diff --git a/src/eventia/eventia.cpp b/src/eventia/eventia.cpp new file mode 100644 index 0000000..e8d0356 --- /dev/null +++ b/src/eventia/eventia.cpp @@ -0,0 +1,78 @@ +#include "eventia.hpp" +#include "private/context.hpp" +#include +#include +#include +#include + +namespace eve { + +namespace { + void lock_mutex_libev (struct ::ev_loop* ev) noexcept { + Context* const ctx = static_cast(ev_userdata(ev)); + ctx->ev_mutex->lock(); + } + + void unlock_mutex_libev (struct ::ev_loop* ev) noexcept { + Context* const ctx = static_cast(ev_userdata(ev)); + ctx->ev_mutex->unlock(); + } +} //unnamed namespace + +struct EvThreadPool::LocalData { + LocalData(); + + std::mutex ev_mutex; + ev::async async; + ev::default_loop loop; + Context context; +}; + +EvThreadPool::LocalData::LocalData() : + loop(ev::AUTO), + context{ &loop, &ev_mutex, &async } +{ +} + +EvThreadPool::EvThreadPool() : + m_local(std::make_unique()) +{ + assert(nullptr == ev_userdata(m_local->loop)); + m_local->async.set(m_local->loop); + m_local->async.set(this); + m_local->async.start(); + ev_set_userdata(m_local->loop, &m_local->context); + ev_set_loop_release_cb(m_local->loop, &eve::unlock_mutex_libev, &eve::lock_mutex_libev); +} + +EvThreadPool::~EvThreadPool() noexcept = default; + +std::function EvThreadPool::event_functor() { + return std::function([this]() { + std::unique_lock lock(m_local->ev_mutex); + m_local->loop.run(0); + }); +} + +void EvThreadPool::do_nothing() { +} + +void EvThreadPool::lock_mutex_libev() noexcept { + try { + m_local->ev_mutex.lock(); + } + catch (const std::system_error&) { + assert(false); + std::cerr << "Locking mutex failed, this will probably result in bad program behaviour\n"; + } +} + +void EvThreadPool::unlock_mutex_libev() noexcept { + m_local->ev_mutex.unlock(); +} + +const Context& EvThreadPool::context() { + return m_local->context; +} + +} //namespace eve diff --git a/src/eventia/eventia.hpp b/src/eventia/eventia.hpp new file mode 100644 index 0000000..665e62b --- /dev/null +++ b/src/eventia/eventia.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include +#include + +namespace eve { + +class Context; + +class EvThreadPool { + friend class EvTimerTask; +public: + EvThreadPool(); + ~EvThreadPool() noexcept; + + std::function event_functor(); + void do_nothing(); + + template + T make_timer(Args&&... args) { + return T{this->context(), std::forward(args)...}; + } + + +private: + struct LocalData; + + void lock_mutex_libev() noexcept; + void unlock_mutex_libev() noexcept; + const Context& context(); + + std::unique_ptr m_local; +}; + +} //namespace eve diff --git a/src/eventia/private/context.hpp b/src/eventia/private/context.hpp new file mode 100644 index 0000000..9831c2e --- /dev/null +++ b/src/eventia/private/context.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include +#include + +namespace ev { + struct loop_ref; + struct async; +} //namespace ev + +namespace eve { + struct Context { + Context ( + ev::loop_ref* l, + std::mutex* m, + ev::async* a + ) : + loop(l), + ev_mutex(m), + async(a) + { + } + + ev::loop_ref* loop; + std::mutex* ev_mutex; + ev::async* async; + }; +} //namespace eve diff --git a/src/eventia/timer.cpp b/src/eventia/timer.cpp new file mode 100644 index 0000000..90eea3c --- /dev/null +++ b/src/eventia/timer.cpp @@ -0,0 +1,47 @@ +#include "timer.hpp" +#include "private/context.hpp" +#include +#include +#include +#include + +namespace eve { + +struct EvTimerTask::LocalData { + explicit LocalData (const Context& ctx); + + ev::timer timer; + Context context; +}; + +EvTimerTask::LocalData::LocalData (const Context& ctx) : + context(ctx) +{ +} + +EvTimerTask::EvTimerTask (double delay, const Context& ctx) : + m_local(std::make_unique(ctx)) +{ + assert(m_local->context.ev_mutex); + + m_local->timer.set(*m_local->context.loop); + m_local->timer.set(this); + this->set_timer(delay); +} + +EvTimerTask::~EvTimerTask() noexcept = default; + +void EvTimerTask::on_timer_ev() { + std::cout << "EvTimerTask::on_timer_ev()\n"; + m_local->timer.stop(); + this->on_timer(); +} + +void EvTimerTask::set_timer (double delay) { + std::unique_lock lock(*m_local->context.ev_mutex); + ev_now_update(*m_local->context.loop); + m_local->timer.start(delay, 0.0); + m_local->context.async->send(); +} + +} //namespace eve diff --git a/src/eventia/timer.hpp b/src/eventia/timer.hpp new file mode 100644 index 0000000..d148deb --- /dev/null +++ b/src/eventia/timer.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include + +namespace eve { + +class Context; + +class EvTimerTask { +public: + EvTimerTask (double delay, const Context& ctx); + virtual ~EvTimerTask() noexcept; + + virtual void on_timer() = 0; + //RunningPool::subpool_type& subflow(); + +protected: + void set_timer (double delay); + +private: + struct LocalData; + + void on_timer_ev(); + + std::unique_ptr m_local; +}; + +} //namespace eve diff --git a/src/evloop.cpp b/src/evloop.cpp index 01060fc..4bd211e 100644 --- a/src/evloop.cpp +++ b/src/evloop.cpp @@ -1,159 +1,72 @@ #include "evloop.hpp" #include "html_fetch_task.hpp" -#include -#include -#include +#include "eventia/eventia.hpp" +#include "eventia/timer.hpp" + +#include "orotool_config.hpp" +#if THREADPOOL == THREADPOOL_TASKFLOW +# include +#elif THREADPOOL == THREADPOOL_ROAR11 +# include "roar11/ThreadPool.hpp" +#endif + #include -#include -#include -#include -#include -#include -#include + +#if THREADPOOL == THREADPOOL_TASKFLOW +namespace tf { + class Taskflow; + class Subflow; +} //namespace tf +#elif THREADPOOL == THREADPOOL_ROAR11 +namespace roar11 { + class ThreadPool; +} //namespace roar11 +#endif namespace duck { namespace { - class KeepaliveTimer : public ev::timer { - public: - KeepaliveTimer (ev::loop_ref& loop, RunningPool::subpool_type*) { - this->set(loop); - this->set(this); - ev::timer::start(5.0, 5.0); - } - - void on_timer_ev() { - } - }; -} //unnamed namespace - -void lock_mutex_libev (struct ev_loop* ev) noexcept { - EvThreadPool* const obj = static_cast(ev_userdata(ev)); - obj->lock_mutex_libev(); -} - -void unlock_mutex_libev (struct ev_loop* ev) noexcept { - EvThreadPool* const obj = static_cast(ev_userdata(ev)); - obj->unlock_mutex_libev(); -} - -RunningPool::RunningPool (ev::default_loop* loop, RunningPool::subpool_type* sub, std::mutex* mtx, ev::async* async) : - m_loop(loop), - m_subflow(sub), - m_ev_mutex(mtx), - m_async(async) -{ - assert(m_ev_mutex); - std::cout << "Created RunningPool(" << m_loop << ", " << m_subflow << ")\n"; -} - -EvThreadPool::EvThreadPool() : - m_loop(ev::AUTO) #if THREADPOOL == THREADPOOL_ROAR11 - , m_pool(std::max(2U, std::thread::hardware_concurrency()) - 1) + typedef roar11::ThreadPool threadpool_type; + typedef roar11::ThreadPool subpool_type; +#elif THREADPOOL == THREADPOOL_TASKFLOW + typedef tf::Taskflow threadpool_type; + typedef tf::Subflow subpool_type; #endif -{ - assert(nullptr == ev_userdata(m_loop)); - m_async.set(m_loop); - m_async.set(this); - m_async.start(); - ev_set_userdata(m_loop, this); - ev_set_loop_release_cb(m_loop, &duck::unlock_mutex_libev, &duck::lock_mutex_libev); -} -RunningPool EvThreadPool::start() { + //class KeepaliveTimer : public ev::timer { + //public: + // KeepaliveTimer (ev::loop_ref& loop, RunningPool::subpool_type*) { + // this->set(loop); + // this->set(this); + // ev::timer::start(5.0, 5.0); + // } + + // void on_timer_ev() { + // } + //}; + #if THREADPOOL == THREADPOOL_TASKFLOW - std::promise subflow_promise; - m_taskflow.emplace([this,&subflow_promise](RunningPool::subpool_type& sub){ - subflow_promise.set_value(&sub); - //KeepaliveTimer keepalive{m_loop, &sub}; - m_ev_mutex.lock(); - m_loop.run(0); - m_ev_mutex.unlock(); - }); - m_executor.run(m_taskflow); - - std::future future_subflow = subflow_promise.get_future(); - return RunningPool{&m_loop, future_subflow.get(), &m_ev_mutex, &m_async}; -#else - std::cout << "Submitting run job to thread pool\n"; - m_pool.submit([this]() { - //KeepaliveTimer keepalive{m_loop, &m_pool}; - m_ev_mutex.lock(); - m_loop.run(0); - m_ev_mutex.unlock(); - }); - std::cout << "Work submitted, returing RunningPool object\n"; - return RunningPool{&m_loop, &m_pool, &m_ev_mutex, &m_async}; -#endif -} - -void EvThreadPool::join() { -#if THREADPOOL == THREADPOOL_TASKFLOW - m_executor.wait_for_all(); -#else - m_pool.join(); -#endif +void join(tf::Executor& executor) { + executor.wait_for_all(); std::cout << "all tasks completed\n"; } - -ev::loop_ref& EvThreadPool::loop() { - return m_loop; +#else +void join(threadpool_type& pool) { + pool.join(); + std::cout << "all tasks completed\n"; } +#endif -void EvThreadPool::do_nothing() { -} - -void EvThreadPool::lock_mutex_libev() noexcept { - try { - m_ev_mutex.lock(); - } - catch (const std::system_error&) { - assert(false); - std::cerr << "Locking mutex failed, this will probably result in bad program behaviour\n"; - } -} - -void EvThreadPool::unlock_mutex_libev() noexcept { - m_ev_mutex.unlock(); -} - -EvTimerTask::EvTimerTask (double delay, ev::loop_ref& loop, RunningPool::subpool_type* subflow, std::mutex* ev_mtx, ev::async* async) : - m_subpool(subflow), - m_ev_mutex(ev_mtx), - m_loop(&loop), - m_async(async) -{ - assert(ev_mtx); - - this->set(loop); - this->set(this); - set_timer(delay); -} - -void EvTimerTask::set_timer (double delay) { - std::unique_lock lock(*m_ev_mutex); - ev_now_update(*m_loop); - ev::timer::start(delay, 0.0); - m_async->send(); -} - -RunningPool::subpool_type& EvTimerTask::subflow() { - return *m_subpool; -} - -void EvTimerTask::on_timer_ev() { - std::cout << "EvTimerTask::on_timer_ev()\n"; - ev::timer::stop(); - this->on_timer(); -} +} //unnamed namespace void test() { - EvThreadPool worker; - auto running_pool = worker.start(); + roar11::ThreadPool pool(std::max(2U, std::thread::hardware_concurrency()) - 1); + eve::EvThreadPool worker; + pool.submit(worker.event_functor()); std::cout << "Instantiating html timer\n"; - auto fetcher = running_pool.make_timer("test_url_lol"); + auto fetcher = worker.make_timer(&pool, "test_url_lol"); - worker.join(); + join(pool); } } //namespace duck diff --git a/src/evloop.hpp b/src/evloop.hpp index 1ce61f3..12628cf 100644 --- a/src/evloop.hpp +++ b/src/evloop.hpp @@ -1,86 +1,7 @@ #pragma once -#include "orotool_config.hpp" -#include -#if THREADPOOL == THREADPOOL_TASKFLOW -# include -#elif THREADPOOL == THREADPOOL_ROAR11 -# include "roar11/ThreadPool.hpp" -#endif -#include - namespace duck { -class RunningPool { -public: -#if THREADPOOL == THREADPOOL_ROAR11 - typedef roar11::ThreadPool threadpool_type; - typedef roar11::ThreadPool subpool_type; -#elif THREADPOOL == THREADPOOL_TASKFLOW - typedef tf::Taskflow threadpool_type; - typedef tf::Subflow subpool_type; -#endif - - RunningPool (ev::default_loop* loop, subpool_type* sub, std::mutex* mtx, ev::async* async); - - template - T make_timer(Args&&... args) { - return T{*m_loop, m_subflow, m_ev_mutex, m_async, std::forward(args)...}; - } - -private: - ev::default_loop* m_loop; - subpool_type* m_subflow; - std::mutex* m_ev_mutex; - ev::async* m_async; -}; - -class EvThreadPool { - friend class EvTimerTask; - friend void lock_mutex_libev(struct ev_loop*) noexcept; - friend void unlock_mutex_libev(struct ev_loop*) noexcept; -public: - typedef std::thread thread_t; - EvThreadPool(); - - RunningPool start(); - void join(); - ev::loop_ref& loop(); - void do_nothing(); - -private: - void lock_mutex_libev() noexcept; - void unlock_mutex_libev() noexcept; - - std::mutex m_ev_mutex; - ev::async m_async; - ev::default_loop m_loop; - RunningPool::threadpool_type m_pool; -#if THREADPOOL == THREADPOOL_TASKFLOW - tf::Executor m_executor; -#endif -}; - -class EvTimerTask : public ev::timer { -public: - EvTimerTask (double delay, ev::loop_ref&, RunningPool::subpool_type*, std::mutex* ev_mtx, ev::async* async); - virtual ~EvTimerTask() noexcept = default; - - virtual void on_timer() = 0; - RunningPool::subpool_type& subflow(); - -protected: - void set_timer (double delay); - -private: - void on_timer_ev(); - - RunningPool::subpool_type* m_subpool; - std::mutex* m_ev_mutex; - ev::loop_ref* m_loop; - ev::async* m_async; -}; - void test(); } //namespace duck diff --git a/src/html_fetch_task.hpp b/src/html_fetch_task.hpp index 21f0bbc..338439d 100644 --- a/src/html_fetch_task.hpp +++ b/src/html_fetch_task.hpp @@ -1,21 +1,24 @@ #pragma once -#include "evloop.hpp" -#include "orotool_config.hpp" +#include "eventia/timer.hpp" +#include "roar11/ThreadPool.hpp" #include #include #include +#include +#include namespace duck { auto time_rand = std::bind(std::uniform_int_distribution(2, 8), std::mt19937(std::time(0))); - class HtmlFetchTimer : public EvTimerTask { + class HtmlFetchTimer : public eve::EvTimerTask { public: HtmlFetchTimer (HtmlFetchTimer&&) = default; HtmlFetchTimer (const HtmlFetchTimer&) = delete; - HtmlFetchTimer (ev::loop_ref& loop, RunningPool::subpool_type* subflow, std::mutex* mtx, ev::async* async, std::string&& url) : - EvTimerTask(3.0, loop, subflow, mtx, async), - m_url(std::move(url)) + HtmlFetchTimer (const eve::Context& ctx, roar11::ThreadPool* pool, std::string&& url) : + EvTimerTask(3.0, ctx), + m_url(std::move(url)), + m_pool(pool) { } @@ -29,15 +32,12 @@ namespace duck { std::cout << "Now starting next timer for " << new_delay << " seconds\n"; set_timer(new_delay); }; -#if THREADPOOL == THREADPOOL_TASKFLOW - subflow().emplace(work_chunk); - std::cout << "subflow task enqueued to " << &subflow() << '\n'; -#else - subflow().submit(work_chunk); -#endif + + m_pool->submit(work_chunk); } private: std::string m_url; + roar11::ThreadPool* m_pool; }; } //namespace duck diff --git a/src/meson.build b/src/meson.build index dd57241..6c6e33d 100644 --- a/src/meson.build +++ b/src/meson.build @@ -51,6 +51,8 @@ executable(meson.project_name(), 'oro/items.cpp', 'oro/shops.cpp', 'evloop.cpp', + 'eventia/eventia.cpp', + 'eventia/timer.cpp', project_config_file, install: true, dependencies: lib_deps,