Properly handle SIGINT to quit.

This puts ThreadPool::join() back to being private.
Eventia events now should inherit from Event, which will take
care of setting up the required callbacks so that events will
be notified about the main event loop stopping. Inheriting
from Event is currently not enforced.
This commit is contained in:
King_DuckZ 2020-08-15 14:55:59 +01:00
parent 1b65162a18
commit 34040e5af1
12 changed files with 318 additions and 35 deletions

45
src/eventia/event.cpp Normal file
View file

@ -0,0 +1,45 @@
/* Copyright 2020, Michele Santullo
* This file is part of orotool.
*
* Orotool is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Orotool is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Orotool. If not, see <http://www.gnu.org/licenses/>.
*/
#include "event.hpp"
#include "eventia.hpp"
#include "private/context.hpp"
#include <cassert>
namespace eve {
Event::Event (const Context& ctx) :
m_eventia(ctx.eventia),
m_was_stopped(false)
{
assert(m_eventia);
m_cb_id = m_eventia->register_stop_callback([this](){this->on_loop_stopping_local();});
}
Event::~Event() noexcept {
m_eventia->unregister_stop_callback(m_cb_id);
on_loop_stopping_local();
}
void Event::on_loop_stopping_local() noexcept {
if (not m_was_stopped) {
m_was_stopped = true;
this->on_loop_stopping();
}
}
} //namespace eve

40
src/eventia/event.hpp Normal file
View file

@ -0,0 +1,40 @@
/* Copyright 2020, Michele Santullo
* This file is part of orotool.
*
* Orotool is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Orotool is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Orotool. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
namespace eve {
class Context;
class Eventia;
class Event {
protected:
explicit Event(const Context& ctx);
virtual ~Event() noexcept;
virtual void on_loop_stopping() noexcept = 0;
private:
void on_loop_stopping_local() noexcept;
unsigned int m_cb_id;
Eventia* m_eventia;
bool m_was_stopped;
};
} //namespace eve

View file

@ -16,11 +16,16 @@
*/
#include "eventia.hpp"
#include "event.hpp"
#include "private/context.hpp"
#include <ev++.h>
#include <mutex>
#include <iostream>
#include <cassert>
#include <boost/container/flat_map.hpp>
#include <utility>
#include <future>
#include <stdexcept>
namespace eve {
@ -37,22 +42,26 @@ namespace {
} //unnamed namespace
struct Eventia::LocalData {
LocalData();
explicit LocalData(Eventia* eventia);
std::mutex ev_mutex;
boost::container::flat_map<unsigned int, std::function<void()>> stop_cbs;
ev::async async;
ev::default_loop loop;
Context context;
std::promise<void> loop_end_promise;
std::future<void> loop_end;
};
Eventia::LocalData::LocalData() :
Eventia::LocalData::LocalData(Eventia* eventia) :
loop(ev::AUTO),
context{ &loop, &ev_mutex, &async }
context{ &loop, &ev_mutex, &async, eventia },
loop_end(loop_end_promise.get_future())
{
}
Eventia::Eventia() :
m_local(std::make_unique<LocalData>())
m_local(std::make_unique<LocalData>(this))
{
assert(nullptr == ev_userdata(m_local->loop));
m_local->async.set(m_local->loop);
@ -62,18 +71,42 @@ Eventia::Eventia() :
ev_set_loop_release_cb(m_local->loop, &eve::unlock_mutex_libev, &eve::lock_mutex_libev);
}
Eventia::~Eventia() noexcept = default;
Eventia::~Eventia() noexcept {
try {
this->wait();
}
catch (std::runtime_error& err) {
std::cerr << "Exception in main ev loop: " << err.what() << '\n';
}
catch (...) {
std::cerr << "Unknown exception in main ev loop\n";
assert(false);
}
}
std::function<void()> Eventia::event_functor() {
return std::function<void()>([this]() {
std::unique_lock<std::mutex> lock(m_local->ev_mutex);
m_local->loop.run(0);
m_local->loop_end_promise.set_value();
});
}
void Eventia::do_nothing() {
}
void Eventia::stop() {
for (auto& stop_cb : m_local->stop_cbs) {
stop_cb.second();
}
m_local->loop.break_loop(ev::ALL);
}
void Eventia::wait() {
if (m_local->loop_end.valid())
m_local->loop_end.get();
}
void Eventia::lock_mutex_libev() noexcept {
try {
m_local->ev_mutex.lock();
@ -92,4 +125,19 @@ const Context& Eventia::context() {
return m_local->context;
}
unsigned int Eventia::register_stop_callback (std::function<void()>&& func) {
const unsigned int id = static_cast<unsigned int>(m_local->stop_cbs.size());
m_local->stop_cbs.insert(std::make_pair(id, std::move(func)));
return id;
}
void Eventia::unregister_stop_callback (unsigned int id) noexcept {
try {
m_local->stop_cbs.erase(id);
}
catch (...) {
assert(false);
}
}
} //namespace eve

View file

@ -23,9 +23,10 @@
namespace eve {
class Context;
class Event;
class Eventia {
friend class EvTimerTask;
friend class Event;
public:
Eventia();
~Eventia() noexcept;
@ -34,10 +35,12 @@ public:
void do_nothing();
template <typename T, typename... Args>
T make_timer(Args&&... args) {
T make_event(Args&&... args) {
return T{this->context(), std::forward<Args>(args)...};
}
void stop();
void wait();
private:
struct LocalData;
@ -46,6 +49,9 @@ private:
void unlock_mutex_libev() noexcept;
const Context& context();
unsigned int register_stop_callback (std::function<void()>&& cb);
void unregister_stop_callback (unsigned int id) noexcept;
std::unique_ptr<LocalData> m_local;
};

View file

@ -26,20 +26,25 @@ namespace ev {
} //namespace ev
namespace eve {
class Eventia;
struct Context {
Context (
ev::loop_ref* l,
std::mutex* m,
ev::async* a
ev::async* a,
Eventia* e
) :
loop(l),
ev_mutex(m),
async(a)
async(a),
eventia(e)
{
}
ev::loop_ref* loop;
std::mutex* ev_mutex;
ev::async* async;
Eventia* eventia;
};
} //namespace eve

69
src/eventia/signal.cpp Normal file
View file

@ -0,0 +1,69 @@
/* Copyright 2020, Michele Santullo
* This file is part of orotool.
*
* Orotool is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Orotool is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Orotool. If not, see <http://www.gnu.org/licenses/>.
*/
#include "signal.hpp"
#include "private/context.hpp"
#include <ev++.h>
#include <mutex>
#include <cassert>
namespace eve {
namespace {
} //unnamed namespace
struct Signal::LocalData {
explicit LocalData (const Context& ctx);
ev::sig signal;
Context context;
int sig_num;
};
Signal::LocalData::LocalData (const Context& ctx) :
context(ctx),
sig_num(0)
{
}
Signal::Signal (int sig, const Context& ctx) :
Event(ctx),
m_local(std::make_unique<LocalData>(ctx))
{
assert(m_local->context.ev_mutex);
m_local->signal.set(*m_local->context.loop);
m_local->signal.set<Signal, &Signal::on_signal_ev>(this);
{
std::unique_lock<std::mutex> lock(*m_local->context.ev_mutex);
m_local->signal.set(sig);
m_local->signal.start();
m_local->context.async->send();
}
}
Signal::~Signal() noexcept = default;
void Signal::on_signal_ev() {
this->on_signal();
}
void Signal::on_loop_stopping() noexcept {
m_local->signal.stop();
}
} //namespace eve

47
src/eventia/signal.hpp Normal file
View file

@ -0,0 +1,47 @@
/* Copyright 2020, Michele Santullo
* This file is part of orotool.
*
* Orotool is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Orotool is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Orotool. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "event.hpp"
#include <memory>
namespace eve {
class Context;
class Signal : public Event {
public:
Signal() = delete;
Signal(Signal&&) = default;
Signal(const Signal&) = delete;
Signal (int sig, const Context& ctx);
virtual ~Signal();
virtual void on_signal() = 0;
protected:
private:
struct LocalData;
void on_signal_ev();
virtual void on_loop_stopping() noexcept override;
std::unique_ptr<LocalData> m_local;
};
} //namespace eve

View file

@ -47,6 +47,7 @@ Timer::LocalData::LocalData (const Context& ctx) :
}
Timer::Timer (double delay, const Context& ctx) :
Event(ctx),
m_local(std::make_unique<LocalData>(ctx))
{
assert(m_local->context.ev_mutex);
@ -84,4 +85,8 @@ void Timer::set_timer (double delay) {
m_local->context.async->send();
}
void Timer::on_loop_stopping() noexcept {
m_local->timer.stop();
}
} //namespace eve

View file

@ -17,13 +17,14 @@
#pragma once
#include "event.hpp"
#include <memory>
namespace eve {
class Context;
class Timer {
class Timer : private Event {
public:
Timer() = delete;
Timer(Timer&&) = default;
@ -40,6 +41,7 @@ private:
struct LocalData;
void on_timer_ev();
virtual void on_loop_stopping() noexcept override;
std::unique_ptr<LocalData> m_local;
};

View file

@ -18,6 +18,7 @@
#include "evloop.hpp"
#include "timer_oro_api.hpp"
#include "eventia/eventia.hpp"
#include "eventia/signal.hpp"
#include "roar11/ThreadPool.hpp"
#include "oro/api.hpp"
#include <iostream>
@ -36,10 +37,20 @@ namespace {
// }
//};
void join(roar11::ThreadPool& pool) {
pool.join();
std::cout << "all tasks completed\n";
}
class SignalInt : public eve::Signal {
public:
SignalInt (const eve::Context& ctx, eve::Eventia* even) :
eve::Signal(SIGINT, ctx),
m_eventia(even)
{ }
virtual void on_signal() override {
m_eventia->stop();
}
private:
eve::Eventia* m_eventia;
};
} //unnamed namespace
@ -54,11 +65,14 @@ void test(oro::Api* api, oro::OriginsDB* db, std::size_t thread_count) {
eve::Eventia worker;
pool.submit(worker.event_functor());
auto timer_items = worker.make_timer<TimerItems>(0.5, &pool, api, db);
auto timer_icons = worker.make_timer<TimerIcons>(1.0, &pool, api, db);
auto timer_shops = worker.make_timer<TimerShops>(1.5, &pool, api, db);
auto timer_creat = worker.make_timer<TimerCreators>(2.0, &pool, api, db);
auto sig_int = worker.make_event<SignalInt>(&worker);
join(pool);
auto timer_items = worker.make_event<TimerItems>(0.5, &pool, api, db);
auto timer_icons = worker.make_event<TimerIcons>(1.0, &pool, api, db);
auto timer_shops = worker.make_event<TimerShops>(1.5, &pool, api, db);
auto timer_creat = worker.make_event<TimerCreators>(2.0, &pool, api, db);
worker.wait();
std::cout << "all tasks completed\n";
}
} //namespace duck

View file

@ -59,6 +59,8 @@ executable(meson.project_name(),
'app_config.cpp',
'oro/private/tiger.c',
'oro/private/tiger.cpp',
'eventia/signal.cpp',
'eventia/event.cpp',
project_config_file,
install: true,
dependencies: lib_deps,

View file

@ -177,22 +177,6 @@ namespace roar11
return result;
}
void join() noexcept {
for(auto& thread : m_threads)
{
if(thread.joinable())
{
try {
thread.join();
}
catch (const std::system_error& err) {
//idk?
std::cerr << "Exception in ThreadPool::join(): " << err.what() << '\n';
}
}
}
}
private:
/**
* Constantly running function each thread uses to acquire work items from the queue.
@ -209,6 +193,22 @@ namespace roar11
}
}
void join() noexcept {
for(auto& thread : m_threads)
{
if(thread.joinable())
{
try {
thread.join();
}
catch (const std::system_error& err) {
//idk?
std::cerr << "Exception in ThreadPool::join(): " << err.what() << '\n';
}
}
}
}
/**
* Invalidates the queue and joins all running threads.
*/