diff --git a/include/incredis/batch.hpp b/include/incredis/batch.hpp index a5d54b1..e1b4740 100644 --- a/include/incredis/batch.hpp +++ b/include/incredis/batch.hpp @@ -23,10 +23,6 @@ #include #include -namespace std { - template class future; -} //namespace std - namespace redis { class Command; class AsyncConnection; @@ -40,6 +36,7 @@ namespace redis { ~Batch ( void ) noexcept; const std::vector& replies ( void ); + std::vector& replies_nonconst ( void ); bool replies_requested ( void ) const; void throw_if_failed ( void ); @@ -57,7 +54,7 @@ namespace redis { explicit Batch ( AsyncConnection* parConn, ThreadContext& parThreadContext ); void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths ); - std::vector> m_futures; + std::vector> m_futures; std::vector m_replies; std::unique_ptr m_local_data; AsyncConnection* m_async_conn; diff --git a/include/incredis/command.hpp b/include/incredis/command.hpp index 7231f2d..835a1ba 100644 --- a/include/incredis/command.hpp +++ b/include/incredis/command.hpp @@ -65,7 +65,7 @@ namespace redis { auto batch = make_batch(); batch.run(parCommand, std::forward(parArgs)...); batch.throw_if_failed(); - return batch.replies().front(); + return std::move(batch.replies_nonconst().front()); } template diff --git a/include/incredis/reply.hpp b/include/incredis/reply.hpp index 10ab56e..6e313d5 100644 --- a/include/incredis/reply.hpp +++ b/include/incredis/reply.hpp @@ -77,8 +77,14 @@ namespace redis { Reply ( ErrorString&& parVal ) : base_class(std::move(parVal)) {} Reply ( StatusString&& parVal ) : base_class(std::move(parVal)) {} Reply ( std::nullptr_t parVal ) : base_class(parVal) {} + + Reply ( Reply&& ) = default; + Reply ( const Reply& ) = default; ~Reply ( void ) noexcept = default; + Reply& operator= ( Reply&& ) = default; + Reply& operator= ( const Reply& ) = default; + bool is_integer ( void ) const; bool is_string ( void ) const; bool is_array ( void ) const; diff --git a/src/batch.cpp b/src/batch.cpp index 736dc2c..e70e092 100644 --- a/src/batch.cpp +++ b/src/batch.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include @@ -39,16 +38,20 @@ namespace redis { const std::size_t g_max_redis_unanswered_commands = 1000; struct HiredisCallbackData { - HiredisCallbackData ( std::atomic_size_t& parPendingFutures, std::condition_variable& parSendCmdCond ) : - promise(), + HiredisCallbackData ( std::atomic_size_t& parPendingFutures, std::atomic_size_t& parLocalPendingFutures, std::condition_variable& parSendCmdCond, std::condition_variable& parLocalCmdsCond ) : pending_futures(parPendingFutures), - send_command_condition(parSendCmdCond) + local_pending_futures(parLocalPendingFutures), + reply_ptr(nullptr), + send_command_condition(parSendCmdCond), + local_commands_condition(parLocalCmdsCond) { } - std::promise promise; + Reply* reply_ptr; std::atomic_size_t& pending_futures; + std::atomic_size_t& local_pending_futures; std::condition_variable& send_command_condition; + std::condition_variable& local_commands_condition; }; Reply make_redis_reply_type (redisReply* parReply) { @@ -80,19 +83,28 @@ namespace redis { void hiredis_run_callback (redisAsyncContext*, void* parReply, void* parPrivData) { assert(parPrivData); auto* data = static_cast(parPrivData); - const auto old_count = data->pending_futures.fetch_add(-1); - assert(old_count > 0); - if (old_count == g_max_redis_unanswered_commands) - data->send_command_condition.notify_one(); + { + const auto old_count = data->pending_futures.fetch_add(-1); + assert(old_count > 0); + if (old_count == g_max_redis_unanswered_commands) + data->send_command_condition.notify_one(); + } if (parReply) { auto reply = make_redis_reply_type(static_cast(parReply)); - data->promise.set_value(std::move(reply)); + assert(data->reply_ptr); + *data->reply_ptr = std::move(reply); } else { assert(false); //Should this case also be managed? } + { + const auto old_value = data->local_pending_futures.fetch_add(-1); + assert(old_value > 0); + if (1 == old_value) + data->local_commands_condition.notify_one(); + } delete data; } @@ -115,13 +127,19 @@ namespace redis { struct Batch::LocalData { explicit LocalData ( ThreadContext& parThreadContext ) : free_cmd_slot(), + no_more_pending_futures(), futures_mutex(), + pending_futures_mutex(), + local_pending_futures(0), thread_context(parThreadContext) { } std::condition_variable free_cmd_slot; + std::condition_variable no_more_pending_futures; std::mutex futures_mutex; + std::mutex pending_futures_mutex; + std::atomic_size_t local_pending_futures; ThreadContext& thread_context; }; @@ -149,8 +167,9 @@ namespace redis { assert(parLengths); //This /could/ be null, but I don't see why it should assert(m_local_data); + m_local_data->local_pending_futures.fetch_add(1); const auto pending_futures = m_local_data->thread_context.pending_futures.fetch_add(1); - auto* data = new HiredisCallbackData(m_local_data->thread_context.pending_futures, m_local_data->free_cmd_slot); + auto* data = new HiredisCallbackData(m_local_data->thread_context.pending_futures, m_local_data->local_pending_futures, m_local_data->free_cmd_slot, m_local_data->no_more_pending_futures); #if defined(VERBOSE_HIREDIS_COMM) std::cout << "run_pvt(), " << pending_futures << " items pending... "; @@ -166,7 +185,9 @@ namespace redis { std::cout << " emplace_back(future)... "; #endif - m_futures.emplace_back(data->promise.get_future()); + std::unique_ptr new_reply(new Reply); + data->reply_ptr = new_reply.get(); + m_futures.emplace_back(std::move(new_reply)); { std::lock_guard lock(m_async_conn->event_mutex()); const int command_added = redisAsyncCommandArgv(m_async_conn->connection(), &hiredis_run_callback, data, parArgc, parArgv, parLengths); @@ -185,10 +206,19 @@ namespace redis { } const std::vector& Batch::replies() { + return replies_nonconst(); + } + + std::vector& Batch::replies_nonconst() { if (not replies_requested()) { + if (m_local_data->local_pending_futures > 0) { + std::unique_lock u_lock(m_local_data->pending_futures_mutex); + m_local_data->no_more_pending_futures.wait(u_lock, [this]() { return m_local_data->local_pending_futures == 0; }); + } + m_replies.reserve(m_futures.size()); - for (auto& fut : m_futures) { - m_replies.emplace_back(fut.get()); + for (auto& itm : m_futures) { + m_replies.emplace_back(std::move(*itm)); } auto empty_vec = std::move(m_futures); @@ -217,7 +247,7 @@ namespace redis { } assert(m_local_data); - assert(0 == m_local_data->thread_context.pending_futures); + assert(0 == m_local_data->local_pending_futures); m_futures.clear(); m_replies.clear(); } diff --git a/src/incredis.cpp b/src/incredis.cpp index f3efdea..60b1bed 100644 --- a/src/incredis.cpp +++ b/src/incredis.cpp @@ -40,7 +40,7 @@ namespace redis { return boost::none; } else { - auto replies = get_array(parReply); + auto replies(get_array(parReply)); IncRedis::opt_string_list::value_type retval; retval.reserve(replies.size()); for (const auto& rep : replies) {