Get the concept working

A bit hackish because see ThreadPool.hpp.
Also code is dirty and it can't be stopped once running.
Building up from here.
This commit is contained in:
King_DuckZ 2020-08-09 17:29:37 +01:00
parent 399f60190f
commit 9f0fed98e2
8 changed files with 282 additions and 78 deletions

View file

@ -1 +1,2 @@
option('base_url', type: 'string', value: 'https://api.originsro.org') option('base_url', type: 'string', value: 'https://api.originsro.org')
option('thread_pool', type: 'combo', description: '', choices: ['roar11', 'taskflow'], value: 'roar11')

View file

@ -1,5 +1,10 @@
#pragma once #pragma once
#define THREADPOOL_ROAR11 1
#define THREADPOOL_TASKFLOW 2
#define THREADPOOL THREADPOOL_@THREAD_POOL_TYPE@
namespace duck { namespace duck {
constexpr const char g_base_url[] = "@BASE_URL@"; constexpr const char g_base_url[] = "@BASE_URL@";

View file

@ -1,4 +1,5 @@
#include "evloop.hpp" #include "evloop.hpp"
#include "html_fetch_task.hpp"
#include <ev++.h> #include <ev++.h>
#include <thread> #include <thread>
#include <vector> #include <vector>
@ -7,79 +8,152 @@
#include <random> #include <random>
#include <functional> #include <functional>
#include <string> #include <string>
#include <future>
#include <cassert>
namespace duck { namespace duck {
namespace { namespace {
auto time_rand = std::bind(std::uniform_int_distribution<int>(2, 8), std::mt19937(std::time(0))); class KeepaliveTimer : public ev::timer {
class HtmlFetchTimer : public EvTimerTask {
public: public:
HtmlFetchTimer (ev::loop_ref& loop, tf::Subflow* subflow, std::string&& url) : KeepaliveTimer (ev::loop_ref& loop, RunningPool::subpool_type*) {
EvTimerTask(static_cast<double>(time_rand()), loop, subflow), this->set(loop);
m_url(std::move(url)) this->set<KeepaliveTimer, &KeepaliveTimer::on_timer_ev>(this);
{ ev::timer::start(5.0, 5.0);
} }
virtual void on_timer() override { void on_timer_ev() {
subflow().emplace([this]() {std::cout << "Timer elapsed for " << m_url << "!\n";});
} }
private:
std::string m_url;
};
class EvThreadPool {
public:
typedef std::thread thread_t;
EvThreadPool() :
m_loop(ev::AUTO)
{
}
void start() {
m_taskflow.emplace([this](tf::Subflow& subflow){main_loop(subflow);});
m_executor.run(m_taskflow);
}
void join() {
m_executor.wait_for_all();
std::cout << "all tasks completed\n";
}
private:
void main_loop (tf::Subflow& subflow) {
m_timer = std::make_unique<HtmlFetchTimer>(m_loop, &subflow, "lalalala");
this->m_loop.run(0);
}
ev::default_loop m_loop;
tf::Taskflow m_taskflow;
tf::Executor m_executor;
std::unique_ptr<HtmlFetchTimer> m_timer;
}; };
} //unnamed namespace } //unnamed namespace
EvTimerTask::EvTimerTask (double delay, ev::loop_ref& loop, tf::Subflow* subflow) : void lock_mutex_libev (struct ev_loop* ev) noexcept {
m_subflow(subflow) EvThreadPool* const obj = static_cast<EvThreadPool*>(ev_userdata(ev));
{ obj->lock_mutex_libev();
this->set(loop);
this->set<EvTimerTask, &EvTimerTask::on_timer_ev>(this);
ev::timer::start(delay, 0.0);
} }
tf::Subflow& EvTimerTask::subflow() { void unlock_mutex_libev (struct ev_loop* ev) noexcept {
return *m_subflow; EvThreadPool* const obj = static_cast<EvThreadPool*>(ev_userdata(ev));
obj->unlock_mutex_libev();
}
RunningPool::RunningPool (ev::default_loop* loop, RunningPool::subpool_type* sub, std::mutex* mtx, ev::async* async) :
m_loop(loop),
m_subflow(sub),
m_ev_mutex(mtx),
m_async(async)
{
assert(m_ev_mutex);
std::cout << "Created RunningPool(" << m_loop << ", " << m_subflow << ")\n";
}
EvThreadPool::EvThreadPool() :
m_loop(ev::AUTO)
#if THREADPOOL == THREADPOOL_ROAR11
, m_pool(std::max(2U, std::thread::hardware_concurrency()) - 1)
#endif
{
assert(nullptr == ev_userdata(m_loop));
m_async.set(m_loop);
m_async.set<EvThreadPool, &EvThreadPool::do_nothing>(this);
m_async.start();
ev_set_userdata(m_loop, this);
ev_set_loop_release_cb(m_loop, &duck::unlock_mutex_libev, &duck::lock_mutex_libev);
}
RunningPool EvThreadPool::start() {
#if THREADPOOL == THREADPOOL_TASKFLOW
std::promise<RunningPool::subpool_type*> subflow_promise;
m_taskflow.emplace([this,&subflow_promise](RunningPool::subpool_type& sub){
subflow_promise.set_value(&sub);
//KeepaliveTimer keepalive{m_loop, &sub};
m_ev_mutex.lock();
m_loop.run(0);
m_ev_mutex.unlock();
});
m_executor.run(m_taskflow);
std::future<RunningPool::subpool_type*> future_subflow = subflow_promise.get_future();
return RunningPool{&m_loop, future_subflow.get(), &m_ev_mutex, &m_async};
#else
std::cout << "Submitting run job to thread pool\n";
m_pool.submit([this]() {
//KeepaliveTimer keepalive{m_loop, &m_pool};
m_ev_mutex.lock();
m_loop.run(0);
m_ev_mutex.unlock();
});
std::cout << "Work submitted, returing RunningPool object\n";
return RunningPool{&m_loop, &m_pool, &m_ev_mutex, &m_async};
#endif
}
void EvThreadPool::join() {
#if THREADPOOL == THREADPOOL_TASKFLOW
m_executor.wait_for_all();
#else
m_pool.join();
#endif
std::cout << "all tasks completed\n";
}
ev::loop_ref& EvThreadPool::loop() {
return m_loop;
}
void EvThreadPool::do_nothing() {
}
void EvThreadPool::lock_mutex_libev() noexcept {
try {
m_ev_mutex.lock();
}
catch (const std::system_error&) {
assert(false);
std::cerr << "Locking mutex failed, this will probably result in bad program behaviour\n";
}
}
void EvThreadPool::unlock_mutex_libev() noexcept {
m_ev_mutex.unlock();
}
EvTimerTask::EvTimerTask (double delay, ev::loop_ref& loop, RunningPool::subpool_type* subflow, std::mutex* ev_mtx, ev::async* async) :
m_subpool(subflow),
m_ev_mutex(ev_mtx),
m_loop(&loop),
m_async(async)
{
assert(ev_mtx);
this->set(loop);
this->set<EvTimerTask, &EvTimerTask::on_timer_ev>(this);
set_timer(delay);
}
void EvTimerTask::set_timer (double delay) {
std::unique_lock<std::mutex> lock(*m_ev_mutex);
ev_now_update(*m_loop);
ev::timer::start(delay, 0.0);
m_async->send();
}
RunningPool::subpool_type& EvTimerTask::subflow() {
return *m_subpool;
} }
void EvTimerTask::on_timer_ev() { void EvTimerTask::on_timer_ev() {
std::cout << "EvTimerTask::on_timer_ev()\n";
ev::timer::stop();
this->on_timer(); this->on_timer();
} }
void test() { void test() {
//const auto processor_count = std::thread::hardware_concurrency(); EvThreadPool worker;
EvThreadPool worker; //(std::max(2U, processor_count) - 1); auto running_pool = worker.start();
worker.start();
std::cout << "Instantiating html timer\n";
auto fetcher = running_pool.make_timer<HtmlFetchTimer>("test_url_lol");
worker.join(); worker.join();
} }
} //namespace duck } //namespace duck

View file

@ -1,22 +1,86 @@
#pragma once #pragma once
#include "orotool_config.hpp"
#include <ev++.h> #include <ev++.h>
#include <taskflow/taskflow.hpp> #if THREADPOOL == THREADPOOL_TASKFLOW
# include <taskflow/taskflow.hpp>
#elif THREADPOOL == THREADPOOL_ROAR11
# include "roar11/ThreadPool.hpp"
#endif
#include <mutex>
namespace duck { namespace duck {
class EvTimerTask : public ev::timer {
public:
EvTimerTask (double delay, ev::loop_ref&, tf::Subflow*);
virtual ~EvTimerTask() noexcept = default;
virtual void on_timer() = 0; class RunningPool {
tf::Subflow& subflow(); public:
#if THREADPOOL == THREADPOOL_ROAR11
typedef roar11::ThreadPool threadpool_type;
typedef roar11::ThreadPool subpool_type;
#elif THREADPOOL == THREADPOOL_TASKFLOW
typedef tf::Taskflow threadpool_type;
typedef tf::Subflow subpool_type;
#endif
private: RunningPool (ev::default_loop* loop, subpool_type* sub, std::mutex* mtx, ev::async* async);
void on_timer_ev();
tf::Subflow* m_subflow; template <typename T, typename... Args>
}; T make_timer(Args&&... args) {
return T{*m_loop, m_subflow, m_ev_mutex, m_async, std::forward<Args>(args)...};
}
private:
ev::default_loop* m_loop;
subpool_type* m_subflow;
std::mutex* m_ev_mutex;
ev::async* m_async;
};
class EvThreadPool {
friend class EvTimerTask;
friend void lock_mutex_libev(struct ev_loop*) noexcept;
friend void unlock_mutex_libev(struct ev_loop*) noexcept;
public:
typedef std::thread thread_t;
EvThreadPool();
RunningPool start();
void join();
ev::loop_ref& loop();
void do_nothing();
private:
void lock_mutex_libev() noexcept;
void unlock_mutex_libev() noexcept;
std::mutex m_ev_mutex;
ev::async m_async;
ev::default_loop m_loop;
RunningPool::threadpool_type m_pool;
#if THREADPOOL == THREADPOOL_TASKFLOW
tf::Executor m_executor;
#endif
};
class EvTimerTask : public ev::timer {
public:
EvTimerTask (double delay, ev::loop_ref&, RunningPool::subpool_type*, std::mutex* ev_mtx, ev::async* async);
virtual ~EvTimerTask() noexcept = default;
virtual void on_timer() = 0;
RunningPool::subpool_type& subflow();
protected:
void set_timer (double delay);
private:
void on_timer_ev();
RunningPool::subpool_type* m_subpool;
std::mutex* m_ev_mutex;
ev::loop_ref* m_loop;
ev::async* m_async;
};
void test();
void test();
} //namespace duck } //namespace duck

43
src/html_fetch_task.hpp Normal file
View file

@ -0,0 +1,43 @@
#pragma once
#include "evloop.hpp"
#include "orotool_config.hpp"
#include <thread>
#include <chrono>
#include <random>
namespace duck {
auto time_rand = std::bind(std::uniform_int_distribution<int>(2, 8), std::mt19937(std::time(0)));
class HtmlFetchTimer : public EvTimerTask {
public:
HtmlFetchTimer (HtmlFetchTimer&&) = default;
HtmlFetchTimer (const HtmlFetchTimer&) = delete;
HtmlFetchTimer (ev::loop_ref& loop, RunningPool::subpool_type* subflow, std::mutex* mtx, ev::async* async, std::string&& url) :
EvTimerTask(3.0, loop, subflow, mtx, async),
m_url(std::move(url))
{
}
virtual void on_timer() override {
auto work_chunk = [this]() {
using namespace std::chrono_literals;
std::cout << "Timer elapsed for " << m_url << "! Doing fake work...\n";
std::this_thread::sleep_for(5s);
const double new_delay = 10.0 + static_cast<double>(time_rand());
std::cout << "Now starting next timer for " << new_delay << " seconds\n";
set_timer(new_delay);
};
#if THREADPOOL == THREADPOOL_TASKFLOW
subflow().emplace(work_chunk);
std::cout << "subflow task enqueued to " << &subflow() << '\n';
#else
subflow().submit(work_chunk);
#endif
}
private:
std::string m_url;
};
} //namespace duck

View file

@ -1,9 +1,13 @@
#include "oro/api.hpp" #include "oro/api.hpp"
#include "orotool_config.hpp" #include "orotool_config.hpp"
#include "SQLiteCpp/SQLiteCpp.h" #include "SQLiteCpp/SQLiteCpp.h"
#include "evloop.hpp"
#include <iostream> #include <iostream>
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
duck::test();
return 0;
/*
if (2 != argc) { if (2 != argc) {
std::cerr << "Please provide your API key\n"; std::cerr << "Please provide your API key\n";
return 2; return 2;
@ -67,4 +71,5 @@ int main(int argc, char* argv[]) {
} }
return 0; return 0;
*/
} }

View file

@ -11,7 +11,9 @@ sqlitecpp_dep = dependency('sqlitecpp', version: '>=3.0.0',
ev_dep = dependency('libev', version: '>=4.31') ev_dep = dependency('libev', version: '>=4.31')
threads_dep = dependency('threads') threads_dep = dependency('threads')
taskflow_dep = dependency('Cpp-Taskflow', version: '>=2.4.0', method: 'cmake') if get_option('thread_pool') == 'taskflow'
taskflow_dep = dependency('Cpp-Taskflow', version: '>=2.4.0', method: 'cmake')
endif
base_url = get_option('base_url').strip() base_url = get_option('base_url').strip()
if not base_url.endswith('/') if not base_url.endswith('/')
@ -20,12 +22,27 @@ endif
conf = configuration_data() conf = configuration_data()
conf.set('BASE_URL', base_url) conf.set('BASE_URL', base_url)
if get_option('thread_pool') == 'roar11'
conf.set('THREAD_POOL_TYPE', 'ROAR11')
elif get_option('thread_pool') == 'taskflow'
conf.set('THREAD_POOL_TYPE', 'TASKFLOW')
endif
project_config_file = configure_file( project_config_file = configure_file(
input: 'config.hpp.in', input: 'config.hpp.in',
output: meson.project_name() + '_config.hpp', output: meson.project_name() + '_config.hpp',
configuration: conf, configuration: conf,
) )
lib_deps = [
restc_cpp_dep,
sqlitecpp_dep,
ev_dep,
threads_dep,
]
if get_option('thread_pool') == 'taskflow'
lib_deps += [taskflow_dep]
endif
executable(meson.project_name(), executable(meson.project_name(),
'main.cpp', 'main.cpp',
'oro/datatypes.cpp', 'oro/datatypes.cpp',
@ -36,12 +53,7 @@ executable(meson.project_name(),
'evloop.cpp', 'evloop.cpp',
project_config_file, project_config_file,
install: true, install: true,
dependencies: [ dependencies: lib_deps,
restc_cpp_dep,
sqlitecpp_dep,
ev_dep,
threads_dep,
taskflow_dep,
],
include_directories: date_incdir, include_directories: date_incdir,
cpp_args: ['-DEV_USE_STDEXCEPT'],
) )

View file

@ -90,10 +90,10 @@ namespace roar11
TaskFuture& operator=(TaskFuture&& other) = default; TaskFuture& operator=(TaskFuture&& other) = default;
~TaskFuture(void) ~TaskFuture(void)
{ {
if(m_future.valid()) //if(m_future.valid())
{ //{
m_future.get(); // m_future.get();
} //}
} }
auto get(void) auto get(void)