diff --git a/src/eventia/event.cpp b/src/eventia/event.cpp index d9328bc..bafa82b 100644 --- a/src/eventia/event.cpp +++ b/src/eventia/event.cpp @@ -49,6 +49,10 @@ void Event::set_exception (std::exception_ptr ptr) { m_eventia->set_exception(ptr); } +bool Event::is_stopped() const noexcept { + return m_was_stopped; +} + #if !defined(NDEBUG) std::string Event::demangle (const char* name) { //see: diff --git a/src/eventia/event.hpp b/src/eventia/event.hpp index 9e4af69..c4b4ca6 100644 --- a/src/eventia/event.hpp +++ b/src/eventia/event.hpp @@ -37,6 +37,7 @@ protected: void set_exception (std::exception_ptr ptr); template void stop_ifn(EV& eve); + bool is_stopped() const noexcept; private: void on_loop_stopping_local() noexcept; diff --git a/src/eventia/eventia.cpp b/src/eventia/eventia.cpp index 0d93a4a..d380a05 100644 --- a/src/eventia/eventia.cpp +++ b/src/eventia/eventia.cpp @@ -47,20 +47,22 @@ struct Eventia::LocalData { explicit LocalData(Eventia* eventia); std::mutex ev_mutex; + std::mutex exception_mutex; boost::container::flat_map> stop_cbs; ev::async async; ev::default_loop loop; Context context; std::promise loop_end_promise; std::future loop_end; - std::atomic_bool mark_loop_end; + std::atomic_bool errored_out; + std::exception_ptr last_exception; }; Eventia::LocalData::LocalData(Eventia* eventia) : loop(ev::AUTO), context{ &loop, &ev_mutex, &async, eventia }, loop_end(loop_end_promise.get_future()), - mark_loop_end(true) + errored_out(false) { } @@ -69,57 +71,64 @@ Eventia::Eventia() : { assert(nullptr == ev_userdata(m_local->loop)); m_local->async.set(m_local->loop); - m_local->async.set(this); + m_local->async.set(this); m_local->async.start(); ev_set_userdata(m_local->loop, &m_local->context); ev_set_loop_release_cb(m_local->loop, &eve::unlock_mutex_libev, &eve::lock_mutex_libev); } -Eventia::~Eventia() noexcept { - try { - this->stop(); - this->wait(); - } - catch (std::runtime_error& err) { - std::cerr << "Exception in main ev loop: " << err.what() << '\n'; - } - catch (...) { - std::cerr << "Unknown exception in main ev loop\n"; - assert(false); - } -} +Eventia::~Eventia() noexcept = default; std::function Eventia::event_functor() { return std::function([this]() { - m_local->mark_loop_end = true; + m_local->errored_out = false; + m_local->last_exception = nullptr; std::unique_lock lock(m_local->ev_mutex); m_local->loop.run(0); #if !defined(NDEBUG) std::cout << "ev loop stopped\n"; #endif - if (m_local->mark_loop_end) + if (not m_local->errored_out) { m_local->loop_end_promise.set_value(); + } + else { + std::unique_lock lock(m_local->exception_mutex); + m_local->loop_end_promise.set_exception(m_local->last_exception); + } }); } -void Eventia::do_nothing() { -} - -void Eventia::stop() { +void Eventia::stop() noexcept { #if !defined(NDEBUG) - std::cout << "Stopping Eventia (" << m_local->stop_cbs.size() << " observers)...\n"; + try { + std::cout << "Stopping Eventia (" << m_local->stop_cbs.size() << " observers)...\n"; + } + catch (...) {} #endif for (auto& stop_cb : m_local->stop_cbs) { stop_cb.second(); } - m_local->async.send(); - m_local->async.stop(); m_local->loop.break_loop(ev::ALL); } void Eventia::wait() { - if (m_local->loop_end.valid()) - m_local->loop_end.get(); + m_local->loop_end.get(); +} + +void Eventia::set_exception (std::exception_ptr ptr) noexcept { + try { + std::unique_lock lock(m_local->exception_mutex); + if (m_local->errored_out) + return; + + m_local->last_exception = ptr; + m_local->errored_out = true; + m_local->async.send(); + } + catch (const std::system_error& err) { + std::cerr << "Unexpected exception in Eventia::set_exception(), this is bad! Exception message: " << + err.what() << '\n'; + } } void Eventia::lock_mutex_libev() noexcept { @@ -136,16 +145,17 @@ void Eventia::unlock_mutex_libev() noexcept { m_local->ev_mutex.unlock(); } -void Eventia::set_exception (std::exception_ptr ptr) { - m_local->mark_loop_end = false; - this->stop(); - m_local->loop_end_promise.set_exception(ptr); -} - const Context& Eventia::context() { return m_local->context; } +void Eventia::on_async() { + if (m_local->errored_out) { + m_local->async.stop(); + this->stop(); + } +} + unsigned int Eventia::register_stop_callback (std::function&& func) { const unsigned int id = static_cast(m_local->stop_cbs.size()); m_local->stop_cbs.insert(std::make_pair(id, std::move(func))); diff --git a/src/eventia/eventia.hpp b/src/eventia/eventia.hpp index f99ec0d..7e64caa 100644 --- a/src/eventia/eventia.hpp +++ b/src/eventia/eventia.hpp @@ -33,17 +33,15 @@ public: ~Eventia() noexcept; std::function event_functor(); - void do_nothing(); template T make_event(Args&&... args) { return T{this->context(), std::forward(args)...}; } - void stop(); + void stop() noexcept; void wait(); - - void set_exception (std::exception_ptr ptr); + void set_exception (std::exception_ptr ptr) noexcept; private: struct LocalData; @@ -51,6 +49,7 @@ private: void lock_mutex_libev() noexcept; void unlock_mutex_libev() noexcept; const Context& context(); + void on_async(); unsigned int register_stop_callback (std::function&& cb); void unregister_stop_callback (unsigned int id) noexcept; diff --git a/src/eventia/timer.cpp b/src/eventia/timer.cpp index 1906c45..d0eda4e 100644 --- a/src/eventia/timer.cpp +++ b/src/eventia/timer.cpp @@ -24,6 +24,7 @@ #if !defined(NDEBUG) # include #endif +#include namespace eve { @@ -64,17 +65,22 @@ Timer::~Timer() noexcept { Timer::on_loop_stopping(); } -void Timer::on_timer_ev() { - double& elapsed = m_local->elapsed; - const double& timer_value = m_local->timer_value; - const double& timeout = m_local->timeout; +void Timer::on_timer_ev() noexcept { + try { + double& elapsed = m_local->elapsed; + const double& timer_value = m_local->timer_value; + const double& timeout = m_local->timeout; - elapsed += timer_value; - if (elapsed >= timeout) { - this->on_timer(); + elapsed += timer_value; + if (elapsed >= timeout) { + this->on_timer(); + } + else { + m_local->timer.start(std::min(timeout - elapsed, g_timer_max), 0.0); + } } - else { - m_local->timer.start(std::min(timeout - elapsed, g_timer_max), 0.0); + catch (...) { + set_exception(std::current_exception()); } } @@ -84,6 +90,13 @@ void Timer::set_timer (double delay) { m_local->timeout = delay; m_local->timer_value = std::min(g_timer_max, m_local->timeout); + if (this->is_stopped()) { +#if !defined(NDEBUG) + std::cout << "Timer::set_timer(" << delay << ") refusing to act because underlying event was stopped\n"; +#endif + return; + } + m_local->timer.stop(); ev_now_update(*m_local->context.loop); m_local->timer.start(m_local->timer_value, 0.0); diff --git a/src/eventia/timer.hpp b/src/eventia/timer.hpp index a4ae46e..2f239ab 100644 --- a/src/eventia/timer.hpp +++ b/src/eventia/timer.hpp @@ -41,7 +41,7 @@ protected: private: struct LocalData; - void on_timer_ev(); + void on_timer_ev() noexcept; virtual void on_loop_stopping() noexcept override; std::unique_ptr m_local; diff --git a/src/eventia_thread_pool.cpp b/src/eventia_thread_pool.cpp index 8b759ca..7d3a83c 100644 --- a/src/eventia_thread_pool.cpp +++ b/src/eventia_thread_pool.cpp @@ -37,6 +37,10 @@ EventiaThreadPool::EventiaThreadPool (eve::Eventia* eventia, std::uint32_t num_t EventiaThreadPool::~EventiaThreadPool() = default; +void EventiaThreadPool::wait() noexcept { + this->destroy(); +} + void EventiaThreadPool::set_exception (std::exception_ptr ptr) { m_eventia->set_exception(ptr); } diff --git a/src/eventia_thread_pool.hpp b/src/eventia_thread_pool.hpp index 78caa31..e18240f 100644 --- a/src/eventia_thread_pool.hpp +++ b/src/eventia_thread_pool.hpp @@ -39,6 +39,8 @@ public: template auto submit(Func&& func, Args&&... args); + void wait() noexcept; + private: void set_exception (std::exception_ptr ptr); @@ -63,7 +65,7 @@ auto EventiaThreadPool::submit(Func&& func, Args&&... args) { } #endif catch (...) { - roar11::ThreadPool::destroy(); + roar11::ThreadPool::invalidate(); this->set_exception(std::current_exception()); } }; diff --git a/src/evloop.cpp b/src/evloop.cpp index abcbc2c..326bd3c 100644 --- a/src/evloop.cpp +++ b/src/evloop.cpp @@ -67,7 +67,6 @@ void test(oro::Api* api, oro::OriginsDB* db, const AppConfig& app_conf) { std::cout << "Running with " << app_conf.worker_threads() << " worker threads\n"; eve::Eventia worker; EventiaThreadPool pool(&worker, app_conf.worker_threads()); - pool.submit(worker.event_functor()); const double ed = static_cast(app_conf.fetch_extra_delay()); @@ -85,7 +84,14 @@ void test(oro::Api* api, oro::OriginsDB* db, const AppConfig& app_conf) { auto timer_shops = worker.make_event(TSet{w3, ed, sf}, &pool, api, db); auto timer_creat = worker.make_event(TSet{w4, ed, sf}, &pool, api, db); - worker.wait(); + try { + pool.submit(worker.event_functor()); + worker.wait(); + } + catch (...) { + pool.wait(); + throw; + } #if !defined(NDEBUG) std::cout << "all tasks completed\n"; #endif diff --git a/src/roar11/ThreadPool.hpp b/src/roar11/ThreadPool.hpp index fe5852b..30231a3 100644 --- a/src/roar11/ThreadPool.hpp +++ b/src/roar11/ThreadPool.hpp @@ -184,7 +184,7 @@ namespace roar11 return submit(std::move(boundTask)); } - protected: + protected: /** * Invalidates the queue and joins all running threads. */ @@ -195,6 +195,11 @@ namespace roar11 join(); } + void invalidate() noexcept + { + m_workQueue.invalidate(); + } + private: /** * Constantly running function each thread uses to acquire work items from the queue. diff --git a/src/roar11/ThreadSafeQueue.hpp b/src/roar11/ThreadSafeQueue.hpp index f9fa355..7a580d1 100644 --- a/src/roar11/ThreadSafeQueue.hpp +++ b/src/roar11/ThreadSafeQueue.hpp @@ -76,8 +76,11 @@ namespace roar11 void push(T value) { std::lock_guard lock{m_mutex}; - m_queue.push(std::move(value)); - m_condition.notify_one(); + if (m_valid) + { + m_queue.push(std::move(value)); + m_condition.notify_one(); + } } /** diff --git a/src/timer_oro_api.cpp b/src/timer_oro_api.cpp index 76165bc..80af66d 100644 --- a/src/timer_oro_api.cpp +++ b/src/timer_oro_api.cpp @@ -99,7 +99,7 @@ inline void TimerOroApi::fetch_data (oro::SourceFormat store_mode) { //server received too many requests, give up for now } else { - throw err; + throw; } } #elif defined(OROTOOL_WITH_NAP) @@ -108,13 +108,10 @@ inline void TimerOroApi::fetch_data (oro::SourceFormat store_mode) { if (429 == status_code) { } else { - this->set_exception(std::current_exception()); + throw; } } #endif - catch (...) { - this->set_exception(std::current_exception()); - } if (429 == status_code) {