From 497d797e3260828463f3f2da7f266e76f0d2863b Mon Sep 17 00:00:00 2001 From: King_DuckZ Date: Sat, 12 Sep 2020 00:40:41 +0100 Subject: [PATCH] Quit cleanly when an exception is thrown The trick is to break the ev loop from within its same thread. This can be done with the async event for example, so I repurposed the one I already had to call stop() in case an error has been set. Otherwise its callback is a no-op just like before. --- src/eventia/event.cpp | 4 ++ src/eventia/event.hpp | 1 + src/eventia/eventia.cpp | 76 +++++++++++++++++++--------------- src/eventia/eventia.hpp | 7 ++-- src/eventia/timer.cpp | 31 ++++++++++---- src/eventia/timer.hpp | 2 +- src/eventia_thread_pool.cpp | 4 ++ src/eventia_thread_pool.hpp | 4 +- src/evloop.cpp | 10 ++++- src/roar11/ThreadPool.hpp | 7 +++- src/roar11/ThreadSafeQueue.hpp | 7 +++- src/timer_oro_api.cpp | 7 +--- 12 files changed, 102 insertions(+), 58 deletions(-) diff --git a/src/eventia/event.cpp b/src/eventia/event.cpp index d9328bc..bafa82b 100644 --- a/src/eventia/event.cpp +++ b/src/eventia/event.cpp @@ -49,6 +49,10 @@ void Event::set_exception (std::exception_ptr ptr) { m_eventia->set_exception(ptr); } +bool Event::is_stopped() const noexcept { + return m_was_stopped; +} + #if !defined(NDEBUG) std::string Event::demangle (const char* name) { //see: diff --git a/src/eventia/event.hpp b/src/eventia/event.hpp index 9e4af69..c4b4ca6 100644 --- a/src/eventia/event.hpp +++ b/src/eventia/event.hpp @@ -37,6 +37,7 @@ protected: void set_exception (std::exception_ptr ptr); template void stop_ifn(EV& eve); + bool is_stopped() const noexcept; private: void on_loop_stopping_local() noexcept; diff --git a/src/eventia/eventia.cpp b/src/eventia/eventia.cpp index 0d93a4a..d380a05 100644 --- a/src/eventia/eventia.cpp +++ b/src/eventia/eventia.cpp @@ -47,20 +47,22 @@ struct Eventia::LocalData { explicit LocalData(Eventia* eventia); std::mutex ev_mutex; + std::mutex exception_mutex; boost::container::flat_map> stop_cbs; ev::async async; ev::default_loop loop; Context context; std::promise loop_end_promise; std::future loop_end; - std::atomic_bool mark_loop_end; + std::atomic_bool errored_out; + std::exception_ptr last_exception; }; Eventia::LocalData::LocalData(Eventia* eventia) : loop(ev::AUTO), context{ &loop, &ev_mutex, &async, eventia }, loop_end(loop_end_promise.get_future()), - mark_loop_end(true) + errored_out(false) { } @@ -69,57 +71,64 @@ Eventia::Eventia() : { assert(nullptr == ev_userdata(m_local->loop)); m_local->async.set(m_local->loop); - m_local->async.set(this); + m_local->async.set(this); m_local->async.start(); ev_set_userdata(m_local->loop, &m_local->context); ev_set_loop_release_cb(m_local->loop, &eve::unlock_mutex_libev, &eve::lock_mutex_libev); } -Eventia::~Eventia() noexcept { - try { - this->stop(); - this->wait(); - } - catch (std::runtime_error& err) { - std::cerr << "Exception in main ev loop: " << err.what() << '\n'; - } - catch (...) { - std::cerr << "Unknown exception in main ev loop\n"; - assert(false); - } -} +Eventia::~Eventia() noexcept = default; std::function Eventia::event_functor() { return std::function([this]() { - m_local->mark_loop_end = true; + m_local->errored_out = false; + m_local->last_exception = nullptr; std::unique_lock lock(m_local->ev_mutex); m_local->loop.run(0); #if !defined(NDEBUG) std::cout << "ev loop stopped\n"; #endif - if (m_local->mark_loop_end) + if (not m_local->errored_out) { m_local->loop_end_promise.set_value(); + } + else { + std::unique_lock lock(m_local->exception_mutex); + m_local->loop_end_promise.set_exception(m_local->last_exception); + } }); } -void Eventia::do_nothing() { -} - -void Eventia::stop() { +void Eventia::stop() noexcept { #if !defined(NDEBUG) - std::cout << "Stopping Eventia (" << m_local->stop_cbs.size() << " observers)...\n"; + try { + std::cout << "Stopping Eventia (" << m_local->stop_cbs.size() << " observers)...\n"; + } + catch (...) {} #endif for (auto& stop_cb : m_local->stop_cbs) { stop_cb.second(); } - m_local->async.send(); - m_local->async.stop(); m_local->loop.break_loop(ev::ALL); } void Eventia::wait() { - if (m_local->loop_end.valid()) - m_local->loop_end.get(); + m_local->loop_end.get(); +} + +void Eventia::set_exception (std::exception_ptr ptr) noexcept { + try { + std::unique_lock lock(m_local->exception_mutex); + if (m_local->errored_out) + return; + + m_local->last_exception = ptr; + m_local->errored_out = true; + m_local->async.send(); + } + catch (const std::system_error& err) { + std::cerr << "Unexpected exception in Eventia::set_exception(), this is bad! Exception message: " << + err.what() << '\n'; + } } void Eventia::lock_mutex_libev() noexcept { @@ -136,16 +145,17 @@ void Eventia::unlock_mutex_libev() noexcept { m_local->ev_mutex.unlock(); } -void Eventia::set_exception (std::exception_ptr ptr) { - m_local->mark_loop_end = false; - this->stop(); - m_local->loop_end_promise.set_exception(ptr); -} - const Context& Eventia::context() { return m_local->context; } +void Eventia::on_async() { + if (m_local->errored_out) { + m_local->async.stop(); + this->stop(); + } +} + unsigned int Eventia::register_stop_callback (std::function&& func) { const unsigned int id = static_cast(m_local->stop_cbs.size()); m_local->stop_cbs.insert(std::make_pair(id, std::move(func))); diff --git a/src/eventia/eventia.hpp b/src/eventia/eventia.hpp index f99ec0d..7e64caa 100644 --- a/src/eventia/eventia.hpp +++ b/src/eventia/eventia.hpp @@ -33,17 +33,15 @@ public: ~Eventia() noexcept; std::function event_functor(); - void do_nothing(); template T make_event(Args&&... args) { return T{this->context(), std::forward(args)...}; } - void stop(); + void stop() noexcept; void wait(); - - void set_exception (std::exception_ptr ptr); + void set_exception (std::exception_ptr ptr) noexcept; private: struct LocalData; @@ -51,6 +49,7 @@ private: void lock_mutex_libev() noexcept; void unlock_mutex_libev() noexcept; const Context& context(); + void on_async(); unsigned int register_stop_callback (std::function&& cb); void unregister_stop_callback (unsigned int id) noexcept; diff --git a/src/eventia/timer.cpp b/src/eventia/timer.cpp index 1906c45..d0eda4e 100644 --- a/src/eventia/timer.cpp +++ b/src/eventia/timer.cpp @@ -24,6 +24,7 @@ #if !defined(NDEBUG) # include #endif +#include namespace eve { @@ -64,17 +65,22 @@ Timer::~Timer() noexcept { Timer::on_loop_stopping(); } -void Timer::on_timer_ev() { - double& elapsed = m_local->elapsed; - const double& timer_value = m_local->timer_value; - const double& timeout = m_local->timeout; +void Timer::on_timer_ev() noexcept { + try { + double& elapsed = m_local->elapsed; + const double& timer_value = m_local->timer_value; + const double& timeout = m_local->timeout; - elapsed += timer_value; - if (elapsed >= timeout) { - this->on_timer(); + elapsed += timer_value; + if (elapsed >= timeout) { + this->on_timer(); + } + else { + m_local->timer.start(std::min(timeout - elapsed, g_timer_max), 0.0); + } } - else { - m_local->timer.start(std::min(timeout - elapsed, g_timer_max), 0.0); + catch (...) { + set_exception(std::current_exception()); } } @@ -84,6 +90,13 @@ void Timer::set_timer (double delay) { m_local->timeout = delay; m_local->timer_value = std::min(g_timer_max, m_local->timeout); + if (this->is_stopped()) { +#if !defined(NDEBUG) + std::cout << "Timer::set_timer(" << delay << ") refusing to act because underlying event was stopped\n"; +#endif + return; + } + m_local->timer.stop(); ev_now_update(*m_local->context.loop); m_local->timer.start(m_local->timer_value, 0.0); diff --git a/src/eventia/timer.hpp b/src/eventia/timer.hpp index a4ae46e..2f239ab 100644 --- a/src/eventia/timer.hpp +++ b/src/eventia/timer.hpp @@ -41,7 +41,7 @@ protected: private: struct LocalData; - void on_timer_ev(); + void on_timer_ev() noexcept; virtual void on_loop_stopping() noexcept override; std::unique_ptr m_local; diff --git a/src/eventia_thread_pool.cpp b/src/eventia_thread_pool.cpp index 8b759ca..7d3a83c 100644 --- a/src/eventia_thread_pool.cpp +++ b/src/eventia_thread_pool.cpp @@ -37,6 +37,10 @@ EventiaThreadPool::EventiaThreadPool (eve::Eventia* eventia, std::uint32_t num_t EventiaThreadPool::~EventiaThreadPool() = default; +void EventiaThreadPool::wait() noexcept { + this->destroy(); +} + void EventiaThreadPool::set_exception (std::exception_ptr ptr) { m_eventia->set_exception(ptr); } diff --git a/src/eventia_thread_pool.hpp b/src/eventia_thread_pool.hpp index 78caa31..e18240f 100644 --- a/src/eventia_thread_pool.hpp +++ b/src/eventia_thread_pool.hpp @@ -39,6 +39,8 @@ public: template auto submit(Func&& func, Args&&... args); + void wait() noexcept; + private: void set_exception (std::exception_ptr ptr); @@ -63,7 +65,7 @@ auto EventiaThreadPool::submit(Func&& func, Args&&... args) { } #endif catch (...) { - roar11::ThreadPool::destroy(); + roar11::ThreadPool::invalidate(); this->set_exception(std::current_exception()); } }; diff --git a/src/evloop.cpp b/src/evloop.cpp index abcbc2c..326bd3c 100644 --- a/src/evloop.cpp +++ b/src/evloop.cpp @@ -67,7 +67,6 @@ void test(oro::Api* api, oro::OriginsDB* db, const AppConfig& app_conf) { std::cout << "Running with " << app_conf.worker_threads() << " worker threads\n"; eve::Eventia worker; EventiaThreadPool pool(&worker, app_conf.worker_threads()); - pool.submit(worker.event_functor()); const double ed = static_cast(app_conf.fetch_extra_delay()); @@ -85,7 +84,14 @@ void test(oro::Api* api, oro::OriginsDB* db, const AppConfig& app_conf) { auto timer_shops = worker.make_event(TSet{w3, ed, sf}, &pool, api, db); auto timer_creat = worker.make_event(TSet{w4, ed, sf}, &pool, api, db); - worker.wait(); + try { + pool.submit(worker.event_functor()); + worker.wait(); + } + catch (...) { + pool.wait(); + throw; + } #if !defined(NDEBUG) std::cout << "all tasks completed\n"; #endif diff --git a/src/roar11/ThreadPool.hpp b/src/roar11/ThreadPool.hpp index fe5852b..30231a3 100644 --- a/src/roar11/ThreadPool.hpp +++ b/src/roar11/ThreadPool.hpp @@ -184,7 +184,7 @@ namespace roar11 return submit(std::move(boundTask)); } - protected: + protected: /** * Invalidates the queue and joins all running threads. */ @@ -195,6 +195,11 @@ namespace roar11 join(); } + void invalidate() noexcept + { + m_workQueue.invalidate(); + } + private: /** * Constantly running function each thread uses to acquire work items from the queue. diff --git a/src/roar11/ThreadSafeQueue.hpp b/src/roar11/ThreadSafeQueue.hpp index f9fa355..7a580d1 100644 --- a/src/roar11/ThreadSafeQueue.hpp +++ b/src/roar11/ThreadSafeQueue.hpp @@ -76,8 +76,11 @@ namespace roar11 void push(T value) { std::lock_guard lock{m_mutex}; - m_queue.push(std::move(value)); - m_condition.notify_one(); + if (m_valid) + { + m_queue.push(std::move(value)); + m_condition.notify_one(); + } } /** diff --git a/src/timer_oro_api.cpp b/src/timer_oro_api.cpp index 76165bc..80af66d 100644 --- a/src/timer_oro_api.cpp +++ b/src/timer_oro_api.cpp @@ -99,7 +99,7 @@ inline void TimerOroApi::fetch_data (oro::SourceFormat store_mode) { //server received too many requests, give up for now } else { - throw err; + throw; } } #elif defined(OROTOOL_WITH_NAP) @@ -108,13 +108,10 @@ inline void TimerOroApi::fetch_data (oro::SourceFormat store_mode) { if (429 == status_code) { } else { - this->set_exception(std::current_exception()); + throw; } } #endif - catch (...) { - this->set_exception(std::current_exception()); - } if (429 == status_code) {