Refactor event code.

This splits the thread pool code from the event code, and
class names are now wrong (I will update them in the next
commit). Also, this disregards the roar11 vs tf library
thing, taskflow was not working anyways and I will probably
remove all the conditional compilation stuff about it for
now.
This commit is contained in:
King_DuckZ 2020-08-09 19:26:18 +01:00
parent 9f0fed98e2
commit 7fb055cd40
9 changed files with 281 additions and 229 deletions

78
src/eventia/eventia.cpp Normal file
View file

@ -0,0 +1,78 @@
#include "eventia.hpp"
#include "private/context.hpp"
#include <ev++.h>
#include <mutex>
#include <iostream>
#include <cassert>
namespace eve {
namespace {
void lock_mutex_libev (struct ::ev_loop* ev) noexcept {
Context* const ctx = static_cast<Context*>(ev_userdata(ev));
ctx->ev_mutex->lock();
}
void unlock_mutex_libev (struct ::ev_loop* ev) noexcept {
Context* const ctx = static_cast<Context*>(ev_userdata(ev));
ctx->ev_mutex->unlock();
}
} //unnamed namespace
struct EvThreadPool::LocalData {
LocalData();
std::mutex ev_mutex;
ev::async async;
ev::default_loop loop;
Context context;
};
EvThreadPool::LocalData::LocalData() :
loop(ev::AUTO),
context{ &loop, &ev_mutex, &async }
{
}
EvThreadPool::EvThreadPool() :
m_local(std::make_unique<LocalData>())
{
assert(nullptr == ev_userdata(m_local->loop));
m_local->async.set(m_local->loop);
m_local->async.set<EvThreadPool, &EvThreadPool::do_nothing>(this);
m_local->async.start();
ev_set_userdata(m_local->loop, &m_local->context);
ev_set_loop_release_cb(m_local->loop, &eve::unlock_mutex_libev, &eve::lock_mutex_libev);
}
EvThreadPool::~EvThreadPool() noexcept = default;
std::function<void()> EvThreadPool::event_functor() {
return std::function<void()>([this]() {
std::unique_lock<std::mutex> lock(m_local->ev_mutex);
m_local->loop.run(0);
});
}
void EvThreadPool::do_nothing() {
}
void EvThreadPool::lock_mutex_libev() noexcept {
try {
m_local->ev_mutex.lock();
}
catch (const std::system_error&) {
assert(false);
std::cerr << "Locking mutex failed, this will probably result in bad program behaviour\n";
}
}
void EvThreadPool::unlock_mutex_libev() noexcept {
m_local->ev_mutex.unlock();
}
const Context& EvThreadPool::context() {
return m_local->context;
}
} //namespace eve

35
src/eventia/eventia.hpp Normal file
View file

@ -0,0 +1,35 @@
#pragma once
#include <memory>
#include <functional>
namespace eve {
class Context;
class EvThreadPool {
friend class EvTimerTask;
public:
EvThreadPool();
~EvThreadPool() noexcept;
std::function<void()> event_functor();
void do_nothing();
template <typename T, typename... Args>
T make_timer(Args&&... args) {
return T{this->context(), std::forward<Args>(args)...};
}
private:
struct LocalData;
void lock_mutex_libev() noexcept;
void unlock_mutex_libev() noexcept;
const Context& context();
std::unique_ptr<LocalData> m_local;
};
} //namespace eve

View file

@ -0,0 +1,28 @@
#pragma once
#include <ev++.h>
#include <mutex>
namespace ev {
struct loop_ref;
struct async;
} //namespace ev
namespace eve {
struct Context {
Context (
ev::loop_ref* l,
std::mutex* m,
ev::async* a
) :
loop(l),
ev_mutex(m),
async(a)
{
}
ev::loop_ref* loop;
std::mutex* ev_mutex;
ev::async* async;
};
} //namespace eve

47
src/eventia/timer.cpp Normal file
View file

@ -0,0 +1,47 @@
#include "timer.hpp"
#include "private/context.hpp"
#include <ev++.h>
#include <mutex>
#include <cassert>
#include <iostream>
namespace eve {
struct EvTimerTask::LocalData {
explicit LocalData (const Context& ctx);
ev::timer timer;
Context context;
};
EvTimerTask::LocalData::LocalData (const Context& ctx) :
context(ctx)
{
}
EvTimerTask::EvTimerTask (double delay, const Context& ctx) :
m_local(std::make_unique<LocalData>(ctx))
{
assert(m_local->context.ev_mutex);
m_local->timer.set(*m_local->context.loop);
m_local->timer.set<EvTimerTask, &EvTimerTask::on_timer_ev>(this);
this->set_timer(delay);
}
EvTimerTask::~EvTimerTask() noexcept = default;
void EvTimerTask::on_timer_ev() {
std::cout << "EvTimerTask::on_timer_ev()\n";
m_local->timer.stop();
this->on_timer();
}
void EvTimerTask::set_timer (double delay) {
std::unique_lock<std::mutex> lock(*m_local->context.ev_mutex);
ev_now_update(*m_local->context.loop);
m_local->timer.start(delay, 0.0);
m_local->context.async->send();
}
} //namespace eve

28
src/eventia/timer.hpp Normal file
View file

@ -0,0 +1,28 @@
#pragma once
#include <memory>
namespace eve {
class Context;
class EvTimerTask {
public:
EvTimerTask (double delay, const Context& ctx);
virtual ~EvTimerTask() noexcept;
virtual void on_timer() = 0;
//RunningPool::subpool_type& subflow();
protected:
void set_timer (double delay);
private:
struct LocalData;
void on_timer_ev();
std::unique_ptr<LocalData> m_local;
};
} //namespace eve

View file

@ -1,159 +1,72 @@
#include "evloop.hpp"
#include "html_fetch_task.hpp"
#include <ev++.h>
#include <thread>
#include <vector>
#include "eventia/eventia.hpp"
#include "eventia/timer.hpp"
#include "orotool_config.hpp"
#if THREADPOOL == THREADPOOL_TASKFLOW
# include <taskflow/taskflow.hpp>
#elif THREADPOOL == THREADPOOL_ROAR11
# include "roar11/ThreadPool.hpp"
#endif
#include <iostream>
#include <algorithm>
#include <random>
#include <functional>
#include <string>
#include <future>
#include <cassert>
#if THREADPOOL == THREADPOOL_TASKFLOW
namespace tf {
class Taskflow;
class Subflow;
} //namespace tf
#elif THREADPOOL == THREADPOOL_ROAR11
namespace roar11 {
class ThreadPool;
} //namespace roar11
#endif
namespace duck {
namespace {
class KeepaliveTimer : public ev::timer {
public:
KeepaliveTimer (ev::loop_ref& loop, RunningPool::subpool_type*) {
this->set(loop);
this->set<KeepaliveTimer, &KeepaliveTimer::on_timer_ev>(this);
ev::timer::start(5.0, 5.0);
}
void on_timer_ev() {
}
};
} //unnamed namespace
void lock_mutex_libev (struct ev_loop* ev) noexcept {
EvThreadPool* const obj = static_cast<EvThreadPool*>(ev_userdata(ev));
obj->lock_mutex_libev();
}
void unlock_mutex_libev (struct ev_loop* ev) noexcept {
EvThreadPool* const obj = static_cast<EvThreadPool*>(ev_userdata(ev));
obj->unlock_mutex_libev();
}
RunningPool::RunningPool (ev::default_loop* loop, RunningPool::subpool_type* sub, std::mutex* mtx, ev::async* async) :
m_loop(loop),
m_subflow(sub),
m_ev_mutex(mtx),
m_async(async)
{
assert(m_ev_mutex);
std::cout << "Created RunningPool(" << m_loop << ", " << m_subflow << ")\n";
}
EvThreadPool::EvThreadPool() :
m_loop(ev::AUTO)
#if THREADPOOL == THREADPOOL_ROAR11
, m_pool(std::max(2U, std::thread::hardware_concurrency()) - 1)
typedef roar11::ThreadPool threadpool_type;
typedef roar11::ThreadPool subpool_type;
#elif THREADPOOL == THREADPOOL_TASKFLOW
typedef tf::Taskflow threadpool_type;
typedef tf::Subflow subpool_type;
#endif
{
assert(nullptr == ev_userdata(m_loop));
m_async.set(m_loop);
m_async.set<EvThreadPool, &EvThreadPool::do_nothing>(this);
m_async.start();
ev_set_userdata(m_loop, this);
ev_set_loop_release_cb(m_loop, &duck::unlock_mutex_libev, &duck::lock_mutex_libev);
}
RunningPool EvThreadPool::start() {
//class KeepaliveTimer : public ev::timer {
//public:
// KeepaliveTimer (ev::loop_ref& loop, RunningPool::subpool_type*) {
// this->set(loop);
// this->set<KeepaliveTimer, &KeepaliveTimer::on_timer_ev>(this);
// ev::timer::start(5.0, 5.0);
// }
// void on_timer_ev() {
// }
//};
#if THREADPOOL == THREADPOOL_TASKFLOW
std::promise<RunningPool::subpool_type*> subflow_promise;
m_taskflow.emplace([this,&subflow_promise](RunningPool::subpool_type& sub){
subflow_promise.set_value(&sub);
//KeepaliveTimer keepalive{m_loop, &sub};
m_ev_mutex.lock();
m_loop.run(0);
m_ev_mutex.unlock();
});
m_executor.run(m_taskflow);
std::future<RunningPool::subpool_type*> future_subflow = subflow_promise.get_future();
return RunningPool{&m_loop, future_subflow.get(), &m_ev_mutex, &m_async};
#else
std::cout << "Submitting run job to thread pool\n";
m_pool.submit([this]() {
//KeepaliveTimer keepalive{m_loop, &m_pool};
m_ev_mutex.lock();
m_loop.run(0);
m_ev_mutex.unlock();
});
std::cout << "Work submitted, returing RunningPool object\n";
return RunningPool{&m_loop, &m_pool, &m_ev_mutex, &m_async};
#endif
}
void EvThreadPool::join() {
#if THREADPOOL == THREADPOOL_TASKFLOW
m_executor.wait_for_all();
#else
m_pool.join();
#endif
void join(tf::Executor& executor) {
executor.wait_for_all();
std::cout << "all tasks completed\n";
}
ev::loop_ref& EvThreadPool::loop() {
return m_loop;
#else
void join(threadpool_type& pool) {
pool.join();
std::cout << "all tasks completed\n";
}
#endif
void EvThreadPool::do_nothing() {
}
void EvThreadPool::lock_mutex_libev() noexcept {
try {
m_ev_mutex.lock();
}
catch (const std::system_error&) {
assert(false);
std::cerr << "Locking mutex failed, this will probably result in bad program behaviour\n";
}
}
void EvThreadPool::unlock_mutex_libev() noexcept {
m_ev_mutex.unlock();
}
EvTimerTask::EvTimerTask (double delay, ev::loop_ref& loop, RunningPool::subpool_type* subflow, std::mutex* ev_mtx, ev::async* async) :
m_subpool(subflow),
m_ev_mutex(ev_mtx),
m_loop(&loop),
m_async(async)
{
assert(ev_mtx);
this->set(loop);
this->set<EvTimerTask, &EvTimerTask::on_timer_ev>(this);
set_timer(delay);
}
void EvTimerTask::set_timer (double delay) {
std::unique_lock<std::mutex> lock(*m_ev_mutex);
ev_now_update(*m_loop);
ev::timer::start(delay, 0.0);
m_async->send();
}
RunningPool::subpool_type& EvTimerTask::subflow() {
return *m_subpool;
}
void EvTimerTask::on_timer_ev() {
std::cout << "EvTimerTask::on_timer_ev()\n";
ev::timer::stop();
this->on_timer();
}
} //unnamed namespace
void test() {
EvThreadPool worker;
auto running_pool = worker.start();
roar11::ThreadPool pool(std::max(2U, std::thread::hardware_concurrency()) - 1);
eve::EvThreadPool worker;
pool.submit(worker.event_functor());
std::cout << "Instantiating html timer\n";
auto fetcher = running_pool.make_timer<HtmlFetchTimer>("test_url_lol");
auto fetcher = worker.make_timer<HtmlFetchTimer>(&pool, "test_url_lol");
worker.join();
join(pool);
}
} //namespace duck

View file

@ -1,86 +1,7 @@
#pragma once
#include "orotool_config.hpp"
#include <ev++.h>
#if THREADPOOL == THREADPOOL_TASKFLOW
# include <taskflow/taskflow.hpp>
#elif THREADPOOL == THREADPOOL_ROAR11
# include "roar11/ThreadPool.hpp"
#endif
#include <mutex>
namespace duck {
class RunningPool {
public:
#if THREADPOOL == THREADPOOL_ROAR11
typedef roar11::ThreadPool threadpool_type;
typedef roar11::ThreadPool subpool_type;
#elif THREADPOOL == THREADPOOL_TASKFLOW
typedef tf::Taskflow threadpool_type;
typedef tf::Subflow subpool_type;
#endif
RunningPool (ev::default_loop* loop, subpool_type* sub, std::mutex* mtx, ev::async* async);
template <typename T, typename... Args>
T make_timer(Args&&... args) {
return T{*m_loop, m_subflow, m_ev_mutex, m_async, std::forward<Args>(args)...};
}
private:
ev::default_loop* m_loop;
subpool_type* m_subflow;
std::mutex* m_ev_mutex;
ev::async* m_async;
};
class EvThreadPool {
friend class EvTimerTask;
friend void lock_mutex_libev(struct ev_loop*) noexcept;
friend void unlock_mutex_libev(struct ev_loop*) noexcept;
public:
typedef std::thread thread_t;
EvThreadPool();
RunningPool start();
void join();
ev::loop_ref& loop();
void do_nothing();
private:
void lock_mutex_libev() noexcept;
void unlock_mutex_libev() noexcept;
std::mutex m_ev_mutex;
ev::async m_async;
ev::default_loop m_loop;
RunningPool::threadpool_type m_pool;
#if THREADPOOL == THREADPOOL_TASKFLOW
tf::Executor m_executor;
#endif
};
class EvTimerTask : public ev::timer {
public:
EvTimerTask (double delay, ev::loop_ref&, RunningPool::subpool_type*, std::mutex* ev_mtx, ev::async* async);
virtual ~EvTimerTask() noexcept = default;
virtual void on_timer() = 0;
RunningPool::subpool_type& subflow();
protected:
void set_timer (double delay);
private:
void on_timer_ev();
RunningPool::subpool_type* m_subpool;
std::mutex* m_ev_mutex;
ev::loop_ref* m_loop;
ev::async* m_async;
};
void test();
} //namespace duck

View file

@ -1,21 +1,24 @@
#pragma once
#include "evloop.hpp"
#include "orotool_config.hpp"
#include "eventia/timer.hpp"
#include "roar11/ThreadPool.hpp"
#include <thread>
#include <chrono>
#include <random>
#include <functional>
#include <iostream>
namespace duck {
auto time_rand = std::bind(std::uniform_int_distribution<int>(2, 8), std::mt19937(std::time(0)));
class HtmlFetchTimer : public EvTimerTask {
class HtmlFetchTimer : public eve::EvTimerTask {
public:
HtmlFetchTimer (HtmlFetchTimer&&) = default;
HtmlFetchTimer (const HtmlFetchTimer&) = delete;
HtmlFetchTimer (ev::loop_ref& loop, RunningPool::subpool_type* subflow, std::mutex* mtx, ev::async* async, std::string&& url) :
EvTimerTask(3.0, loop, subflow, mtx, async),
m_url(std::move(url))
HtmlFetchTimer (const eve::Context& ctx, roar11::ThreadPool* pool, std::string&& url) :
EvTimerTask(3.0, ctx),
m_url(std::move(url)),
m_pool(pool)
{
}
@ -29,15 +32,12 @@ namespace duck {
std::cout << "Now starting next timer for " << new_delay << " seconds\n";
set_timer(new_delay);
};
#if THREADPOOL == THREADPOOL_TASKFLOW
subflow().emplace(work_chunk);
std::cout << "subflow task enqueued to " << &subflow() << '\n';
#else
subflow().submit(work_chunk);
#endif
m_pool->submit(work_chunk);
}
private:
std::string m_url;
roar11::ThreadPool* m_pool;
};
} //namespace duck

View file

@ -51,6 +51,8 @@ executable(meson.project_name(),
'oro/items.cpp',
'oro/shops.cpp',
'evloop.cpp',
'eventia/eventia.cpp',
'eventia/timer.cpp',
project_config_file,
install: true,
dependencies: lib_deps,