From 0efd71a2fab485ec9886351a4f0769ab661ae150 Mon Sep 17 00:00:00 2001 From: King_DuckZ Date: Thu, 30 Jun 2016 09:35:34 +0100 Subject: [PATCH] Refactor connection code. Move all redis connection and libev code out from command. Command just wraps some useful methods in AsyncConnection now. --- src/backends/redis/CMakeLists.txt | 1 + src/backends/redis/async_connection.cpp | 212 ++++++++++++++++++++++++ src/backends/redis/async_connection.hpp | 74 +++++++++ src/backends/redis/batch.cpp | 23 +-- src/backends/redis/batch.hpp | 8 +- src/backends/redis/command.cpp | 179 ++------------------ src/backends/redis/command.hpp | 21 --- 7 files changed, 316 insertions(+), 202 deletions(-) create mode 100644 src/backends/redis/async_connection.cpp create mode 100644 src/backends/redis/async_connection.hpp diff --git a/src/backends/redis/CMakeLists.txt b/src/backends/redis/CMakeLists.txt index f5105db..d04a0b3 100644 --- a/src/backends/redis/CMakeLists.txt +++ b/src/backends/redis/CMakeLists.txt @@ -12,6 +12,7 @@ add_library(${PROJECT_NAME} SHARED batch.cpp script.cpp script_manager.cpp + async_connection.cpp ) target_include_directories(${PROJECT_NAME} SYSTEM diff --git a/src/backends/redis/async_connection.cpp b/src/backends/redis/async_connection.cpp new file mode 100644 index 0000000..cf48a01 --- /dev/null +++ b/src/backends/redis/async_connection.cpp @@ -0,0 +1,212 @@ +/* Copyright 2015, 2016, Michele Santullo + * This file is part of "dindexer". + * + * "dindexer" is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * "dindexer" is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with "dindexer". If not, see . + */ + +#include "async_connection.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace redis { + namespace { + void async_callback (ev_loop* /*parLoop*/, ev_async* /*parObject*/, int /*parRevents*/) { + } + + void async_halt_loop (ev_loop* parLoop, ev_async* /*parObject*/, int /*parRevents*/) { + ev_break(parLoop, EVBREAK_ALL); + } + + void lock_mutex_libev (ev_loop* parLoop) { + std::mutex* mtx = static_cast(ev_userdata(parLoop)); + assert(mtx); + mtx->lock(); + } + + void unlock_mutex_libev (ev_loop* parLoop) { + std::mutex* mtx = static_cast(ev_userdata(parLoop)); + assert(mtx); + mtx->unlock(); + } + } //unnamed namespace + + struct AsyncConnection::LocalData { + LocalData() : + redis_poll_thread(), + connect_processed(false), + disconnect_processed(true) + { + } + + ev_async watcher_wakeup; + ev_async watcher_halt; + std::thread redis_poll_thread; + std::mutex hiredis_mutex; + std::mutex libev_mutex; + std::condition_variable condition_connected; + std::condition_variable condition_disconnected; + std::string connect_err_msg; + std::atomic_bool connect_processed; + std::atomic_bool disconnect_processed; + }; + + void on_connect (const redisAsyncContext* parContext, int parStatus) { + assert(parContext and parContext->data); + AsyncConnection& self = *static_cast(parContext->data); + assert(parContext == self.m_conn.get()); + assert(not self.m_local_data->connect_processed); + + self.m_connection_lost = false; + self.m_connected = (parStatus == REDIS_OK); + self.m_local_data->connect_processed = true; + self.m_local_data->connect_err_msg = parContext->errstr; + self.m_local_data->condition_connected.notify_one(); + } + + void on_disconnect (const redisAsyncContext* parContext, int parStatus) { + assert(parContext and parContext->data); + AsyncConnection& self = *static_cast(parContext->data); + assert(self.m_connected); + assert(not self.m_local_data->disconnect_processed); + + self.m_connection_lost = (REDIS_ERR == parStatus); + self.m_connected = false; + self.m_local_data->disconnect_processed = true; + self.m_local_data->connect_err_msg.clear(); + self.m_local_data->condition_disconnected.notify_one(); + }; + + AsyncConnection::AsyncConnection(std::string&& parAddress, uint16_t parPort) : + m_conn(nullptr, &redisAsyncDisconnect), + m_local_data(new LocalData()), + m_libev_loop_thread(ev_loop_new(EVFLAG_NOINOTIFY), &ev_loop_destroy), + m_address(std::move(parAddress)), + m_port(parPort), + m_connected(false), + m_connection_lost(false) + { + //Init libev stuff + { + signal(SIGPIPE, SIG_IGN); + + //See: http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#THREAD_LOCKING_EXAMPLE + ev_async_init(&m_local_data->watcher_wakeup, &async_callback); + ev_async_start(m_libev_loop_thread.get(), &m_local_data->watcher_wakeup); + ev_async_init(&m_local_data->watcher_halt, &async_halt_loop); + ev_async_start(m_libev_loop_thread.get(), &m_local_data->watcher_halt); + ev_set_userdata(m_libev_loop_thread.get(), &m_local_data->libev_mutex); + ev_set_loop_release_cb(m_libev_loop_thread.get(), &unlock_mutex_libev, &lock_mutex_libev); + } + } + + AsyncConnection::~AsyncConnection() noexcept { + this->disconnect(); + this->wait_for_disconnect(); + } + + void AsyncConnection::connect() { + if (not m_conn) { + m_local_data->disconnect_processed = false; + RedisConnection conn( + (is_socket_connection() ? + redisAsyncConnectUnix(m_address.c_str()) + : + redisAsyncConnect(m_address.c_str(), m_port) + ), + &redisAsyncDisconnect + ); + if (not conn) { + std::ostringstream oss; + oss << "Unable to connect to Redis server at " << m_address << ':' << m_port; + throw std::runtime_error(oss.str()); + } + else { + conn->data = this; + } + if (REDIS_OK != redisLibevAttach(m_libev_loop_thread.get(), conn.get())) + throw std::runtime_error("Unable to set event loop"); + if (REDIS_OK != redisAsyncSetConnectCallback(conn.get(), &on_connect)) + throw std::runtime_error("Unable to set \"on_connect()\" callback"); + if (REDIS_OK != redisAsyncSetDisconnectCallback(conn.get(), &on_disconnect)) + throw std::runtime_error("Unable to set \"on_disconnect()\" callback"); + std::swap(conn, m_conn); + m_local_data->redis_poll_thread = std::thread([this]() { + m_local_data->libev_mutex.lock(); + ev_run(m_libev_loop_thread.get(), 0); + m_local_data->libev_mutex.unlock(); + }); + wakeup_event_thread(); + } + } + + void AsyncConnection::wait_for_connect() { + if (not m_local_data->connect_processed) { + std::unique_lock lk(m_local_data->hiredis_mutex); + m_local_data->condition_connected.wait(lk, [this]() { return m_local_data->connect_processed.load(); }); + assert(true == m_local_data->connect_processed); + } + } + + void AsyncConnection::disconnect() { + assert(m_local_data->redis_poll_thread.joinable()); + m_local_data->connect_processed = false; + { + std::lock_guard lock(m_local_data->libev_mutex); + assert(not ev_async_pending(&m_local_data->watcher_halt)); + ev_async_send(m_libev_loop_thread.get(), &m_local_data->watcher_halt); + m_conn.reset(); + } + m_local_data->redis_poll_thread.join(); + } + + void AsyncConnection::wait_for_disconnect() { + if (not m_local_data->disconnect_processed) { + std::unique_lock lk(m_local_data->hiredis_mutex); + m_local_data->condition_disconnected.wait(lk, [this]() { return m_local_data->disconnect_processed.load(); }); + assert(true == m_local_data->disconnect_processed); + } + } + + bool AsyncConnection::is_connected() const { + const bool connected = m_conn and not m_conn->err and m_connected; + assert(not m_connection_lost or connected); + return connected; + } + + boost::string_ref AsyncConnection::connection_error() const { + return m_local_data->connect_err_msg; + } + + void AsyncConnection::wakeup_event_thread() { + std::lock_guard lock(m_local_data->libev_mutex); + if (ev_async_pending(&m_local_data->watcher_wakeup) == false) + ev_async_send(m_libev_loop_thread.get(), &m_local_data->watcher_wakeup); + } + + std::mutex& AsyncConnection::event_mutex() { + return m_local_data->libev_mutex; + } + + bool AsyncConnection::is_socket_connection() const { + return not (m_port or m_address.empty()); + } +} //namespace redis diff --git a/src/backends/redis/async_connection.hpp b/src/backends/redis/async_connection.hpp new file mode 100644 index 0000000..7c1fce7 --- /dev/null +++ b/src/backends/redis/async_connection.hpp @@ -0,0 +1,74 @@ +/* Copyright 2015, 2016, Michele Santullo + * This file is part of "dindexer". + * + * "dindexer" is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * "dindexer" is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with "dindexer". If not, see . + */ + +#ifndef id982A651A9BC34F6E9BA935A804B3A0A4 +#define id982A651A9BC34F6E9BA935A804B3A0A4 + +#include +#include +#include +#include + +struct redisAsyncContext; +struct ev_loop; + +namespace std { + class mutex; +} //namespace std + +namespace redis { + class AsyncConnection { + friend void on_connect ( const redisAsyncContext*, int ); + friend void on_disconnect ( const redisAsyncContext*, int ); + public: + AsyncConnection ( std::string&& parAddress, uint16_t parPort ); + ~AsyncConnection ( void ) noexcept; + + void connect ( void ); + void wait_for_connect ( void ); + void disconnect ( void ); + void wait_for_disconnect ( void ); + + bool is_connected ( void ) const; + boost::string_ref connection_error ( void ) const; + void wakeup_event_thread ( void ); + std::mutex& event_mutex ( void ); + redisAsyncContext* connection ( void ); + + private: + using RedisConnection = std::unique_ptr; + using LibevLoop = std::unique_ptr; + + bool is_socket_connection ( void ) const; + + struct LocalData; + + RedisConnection m_conn; + std::unique_ptr m_local_data; + LibevLoop m_libev_loop_thread; + std::string m_address; + uint16_t m_port; + volatile bool m_connected; + volatile bool m_connection_lost; + }; + + inline redisAsyncContext* AsyncConnection::connection() { + return m_conn.get(); + } +} //namespace redis + +#endif diff --git a/src/backends/redis/batch.cpp b/src/backends/redis/batch.cpp index e30bc83..d24cccf 100644 --- a/src/backends/redis/batch.cpp +++ b/src/backends/redis/batch.cpp @@ -16,7 +16,7 @@ */ #include "batch.hpp" -#include "command.hpp" +#include "async_connection.hpp" #include #include #include @@ -105,15 +105,14 @@ namespace redis { Batch::Batch (Batch&&) = default; - Batch::Batch (redisAsyncContext* parContext, Command* parCommand) : + Batch::Batch (AsyncConnection* parConn) : m_futures(), m_replies(), m_local_data(new LocalData), - m_command(parCommand), - m_context(parContext) + m_async_conn(parConn) { - assert(m_command); - assert(m_context); + assert(m_async_conn); + assert(m_async_conn->connection()); } Batch::~Batch() noexcept { @@ -137,13 +136,15 @@ namespace redis { std::cout << " emplace_back(future)... "; m_futures.emplace_back(data->promise.get_future()); - m_command->lock(); - const int command_added = redisAsyncCommandArgv(m_context, &hiredis_run_callback, data, parArgc, parArgv, parLengths); - m_command->unlock(); - assert(REDIS_OK == command_added); // REDIS_ERR if error + { + std::lock_guard lock(m_async_conn->event_mutex()); + const int command_added = redisAsyncCommandArgv(m_async_conn->connection(), &hiredis_run_callback, data, parArgc, parArgv, parLengths); + assert(REDIS_OK == command_added); // REDIS_ERR if error + static_cast(command_added); + } std::cout << "command sent to hiredis" << std::endl; - m_command->wakeup_thread(); + m_async_conn->wakeup_event_thread(); } bool Batch::replies_requested() const { diff --git a/src/backends/redis/batch.hpp b/src/backends/redis/batch.hpp index bf28737..ccced3f 100644 --- a/src/backends/redis/batch.hpp +++ b/src/backends/redis/batch.hpp @@ -23,8 +23,6 @@ #include #include -struct redisAsyncContext; - namespace std { template class future; template struct atomic; @@ -32,6 +30,7 @@ namespace std { namespace redis { class Command; + class AsyncConnection; class Batch { friend class Command; @@ -53,14 +52,13 @@ namespace redis { private: struct LocalData; - Batch ( redisAsyncContext* parContext, Command* parCommand ); + explicit Batch ( AsyncConnection* parConn ); void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths ); std::vector> m_futures; std::vector m_replies; std::unique_ptr m_local_data; - Command* m_command; - redisAsyncContext* m_context; + AsyncConnection* m_async_conn; }; class RedisError : public std::runtime_error { diff --git a/src/backends/redis/command.cpp b/src/backends/redis/command.cpp index 5430829..00ca1ca 100644 --- a/src/backends/redis/command.cpp +++ b/src/backends/redis/command.cpp @@ -17,114 +17,33 @@ #include "command.hpp" #include "script_manager.hpp" +#include "async_connection.hpp" #include -#include -#include -#include #include #include -#include #include #include -#include -#include -#include -#include -#include //See docs directory for info about hiredis/libev with multithreading namespace redis { namespace { - void async_callback (ev_loop* /*parLoop*/, ev_async* /*parObject*/, int /*parRevents*/) { - } - - void async_halt_loop (ev_loop* parLoop, ev_async* /*parObject*/, int /*parRevents*/) { - ev_break(parLoop, EVBREAK_ALL); - } - - void lock_mutex_libev (ev_loop* parLoop) { - std::mutex* mtx = static_cast(ev_userdata(parLoop)); - assert(mtx); - mtx->lock(); - } - - void unlock_mutex_libev (ev_loop* parLoop) { - std::mutex* mtx = static_cast(ev_userdata(parLoop)); - assert(mtx); - mtx->unlock(); - } } //unnamed namespace struct Command::LocalData { - explicit LocalData (Command* parCommand) : - lua_scripts(parCommand), - redis_poll_thread(), - connect_processed(false), - disconnect_processed(true) + explicit LocalData (Command* parCommand, std::string&& parAddress, uint16_t parPort) : + async_connection(std::move(parAddress), parPort), + lua_scripts(parCommand) { } + AsyncConnection async_connection; ScriptManager lua_scripts; - ev_async watcher_wakeup; - ev_async watcher_halt; - std::thread redis_poll_thread; - std::mutex hiredis_mutex; - std::mutex libev_mutex; - std::condition_variable condition_connected; - std::condition_variable condition_disconnected; - std::string connect_err_msg; - std::atomic_bool connect_processed; - std::atomic_bool disconnect_processed; - }; - - void on_connect (const redisAsyncContext* parContext, int parStatus) { - assert(parContext and parContext->data); - Command& self = *static_cast(parContext->data); - assert(parContext == self.m_conn.get()); - assert(not self.m_local_data->connect_processed); - - self.m_connection_lost = false; - self.m_connected = (parStatus == REDIS_OK); - self.m_local_data->connect_processed = true; - self.m_local_data->connect_err_msg = parContext->errstr; - self.m_local_data->condition_connected.notify_one(); - } - - void on_disconnect (const redisAsyncContext* parContext, int parStatus) { - assert(parContext and parContext->data); - Command& self = *static_cast(parContext->data); - assert(self.m_connected); - assert(not self.m_local_data->disconnect_processed); - - self.m_connection_lost = (REDIS_ERR == parStatus); - self.m_connected = false; - self.m_local_data->disconnect_processed = true; - self.m_local_data->connect_err_msg.clear(); - self.m_local_data->condition_disconnected.notify_one(); }; Command::Command (std::string&& parAddress, uint16_t parPort) : - m_conn(nullptr, &redisAsyncDisconnect), - m_libev_loop_thread(ev_loop_new(EVFLAG_NOINOTIFY), &ev_loop_destroy), - m_address(std::move(parAddress)), - m_local_data(new LocalData(this)), - m_port(parPort), - m_connected(false), - m_connection_lost(false) + m_local_data(new LocalData(this, std::move(parAddress), parPort)) { - //Init libev stuff - { - signal(SIGPIPE, SIG_IGN); - - //See: http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#THREAD_LOCKING_EXAMPLE - ev_async_init(&m_local_data->watcher_wakeup, &async_callback); - ev_async_start(m_libev_loop_thread.get(), &m_local_data->watcher_wakeup); - ev_async_init(&m_local_data->watcher_halt, &async_halt_loop); - ev_async_start(m_libev_loop_thread.get(), &m_local_data->watcher_halt); - ev_set_userdata(m_libev_loop_thread.get(), &m_local_data->libev_mutex); - ev_set_loop_release_cb(m_libev_loop_thread.get(), &unlock_mutex_libev, &lock_mutex_libev); - } } Command::Command (std::string&& parSocket) : @@ -132,82 +51,30 @@ namespace redis { { } - Command::~Command() noexcept { - this->disconnect(); - this->wait_for_disconnect(); - } + Command::~Command() noexcept = default; void Command::connect() { - if (not m_conn) { - m_local_data->disconnect_processed = false; - RedisConnection conn( - (is_socket_connection() ? - redisAsyncConnectUnix(m_address.c_str()) - : - redisAsyncConnect(m_address.c_str(), m_port) - ), - &redisAsyncDisconnect - ); - if (not conn) { - std::ostringstream oss; - oss << "Unable to connect to Redis server at " << m_address << ':' << m_port; - throw std::runtime_error(oss.str()); - } - else { - conn->data = this; - } - if (REDIS_OK != redisLibevAttach(m_libev_loop_thread.get(), conn.get())) - throw std::runtime_error("Unable to set event loop"); - if (REDIS_OK != redisAsyncSetConnectCallback(conn.get(), &on_connect)) - throw std::runtime_error("Unable to set \"on_connect()\" callback"); - if (REDIS_OK != redisAsyncSetDisconnectCallback(conn.get(), &on_disconnect)) - throw std::runtime_error("Unable to set \"on_disconnect()\" callback"); - std::swap(conn, m_conn); - m_local_data->redis_poll_thread = std::thread([this]() { - m_local_data->libev_mutex.lock(); - ev_run(m_libev_loop_thread.get(), 0); - m_local_data->libev_mutex.unlock(); - }); - wakeup_thread(); - } + m_local_data->async_connection.connect(); } void Command::wait_for_connect() { - if (not m_local_data->connect_processed) { - std::unique_lock lk(m_local_data->hiredis_mutex); - m_local_data->condition_connected.wait(lk, [this]() { return m_local_data->connect_processed.load(); }); - assert(true == m_local_data->connect_processed); - } + m_local_data->async_connection.wait_for_connect(); } void Command::disconnect() { - assert(m_local_data->redis_poll_thread.joinable()); - m_local_data->connect_processed = false; - { - std::lock_guard lock(m_local_data->libev_mutex); - assert(not ev_async_pending(&m_local_data->watcher_halt)); - ev_async_send(m_libev_loop_thread.get(), &m_local_data->watcher_halt); - m_conn.reset(); - } - m_local_data->redis_poll_thread.join(); + m_local_data->async_connection.disconnect(); } void Command::wait_for_disconnect() { - if (not m_local_data->disconnect_processed) { - std::unique_lock lk(m_local_data->hiredis_mutex); - m_local_data->condition_disconnected.wait(lk, [this]() { return m_local_data->disconnect_processed.load(); }); - assert(true == m_local_data->disconnect_processed); - } + m_local_data->async_connection.wait_for_disconnect(); } bool Command::is_connected() const { - const bool connected = m_conn and not m_conn->err and m_connected; - assert(not m_connection_lost or connected); - return connected; + return m_local_data->async_connection.is_connected(); } boost::string_ref Command::connection_error() const { - return m_local_data->connect_err_msg; + return m_local_data->async_connection.connection_error(); } auto Command::scan() -> scan_range { @@ -228,28 +95,10 @@ namespace redis { Batch Command::make_batch() { assert(is_connected()); - return Batch(m_conn.get(), this); - } - - bool Command::is_socket_connection() const { - return not (m_port or m_address.empty()); + return Batch(&m_local_data->async_connection); } void Command::submit_lua_script (const std::string& parScript) { m_local_data->lua_scripts.submit_lua_script(parScript); } - - void Command::wakeup_thread() { - std::lock_guard lock(m_local_data->libev_mutex); - if (ev_async_pending(&m_local_data->watcher_wakeup) == false) - ev_async_send(m_libev_loop_thread.get(), &m_local_data->watcher_wakeup); - } - - void Command::lock() { - m_local_data->libev_mutex.lock(); - } - - void Command::unlock() { - m_local_data->libev_mutex.unlock(); - } } //namespace redis diff --git a/src/backends/redis/command.hpp b/src/backends/redis/command.hpp index eb8a81e..32b37f3 100644 --- a/src/backends/redis/command.hpp +++ b/src/backends/redis/command.hpp @@ -23,7 +23,6 @@ #include "batch.hpp" #include "redisConfig.h" #include -#include #include #include #include @@ -34,13 +33,8 @@ #include #include -struct redisAsyncContext; -struct ev_loop; - namespace redis { class Command { - friend void on_connect ( const redisAsyncContext*, int ); - friend void on_disconnect ( const redisAsyncContext*, int ); public: typedef ScanIterator> scan_iterator; typedef boost::iterator_range scan_range; @@ -75,26 +69,11 @@ namespace redis { zscan_range zscan ( boost::string_ref parKey ); void submit_lua_script ( const std::string& parScript ); - void wakeup_thread(); - void lock(); - void unlock(); private: - using RedisConnection = std::unique_ptr; - using LibevLoop = std::unique_ptr; - - bool is_socket_connection ( void ) const; - void on_connect_successful ( void ); - struct LocalData; - RedisConnection m_conn; - LibevLoop m_libev_loop_thread; - std::string m_address; std::unique_ptr m_local_data; - uint16_t m_port; - volatile bool m_connected; - volatile bool m_connection_lost; }; template