Embed try/catch/set_exception into the pool itself
This commit is contained in:
parent
6543f31acb
commit
317e8a3398
11 changed files with 156 additions and 31 deletions
|
@ -27,6 +27,7 @@
|
||||||
#include <future>
|
#include <future>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <exception>
|
||||||
|
|
||||||
namespace eve {
|
namespace eve {
|
||||||
|
|
||||||
|
|
|
@ -43,12 +43,13 @@ public:
|
||||||
void stop();
|
void stop();
|
||||||
void wait();
|
void wait();
|
||||||
|
|
||||||
|
void set_exception (std::exception_ptr ptr);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct LocalData;
|
struct LocalData;
|
||||||
|
|
||||||
void lock_mutex_libev() noexcept;
|
void lock_mutex_libev() noexcept;
|
||||||
void unlock_mutex_libev() noexcept;
|
void unlock_mutex_libev() noexcept;
|
||||||
void set_exception (std::exception_ptr ptr);
|
|
||||||
const Context& context();
|
const Context& context();
|
||||||
|
|
||||||
unsigned int register_stop_callback (std::function<void()>&& cb);
|
unsigned int register_stop_callback (std::function<void()>&& cb);
|
||||||
|
|
44
src/eventia_thread_pool.cpp
Normal file
44
src/eventia_thread_pool.cpp
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
/* Copyright 2020, Michele Santullo
|
||||||
|
* This file is part of orotool.
|
||||||
|
*
|
||||||
|
* Orotool is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Orotool is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Orotool. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "eventia_thread_pool.hpp"
|
||||||
|
#include "eventia/eventia.hpp"
|
||||||
|
#include <cassert>
|
||||||
|
|
||||||
|
namespace duck {
|
||||||
|
|
||||||
|
EventiaThreadPool::EventiaThreadPool (eve::Eventia* eventia) :
|
||||||
|
roar11::ThreadPool(),
|
||||||
|
m_eventia(eventia)
|
||||||
|
{
|
||||||
|
assert(m_eventia);
|
||||||
|
}
|
||||||
|
|
||||||
|
EventiaThreadPool::EventiaThreadPool (eve::Eventia* eventia, std::uint32_t num_threads) :
|
||||||
|
roar11::ThreadPool(num_threads),
|
||||||
|
m_eventia(eventia)
|
||||||
|
{
|
||||||
|
assert(m_eventia);
|
||||||
|
}
|
||||||
|
|
||||||
|
EventiaThreadPool::~EventiaThreadPool() = default;
|
||||||
|
|
||||||
|
void EventiaThreadPool::set_exception (std::exception_ptr ptr) {
|
||||||
|
m_eventia->set_exception(ptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
} //namespace duck
|
74
src/eventia_thread_pool.hpp
Normal file
74
src/eventia_thread_pool.hpp
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
/* Copyright 2020, Michele Santullo
|
||||||
|
* This file is part of orotool.
|
||||||
|
*
|
||||||
|
* Orotool is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Orotool is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with Orotool. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "roar11/ThreadPool.hpp"
|
||||||
|
#include <exception>
|
||||||
|
#if __cplusplus <= 201703L
|
||||||
|
# include <tuple>
|
||||||
|
#endif
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
namespace eve {
|
||||||
|
class Eventia;
|
||||||
|
} //namespace eve
|
||||||
|
|
||||||
|
namespace duck {
|
||||||
|
|
||||||
|
class EventiaThreadPool : roar11::ThreadPool {
|
||||||
|
public:
|
||||||
|
explicit EventiaThreadPool (eve::Eventia* eventia);
|
||||||
|
EventiaThreadPool (eve::Eventia* eventia, std::uint32_t num_threads);
|
||||||
|
~EventiaThreadPool() noexcept;
|
||||||
|
|
||||||
|
template <typename Func, typename... Args>
|
||||||
|
auto submit(Func&& func, Args&&... args);
|
||||||
|
|
||||||
|
private:
|
||||||
|
void set_exception (std::exception_ptr ptr);
|
||||||
|
|
||||||
|
eve::Eventia* m_eventia;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename Func, typename... Args>
|
||||||
|
inline
|
||||||
|
auto EventiaThreadPool::submit(Func&& func, Args&&... args) {
|
||||||
|
using std::forward;
|
||||||
|
|
||||||
|
auto catching_lambda =
|
||||||
|
#if __cplusplus <= 201703L
|
||||||
|
[this,func=forward<Func>(func), args=std::make_tuple(forward<Args>(args)...)]() mutable {
|
||||||
|
try {
|
||||||
|
std::apply(func, std::move(args));
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
[this,func=forward<Func>(func), ... args=forward<Args>(args)](){
|
||||||
|
try {
|
||||||
|
func(std::forward<Args>(args)...);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
catch (...) {
|
||||||
|
roar11::ThreadPool::destroy();
|
||||||
|
this->set_exception(std::current_exception());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return roar11::ThreadPool::submit(std::move(catching_lambda));
|
||||||
|
}
|
||||||
|
|
||||||
|
} //namespace duck
|
|
@ -19,7 +19,7 @@
|
||||||
#include "timer_oro_api.hpp"
|
#include "timer_oro_api.hpp"
|
||||||
#include "eventia/eventia.hpp"
|
#include "eventia/eventia.hpp"
|
||||||
#include "eventia/signal.hpp"
|
#include "eventia/signal.hpp"
|
||||||
#include "roar11/ThreadPool.hpp"
|
#include "eventia_thread_pool.hpp"
|
||||||
#include "oro/dboperation.hpp"
|
#include "oro/dboperation.hpp"
|
||||||
#include "app_config.hpp"
|
#include "app_config.hpp"
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
@ -65,8 +65,8 @@ void test(oro::Api* api, oro::OriginsDB* db, const AppConfig& app_conf) {
|
||||||
typedef TimerSettings TSet;
|
typedef TimerSettings TSet;
|
||||||
|
|
||||||
std::cout << "Running with " << app_conf.worker_threads() << " worker threads\n";
|
std::cout << "Running with " << app_conf.worker_threads() << " worker threads\n";
|
||||||
roar11::ThreadPool pool(app_conf.worker_threads());
|
|
||||||
eve::Eventia worker;
|
eve::Eventia worker;
|
||||||
|
EventiaThreadPool pool(&worker, app_conf.worker_threads());
|
||||||
pool.submit(worker.event_functor());
|
pool.submit(worker.event_functor());
|
||||||
|
|
||||||
const double ed = static_cast<double>(app_conf.fetch_extra_delay());
|
const double ed = static_cast<double>(app_conf.fetch_extra_delay());
|
||||||
|
|
|
@ -133,6 +133,7 @@ executable(meson.project_name(),
|
||||||
'oro/api_nap_exception.cpp',
|
'oro/api_nap_exception.cpp',
|
||||||
optional_sources,
|
optional_sources,
|
||||||
project_config_file,
|
project_config_file,
|
||||||
|
'eventia_thread_pool.cpp',
|
||||||
install: true,
|
install: true,
|
||||||
dependencies: lib_deps,
|
dependencies: lib_deps,
|
||||||
include_directories: [
|
include_directories: [
|
||||||
|
|
|
@ -157,16 +157,16 @@ namespace roar11
|
||||||
*/
|
*/
|
||||||
~ThreadPool(void)
|
~ThreadPool(void)
|
||||||
{
|
{
|
||||||
|
if (!m_done)
|
||||||
destroy();
|
destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Submit a job to be run by the thread pool.
|
* Submit a job to be run by the thread pool.
|
||||||
*/
|
*/
|
||||||
template <typename Func, typename... Args>
|
template <typename Func>
|
||||||
auto submit(Func&& func, Args&&... args)
|
auto submit (Func&& boundTask)
|
||||||
{
|
{
|
||||||
auto boundTask = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
|
|
||||||
using ResultType = std::result_of_t<decltype(boundTask)()>;
|
using ResultType = std::result_of_t<decltype(boundTask)()>;
|
||||||
using PackagedTask = std::packaged_task<ResultType()>;
|
using PackagedTask = std::packaged_task<ResultType()>;
|
||||||
using TaskType = ThreadTask<PackagedTask>;
|
using TaskType = ThreadTask<PackagedTask>;
|
||||||
|
@ -177,6 +177,24 @@ namespace roar11
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename Func, typename Arg1, typename... Args>
|
||||||
|
auto submit(Func&& func, Arg1&& arg1, Args&&... args)
|
||||||
|
{
|
||||||
|
auto boundTask = std::bind(std::forward<Func>(func), std::forward<Arg1>(arg1), std::forward<Args>(args)...);
|
||||||
|
return submit(std::move(boundTask));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/**
|
||||||
|
* Invalidates the queue and joins all running threads.
|
||||||
|
*/
|
||||||
|
void destroy(void) noexcept
|
||||||
|
{
|
||||||
|
m_done = true;
|
||||||
|
m_workQueue.invalidate();
|
||||||
|
join();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
* Constantly running function each thread uses to acquire work items from the queue.
|
* Constantly running function each thread uses to acquire work items from the queue.
|
||||||
|
@ -209,16 +227,6 @@ namespace roar11
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Invalidates the queue and joins all running threads.
|
|
||||||
*/
|
|
||||||
void destroy(void) noexcept
|
|
||||||
{
|
|
||||||
m_done = true;
|
|
||||||
m_workQueue.invalidate();
|
|
||||||
join();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::atomic_bool m_done;
|
std::atomic_bool m_done;
|
||||||
ThreadSafeQueue<std::unique_ptr<IThreadTask>> m_workQueue;
|
ThreadSafeQueue<std::unique_ptr<IThreadTask>> m_workQueue;
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "timer_base.hpp"
|
#include "timer_base.hpp"
|
||||||
#include "roar11/ThreadPool.hpp"
|
#include "eventia_thread_pool.hpp"
|
||||||
#include "oro/api.hpp"
|
#include "oro/api.hpp"
|
||||||
#include "oro/originsdb.hpp"
|
#include "oro/originsdb.hpp"
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
@ -69,7 +69,7 @@ TimerBase::TimerBase (
|
||||||
const eve::Context& ctx,
|
const eve::Context& ctx,
|
||||||
oro::DBOperation type,
|
oro::DBOperation type,
|
||||||
const TimerSettings& settings,
|
const TimerSettings& settings,
|
||||||
roar11::ThreadPool* pool,
|
EventiaThreadPool* pool,
|
||||||
oro::Api* oro_api,
|
oro::Api* oro_api,
|
||||||
oro::OriginsDB* db
|
oro::OriginsDB* db
|
||||||
) :
|
) :
|
||||||
|
@ -91,11 +91,10 @@ void TimerBase::on_timer() {
|
||||||
|
|
||||||
void TimerBase::set_next_timer (const oro::Header& header) {
|
void TimerBase::set_next_timer (const oro::Header& header) {
|
||||||
const unsigned long next_timer = time_interval(header, m_min_wait, m_extra_delay);
|
const unsigned long next_timer = time_interval(header, m_min_wait, m_extra_delay);
|
||||||
std::cout << "Next timer in " << next_timer << " secs\n";
|
|
||||||
this->set_timer(static_cast<double>(next_timer));
|
this->set_timer(static_cast<double>(next_timer));
|
||||||
}
|
}
|
||||||
|
|
||||||
roar11::ThreadPool& TimerBase::pool() {
|
EventiaThreadPool& TimerBase::pool() {
|
||||||
assert(m_pool);
|
assert(m_pool);
|
||||||
return *m_pool;
|
return *m_pool;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,10 +22,6 @@
|
||||||
#include "oro/dboperation.hpp"
|
#include "oro/dboperation.hpp"
|
||||||
#include "oro/source.hpp"
|
#include "oro/source.hpp"
|
||||||
|
|
||||||
namespace roar11 {
|
|
||||||
class ThreadPool;
|
|
||||||
} //namespace roar11
|
|
||||||
|
|
||||||
namespace oro {
|
namespace oro {
|
||||||
class Api;
|
class Api;
|
||||||
class OriginsDB;
|
class OriginsDB;
|
||||||
|
@ -38,6 +34,8 @@ namespace oro {
|
||||||
|
|
||||||
namespace duck {
|
namespace duck {
|
||||||
|
|
||||||
|
class EventiaThreadPool;
|
||||||
|
|
||||||
struct TimerSettings {
|
struct TimerSettings {
|
||||||
TimerSettings (double min_wait, double extra_delay, oro::SourceFormat store_mode) :
|
TimerSettings (double min_wait, double extra_delay, oro::SourceFormat store_mode) :
|
||||||
min_wait(min_wait),
|
min_wait(min_wait),
|
||||||
|
@ -56,7 +54,7 @@ public:
|
||||||
const eve::Context& ctx,
|
const eve::Context& ctx,
|
||||||
oro::DBOperation type,
|
oro::DBOperation type,
|
||||||
const TimerSettings& settings,
|
const TimerSettings& settings,
|
||||||
roar11::ThreadPool* pool,
|
EventiaThreadPool* pool,
|
||||||
oro::Api* oro_api,
|
oro::Api* oro_api,
|
||||||
oro::OriginsDB* db
|
oro::OriginsDB* db
|
||||||
);
|
);
|
||||||
|
@ -70,7 +68,7 @@ protected:
|
||||||
void update_db (const oro::Items& items, const oro::Header& header, const oro::Source& source);
|
void update_db (const oro::Items& items, const oro::Header& header, const oro::Source& source);
|
||||||
void update_db (const oro::Icons& icons, const oro::Header& header, const oro::Source& source);
|
void update_db (const oro::Icons& icons, const oro::Header& header, const oro::Source& source);
|
||||||
void update_db (const oro::Creators& creators, const oro::Header& header, const oro::Source& source);
|
void update_db (const oro::Creators& creators, const oro::Header& header, const oro::Source& source);
|
||||||
roar11::ThreadPool& pool();
|
EventiaThreadPool& pool();
|
||||||
oro::Api& oro_api();
|
oro::Api& oro_api();
|
||||||
oro::OriginsDB& db();
|
oro::OriginsDB& db();
|
||||||
void reset_db_access_time (oro::DBOperation op);
|
void reset_db_access_time (oro::DBOperation op);
|
||||||
|
@ -80,7 +78,7 @@ private:
|
||||||
|
|
||||||
double m_extra_delay;
|
double m_extra_delay;
|
||||||
double m_min_wait;
|
double m_min_wait;
|
||||||
roar11::ThreadPool* m_pool;
|
EventiaThreadPool* m_pool;
|
||||||
oro::Api* m_oro_api;
|
oro::Api* m_oro_api;
|
||||||
oro::OriginsDB* m_db;
|
oro::OriginsDB* m_db;
|
||||||
oro::SourceFormat m_source_store_mode;
|
oro::SourceFormat m_source_store_mode;
|
||||||
|
|
|
@ -27,7 +27,6 @@
|
||||||
#elif defined(OROTOOL_WITH_NAP)
|
#elif defined(OROTOOL_WITH_NAP)
|
||||||
# include "oro/api_nap_exception.hpp"
|
# include "oro/api_nap_exception.hpp"
|
||||||
#endif
|
#endif
|
||||||
#include <exception>
|
|
||||||
|
|
||||||
namespace duck {
|
namespace duck {
|
||||||
|
|
||||||
|
@ -59,7 +58,7 @@ template<oro::DBOperation Op>
|
||||||
inline TimerOroApi<Op>::TimerOroApi (
|
inline TimerOroApi<Op>::TimerOroApi (
|
||||||
const eve::Context& ctx,
|
const eve::Context& ctx,
|
||||||
const TimerSettings& settings,
|
const TimerSettings& settings,
|
||||||
roar11::ThreadPool* pool,
|
EventiaThreadPool* pool,
|
||||||
oro::Api* oro_api,
|
oro::Api* oro_api,
|
||||||
oro::OriginsDB* db
|
oro::OriginsDB* db
|
||||||
) :
|
) :
|
||||||
|
|
|
@ -32,7 +32,7 @@ public:
|
||||||
TimerOroApi (
|
TimerOroApi (
|
||||||
const eve::Context& ctx,
|
const eve::Context& ctx,
|
||||||
const TimerSettings& settings,
|
const TimerSettings& settings,
|
||||||
roar11::ThreadPool* pool,
|
EventiaThreadPool* pool,
|
||||||
oro::Api* oro_api,
|
oro::Api* oro_api,
|
||||||
oro::OriginsDB* db
|
oro::OriginsDB* db
|
||||||
);
|
);
|
||||||
|
|
Loading…
Add table
Reference in a new issue