diff --git a/src/eventia/eventia.cpp b/src/eventia/eventia.cpp index eeaaebd..0d93a4a 100644 --- a/src/eventia/eventia.cpp +++ b/src/eventia/eventia.cpp @@ -27,6 +27,7 @@ #include #include #include +#include namespace eve { diff --git a/src/eventia/eventia.hpp b/src/eventia/eventia.hpp index 8c7b6f0..f99ec0d 100644 --- a/src/eventia/eventia.hpp +++ b/src/eventia/eventia.hpp @@ -43,12 +43,13 @@ public: void stop(); void wait(); + void set_exception (std::exception_ptr ptr); + private: struct LocalData; void lock_mutex_libev() noexcept; void unlock_mutex_libev() noexcept; - void set_exception (std::exception_ptr ptr); const Context& context(); unsigned int register_stop_callback (std::function&& cb); diff --git a/src/eventia_thread_pool.cpp b/src/eventia_thread_pool.cpp new file mode 100644 index 0000000..8b759ca --- /dev/null +++ b/src/eventia_thread_pool.cpp @@ -0,0 +1,44 @@ +/* Copyright 2020, Michele Santullo + * This file is part of orotool. + * + * Orotool is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Orotool is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Orotool. If not, see . + */ + +#include "eventia_thread_pool.hpp" +#include "eventia/eventia.hpp" +#include + +namespace duck { + +EventiaThreadPool::EventiaThreadPool (eve::Eventia* eventia) : + roar11::ThreadPool(), + m_eventia(eventia) +{ + assert(m_eventia); +} + +EventiaThreadPool::EventiaThreadPool (eve::Eventia* eventia, std::uint32_t num_threads) : + roar11::ThreadPool(num_threads), + m_eventia(eventia) +{ + assert(m_eventia); +} + +EventiaThreadPool::~EventiaThreadPool() = default; + +void EventiaThreadPool::set_exception (std::exception_ptr ptr) { + m_eventia->set_exception(ptr); +} + +} //namespace duck diff --git a/src/eventia_thread_pool.hpp b/src/eventia_thread_pool.hpp new file mode 100644 index 0000000..78caa31 --- /dev/null +++ b/src/eventia_thread_pool.hpp @@ -0,0 +1,74 @@ +/* Copyright 2020, Michele Santullo + * This file is part of orotool. + * + * Orotool is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Orotool is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Orotool. If not, see . + */ + +#pragma once + +#include "roar11/ThreadPool.hpp" +#include +#if __cplusplus <= 201703L +# include +#endif +#include + +namespace eve { +class Eventia; +} //namespace eve + +namespace duck { + +class EventiaThreadPool : roar11::ThreadPool { +public: + explicit EventiaThreadPool (eve::Eventia* eventia); + EventiaThreadPool (eve::Eventia* eventia, std::uint32_t num_threads); + ~EventiaThreadPool() noexcept; + + template + auto submit(Func&& func, Args&&... args); + +private: + void set_exception (std::exception_ptr ptr); + + eve::Eventia* m_eventia; +}; + +template +inline +auto EventiaThreadPool::submit(Func&& func, Args&&... args) { + using std::forward; + + auto catching_lambda = +#if __cplusplus <= 201703L + [this,func=forward(func), args=std::make_tuple(forward(args)...)]() mutable { + try { + std::apply(func, std::move(args)); + } +#else + [this,func=forward(func), ... args=forward(args)](){ + try { + func(std::forward(args)...); + } +#endif + catch (...) { + roar11::ThreadPool::destroy(); + this->set_exception(std::current_exception()); + } + }; + + return roar11::ThreadPool::submit(std::move(catching_lambda)); +} + +} //namespace duck diff --git a/src/evloop.cpp b/src/evloop.cpp index 43c288e..abcbc2c 100644 --- a/src/evloop.cpp +++ b/src/evloop.cpp @@ -19,7 +19,7 @@ #include "timer_oro_api.hpp" #include "eventia/eventia.hpp" #include "eventia/signal.hpp" -#include "roar11/ThreadPool.hpp" +#include "eventia_thread_pool.hpp" #include "oro/dboperation.hpp" #include "app_config.hpp" #include @@ -65,8 +65,8 @@ void test(oro::Api* api, oro::OriginsDB* db, const AppConfig& app_conf) { typedef TimerSettings TSet; std::cout << "Running with " << app_conf.worker_threads() << " worker threads\n"; - roar11::ThreadPool pool(app_conf.worker_threads()); eve::Eventia worker; + EventiaThreadPool pool(&worker, app_conf.worker_threads()); pool.submit(worker.event_functor()); const double ed = static_cast(app_conf.fetch_extra_delay()); diff --git a/src/meson.build b/src/meson.build index e65cacd..42ba9c9 100644 --- a/src/meson.build +++ b/src/meson.build @@ -133,6 +133,7 @@ executable(meson.project_name(), 'oro/api_nap_exception.cpp', optional_sources, project_config_file, + 'eventia_thread_pool.cpp', install: true, dependencies: lib_deps, include_directories: [ diff --git a/src/roar11/ThreadPool.hpp b/src/roar11/ThreadPool.hpp index 030d308..fe5852b 100644 --- a/src/roar11/ThreadPool.hpp +++ b/src/roar11/ThreadPool.hpp @@ -157,16 +157,16 @@ namespace roar11 */ ~ThreadPool(void) { - destroy(); + if (!m_done) + destroy(); } /** * Submit a job to be run by the thread pool. */ - template - auto submit(Func&& func, Args&&... args) + template + auto submit (Func&& boundTask) { - auto boundTask = std::bind(std::forward(func), std::forward(args)...); using ResultType = std::result_of_t; using PackagedTask = std::packaged_task; using TaskType = ThreadTask; @@ -177,6 +177,24 @@ namespace roar11 return result; } + template + auto submit(Func&& func, Arg1&& arg1, Args&&... args) + { + auto boundTask = std::bind(std::forward(func), std::forward(arg1), std::forward(args)...); + return submit(std::move(boundTask)); + } + + protected: + /** + * Invalidates the queue and joins all running threads. + */ + void destroy(void) noexcept + { + m_done = true; + m_workQueue.invalidate(); + join(); + } + private: /** * Constantly running function each thread uses to acquire work items from the queue. @@ -209,16 +227,6 @@ namespace roar11 } } - /** - * Invalidates the queue and joins all running threads. - */ - void destroy(void) noexcept - { - m_done = true; - m_workQueue.invalidate(); - join(); - } - private: std::atomic_bool m_done; ThreadSafeQueue> m_workQueue; diff --git a/src/timer_base.cpp b/src/timer_base.cpp index 5aafd9b..2c802b9 100644 --- a/src/timer_base.cpp +++ b/src/timer_base.cpp @@ -16,7 +16,7 @@ */ #include "timer_base.hpp" -#include "roar11/ThreadPool.hpp" +#include "eventia_thread_pool.hpp" #include "oro/api.hpp" #include "oro/originsdb.hpp" #include @@ -69,7 +69,7 @@ TimerBase::TimerBase ( const eve::Context& ctx, oro::DBOperation type, const TimerSettings& settings, - roar11::ThreadPool* pool, + EventiaThreadPool* pool, oro::Api* oro_api, oro::OriginsDB* db ) : @@ -91,11 +91,10 @@ void TimerBase::on_timer() { void TimerBase::set_next_timer (const oro::Header& header) { const unsigned long next_timer = time_interval(header, m_min_wait, m_extra_delay); - std::cout << "Next timer in " << next_timer << " secs\n"; this->set_timer(static_cast(next_timer)); } -roar11::ThreadPool& TimerBase::pool() { +EventiaThreadPool& TimerBase::pool() { assert(m_pool); return *m_pool; } diff --git a/src/timer_base.hpp b/src/timer_base.hpp index d12ea70..7b969a3 100644 --- a/src/timer_base.hpp +++ b/src/timer_base.hpp @@ -22,10 +22,6 @@ #include "oro/dboperation.hpp" #include "oro/source.hpp" -namespace roar11 { - class ThreadPool; -} //namespace roar11 - namespace oro { class Api; class OriginsDB; @@ -38,6 +34,8 @@ namespace oro { namespace duck { +class EventiaThreadPool; + struct TimerSettings { TimerSettings (double min_wait, double extra_delay, oro::SourceFormat store_mode) : min_wait(min_wait), @@ -56,7 +54,7 @@ public: const eve::Context& ctx, oro::DBOperation type, const TimerSettings& settings, - roar11::ThreadPool* pool, + EventiaThreadPool* pool, oro::Api* oro_api, oro::OriginsDB* db ); @@ -70,7 +68,7 @@ protected: void update_db (const oro::Items& items, const oro::Header& header, const oro::Source& source); void update_db (const oro::Icons& icons, const oro::Header& header, const oro::Source& source); void update_db (const oro::Creators& creators, const oro::Header& header, const oro::Source& source); - roar11::ThreadPool& pool(); + EventiaThreadPool& pool(); oro::Api& oro_api(); oro::OriginsDB& db(); void reset_db_access_time (oro::DBOperation op); @@ -80,7 +78,7 @@ private: double m_extra_delay; double m_min_wait; - roar11::ThreadPool* m_pool; + EventiaThreadPool* m_pool; oro::Api* m_oro_api; oro::OriginsDB* m_db; oro::SourceFormat m_source_store_mode; diff --git a/src/timer_oro_api.cpp b/src/timer_oro_api.cpp index fd56b8c..76165bc 100644 --- a/src/timer_oro_api.cpp +++ b/src/timer_oro_api.cpp @@ -27,7 +27,6 @@ #elif defined(OROTOOL_WITH_NAP) # include "oro/api_nap_exception.hpp" #endif -#include namespace duck { @@ -59,7 +58,7 @@ template inline TimerOroApi::TimerOroApi ( const eve::Context& ctx, const TimerSettings& settings, - roar11::ThreadPool* pool, + EventiaThreadPool* pool, oro::Api* oro_api, oro::OriginsDB* db ) : diff --git a/src/timer_oro_api.hpp b/src/timer_oro_api.hpp index 0f99d3b..7eef958 100644 --- a/src/timer_oro_api.hpp +++ b/src/timer_oro_api.hpp @@ -32,7 +32,7 @@ public: TimerOroApi ( const eve::Context& ctx, const TimerSettings& settings, - roar11::ThreadPool* pool, + EventiaThreadPool* pool, oro::Api* oro_api, oro::OriginsDB* db );