From 7fb055cd40322e0c32699d023e0c0fdb00d0c466 Mon Sep 17 00:00:00 2001 From: King_DuckZ Date: Sun, 9 Aug 2020 19:26:18 +0100 Subject: [PATCH] Refactor event code. This splits the thread pool code from the event code, and class names are now wrong (I will update them in the next commit). Also, this disregards the roar11 vs tf library thing, taskflow was not working anyways and I will probably remove all the conditional compilation stuff about it for now. --- src/eventia/eventia.cpp | 78 +++++++++++++ src/eventia/eventia.hpp | 35 ++++++ src/eventia/private/context.hpp | 28 +++++ src/eventia/timer.cpp | 47 ++++++++ src/eventia/timer.hpp | 28 +++++ src/evloop.cpp | 189 +++++++++----------------------- src/evloop.hpp | 79 ------------- src/html_fetch_task.hpp | 24 ++-- src/meson.build | 2 + 9 files changed, 281 insertions(+), 229 deletions(-) create mode 100644 src/eventia/eventia.cpp create mode 100644 src/eventia/eventia.hpp create mode 100644 src/eventia/private/context.hpp create mode 100644 src/eventia/timer.cpp create mode 100644 src/eventia/timer.hpp diff --git a/src/eventia/eventia.cpp b/src/eventia/eventia.cpp new file mode 100644 index 0000000..e8d0356 --- /dev/null +++ b/src/eventia/eventia.cpp @@ -0,0 +1,78 @@ +#include "eventia.hpp" +#include "private/context.hpp" +#include +#include +#include +#include + +namespace eve { + +namespace { + void lock_mutex_libev (struct ::ev_loop* ev) noexcept { + Context* const ctx = static_cast(ev_userdata(ev)); + ctx->ev_mutex->lock(); + } + + void unlock_mutex_libev (struct ::ev_loop* ev) noexcept { + Context* const ctx = static_cast(ev_userdata(ev)); + ctx->ev_mutex->unlock(); + } +} //unnamed namespace + +struct EvThreadPool::LocalData { + LocalData(); + + std::mutex ev_mutex; + ev::async async; + ev::default_loop loop; + Context context; +}; + +EvThreadPool::LocalData::LocalData() : + loop(ev::AUTO), + context{ &loop, &ev_mutex, &async } +{ +} + +EvThreadPool::EvThreadPool() : + m_local(std::make_unique()) +{ + assert(nullptr == ev_userdata(m_local->loop)); + m_local->async.set(m_local->loop); + m_local->async.set(this); + m_local->async.start(); + ev_set_userdata(m_local->loop, &m_local->context); + ev_set_loop_release_cb(m_local->loop, &eve::unlock_mutex_libev, &eve::lock_mutex_libev); +} + +EvThreadPool::~EvThreadPool() noexcept = default; + +std::function EvThreadPool::event_functor() { + return std::function([this]() { + std::unique_lock lock(m_local->ev_mutex); + m_local->loop.run(0); + }); +} + +void EvThreadPool::do_nothing() { +} + +void EvThreadPool::lock_mutex_libev() noexcept { + try { + m_local->ev_mutex.lock(); + } + catch (const std::system_error&) { + assert(false); + std::cerr << "Locking mutex failed, this will probably result in bad program behaviour\n"; + } +} + +void EvThreadPool::unlock_mutex_libev() noexcept { + m_local->ev_mutex.unlock(); +} + +const Context& EvThreadPool::context() { + return m_local->context; +} + +} //namespace eve diff --git a/src/eventia/eventia.hpp b/src/eventia/eventia.hpp new file mode 100644 index 0000000..665e62b --- /dev/null +++ b/src/eventia/eventia.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include +#include + +namespace eve { + +class Context; + +class EvThreadPool { + friend class EvTimerTask; +public: + EvThreadPool(); + ~EvThreadPool() noexcept; + + std::function event_functor(); + void do_nothing(); + + template + T make_timer(Args&&... args) { + return T{this->context(), std::forward(args)...}; + } + + +private: + struct LocalData; + + void lock_mutex_libev() noexcept; + void unlock_mutex_libev() noexcept; + const Context& context(); + + std::unique_ptr m_local; +}; + +} //namespace eve diff --git a/src/eventia/private/context.hpp b/src/eventia/private/context.hpp new file mode 100644 index 0000000..9831c2e --- /dev/null +++ b/src/eventia/private/context.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include +#include + +namespace ev { + struct loop_ref; + struct async; +} //namespace ev + +namespace eve { + struct Context { + Context ( + ev::loop_ref* l, + std::mutex* m, + ev::async* a + ) : + loop(l), + ev_mutex(m), + async(a) + { + } + + ev::loop_ref* loop; + std::mutex* ev_mutex; + ev::async* async; + }; +} //namespace eve diff --git a/src/eventia/timer.cpp b/src/eventia/timer.cpp new file mode 100644 index 0000000..90eea3c --- /dev/null +++ b/src/eventia/timer.cpp @@ -0,0 +1,47 @@ +#include "timer.hpp" +#include "private/context.hpp" +#include +#include +#include +#include + +namespace eve { + +struct EvTimerTask::LocalData { + explicit LocalData (const Context& ctx); + + ev::timer timer; + Context context; +}; + +EvTimerTask::LocalData::LocalData (const Context& ctx) : + context(ctx) +{ +} + +EvTimerTask::EvTimerTask (double delay, const Context& ctx) : + m_local(std::make_unique(ctx)) +{ + assert(m_local->context.ev_mutex); + + m_local->timer.set(*m_local->context.loop); + m_local->timer.set(this); + this->set_timer(delay); +} + +EvTimerTask::~EvTimerTask() noexcept = default; + +void EvTimerTask::on_timer_ev() { + std::cout << "EvTimerTask::on_timer_ev()\n"; + m_local->timer.stop(); + this->on_timer(); +} + +void EvTimerTask::set_timer (double delay) { + std::unique_lock lock(*m_local->context.ev_mutex); + ev_now_update(*m_local->context.loop); + m_local->timer.start(delay, 0.0); + m_local->context.async->send(); +} + +} //namespace eve diff --git a/src/eventia/timer.hpp b/src/eventia/timer.hpp new file mode 100644 index 0000000..d148deb --- /dev/null +++ b/src/eventia/timer.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include + +namespace eve { + +class Context; + +class EvTimerTask { +public: + EvTimerTask (double delay, const Context& ctx); + virtual ~EvTimerTask() noexcept; + + virtual void on_timer() = 0; + //RunningPool::subpool_type& subflow(); + +protected: + void set_timer (double delay); + +private: + struct LocalData; + + void on_timer_ev(); + + std::unique_ptr m_local; +}; + +} //namespace eve diff --git a/src/evloop.cpp b/src/evloop.cpp index 01060fc..4bd211e 100644 --- a/src/evloop.cpp +++ b/src/evloop.cpp @@ -1,159 +1,72 @@ #include "evloop.hpp" #include "html_fetch_task.hpp" -#include -#include -#include +#include "eventia/eventia.hpp" +#include "eventia/timer.hpp" + +#include "orotool_config.hpp" +#if THREADPOOL == THREADPOOL_TASKFLOW +# include +#elif THREADPOOL == THREADPOOL_ROAR11 +# include "roar11/ThreadPool.hpp" +#endif + #include -#include -#include -#include -#include -#include -#include + +#if THREADPOOL == THREADPOOL_TASKFLOW +namespace tf { + class Taskflow; + class Subflow; +} //namespace tf +#elif THREADPOOL == THREADPOOL_ROAR11 +namespace roar11 { + class ThreadPool; +} //namespace roar11 +#endif namespace duck { namespace { - class KeepaliveTimer : public ev::timer { - public: - KeepaliveTimer (ev::loop_ref& loop, RunningPool::subpool_type*) { - this->set(loop); - this->set(this); - ev::timer::start(5.0, 5.0); - } - - void on_timer_ev() { - } - }; -} //unnamed namespace - -void lock_mutex_libev (struct ev_loop* ev) noexcept { - EvThreadPool* const obj = static_cast(ev_userdata(ev)); - obj->lock_mutex_libev(); -} - -void unlock_mutex_libev (struct ev_loop* ev) noexcept { - EvThreadPool* const obj = static_cast(ev_userdata(ev)); - obj->unlock_mutex_libev(); -} - -RunningPool::RunningPool (ev::default_loop* loop, RunningPool::subpool_type* sub, std::mutex* mtx, ev::async* async) : - m_loop(loop), - m_subflow(sub), - m_ev_mutex(mtx), - m_async(async) -{ - assert(m_ev_mutex); - std::cout << "Created RunningPool(" << m_loop << ", " << m_subflow << ")\n"; -} - -EvThreadPool::EvThreadPool() : - m_loop(ev::AUTO) #if THREADPOOL == THREADPOOL_ROAR11 - , m_pool(std::max(2U, std::thread::hardware_concurrency()) - 1) + typedef roar11::ThreadPool threadpool_type; + typedef roar11::ThreadPool subpool_type; +#elif THREADPOOL == THREADPOOL_TASKFLOW + typedef tf::Taskflow threadpool_type; + typedef tf::Subflow subpool_type; #endif -{ - assert(nullptr == ev_userdata(m_loop)); - m_async.set(m_loop); - m_async.set(this); - m_async.start(); - ev_set_userdata(m_loop, this); - ev_set_loop_release_cb(m_loop, &duck::unlock_mutex_libev, &duck::lock_mutex_libev); -} -RunningPool EvThreadPool::start() { + //class KeepaliveTimer : public ev::timer { + //public: + // KeepaliveTimer (ev::loop_ref& loop, RunningPool::subpool_type*) { + // this->set(loop); + // this->set(this); + // ev::timer::start(5.0, 5.0); + // } + + // void on_timer_ev() { + // } + //}; + #if THREADPOOL == THREADPOOL_TASKFLOW - std::promise subflow_promise; - m_taskflow.emplace([this,&subflow_promise](RunningPool::subpool_type& sub){ - subflow_promise.set_value(&sub); - //KeepaliveTimer keepalive{m_loop, &sub}; - m_ev_mutex.lock(); - m_loop.run(0); - m_ev_mutex.unlock(); - }); - m_executor.run(m_taskflow); - - std::future future_subflow = subflow_promise.get_future(); - return RunningPool{&m_loop, future_subflow.get(), &m_ev_mutex, &m_async}; -#else - std::cout << "Submitting run job to thread pool\n"; - m_pool.submit([this]() { - //KeepaliveTimer keepalive{m_loop, &m_pool}; - m_ev_mutex.lock(); - m_loop.run(0); - m_ev_mutex.unlock(); - }); - std::cout << "Work submitted, returing RunningPool object\n"; - return RunningPool{&m_loop, &m_pool, &m_ev_mutex, &m_async}; -#endif -} - -void EvThreadPool::join() { -#if THREADPOOL == THREADPOOL_TASKFLOW - m_executor.wait_for_all(); -#else - m_pool.join(); -#endif +void join(tf::Executor& executor) { + executor.wait_for_all(); std::cout << "all tasks completed\n"; } - -ev::loop_ref& EvThreadPool::loop() { - return m_loop; +#else +void join(threadpool_type& pool) { + pool.join(); + std::cout << "all tasks completed\n"; } +#endif -void EvThreadPool::do_nothing() { -} - -void EvThreadPool::lock_mutex_libev() noexcept { - try { - m_ev_mutex.lock(); - } - catch (const std::system_error&) { - assert(false); - std::cerr << "Locking mutex failed, this will probably result in bad program behaviour\n"; - } -} - -void EvThreadPool::unlock_mutex_libev() noexcept { - m_ev_mutex.unlock(); -} - -EvTimerTask::EvTimerTask (double delay, ev::loop_ref& loop, RunningPool::subpool_type* subflow, std::mutex* ev_mtx, ev::async* async) : - m_subpool(subflow), - m_ev_mutex(ev_mtx), - m_loop(&loop), - m_async(async) -{ - assert(ev_mtx); - - this->set(loop); - this->set(this); - set_timer(delay); -} - -void EvTimerTask::set_timer (double delay) { - std::unique_lock lock(*m_ev_mutex); - ev_now_update(*m_loop); - ev::timer::start(delay, 0.0); - m_async->send(); -} - -RunningPool::subpool_type& EvTimerTask::subflow() { - return *m_subpool; -} - -void EvTimerTask::on_timer_ev() { - std::cout << "EvTimerTask::on_timer_ev()\n"; - ev::timer::stop(); - this->on_timer(); -} +} //unnamed namespace void test() { - EvThreadPool worker; - auto running_pool = worker.start(); + roar11::ThreadPool pool(std::max(2U, std::thread::hardware_concurrency()) - 1); + eve::EvThreadPool worker; + pool.submit(worker.event_functor()); std::cout << "Instantiating html timer\n"; - auto fetcher = running_pool.make_timer("test_url_lol"); + auto fetcher = worker.make_timer(&pool, "test_url_lol"); - worker.join(); + join(pool); } } //namespace duck diff --git a/src/evloop.hpp b/src/evloop.hpp index 1ce61f3..12628cf 100644 --- a/src/evloop.hpp +++ b/src/evloop.hpp @@ -1,86 +1,7 @@ #pragma once -#include "orotool_config.hpp" -#include -#if THREADPOOL == THREADPOOL_TASKFLOW -# include -#elif THREADPOOL == THREADPOOL_ROAR11 -# include "roar11/ThreadPool.hpp" -#endif -#include - namespace duck { -class RunningPool { -public: -#if THREADPOOL == THREADPOOL_ROAR11 - typedef roar11::ThreadPool threadpool_type; - typedef roar11::ThreadPool subpool_type; -#elif THREADPOOL == THREADPOOL_TASKFLOW - typedef tf::Taskflow threadpool_type; - typedef tf::Subflow subpool_type; -#endif - - RunningPool (ev::default_loop* loop, subpool_type* sub, std::mutex* mtx, ev::async* async); - - template - T make_timer(Args&&... args) { - return T{*m_loop, m_subflow, m_ev_mutex, m_async, std::forward(args)...}; - } - -private: - ev::default_loop* m_loop; - subpool_type* m_subflow; - std::mutex* m_ev_mutex; - ev::async* m_async; -}; - -class EvThreadPool { - friend class EvTimerTask; - friend void lock_mutex_libev(struct ev_loop*) noexcept; - friend void unlock_mutex_libev(struct ev_loop*) noexcept; -public: - typedef std::thread thread_t; - EvThreadPool(); - - RunningPool start(); - void join(); - ev::loop_ref& loop(); - void do_nothing(); - -private: - void lock_mutex_libev() noexcept; - void unlock_mutex_libev() noexcept; - - std::mutex m_ev_mutex; - ev::async m_async; - ev::default_loop m_loop; - RunningPool::threadpool_type m_pool; -#if THREADPOOL == THREADPOOL_TASKFLOW - tf::Executor m_executor; -#endif -}; - -class EvTimerTask : public ev::timer { -public: - EvTimerTask (double delay, ev::loop_ref&, RunningPool::subpool_type*, std::mutex* ev_mtx, ev::async* async); - virtual ~EvTimerTask() noexcept = default; - - virtual void on_timer() = 0; - RunningPool::subpool_type& subflow(); - -protected: - void set_timer (double delay); - -private: - void on_timer_ev(); - - RunningPool::subpool_type* m_subpool; - std::mutex* m_ev_mutex; - ev::loop_ref* m_loop; - ev::async* m_async; -}; - void test(); } //namespace duck diff --git a/src/html_fetch_task.hpp b/src/html_fetch_task.hpp index 21f0bbc..338439d 100644 --- a/src/html_fetch_task.hpp +++ b/src/html_fetch_task.hpp @@ -1,21 +1,24 @@ #pragma once -#include "evloop.hpp" -#include "orotool_config.hpp" +#include "eventia/timer.hpp" +#include "roar11/ThreadPool.hpp" #include #include #include +#include +#include namespace duck { auto time_rand = std::bind(std::uniform_int_distribution(2, 8), std::mt19937(std::time(0))); - class HtmlFetchTimer : public EvTimerTask { + class HtmlFetchTimer : public eve::EvTimerTask { public: HtmlFetchTimer (HtmlFetchTimer&&) = default; HtmlFetchTimer (const HtmlFetchTimer&) = delete; - HtmlFetchTimer (ev::loop_ref& loop, RunningPool::subpool_type* subflow, std::mutex* mtx, ev::async* async, std::string&& url) : - EvTimerTask(3.0, loop, subflow, mtx, async), - m_url(std::move(url)) + HtmlFetchTimer (const eve::Context& ctx, roar11::ThreadPool* pool, std::string&& url) : + EvTimerTask(3.0, ctx), + m_url(std::move(url)), + m_pool(pool) { } @@ -29,15 +32,12 @@ namespace duck { std::cout << "Now starting next timer for " << new_delay << " seconds\n"; set_timer(new_delay); }; -#if THREADPOOL == THREADPOOL_TASKFLOW - subflow().emplace(work_chunk); - std::cout << "subflow task enqueued to " << &subflow() << '\n'; -#else - subflow().submit(work_chunk); -#endif + + m_pool->submit(work_chunk); } private: std::string m_url; + roar11::ThreadPool* m_pool; }; } //namespace duck diff --git a/src/meson.build b/src/meson.build index dd57241..6c6e33d 100644 --- a/src/meson.build +++ b/src/meson.build @@ -51,6 +51,8 @@ executable(meson.project_name(), 'oro/items.cpp', 'oro/shops.cpp', 'evloop.cpp', + 'eventia/eventia.cpp', + 'eventia/timer.cpp', project_config_file, install: true, dependencies: lib_deps,