diff --git a/cmake/Modules/Findlibev.cmake b/cmake/Modules/Findlibev.cmake new file mode 100644 index 0000000..665d4c0 --- /dev/null +++ b/cmake/Modules/Findlibev.cmake @@ -0,0 +1,31 @@ +# Try to find libev +# Once done, this will define +# +# LIBEV_FOUND - system has libev +# LIBEV_INCLUDE_DIRS - libev include directories +# LIBEV_LIBRARIES - libraries needed to use libev + +if(LIBEV_INCLUDE_DIRS AND LIBEV_LIBRARIES) + set(LIBEV_FIND_QUIETLY TRUE) +else() + find_path( + LIBEV_INCLUDE_DIR + NAMES ev.h + HINTS ${LIBEV_ROOT_DIR} + PATH_SUFFIXES include) + + find_library( + LIBEV_LIBRARY + NAME ev + HINTS ${LIBEV_ROOT_DIR} + PATH_SUFFIXES ${CMAKE_INSTALL_LIBDIR}) + + set(LIBEV_INCLUDE_DIRS ${LIBEV_INCLUDE_DIR}) + set(LIBEV_LIBRARIES ${LIBEV_LIBRARY}) + + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args( + libev DEFAULT_MSG LIBEV_LIBRARY LIBEV_INCLUDE_DIR) + + mark_as_advanced(LIBEV_LIBRARY LIBEV_INCLUDE_DIR) +endif() diff --git a/docs/hiredis_libev_multithread.md b/docs/hiredis_libev_multithread.md new file mode 100644 index 0000000..c86602f --- /dev/null +++ b/docs/hiredis_libev_multithread.md @@ -0,0 +1,166 @@ +[TOC] + +# Using hiredis+libev in a separate thread for events + +Used sources: + +1. [stackoverflow.com/questions/14621261][link_so_1] +2. [stackoverflow.com/questions/8611126][link_so_2] +3. [libev thread locking example][link_ev_doc] + + +## Explanation + +### Link 1 + +Possibly due to the original question asked, [Link 1][link_so_1] seems to show that you need to have at least two ev_loop objects created in order to run one in a separate thread. The code on that page: + +```cpp +//This program is demo for using pthreads with libev. +//Try using Timeout values as large as 1.0 and as small as 0.000001 +//and notice the difference in the output + +//(c) 2009 debuguo +//(c) 2013 enthusiasticgeek for stack overflow +//Free to distribute and improve the code. Leave credits intact + +#include +#include // for puts +#include +#include + +pthread_mutex_t lock; +double timeout = 0.00001; +ev_timer timeout_watcher; +int timeout_count = 0; + +ev_async async_watcher; +int async_count = 0; + +struct ev_loop* loop2; + +void* loop2thread(void* args) +{ + printf("Inside loop 2"); // Here one could initiate another timeout watcher + ev_loop(loop2, 0); // similar to the main loop - call it say timeout_cb1 + return NULL; +} + +static void async_cb (EV_P_ ev_async *w, int revents) +{ + //puts ("async ready"); + pthread_mutex_lock(&lock); //Don't forget locking + ++async_count; + printf("async = %d, timeout = %d \n", async_count, timeout_count); + pthread_mutex_unlock(&lock); //Don't forget unlocking +} + +static void timeout_cb (EV_P_ ev_timer *w, int revents) // Timer callback function +{ + //puts ("timeout"); + if (ev_async_pending(&async_watcher)==false) { //the event has not yet been processed (or even noted) by the event loop? (i.e. Is it serviced? If yes then proceed to) + ev_async_send(loop2, &async_watcher); //Sends/signals/activates the given ev_async watcher, that is, feeds an EV_ASYNC event on the watcher into the event loop. + } + + pthread_mutex_lock(&lock); //Don't forget locking + ++timeout_count; + pthread_mutex_unlock(&lock); //Don't forget unlocking + w->repeat = timeout; + ev_timer_again(loop, &timeout_watcher); //Start the timer again. +} + +int main (int argc, char** argv) +{ + if (argc < 2) { + puts("Timeout value missing.\n./demo "); + return -1; + } + timeout = atof(argv[1]); + + struct ev_loop *loop = EV_DEFAULT; //or ev_default_loop (0); + + //Initialize pthread + pthread_mutex_init(&lock, NULL); + pthread_t thread; + + // This loop sits in the pthread + loop2 = ev_loop_new(0); + + //This block is specifically used pre-empting thread (i.e. temporary interruption and suspension of a task, without asking for its cooperation, with the intention to resume that task later.) + //This takes into account thread safety + ev_async_init(&async_watcher, async_cb); + ev_async_start(loop2, &async_watcher); + pthread_create(&thread, NULL, loop2thread, NULL); + + ev_timer_init (&timeout_watcher, timeout_cb, timeout, 0.); // Non repeating timer. The timer starts repeating in the timeout callback function + ev_timer_start (loop, &timeout_watcher); + + // now wait for events to arrive + ev_loop(loop, 0); + //Wait on threads for execution + pthread_join(thread, NULL); + + pthread_mutex_destroy(&lock); + return 0; +} +``` + +with the comment *"Note for libev 4+ ev_loop should be ev_run."* is still slightly useful but it shouldn't be taken as a model. + + +### Link 2 + +The [second link][link_so_2] is what shows that one ev_loop is enough. This is the code provided by the original poster: + +```cpp + void RedisSubscriber::Start() { + m_redis = redisAsyncConnect(m_addr.c_str(),m_port); + m_redis->data = (void*)this; + + m_loop = ev_loop_new(EVFLAG_NOINOTIFY); + redisLibevAttach(m_loop, m_redis); + redisAsyncSetConnectCallback(m_redis,connectCallback); + redisAsyncSetDisconnectCallback(m_redis,disconnectCallback); + redisAsyncCommand(m_redis, subscribeCallback, NULL, "SUBSCRIBE %s", m_channel.c_str()); + + m_thread = boost::thread(ev_loop,m_loop,0); + } + + void RedisSubscriber::Stop() { + redisAsyncFree(m_redis); + m_thread.join(); + m_redis = 0; + } + + void RedisSubscriber::connectCallback(const redisAsyncContext *c) { + + } + + void RedisSubscriber::disconnectCallback(const redisAsyncContext *c, int status) { + RedisSubscriber* r = (RedisSubscriber*)(c->data); + ev_unloop(r->m_loop,EVUNLOOP_ALL); + } + + void RedisSubscriber::subscribeCallback(redisAsyncContext *c, void *r, void *privdata) { + + } +``` + +There are no accepted answers, but the answer from *themoondothshine* provides very useful info. Here is what it says: + +Assuming that you mean ev_run for your boost::thread, here's what you can do: + +1. Setup an `ev_async` +2. In the callback of `ev_async` call `ev_break`. +3. Call `ev_async_send` from `RedisSubscriber::Stop()`. `ev_async` watchers are thread-safe -- it uses memory barriers for synchronising between threads. + +This will cause the event loop to stop, and `m_thread.join()` will return. + + +### Link 3 + +The [THREAD LOCKING EXAMPLE][link_ev_doc] shows how to lock in order to protect the ev_loop object in use. + +[link_so_1]: http://stackoverflow.com/questions/14621261/using-libev-with-multiple-threads#14779930 +[link_so_2]: http://stackoverflow.com/questions/8611126/hiredis-libev-and-boostthreads +[link_ev_doc]: http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#THREAD_LOCKING_EXAMPLE diff --git a/src/backends/redis/CMakeLists.txt b/src/backends/redis/CMakeLists.txt index 2ee8331..f5105db 100644 --- a/src/backends/redis/CMakeLists.txt +++ b/src/backends/redis/CMakeLists.txt @@ -2,6 +2,7 @@ project(${bare_name}-backend-redis CXX) find_package(hiredis 0.11.0 REQUIRED) find_package(CryptoPP 5.6) +find_package(libev 4.0 REQUIRED) add_library(${PROJECT_NAME} SHARED backend_redis.cpp @@ -9,12 +10,15 @@ add_library(${PROJECT_NAME} SHARED scan_iterator.cpp reply.cpp batch.cpp + script.cpp + script_manager.cpp ) target_include_directories(${PROJECT_NAME} SYSTEM PUBLIC ${Boost_INCLUDE_DIRS} PRIVATE ${HIREDIS_INCLUDE_DIRS} PRIVATE ${CMAKE_SOURCE_DIR}/lib/better-enums + PRIVATE ${LIBEV_INCLUDE_DIRS} ) target_include_directories(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_BINARY_DIR} @@ -24,6 +28,11 @@ target_link_libraries(${PROJECT_NAME} PRIVATE ${bare_name}-inc PRIVATE ${bare_name}-pq PRIVATE ${HIREDIS_LIBRARIES} + PRIVATE ${LIBEV_LIBRARIES} +) + +target_compile_definitions(${PROJECT_NAME} + PRIVATE EV_COMPAT3=0 ) if (CryptoPP_FOUND) diff --git a/src/backends/redis/backend_redis.cpp b/src/backends/redis/backend_redis.cpp index 2efa315..427aec2 100644 --- a/src/backends/redis/backend_redis.cpp +++ b/src/backends/redis/backend_redis.cpp @@ -35,11 +35,10 @@ namespace dindb { uint16_t database; }; - std::pair pair_list_to_file_record (const redis::Command::hscan_range& parRange, const mchlib::TigerHash& parHash) { + std::pair pair_list_to_file_record (const redis::Command::hscan_range& parRange) { using dinhelp::lexical_cast; mchlib::FileRecordData retval; - retval.hash = parHash; std::array mime; std::string group_key; @@ -47,7 +46,7 @@ namespace dindb { if (itm.first == "path") retval.abs_path = itm.second; else if (itm.first == "hash") - assert(tiger_to_string(parHash) == itm.second); + retval.hash = mchlib::string_to_tiger(itm.second); else if (itm.first == "size") retval.size = lexical_cast( itm.second); @@ -124,6 +123,19 @@ namespace YAML { }; } //namespace YAML +//namespace redis { +// template <> +// struct RedisStructAdapt { +// static bool decode (const Command::hscan_range& parFrom, mchlib::FileRecordData& parOut) { +// return true; +// } +// +// static void encode (const Command::hscan_range& parFrom, mchlib::FileRecordData& parOut) { +// return true; +// } +// }; +//} + namespace dindb { BackendRedis::BackendRedis(std::string &&parAddress, uint16_t parPort, uint16_t parDatabase, bool parConnect) : m_redis(std::move(parAddress), parPort), @@ -140,9 +152,18 @@ namespace dindb { using dinhelp::lexical_cast; m_redis.connect(); - if (m_redis.is_connected() and m_database > 0) { - m_redis.run("SELECT", lexical_cast(m_database)); - m_redis.run("CLIENT", "SETNAME", PROGRAM_NAME "_v" STRINGIZE(VERSION_MAJOR) "." STRINGIZE(VERSION_MINOR) "." STRINGIZE(VERSION_PATCH)); + m_redis.wait_for_connect(); + if (m_redis.is_connected()) { + auto batch = m_redis.make_batch(); + batch.run("SELECT", lexical_cast(m_database)); + batch.run("CLIENT", "SETNAME", PROGRAM_NAME "_v" STRINGIZE(VERSION_MAJOR) "." STRINGIZE(VERSION_MINOR) "." STRINGIZE(VERSION_PATCH)); + batch.throw_if_failed(); + m_redis.submit_lua_script("return 42;"); + } + else { + std::ostringstream oss; + oss << "Error connecting to Redis: " << m_redis.connection_error(); + throw std::runtime_error(oss.str()); } } @@ -186,6 +207,7 @@ namespace dindb { "type", parSetData.type, "content_type", parSetData.content_type ); + //m_redis.hmset(parSetData); for (const auto& file_data : parData) { redis::Reply file_id_reply = m_redis.run("HINCRBY", PROGRAM_NAME ":indices", "files", "1"); @@ -223,7 +245,7 @@ namespace dindb { } else { const auto result_id = redis::get_string(hash_reply); - auto set_key_and_file_item = pair_list_to_file_record(m_redis.hscan(result_id), parHash); + auto set_key_and_file_item = pair_list_to_file_record(m_redis.hscan(result_id)); parItem = std::move(set_key_and_file_item.second); const std::string group_key = std::move(set_key_and_file_item.first); diff --git a/src/backends/redis/batch.cpp b/src/backends/redis/batch.cpp index e424fb4..e30bc83 100644 --- a/src/backends/redis/batch.cpp +++ b/src/backends/redis/batch.cpp @@ -16,6 +16,7 @@ */ #include "batch.hpp" +#include "command.hpp" #include #include #include @@ -26,6 +27,7 @@ #include #include #include +#include namespace redis { namespace { @@ -60,12 +62,14 @@ namespace redis { ); case REDIS_REPLY_ERROR: return ErrorString(parReply->str, parReply->len); + case REDIS_REPLY_STATUS: + return StatusString(parReply->str, parReply->len); default: + assert(false); //not reached return Reply(); }; } - extern "C" void hiredis_run_callback (redisAsyncContext*, void* parReply, void* parPrivData) { assert(parPrivData); auto* data = static_cast(parPrivData); @@ -78,6 +82,9 @@ namespace redis { auto reply = make_redis_reply_type(static_cast(parReply)); data->promise.set_value(std::move(reply)); } + else { + assert(false); //Should this case also be managed? + } delete data; } @@ -98,12 +105,14 @@ namespace redis { Batch::Batch (Batch&&) = default; - Batch::Batch (redisAsyncContext* parContext) : + Batch::Batch (redisAsyncContext* parContext, Command* parCommand) : m_futures(), m_replies(), m_local_data(new LocalData), + m_command(parCommand), m_context(parContext) { + assert(m_command); assert(m_context); } @@ -125,11 +134,16 @@ namespace redis { std::unique_lock u_lock(m_local_data->futures_mutex); m_local_data->free_cmd_slot.wait(u_lock, [this]() { return m_local_data->pending_futures < g_max_redis_unanswered_commands; }); } - std::cout << "command sent to hiredis" << std::endl; + std::cout << " emplace_back(future)... "; - m_futures.push_back(data->promise.get_future()); + m_futures.emplace_back(data->promise.get_future()); + m_command->lock(); const int command_added = redisAsyncCommandArgv(m_context, &hiredis_run_callback, data, parArgc, parArgv, parLengths); + m_command->unlock(); assert(REDIS_OK == command_added); // REDIS_ERR if error + + std::cout << "command sent to hiredis" << std::endl; + m_command->wakeup_thread(); } bool Batch::replies_requested() const { @@ -140,7 +154,7 @@ namespace redis { if (not replies_requested()) { m_replies.reserve(m_futures.size()); for (auto& fut : m_futures) { - m_replies.push_back(fut.get()); + m_replies.emplace_back(fut.get()); } auto empty_vec = std::move(m_futures); @@ -149,8 +163,22 @@ namespace redis { } void Batch::throw_if_failed() { - const auto& rep = replies(); - assert(false); //not implemented + std::ostringstream oss; + int err_count = 0; + const int max_reported_errors = 3; + + oss << "Error in reply: "; + for (const auto& rep : replies()) { + if (rep.which() == RedisVariantType_Error) { + ++err_count; + if (err_count <= max_reported_errors) + oss << '"' << get_error_string(rep).message() << "\" "; + } + } + if (err_count) { + oss << " (showing " << err_count << '/' << max_reported_errors << " errors on " << replies().size() << " total replies)"; + throw std::runtime_error(oss.str()); + } } RedisError::RedisError (const char* parMessage, std::size_t parLength) : diff --git a/src/backends/redis/batch.hpp b/src/backends/redis/batch.hpp index 491129c..bf28737 100644 --- a/src/backends/redis/batch.hpp +++ b/src/backends/redis/batch.hpp @@ -53,12 +53,13 @@ namespace redis { private: struct LocalData; - explicit Batch ( redisAsyncContext* parContext ); + Batch ( redisAsyncContext* parContext, Command* parCommand ); void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths ); std::vector> m_futures; std::vector m_replies; std::unique_ptr m_local_data; + Command* m_command; redisAsyncContext* m_context; }; diff --git a/src/backends/redis/command.cpp b/src/backends/redis/command.cpp index b3951ea..5430829 100644 --- a/src/backends/redis/command.cpp +++ b/src/backends/redis/command.cpp @@ -16,57 +16,115 @@ */ #include "command.hpp" -#include "helpers/lexical_cast.hpp" +#include "script_manager.hpp" #include #include +#include +#include #include #include #include #include #include -#if defined(WITH_CRYPTOPP) -# include -#endif +#include +#include +#include +#include +#include + +//See docs directory for info about hiredis/libev with multithreading namespace redis { namespace { -#if defined(WITH_CRYPTOPP) - struct LuaScriptHash { - union { - struct { - uint64_t part_a, part_b; - uint32_t part_c; - }; - uint8_t raw_bytes[20]; - }; - }; -#endif + void async_callback (ev_loop* /*parLoop*/, ev_async* /*parObject*/, int /*parRevents*/) { + } + + void async_halt_loop (ev_loop* parLoop, ev_async* /*parObject*/, int /*parRevents*/) { + ev_break(parLoop, EVBREAK_ALL); + } + + void lock_mutex_libev (ev_loop* parLoop) { + std::mutex* mtx = static_cast(ev_userdata(parLoop)); + assert(mtx); + mtx->lock(); + } + + void unlock_mutex_libev (ev_loop* parLoop) { + std::mutex* mtx = static_cast(ev_userdata(parLoop)); + assert(mtx); + mtx->unlock(); + } } //unnamed namespace + struct Command::LocalData { + explicit LocalData (Command* parCommand) : + lua_scripts(parCommand), + redis_poll_thread(), + connect_processed(false), + disconnect_processed(true) + { + } + + ScriptManager lua_scripts; + ev_async watcher_wakeup; + ev_async watcher_halt; + std::thread redis_poll_thread; + std::mutex hiredis_mutex; + std::mutex libev_mutex; + std::condition_variable condition_connected; + std::condition_variable condition_disconnected; + std::string connect_err_msg; + std::atomic_bool connect_processed; + std::atomic_bool disconnect_processed; + }; + void on_connect (const redisAsyncContext* parContext, int parStatus) { assert(parContext and parContext->data); Command& self = *static_cast(parContext->data); + assert(parContext == self.m_conn.get()); + assert(not self.m_local_data->connect_processed); self.m_connection_lost = false; self.m_connected = (parStatus == REDIS_OK); + self.m_local_data->connect_processed = true; + self.m_local_data->connect_err_msg = parContext->errstr; + self.m_local_data->condition_connected.notify_one(); } void on_disconnect (const redisAsyncContext* parContext, int parStatus) { assert(parContext and parContext->data); Command& self = *static_cast(parContext->data); assert(self.m_connected); + assert(not self.m_local_data->disconnect_processed); self.m_connection_lost = (REDIS_ERR == parStatus); self.m_connected = false; + self.m_local_data->disconnect_processed = true; + self.m_local_data->connect_err_msg.clear(); + self.m_local_data->condition_disconnected.notify_one(); }; Command::Command (std::string&& parAddress, uint16_t parPort) : m_conn(nullptr, &redisAsyncDisconnect), + m_libev_loop_thread(ev_loop_new(EVFLAG_NOINOTIFY), &ev_loop_destroy), m_address(std::move(parAddress)), + m_local_data(new LocalData(this)), m_port(parPort), m_connected(false), m_connection_lost(false) { + //Init libev stuff + { + signal(SIGPIPE, SIG_IGN); + + //See: http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#THREAD_LOCKING_EXAMPLE + ev_async_init(&m_local_data->watcher_wakeup, &async_callback); + ev_async_start(m_libev_loop_thread.get(), &m_local_data->watcher_wakeup); + ev_async_init(&m_local_data->watcher_halt, &async_halt_loop); + ev_async_start(m_libev_loop_thread.get(), &m_local_data->watcher_halt); + ev_set_userdata(m_libev_loop_thread.get(), &m_local_data->libev_mutex); + ev_set_loop_release_cb(m_libev_loop_thread.get(), &unlock_mutex_libev, &lock_mutex_libev); + } } Command::Command (std::string&& parSocket) : @@ -75,10 +133,13 @@ namespace redis { } Command::~Command() noexcept { + this->disconnect(); + this->wait_for_disconnect(); } void Command::connect() { if (not m_conn) { + m_local_data->disconnect_processed = false; RedisConnection conn( (is_socket_connection() ? redisAsyncConnectUnix(m_address.c_str()) @@ -95,22 +156,48 @@ namespace redis { else { conn->data = this; } - if (conn->err) { - std::ostringstream oss; - oss << "Unable to connect to Redis server at " << m_address << ':' << m_port << - ": " << conn->errstr; - throw std::runtime_error(oss.str()); - } + if (REDIS_OK != redisLibevAttach(m_libev_loop_thread.get(), conn.get())) + throw std::runtime_error("Unable to set event loop"); if (REDIS_OK != redisAsyncSetConnectCallback(conn.get(), &on_connect)) throw std::runtime_error("Unable to set \"on_connect()\" callback"); if (REDIS_OK != redisAsyncSetDisconnectCallback(conn.get(), &on_disconnect)) throw std::runtime_error("Unable to set \"on_disconnect()\" callback"); std::swap(conn, m_conn); + m_local_data->redis_poll_thread = std::thread([this]() { + m_local_data->libev_mutex.lock(); + ev_run(m_libev_loop_thread.get(), 0); + m_local_data->libev_mutex.unlock(); + }); + wakeup_thread(); + } + } + + void Command::wait_for_connect() { + if (not m_local_data->connect_processed) { + std::unique_lock lk(m_local_data->hiredis_mutex); + m_local_data->condition_connected.wait(lk, [this]() { return m_local_data->connect_processed.load(); }); + assert(true == m_local_data->connect_processed); } } void Command::disconnect() { - m_conn.reset(); + assert(m_local_data->redis_poll_thread.joinable()); + m_local_data->connect_processed = false; + { + std::lock_guard lock(m_local_data->libev_mutex); + assert(not ev_async_pending(&m_local_data->watcher_halt)); + ev_async_send(m_libev_loop_thread.get(), &m_local_data->watcher_halt); + m_conn.reset(); + } + m_local_data->redis_poll_thread.join(); + } + + void Command::wait_for_disconnect() { + if (not m_local_data->disconnect_processed) { + std::unique_lock lk(m_local_data->hiredis_mutex); + m_local_data->condition_disconnected.wait(lk, [this]() { return m_local_data->disconnect_processed.load(); }); + assert(true == m_local_data->disconnect_processed); + } } bool Command::is_connected() const { @@ -119,6 +206,10 @@ namespace redis { return connected; } + boost::string_ref Command::connection_error() const { + return m_local_data->connect_err_msg; + } + auto Command::scan() -> scan_range { return scan_range(scan_iterator(this, false), scan_iterator(this, true)); } @@ -137,63 +228,28 @@ namespace redis { Batch Command::make_batch() { assert(is_connected()); - return Batch(m_conn.get()); + return Batch(m_conn.get(), this); } -#if defined(WITH_CRYPTOPP) - boost::string_ref Command::add_lua_script_ifn (const std::string& parScript) { - if (parScript.empty()) - return boost::string_ref(); - - using dinhelp::lexical_cast; - - static_assert(20 == CryptoPP::SHA1::DIGESTSIZE, "Unexpected SHA1 digest size"); - static_assert(sizeof(LuaScriptHash) >= CryptoPP::SHA1::DIGESTSIZE, "Wrong SHA1 struct size"); - static_assert(Sha1Array().size() == CryptoPP::SHA1::DIGESTSIZE, "Wrong array size"); - - LuaScriptHash digest; - CryptoPP::SHA1().CalculateDigest(digest.raw_bytes, reinterpret_cast(parScript.data()), parScript.size()); - //TODO: change when lexical_cast will support arrays - const std::string sha1_str = lexical_cast(digest.part_a) + lexical_cast(digest.part_b) + lexical_cast(digest.part_c); - Sha1Array sha1_array; - std::copy(sha1_str.begin(), sha1_str.end(), sha1_array.begin()); - - auto it_found = m_known_hashes.find(sha1_array); - const bool was_present = (m_known_hashes.end() != it_found); - if (was_present) { - return boost::string_ref(it_found->data(), it_found->size()); - } - - auto reply = this->run("SCRIPT", "LOAD", parScript); - assert(not was_present); - - assert(get_string(reply) == sha1_str); - const auto it_inserted = m_known_hashes.insert(it_found, sha1_array); - (void)reply; - - return boost::string_ref(it_inserted->data(), it_inserted->size()); - } -#else - boost::string_ref Command::add_lua_script_ifn (const std::string& parScript) { - auto it_found = m_known_hashes.find(parScript); - const bool was_present = (m_known_hashes.end() != it_found); - if (was_present) { - return boost::string_ref(it_found->second.data(), it_found->second.size()); - } - - auto reply = this->run("SCRIPT", "LOAD", parScript); - assert(not was_present); - - const auto sha1_str = get_string(reply); - Sha1Array sha1_array; - std::copy(sha1_str.begin(), sha1_str.end(), sha1_array.begin()); - auto it_inserted = m_known_hashes.insert(it_found, std::make_pair(parScript, sha1_array)); - - return boost::string_ref(it_inserted->second.data(), it_inserted->second.size()); - } -#endif - bool Command::is_socket_connection() const { return not (m_port or m_address.empty()); } + + void Command::submit_lua_script (const std::string& parScript) { + m_local_data->lua_scripts.submit_lua_script(parScript); + } + + void Command::wakeup_thread() { + std::lock_guard lock(m_local_data->libev_mutex); + if (ev_async_pending(&m_local_data->watcher_wakeup) == false) + ev_async_send(m_libev_loop_thread.get(), &m_local_data->watcher_wakeup); + } + + void Command::lock() { + m_local_data->libev_mutex.lock(); + } + + void Command::unlock() { + m_local_data->libev_mutex.unlock(); + } } //namespace redis diff --git a/src/backends/redis/command.hpp b/src/backends/redis/command.hpp index 6b1fd66..eb8a81e 100644 --- a/src/backends/redis/command.hpp +++ b/src/backends/redis/command.hpp @@ -33,14 +33,9 @@ #include #include #include -#if defined(WITH_CRYPTOPP) -# include -#else -# include -#endif -#include struct redisAsyncContext; +struct ev_loop; namespace redis { class Command { @@ -61,9 +56,12 @@ namespace redis { ~Command ( void ) noexcept; void connect ( void ); + void wait_for_connect ( void ); void disconnect ( void ); + void wait_for_disconnect ( void ); bool is_connected ( void ) const; + boost::string_ref connection_error ( void ) const; Batch make_batch ( void ); @@ -77,24 +75,26 @@ namespace redis { zscan_range zscan ( boost::string_ref parKey ); void submit_lua_script ( const std::string& parScript ); + void wakeup_thread(); + void lock(); + void unlock(); private: using RedisConnection = std::unique_ptr; - using Sha1Array = std::array; + using LibevLoop = std::unique_ptr; - boost::string_ref add_lua_script_ifn ( const std::string& parScript ); bool is_socket_connection ( void ) const; + void on_connect_successful ( void ); + + struct LocalData; RedisConnection m_conn; -#if defined(WITH_CRYPTOPP) - std::set m_known_hashes; -#else - std::map m_known_scripts; -#endif + LibevLoop m_libev_loop_thread; std::string m_address; + std::unique_ptr m_local_data; uint16_t m_port; - bool m_connected; - bool m_connection_lost; + volatile bool m_connected; + volatile bool m_connection_lost; }; template diff --git a/src/backends/redis/script.cpp b/src/backends/redis/script.cpp new file mode 100644 index 0000000..2e1502c --- /dev/null +++ b/src/backends/redis/script.cpp @@ -0,0 +1,21 @@ +/* Copyright 2015, 2016, Michele Santullo + * This file is part of "dindexer". + * + * "dindexer" is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * "dindexer" is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with "dindexer". If not, see . + */ + +#include "script.hpp" + +namespace redis { +} //namespace redis diff --git a/src/backends/redis/script.hpp b/src/backends/redis/script.hpp new file mode 100644 index 0000000..f547df3 --- /dev/null +++ b/src/backends/redis/script.hpp @@ -0,0 +1,28 @@ +/* Copyright 2015, 2016, Michele Santullo + * This file is part of "dindexer". + * + * "dindexer" is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * "dindexer" is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with "dindexer". If not, see . + */ + +#ifndef id5B30CDA57F894CD6888093B64F9433DA +#define id5B30CDA57F894CD6888093B64F9433DA + +#include "script_manager.hpp" + +namespace redis { + class Script { + }; +} //namespace redis + +#endif diff --git a/src/backends/redis/script_manager.cpp b/src/backends/redis/script_manager.cpp new file mode 100644 index 0000000..0f7c98e --- /dev/null +++ b/src/backends/redis/script_manager.cpp @@ -0,0 +1,115 @@ +/* Copyright 2015, 2016, Michele Santullo + * This file is part of "dindexer". + * + * "dindexer" is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * "dindexer" is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with "dindexer". If not, see . + */ + +#include "script_manager.hpp" +#include "helpers/lexical_cast.hpp" +#include "command.hpp" +#include +#if defined(WITH_CRYPTOPP) +# include +#endif + +namespace redis { + namespace { +#if defined(WITH_CRYPTOPP) + struct LuaScriptHash { + union { + struct { + uint64_t part_a, part_b; + uint32_t part_c; + }; + uint8_t raw_bytes[20]; + }; + }; +#endif + } //unnamed namespace + + ScriptManager::ScriptManager (Command* parCommand) : + m_command(parCommand) + { + assert(m_command); + } + +#if defined(WITH_CRYPTOPP) + boost::string_ref ScriptManager::add_lua_script_ifn (const std::string& parScript) { + assert(m_command->is_connected()); + + if (parScript.empty()) + return boost::string_ref(); + + using dinhelp::lexical_cast; + + static_assert(20 == CryptoPP::SHA1::DIGESTSIZE, "Unexpected SHA1 digest size"); + static_assert(sizeof(LuaScriptHash) >= CryptoPP::SHA1::DIGESTSIZE, "Wrong SHA1 struct size"); + static_assert(Sha1Array().size() == CryptoPP::SHA1::DIGESTSIZE * 2, "Wrong array size"); + + LuaScriptHash digest; + CryptoPP::SHA1().CalculateDigest(digest.raw_bytes, reinterpret_cast(parScript.data()), parScript.size()); + //TODO: change when lexical_cast will support arrays + auto sha1_str_parta = lexical_cast(__builtin_bswap64(digest.part_a)); + auto sha1_str_partb = lexical_cast(__builtin_bswap64(digest.part_b)); + auto sha1_str_partc = lexical_cast(__builtin_bswap32(digest.part_c)); + const std::string sha1_str = + std::string(sizeof(digest.part_a) * 2 - sha1_str_parta.size(), '0') + sha1_str_parta + + std::string(sizeof(digest.part_b) * 2 - sha1_str_partb.size(), '0') + sha1_str_partb + + std::string(sizeof(digest.part_c) * 2 - sha1_str_partc.size(), '0') + sha1_str_partc + ; + Sha1Array sha1_array; + assert(sha1_str.size() == sha1_array.size()); + std::copy(sha1_str.begin(), sha1_str.end(), sha1_array.begin()); + + auto it_found = m_known_hashes.find(sha1_array); + const bool was_present = (m_known_hashes.end() != it_found); + if (was_present) { + return boost::string_ref(it_found->data(), it_found->size()); + } + + auto reply = m_command->run("SCRIPT", "LOAD", parScript); + assert(not was_present); + + assert(get_string(reply) == sha1_str); + const auto it_inserted = m_known_hashes.insert(it_found, sha1_array); + (void)reply; + + return boost::string_ref(it_inserted->data(), it_inserted->size()); + } +#else + boost::string_ref ScriptManager::add_lua_script_ifn (const std::string& parScript) { + assert(m_command->is_connected()); + + auto it_found = m_known_hashes.find(parScript); + const bool was_present = (m_known_hashes.end() != it_found); + if (was_present) { + return boost::string_ref(it_found->second.data(), it_found->second.size()); + } + + auto reply = m_command->run("SCRIPT", "LOAD", parScript); + assert(not was_present); + + const auto sha1_str = get_string(reply); + Sha1Array sha1_array; + std::copy(sha1_str.begin(), sha1_str.end(), sha1_array.begin()); + auto it_inserted = m_known_hashes.insert(it_found, std::make_pair(parScript, sha1_array)); + + return boost::string_ref(it_inserted->second.data(), it_inserted->second.size()); + } +#endif + + void ScriptManager::submit_lua_script (const std::string& parScript) { + add_lua_script_ifn(parScript); + } +} //namespace redis diff --git a/src/backends/redis/script_manager.hpp b/src/backends/redis/script_manager.hpp new file mode 100644 index 0000000..08b744a --- /dev/null +++ b/src/backends/redis/script_manager.hpp @@ -0,0 +1,54 @@ +/* Copyright 2015, 2016, Michele Santullo + * This file is part of "dindexer". + * + * "dindexer" is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * "dindexer" is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with "dindexer". If not, see . + */ + +#ifndef id8E124FF76DF449CDB8FBA806F8EF4E78 +#define id8E124FF76DF449CDB8FBA806F8EF4E78 + +#include "redisConfig.h" +#include +#if defined(WITH_CRYPTOPP) +# include +#else +# include +#endif +#include +#include + +namespace redis { + class Command; + + class ScriptManager { + public: + explicit ScriptManager ( Command* parCommand ); + + void submit_lua_script ( const std::string& parScript ); + + private: + using Sha1Array = std::array; + + boost::string_ref add_lua_script_ifn ( const std::string& parScript ); + + Command* m_command; +#if defined(WITH_CRYPTOPP) + std::set m_known_hashes; +#else + std::map m_known_scripts; +#endif + }; +} //namespace redis + +#endif