From 9f0fed98e2387b21cd39ba05b9c6ff95fabc1472 Mon Sep 17 00:00:00 2001 From: King_DuckZ Date: Sun, 9 Aug 2020 17:29:37 +0100 Subject: [PATCH] Get the concept working A bit hackish because see ThreadPool.hpp. Also code is dirty and it can't be stopped once running. Building up from here. --- meson_options.txt | 1 + src/config.hpp.in | 5 ++ src/evloop.cpp | 182 +++++++++++++++++++++++++++----------- src/evloop.hpp | 88 +++++++++++++++--- src/html_fetch_task.hpp | 43 +++++++++ src/main.cpp | 5 ++ src/meson.build | 28 ++++-- src/roar11/ThreadPool.hpp | 8 +- 8 files changed, 282 insertions(+), 78 deletions(-) create mode 100644 src/html_fetch_task.hpp diff --git a/meson_options.txt b/meson_options.txt index f45184c..b5130c5 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -1 +1,2 @@ option('base_url', type: 'string', value: 'https://api.originsro.org') +option('thread_pool', type: 'combo', description: '', choices: ['roar11', 'taskflow'], value: 'roar11') diff --git a/src/config.hpp.in b/src/config.hpp.in index 707a64f..38783a0 100644 --- a/src/config.hpp.in +++ b/src/config.hpp.in @@ -1,5 +1,10 @@ #pragma once +#define THREADPOOL_ROAR11 1 +#define THREADPOOL_TASKFLOW 2 + +#define THREADPOOL THREADPOOL_@THREAD_POOL_TYPE@ + namespace duck { constexpr const char g_base_url[] = "@BASE_URL@"; diff --git a/src/evloop.cpp b/src/evloop.cpp index 1707fbc..01060fc 100644 --- a/src/evloop.cpp +++ b/src/evloop.cpp @@ -1,4 +1,5 @@ #include "evloop.hpp" +#include "html_fetch_task.hpp" #include #include #include @@ -7,79 +8,152 @@ #include #include #include +#include +#include namespace duck { namespace { - auto time_rand = std::bind(std::uniform_int_distribution(2, 8), std::mt19937(std::time(0))); - - class HtmlFetchTimer : public EvTimerTask { + class KeepaliveTimer : public ev::timer { public: - HtmlFetchTimer (ev::loop_ref& loop, tf::Subflow* subflow, std::string&& url) : - EvTimerTask(static_cast(time_rand()), loop, subflow), - m_url(std::move(url)) - { + KeepaliveTimer (ev::loop_ref& loop, RunningPool::subpool_type*) { + this->set(loop); + this->set(this); + ev::timer::start(5.0, 5.0); } - virtual void on_timer() override { - subflow().emplace([this]() {std::cout << "Timer elapsed for " << m_url << "!\n";}); + void on_timer_ev() { } - - private: - std::string m_url; - }; - - class EvThreadPool { - public: - typedef std::thread thread_t; - EvThreadPool() : - m_loop(ev::AUTO) - { - } - - void start() { - m_taskflow.emplace([this](tf::Subflow& subflow){main_loop(subflow);}); - m_executor.run(m_taskflow); - } - - void join() { - m_executor.wait_for_all(); - std::cout << "all tasks completed\n"; - } - - private: - void main_loop (tf::Subflow& subflow) { - m_timer = std::make_unique(m_loop, &subflow, "lalalala"); - - this->m_loop.run(0); - } - - ev::default_loop m_loop; - tf::Taskflow m_taskflow; - tf::Executor m_executor; - std::unique_ptr m_timer; }; } //unnamed namespace -EvTimerTask::EvTimerTask (double delay, ev::loop_ref& loop, tf::Subflow* subflow) : - m_subflow(subflow) -{ - this->set(loop); - this->set(this); - ev::timer::start(delay, 0.0); +void lock_mutex_libev (struct ev_loop* ev) noexcept { + EvThreadPool* const obj = static_cast(ev_userdata(ev)); + obj->lock_mutex_libev(); } -tf::Subflow& EvTimerTask::subflow() { - return *m_subflow; +void unlock_mutex_libev (struct ev_loop* ev) noexcept { + EvThreadPool* const obj = static_cast(ev_userdata(ev)); + obj->unlock_mutex_libev(); +} + +RunningPool::RunningPool (ev::default_loop* loop, RunningPool::subpool_type* sub, std::mutex* mtx, ev::async* async) : + m_loop(loop), + m_subflow(sub), + m_ev_mutex(mtx), + m_async(async) +{ + assert(m_ev_mutex); + std::cout << "Created RunningPool(" << m_loop << ", " << m_subflow << ")\n"; +} + +EvThreadPool::EvThreadPool() : + m_loop(ev::AUTO) +#if THREADPOOL == THREADPOOL_ROAR11 + , m_pool(std::max(2U, std::thread::hardware_concurrency()) - 1) +#endif +{ + assert(nullptr == ev_userdata(m_loop)); + m_async.set(m_loop); + m_async.set(this); + m_async.start(); + ev_set_userdata(m_loop, this); + ev_set_loop_release_cb(m_loop, &duck::unlock_mutex_libev, &duck::lock_mutex_libev); +} + +RunningPool EvThreadPool::start() { +#if THREADPOOL == THREADPOOL_TASKFLOW + std::promise subflow_promise; + m_taskflow.emplace([this,&subflow_promise](RunningPool::subpool_type& sub){ + subflow_promise.set_value(&sub); + //KeepaliveTimer keepalive{m_loop, &sub}; + m_ev_mutex.lock(); + m_loop.run(0); + m_ev_mutex.unlock(); + }); + m_executor.run(m_taskflow); + + std::future future_subflow = subflow_promise.get_future(); + return RunningPool{&m_loop, future_subflow.get(), &m_ev_mutex, &m_async}; +#else + std::cout << "Submitting run job to thread pool\n"; + m_pool.submit([this]() { + //KeepaliveTimer keepalive{m_loop, &m_pool}; + m_ev_mutex.lock(); + m_loop.run(0); + m_ev_mutex.unlock(); + }); + std::cout << "Work submitted, returing RunningPool object\n"; + return RunningPool{&m_loop, &m_pool, &m_ev_mutex, &m_async}; +#endif +} + +void EvThreadPool::join() { +#if THREADPOOL == THREADPOOL_TASKFLOW + m_executor.wait_for_all(); +#else + m_pool.join(); +#endif + std::cout << "all tasks completed\n"; +} + +ev::loop_ref& EvThreadPool::loop() { + return m_loop; +} + +void EvThreadPool::do_nothing() { +} + +void EvThreadPool::lock_mutex_libev() noexcept { + try { + m_ev_mutex.lock(); + } + catch (const std::system_error&) { + assert(false); + std::cerr << "Locking mutex failed, this will probably result in bad program behaviour\n"; + } +} + +void EvThreadPool::unlock_mutex_libev() noexcept { + m_ev_mutex.unlock(); +} + +EvTimerTask::EvTimerTask (double delay, ev::loop_ref& loop, RunningPool::subpool_type* subflow, std::mutex* ev_mtx, ev::async* async) : + m_subpool(subflow), + m_ev_mutex(ev_mtx), + m_loop(&loop), + m_async(async) +{ + assert(ev_mtx); + + this->set(loop); + this->set(this); + set_timer(delay); +} + +void EvTimerTask::set_timer (double delay) { + std::unique_lock lock(*m_ev_mutex); + ev_now_update(*m_loop); + ev::timer::start(delay, 0.0); + m_async->send(); +} + +RunningPool::subpool_type& EvTimerTask::subflow() { + return *m_subpool; } void EvTimerTask::on_timer_ev() { + std::cout << "EvTimerTask::on_timer_ev()\n"; + ev::timer::stop(); this->on_timer(); } void test() { - //const auto processor_count = std::thread::hardware_concurrency(); - EvThreadPool worker; //(std::max(2U, processor_count) - 1); - worker.start(); + EvThreadPool worker; + auto running_pool = worker.start(); + + std::cout << "Instantiating html timer\n"; + auto fetcher = running_pool.make_timer("test_url_lol"); + worker.join(); } } //namespace duck diff --git a/src/evloop.hpp b/src/evloop.hpp index 7a334fc..1ce61f3 100644 --- a/src/evloop.hpp +++ b/src/evloop.hpp @@ -1,22 +1,86 @@ #pragma once +#include "orotool_config.hpp" #include -#include +#if THREADPOOL == THREADPOOL_TASKFLOW +# include +#elif THREADPOOL == THREADPOOL_ROAR11 +# include "roar11/ThreadPool.hpp" +#endif +#include namespace duck { - class EvTimerTask : public ev::timer { - public: - EvTimerTask (double delay, ev::loop_ref&, tf::Subflow*); - virtual ~EvTimerTask() noexcept = default; - virtual void on_timer() = 0; - tf::Subflow& subflow(); +class RunningPool { +public: +#if THREADPOOL == THREADPOOL_ROAR11 + typedef roar11::ThreadPool threadpool_type; + typedef roar11::ThreadPool subpool_type; +#elif THREADPOOL == THREADPOOL_TASKFLOW + typedef tf::Taskflow threadpool_type; + typedef tf::Subflow subpool_type; +#endif - private: - void on_timer_ev(); + RunningPool (ev::default_loop* loop, subpool_type* sub, std::mutex* mtx, ev::async* async); - tf::Subflow* m_subflow; - }; + template + T make_timer(Args&&... args) { + return T{*m_loop, m_subflow, m_ev_mutex, m_async, std::forward(args)...}; + } + +private: + ev::default_loop* m_loop; + subpool_type* m_subflow; + std::mutex* m_ev_mutex; + ev::async* m_async; +}; + +class EvThreadPool { + friend class EvTimerTask; + friend void lock_mutex_libev(struct ev_loop*) noexcept; + friend void unlock_mutex_libev(struct ev_loop*) noexcept; +public: + typedef std::thread thread_t; + EvThreadPool(); + + RunningPool start(); + void join(); + ev::loop_ref& loop(); + void do_nothing(); + +private: + void lock_mutex_libev() noexcept; + void unlock_mutex_libev() noexcept; + + std::mutex m_ev_mutex; + ev::async m_async; + ev::default_loop m_loop; + RunningPool::threadpool_type m_pool; +#if THREADPOOL == THREADPOOL_TASKFLOW + tf::Executor m_executor; +#endif +}; + +class EvTimerTask : public ev::timer { +public: + EvTimerTask (double delay, ev::loop_ref&, RunningPool::subpool_type*, std::mutex* ev_mtx, ev::async* async); + virtual ~EvTimerTask() noexcept = default; + + virtual void on_timer() = 0; + RunningPool::subpool_type& subflow(); + +protected: + void set_timer (double delay); + +private: + void on_timer_ev(); + + RunningPool::subpool_type* m_subpool; + std::mutex* m_ev_mutex; + ev::loop_ref* m_loop; + ev::async* m_async; +}; + +void test(); - void test(); } //namespace duck diff --git a/src/html_fetch_task.hpp b/src/html_fetch_task.hpp new file mode 100644 index 0000000..21f0bbc --- /dev/null +++ b/src/html_fetch_task.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include "evloop.hpp" +#include "orotool_config.hpp" +#include +#include +#include + +namespace duck { + auto time_rand = std::bind(std::uniform_int_distribution(2, 8), std::mt19937(std::time(0))); + + class HtmlFetchTimer : public EvTimerTask { + public: + HtmlFetchTimer (HtmlFetchTimer&&) = default; + HtmlFetchTimer (const HtmlFetchTimer&) = delete; + HtmlFetchTimer (ev::loop_ref& loop, RunningPool::subpool_type* subflow, std::mutex* mtx, ev::async* async, std::string&& url) : + EvTimerTask(3.0, loop, subflow, mtx, async), + m_url(std::move(url)) + { + } + + virtual void on_timer() override { + auto work_chunk = [this]() { + using namespace std::chrono_literals; + std::cout << "Timer elapsed for " << m_url << "! Doing fake work...\n"; + std::this_thread::sleep_for(5s); + + const double new_delay = 10.0 + static_cast(time_rand()); + std::cout << "Now starting next timer for " << new_delay << " seconds\n"; + set_timer(new_delay); + }; +#if THREADPOOL == THREADPOOL_TASKFLOW + subflow().emplace(work_chunk); + std::cout << "subflow task enqueued to " << &subflow() << '\n'; +#else + subflow().submit(work_chunk); +#endif + } + + private: + std::string m_url; + }; +} //namespace duck diff --git a/src/main.cpp b/src/main.cpp index 9c756b4..f8b6bc1 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,9 +1,13 @@ #include "oro/api.hpp" #include "orotool_config.hpp" #include "SQLiteCpp/SQLiteCpp.h" +#include "evloop.hpp" #include int main(int argc, char* argv[]) { + duck::test(); + return 0; +/* if (2 != argc) { std::cerr << "Please provide your API key\n"; return 2; @@ -67,4 +71,5 @@ int main(int argc, char* argv[]) { } return 0; + */ } diff --git a/src/meson.build b/src/meson.build index 78f9538..dd57241 100644 --- a/src/meson.build +++ b/src/meson.build @@ -11,7 +11,9 @@ sqlitecpp_dep = dependency('sqlitecpp', version: '>=3.0.0', ev_dep = dependency('libev', version: '>=4.31') threads_dep = dependency('threads') -taskflow_dep = dependency('Cpp-Taskflow', version: '>=2.4.0', method: 'cmake') +if get_option('thread_pool') == 'taskflow' + taskflow_dep = dependency('Cpp-Taskflow', version: '>=2.4.0', method: 'cmake') +endif base_url = get_option('base_url').strip() if not base_url.endswith('/') @@ -20,12 +22,27 @@ endif conf = configuration_data() conf.set('BASE_URL', base_url) +if get_option('thread_pool') == 'roar11' + conf.set('THREAD_POOL_TYPE', 'ROAR11') +elif get_option('thread_pool') == 'taskflow' + conf.set('THREAD_POOL_TYPE', 'TASKFLOW') +endif project_config_file = configure_file( input: 'config.hpp.in', output: meson.project_name() + '_config.hpp', configuration: conf, ) +lib_deps = [ + restc_cpp_dep, + sqlitecpp_dep, + ev_dep, + threads_dep, +] +if get_option('thread_pool') == 'taskflow' + lib_deps += [taskflow_dep] +endif + executable(meson.project_name(), 'main.cpp', 'oro/datatypes.cpp', @@ -36,12 +53,7 @@ executable(meson.project_name(), 'evloop.cpp', project_config_file, install: true, - dependencies: [ - restc_cpp_dep, - sqlitecpp_dep, - ev_dep, - threads_dep, - taskflow_dep, - ], + dependencies: lib_deps, include_directories: date_incdir, + cpp_args: ['-DEV_USE_STDEXCEPT'], ) diff --git a/src/roar11/ThreadPool.hpp b/src/roar11/ThreadPool.hpp index 8e38bec..d0d5b5b 100644 --- a/src/roar11/ThreadPool.hpp +++ b/src/roar11/ThreadPool.hpp @@ -90,10 +90,10 @@ namespace roar11 TaskFuture& operator=(TaskFuture&& other) = default; ~TaskFuture(void) { - if(m_future.valid()) - { - m_future.get(); - } + //if(m_future.valid()) + //{ + // m_future.get(); + //} } auto get(void)