Quit cleanly when an exception is thrown

The trick is to break the ev loop from within its
same thread. This can be done with the async event
for example, so I repurposed the one I already had
to call stop() in case an error has been set.
Otherwise its callback is a no-op just like before.
This commit is contained in:
King_DuckZ 2020-09-12 00:40:41 +01:00
parent 4cb47eb82e
commit 497d797e32
12 changed files with 102 additions and 58 deletions

View file

@ -49,6 +49,10 @@ void Event::set_exception (std::exception_ptr ptr) {
m_eventia->set_exception(ptr);
}
bool Event::is_stopped() const noexcept {
return m_was_stopped;
}
#if !defined(NDEBUG)
std::string Event::demangle (const char* name) {
//see:

View file

@ -37,6 +37,7 @@ protected:
void set_exception (std::exception_ptr ptr);
template <typename EV> void stop_ifn(EV& eve);
bool is_stopped() const noexcept;
private:
void on_loop_stopping_local() noexcept;

View file

@ -47,20 +47,22 @@ struct Eventia::LocalData {
explicit LocalData(Eventia* eventia);
std::mutex ev_mutex;
std::mutex exception_mutex;
boost::container::flat_map<unsigned int, std::function<void()>> stop_cbs;
ev::async async;
ev::default_loop loop;
Context context;
std::promise<void> loop_end_promise;
std::future<void> loop_end;
std::atomic_bool mark_loop_end;
std::atomic_bool errored_out;
std::exception_ptr last_exception;
};
Eventia::LocalData::LocalData(Eventia* eventia) :
loop(ev::AUTO),
context{ &loop, &ev_mutex, &async, eventia },
loop_end(loop_end_promise.get_future()),
mark_loop_end(true)
errored_out(false)
{
}
@ -69,57 +71,64 @@ Eventia::Eventia() :
{
assert(nullptr == ev_userdata(m_local->loop));
m_local->async.set(m_local->loop);
m_local->async.set<Eventia, &Eventia::do_nothing>(this);
m_local->async.set<Eventia, &Eventia::on_async>(this);
m_local->async.start();
ev_set_userdata(m_local->loop, &m_local->context);
ev_set_loop_release_cb(m_local->loop, &eve::unlock_mutex_libev, &eve::lock_mutex_libev);
}
Eventia::~Eventia() noexcept {
try {
this->stop();
this->wait();
}
catch (std::runtime_error& err) {
std::cerr << "Exception in main ev loop: " << err.what() << '\n';
}
catch (...) {
std::cerr << "Unknown exception in main ev loop\n";
assert(false);
}
}
Eventia::~Eventia() noexcept = default;
std::function<void()> Eventia::event_functor() {
return std::function<void()>([this]() {
m_local->mark_loop_end = true;
m_local->errored_out = false;
m_local->last_exception = nullptr;
std::unique_lock<std::mutex> lock(m_local->ev_mutex);
m_local->loop.run(0);
#if !defined(NDEBUG)
std::cout << "ev loop stopped\n";
#endif
if (m_local->mark_loop_end)
if (not m_local->errored_out) {
m_local->loop_end_promise.set_value();
}
else {
std::unique_lock lock(m_local->exception_mutex);
m_local->loop_end_promise.set_exception(m_local->last_exception);
}
});
}
void Eventia::do_nothing() {
}
void Eventia::stop() {
void Eventia::stop() noexcept {
#if !defined(NDEBUG)
std::cout << "Stopping Eventia (" << m_local->stop_cbs.size() << " observers)...\n";
try {
std::cout << "Stopping Eventia (" << m_local->stop_cbs.size() << " observers)...\n";
}
catch (...) {}
#endif
for (auto& stop_cb : m_local->stop_cbs) {
stop_cb.second();
}
m_local->async.send();
m_local->async.stop();
m_local->loop.break_loop(ev::ALL);
}
void Eventia::wait() {
if (m_local->loop_end.valid())
m_local->loop_end.get();
m_local->loop_end.get();
}
void Eventia::set_exception (std::exception_ptr ptr) noexcept {
try {
std::unique_lock lock(m_local->exception_mutex);
if (m_local->errored_out)
return;
m_local->last_exception = ptr;
m_local->errored_out = true;
m_local->async.send();
}
catch (const std::system_error& err) {
std::cerr << "Unexpected exception in Eventia::set_exception(), this is bad! Exception message: " <<
err.what() << '\n';
}
}
void Eventia::lock_mutex_libev() noexcept {
@ -136,16 +145,17 @@ void Eventia::unlock_mutex_libev() noexcept {
m_local->ev_mutex.unlock();
}
void Eventia::set_exception (std::exception_ptr ptr) {
m_local->mark_loop_end = false;
this->stop();
m_local->loop_end_promise.set_exception(ptr);
}
const Context& Eventia::context() {
return m_local->context;
}
void Eventia::on_async() {
if (m_local->errored_out) {
m_local->async.stop();
this->stop();
}
}
unsigned int Eventia::register_stop_callback (std::function<void()>&& func) {
const unsigned int id = static_cast<unsigned int>(m_local->stop_cbs.size());
m_local->stop_cbs.insert(std::make_pair(id, std::move(func)));

View file

@ -33,17 +33,15 @@ public:
~Eventia() noexcept;
std::function<void()> event_functor();
void do_nothing();
template <typename T, typename... Args>
T make_event(Args&&... args) {
return T{this->context(), std::forward<Args>(args)...};
}
void stop();
void stop() noexcept;
void wait();
void set_exception (std::exception_ptr ptr);
void set_exception (std::exception_ptr ptr) noexcept;
private:
struct LocalData;
@ -51,6 +49,7 @@ private:
void lock_mutex_libev() noexcept;
void unlock_mutex_libev() noexcept;
const Context& context();
void on_async();
unsigned int register_stop_callback (std::function<void()>&& cb);
void unregister_stop_callback (unsigned int id) noexcept;

View file

@ -24,6 +24,7 @@
#if !defined(NDEBUG)
# include <iostream>
#endif
#include <exception>
namespace eve {
@ -64,17 +65,22 @@ Timer::~Timer() noexcept {
Timer::on_loop_stopping();
}
void Timer::on_timer_ev() {
double& elapsed = m_local->elapsed;
const double& timer_value = m_local->timer_value;
const double& timeout = m_local->timeout;
void Timer::on_timer_ev() noexcept {
try {
double& elapsed = m_local->elapsed;
const double& timer_value = m_local->timer_value;
const double& timeout = m_local->timeout;
elapsed += timer_value;
if (elapsed >= timeout) {
this->on_timer();
elapsed += timer_value;
if (elapsed >= timeout) {
this->on_timer();
}
else {
m_local->timer.start(std::min(timeout - elapsed, g_timer_max), 0.0);
}
}
else {
m_local->timer.start(std::min(timeout - elapsed, g_timer_max), 0.0);
catch (...) {
set_exception(std::current_exception());
}
}
@ -84,6 +90,13 @@ void Timer::set_timer (double delay) {
m_local->timeout = delay;
m_local->timer_value = std::min(g_timer_max, m_local->timeout);
if (this->is_stopped()) {
#if !defined(NDEBUG)
std::cout << "Timer::set_timer(" << delay << ") refusing to act because underlying event was stopped\n";
#endif
return;
}
m_local->timer.stop();
ev_now_update(*m_local->context.loop);
m_local->timer.start(m_local->timer_value, 0.0);

View file

@ -41,7 +41,7 @@ protected:
private:
struct LocalData;
void on_timer_ev();
void on_timer_ev() noexcept;
virtual void on_loop_stopping() noexcept override;
std::unique_ptr<LocalData> m_local;

View file

@ -37,6 +37,10 @@ EventiaThreadPool::EventiaThreadPool (eve::Eventia* eventia, std::uint32_t num_t
EventiaThreadPool::~EventiaThreadPool() = default;
void EventiaThreadPool::wait() noexcept {
this->destroy();
}
void EventiaThreadPool::set_exception (std::exception_ptr ptr) {
m_eventia->set_exception(ptr);
}

View file

@ -39,6 +39,8 @@ public:
template <typename Func, typename... Args>
auto submit(Func&& func, Args&&... args);
void wait() noexcept;
private:
void set_exception (std::exception_ptr ptr);
@ -63,7 +65,7 @@ auto EventiaThreadPool::submit(Func&& func, Args&&... args) {
}
#endif
catch (...) {
roar11::ThreadPool::destroy();
roar11::ThreadPool::invalidate();
this->set_exception(std::current_exception());
}
};

View file

@ -67,7 +67,6 @@ void test(oro::Api* api, oro::OriginsDB* db, const AppConfig& app_conf) {
std::cout << "Running with " << app_conf.worker_threads() << " worker threads\n";
eve::Eventia worker;
EventiaThreadPool pool(&worker, app_conf.worker_threads());
pool.submit(worker.event_functor());
const double ed = static_cast<double>(app_conf.fetch_extra_delay());
@ -85,7 +84,14 @@ void test(oro::Api* api, oro::OriginsDB* db, const AppConfig& app_conf) {
auto timer_shops = worker.make_event<TimerShops>(TSet{w3, ed, sf}, &pool, api, db);
auto timer_creat = worker.make_event<TimerCreators>(TSet{w4, ed, sf}, &pool, api, db);
worker.wait();
try {
pool.submit(worker.event_functor());
worker.wait();
}
catch (...) {
pool.wait();
throw;
}
#if !defined(NDEBUG)
std::cout << "all tasks completed\n";
#endif

View file

@ -184,7 +184,7 @@ namespace roar11
return submit(std::move(boundTask));
}
protected:
protected:
/**
* Invalidates the queue and joins all running threads.
*/
@ -195,6 +195,11 @@ namespace roar11
join();
}
void invalidate() noexcept
{
m_workQueue.invalidate();
}
private:
/**
* Constantly running function each thread uses to acquire work items from the queue.

View file

@ -76,8 +76,11 @@ namespace roar11
void push(T value)
{
std::lock_guard<std::mutex> lock{m_mutex};
m_queue.push(std::move(value));
m_condition.notify_one();
if (m_valid)
{
m_queue.push(std::move(value));
m_condition.notify_one();
}
}
/**

View file

@ -99,7 +99,7 @@ inline void TimerOroApi<Op>::fetch_data (oro::SourceFormat store_mode) {
//server received too many requests, give up for now
}
else {
throw err;
throw;
}
}
#elif defined(OROTOOL_WITH_NAP)
@ -108,13 +108,10 @@ inline void TimerOroApi<Op>::fetch_data (oro::SourceFormat store_mode) {
if (429 == status_code) {
}
else {
this->set_exception(std::current_exception());
throw;
}
}
#endif
catch (...) {
this->set_exception(std::current_exception());
}
if (429 == status_code) {