diff --git a/src/backends/redis/CMakeLists.txt b/src/backends/redis/CMakeLists.txt index f268708..1595e81 100644 --- a/src/backends/redis/CMakeLists.txt +++ b/src/backends/redis/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(${PROJECT_NAME} SHARED command.cpp scan_iterator.cpp reply.cpp + batch.cpp ) target_include_directories(${PROJECT_NAME} SYSTEM diff --git a/src/backends/redis/batch.cpp b/src/backends/redis/batch.cpp new file mode 100644 index 0000000..a8a182d --- /dev/null +++ b/src/backends/redis/batch.cpp @@ -0,0 +1,159 @@ +/* Copyright 2015, 2016, Michele Santullo + * This file is part of "dindexer". + * + * "dindexer" is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * "dindexer" is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with "dindexer". If not, see . + */ + +#include "batch.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace redis { + namespace { + const std::size_t g_max_redis_unanswered_commands = 1000; + + struct HiredisCallbackData { + HiredisCallbackData ( std::atomic_size_t& parPendingFutures, std::condition_variable& parSendCmdCond ) : + promise(), + pending_futures(parPendingFutures), + send_command_condition(parSendCmdCond) + { + } + + std::promise promise; + std::atomic_size_t& pending_futures; + std::condition_variable& send_command_condition; + }; + + Reply make_redis_reply_type (redisReply* parReply) { + using boost::transform_iterator; + using PtrToReplyIterator = transform_iterator; + + switch (parReply->type) { + case REDIS_REPLY_INTEGER: + return parReply->integer; + case REDIS_REPLY_STRING: + return std::string(parReply->str, parReply->len); + case REDIS_REPLY_ARRAY: + return std::vector( + PtrToReplyIterator(parReply->element, &make_redis_reply_type), + PtrToReplyIterator(parReply->element + parReply->elements, &make_redis_reply_type) + ); + case REDIS_REPLY_ERROR: + return ErrorString(parReply->str, parReply->len); + default: + return Reply(); + }; + } + + void hiredis_run_callback (redisAsyncContext*, void* parReply, void* parPrivData) { + assert(parPrivData); + auto* data = static_cast(parPrivData); + const auto old_count = data->pending_futures.fetch_add(-1); + assert(old_count > 0); + if (old_count == g_max_redis_unanswered_commands) + data->send_command_condition.notify_one(); + + if (parReply) { + auto reply = make_redis_reply_type(static_cast(parReply)); + data->promise.set_value(std::move(reply)); + } + + delete data; + } + } //unnamed namespace + + struct Batch::LocalData { + LocalData ( void ) : + free_cmd_slot(), + futures_mutex(), + pending_futures(0) + { + } + + std::condition_variable free_cmd_slot; + std::mutex futures_mutex; + std::atomic_size_t pending_futures; + }; + + Batch::Batch (Batch&&) = default; + + Batch::Batch (redisAsyncContext* parContext) : + m_futures(), + m_replies(), + m_local_data(new LocalData), + m_context(parContext) + { + assert(m_context); + } + + Batch::~Batch() noexcept { + } + + void Batch::run_pvt (int parArgc, const char** parArgv, std::size_t* parLengths) { + assert(not replies_requested()); + assert(parArgc >= 1); + assert(parArgv); + assert(parLengths); //This /could/ be null, but I don't see why it should + + const auto pending_futures = m_local_data->pending_futures.fetch_add(1); + auto* data = new HiredisCallbackData(m_local_data->pending_futures, m_local_data->free_cmd_slot); + + std::cout << "run_pvt(), " << pending_futures << " items pending... "; + if (pending_futures >= g_max_redis_unanswered_commands) { + std::cout << " waiting... "; + std::unique_lock u_lock(m_local_data->futures_mutex); + m_local_data->free_cmd_slot.wait(u_lock, [this]() { return m_local_data->pending_futures < g_max_redis_unanswered_commands; }); + } + std::cout << "command sent to hiredis" << std::endl; + + m_futures.push_back(data->promise.get_future()); + const int command_added = redisAsyncCommandArgv(m_context, &hiredis_run_callback, data, parArgc, parArgv, parLengths); + assert(REDIS_OK == command_added); // REDIS_ERR if error + } + + bool Batch::replies_requested() const { + return not m_replies.empty(); + } + + const std::vector& Batch::replies() { + if (not replies_requested()) { + m_replies.reserve(m_futures.size()); + for (auto& fut : m_futures) { + m_replies.push_back(fut.get()); + } + + auto empty_vec = std::move(m_futures); + } + return m_replies; + } + + void Batch::throw_if_failed() { + const auto& rep = replies(); + assert(false); //not implemented + } + + RedisError::RedisError (const char* parMessage, std::size_t parLength) : + std::runtime_error(std::string(parMessage, parLength)) + { + } +} //namespace redis diff --git a/src/backends/redis/batch.hpp b/src/backends/redis/batch.hpp new file mode 100644 index 0000000..491129c --- /dev/null +++ b/src/backends/redis/batch.hpp @@ -0,0 +1,95 @@ +/* Copyright 2015, 2016, Michele Santullo + * This file is part of "dindexer". + * + * "dindexer" is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * "dindexer" is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with "dindexer". If not, see . + */ + +#ifndef idD81C81D99196491A8C9B68DED8ADD260 +#define idD81C81D99196491A8C9B68DED8ADD260 + +#include "reply.hpp" +#include "arg_to_bin_safe.hpp" +#include +#include + +struct redisAsyncContext; + +namespace std { + template class future; + template struct atomic; +} //namespace std + +namespace redis { + class Command; + + class Batch { + friend class Command; + public: + Batch ( Batch&& parOther ); + Batch ( const Batch& ) = delete; + ~Batch ( void ) noexcept; + + const std::vector& replies ( void ); + bool replies_requested ( void ) const; + void throw_if_failed ( void ); + + template + Batch& run ( const char* parCommand, Args&&... parArgs ); + + template + Batch& operator() ( const char* parCommand, Args&&... parArgs ); + + private: + struct LocalData; + + explicit Batch ( redisAsyncContext* parContext ); + void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths ); + + std::vector> m_futures; + std::vector m_replies; + std::unique_ptr m_local_data; + redisAsyncContext* m_context; + }; + + class RedisError : public std::runtime_error { + public: + RedisError ( const char* parMessage, std::size_t parLength ); + }; + + template + Batch& Batch::run (const char* parCommand, Args&&... parArgs) { + constexpr const std::size_t arg_count = sizeof...(Args) + 1; + using CharPointerArray = std::array; + using LengthArray = std::array; + using implem::arg_to_bin_safe_char; + using implem::arg_to_bin_safe_length; + using implem::MakeCharInfo; + using boost::string_ref; + + this->run_pvt( + static_cast(arg_count), + CharPointerArray{ (arg_to_bin_safe_char(string_ref(parCommand))), MakeCharInfo::type>::type>(std::forward(parArgs)).data()... }.data(), + LengthArray{ arg_to_bin_safe_length(string_ref(parCommand)), arg_to_bin_safe_length(std::forward(parArgs))... }.data() + ); + + return *this; + } + + template + Batch& Batch::operator() (const char* parCommand, Args&&... parArgs) { + return this->run(parCommand, std::forward(parArgs)...); + } +} //namespace redis + +#endif diff --git a/src/backends/redis/command.cpp b/src/backends/redis/command.cpp index 2994cff..cb8044c 100644 --- a/src/backends/redis/command.cpp +++ b/src/backends/redis/command.cpp @@ -17,41 +17,19 @@ #include "command.hpp" #include +#include #include #include #include #include #include -#include namespace redis { namespace { - using RedisReply = std::unique_ptr; - - Reply make_redis_reply_type (redisReply* parReply) { - using boost::transform_iterator; - using PtrToReplyIterator = transform_iterator; - - switch (parReply->type) { - case REDIS_REPLY_INTEGER: - return parReply->integer; - case REDIS_REPLY_STRING: - return std::string(parReply->str, parReply->len); - case REDIS_REPLY_ARRAY: - return std::vector( - PtrToReplyIterator(parReply->element, &make_redis_reply_type), - PtrToReplyIterator(parReply->element + parReply->elements, &make_redis_reply_type) - ); - case REDIS_REPLY_ERROR: - return ErrorString(parReply->str, parReply->len); - default: - return Reply(); - }; - } } //unnamed namespace Command::Command (std::string&& parAddress, uint16_t parPort) : - m_conn(nullptr, &redisFree), + m_conn(nullptr, &redisAsyncDisconnect), m_address(std::move(parAddress)), m_port(parPort) { @@ -62,10 +40,9 @@ namespace redis { void Command::connect() { if (not m_conn) { - struct timeval timeout = {5, 500000}; //5.5 seconds? RedisConnection conn( - redisConnectWithTimeout(m_address.c_str(), m_port, timeout), - &redisFree + redisAsyncConnect(m_address.c_str(), m_port), + &redisAsyncDisconnect ); if (not conn) { std::ostringstream oss; @@ -86,44 +63,6 @@ namespace redis { m_conn.reset(); } - Reply Command::run_pvt (int parArgc, const char** parArgv, std::size_t* parLengths) { - assert(parArgc >= 1); - assert(parArgv); - assert(parLengths); //This /could/ be null, but I don't see why it should - assert(is_connected()); - - RedisReply reply( - static_cast(redisCommandArgv(m_conn.get(), parArgc, parArgv, parLengths)), - &freeReplyObject - ); - - return make_redis_reply_type(reply.get()); - - //std::string key; - //{ - // std::ostringstream key_oss; - // RedisReply incr_reply(static_cast(redisCommand(m_conn.get(), "incr set_counter")), &freeReplyObject); - // key_oss << "sets:" << incr_reply->integer; - // key = key_oss.str(); - //} - - //RedisReply insert_reply( - // static_cast(redisCommand( - // m_conn.get(), - // "hmset %b name %b disk_label %b fs_uuid %b", - // key.data(), - // key.size(), - // parSetData.name.data(), - // parSetData.name.size(), - // parSetData.disk_label.data(), - // parSetData.disk_label.size(), - // parSetData.fs_uuid.data(), - // parSetData.fs_uuid.size() - // )), - // &freeReplyObject - //); - } - bool Command::is_connected() const { return m_conn and not m_conn->err; } @@ -143,4 +82,9 @@ namespace redis { auto Command::zscan (boost::string_ref parKey) -> zscan_range { return zscan_range(zscan_iterator(this, parKey, false), zscan_iterator(this, parKey, true)); } + + Batch Command::make_batch() { + assert(is_connected()); + return Batch(m_conn.get()); + } } //namespace redis diff --git a/src/backends/redis/command.hpp b/src/backends/redis/command.hpp index 5b9b6b5..32cf891 100644 --- a/src/backends/redis/command.hpp +++ b/src/backends/redis/command.hpp @@ -18,9 +18,9 @@ #ifndef idD83EEBFC927840C6B9F32D61A1D1E582 #define idD83EEBFC927840C6B9F32D61A1D1E582 -#include "arg_to_bin_safe.hpp" #include "scan_iterator.hpp" #include "reply.hpp" +#include "batch.hpp" #include #include #include @@ -33,7 +33,7 @@ #include #include -struct redisContext; +struct redisAsyncContext; namespace redis { class Command { @@ -55,6 +55,8 @@ namespace redis { bool is_connected ( void ) const; + Batch make_batch ( void ); + template Reply run ( const char* parCommand, Args&&... parArgs ); @@ -65,9 +67,7 @@ namespace redis { zscan_range zscan ( boost::string_ref parKey ); private: - using RedisConnection = std::unique_ptr; - - Reply run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths ); + using RedisConnection = std::unique_ptr; RedisConnection m_conn; std::string m_address; @@ -76,19 +76,10 @@ namespace redis { template Reply Command::run (const char* parCommand, Args&&... parArgs) { - constexpr const std::size_t arg_count = sizeof...(Args) + 1; - using CharPointerArray = std::array; - using LengthArray = std::array; - using implem::arg_to_bin_safe_char; - using implem::arg_to_bin_safe_length; - using implem::MakeCharInfo; - using boost::string_ref; - - return this->run_pvt( - static_cast(arg_count), - CharPointerArray{ (arg_to_bin_safe_char(string_ref(parCommand))), MakeCharInfo::type>::type>(std::forward(parArgs)).data()... }.data(), - LengthArray{ arg_to_bin_safe_length(string_ref(parCommand)), arg_to_bin_safe_length(std::forward(parArgs))... }.data() - ); + auto batch = make_batch(); + batch.run(parCommand, std::forward(parArgs)...); + batch.throw_if_failed(); + return batch.replies().front(); } } //namespace redis