1
0
Fork 0
mirror of https://github.com/KingDuckZ/incredis synced 2024-11-23 00:33:46 +00:00

Optimization - don't use futures, use simple pointers.

The local count atomic is used to tell if there are any unanswered
ex-futures yet.
This commit is contained in:
King_DuckZ 2016-12-05 20:45:07 +00:00
parent 930fde41c8
commit 657ecd63e2
5 changed files with 55 additions and 22 deletions

View file

@ -23,10 +23,6 @@
#include <vector> #include <vector>
#include <memory> #include <memory>
namespace std {
template <class R> class future;
} //namespace std
namespace redis { namespace redis {
class Command; class Command;
class AsyncConnection; class AsyncConnection;
@ -40,6 +36,7 @@ namespace redis {
~Batch ( void ) noexcept; ~Batch ( void ) noexcept;
const std::vector<Reply>& replies ( void ); const std::vector<Reply>& replies ( void );
std::vector<Reply>& replies_nonconst ( void );
bool replies_requested ( void ) const; bool replies_requested ( void ) const;
void throw_if_failed ( void ); void throw_if_failed ( void );
@ -57,7 +54,7 @@ namespace redis {
explicit Batch ( AsyncConnection* parConn, ThreadContext& parThreadContext ); explicit Batch ( AsyncConnection* parConn, ThreadContext& parThreadContext );
void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths ); void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths );
std::vector<std::future<Reply>> m_futures; std::vector<std::unique_ptr<Reply>> m_futures;
std::vector<Reply> m_replies; std::vector<Reply> m_replies;
std::unique_ptr<LocalData> m_local_data; std::unique_ptr<LocalData> m_local_data;
AsyncConnection* m_async_conn; AsyncConnection* m_async_conn;

View file

@ -65,7 +65,7 @@ namespace redis {
auto batch = make_batch(); auto batch = make_batch();
batch.run(parCommand, std::forward<Args>(parArgs)...); batch.run(parCommand, std::forward<Args>(parArgs)...);
batch.throw_if_failed(); batch.throw_if_failed();
return batch.replies().front(); return std::move(batch.replies_nonconst().front());
} }
template <typename T> template <typename T>

View file

@ -77,8 +77,14 @@ namespace redis {
Reply ( ErrorString&& parVal ) : base_class(std::move(parVal)) {} Reply ( ErrorString&& parVal ) : base_class(std::move(parVal)) {}
Reply ( StatusString&& parVal ) : base_class(std::move(parVal)) {} Reply ( StatusString&& parVal ) : base_class(std::move(parVal)) {}
Reply ( std::nullptr_t parVal ) : base_class(parVal) {} Reply ( std::nullptr_t parVal ) : base_class(parVal) {}
Reply ( Reply&& ) = default;
Reply ( const Reply& ) = default;
~Reply ( void ) noexcept = default; ~Reply ( void ) noexcept = default;
Reply& operator= ( Reply&& ) = default;
Reply& operator= ( const Reply& ) = default;
bool is_integer ( void ) const; bool is_integer ( void ) const;
bool is_string ( void ) const; bool is_string ( void ) const;
bool is_array ( void ) const; bool is_array ( void ) const;

View file

@ -21,7 +21,6 @@
#include <hiredis/hiredis.h> #include <hiredis/hiredis.h>
#include <hiredis/async.h> #include <hiredis/async.h>
#include <cassert> #include <cassert>
#include <future>
#include <ciso646> #include <ciso646>
#include <boost/iterator/transform_iterator.hpp> #include <boost/iterator/transform_iterator.hpp>
#include <mutex> #include <mutex>
@ -39,16 +38,20 @@ namespace redis {
const std::size_t g_max_redis_unanswered_commands = 1000; const std::size_t g_max_redis_unanswered_commands = 1000;
struct HiredisCallbackData { struct HiredisCallbackData {
HiredisCallbackData ( std::atomic_size_t& parPendingFutures, std::condition_variable& parSendCmdCond ) : HiredisCallbackData ( std::atomic_size_t& parPendingFutures, std::atomic_size_t& parLocalPendingFutures, std::condition_variable& parSendCmdCond, std::condition_variable& parLocalCmdsCond ) :
promise(),
pending_futures(parPendingFutures), pending_futures(parPendingFutures),
send_command_condition(parSendCmdCond) local_pending_futures(parLocalPendingFutures),
reply_ptr(nullptr),
send_command_condition(parSendCmdCond),
local_commands_condition(parLocalCmdsCond)
{ {
} }
std::promise<Reply> promise; Reply* reply_ptr;
std::atomic_size_t& pending_futures; std::atomic_size_t& pending_futures;
std::atomic_size_t& local_pending_futures;
std::condition_variable& send_command_condition; std::condition_variable& send_command_condition;
std::condition_variable& local_commands_condition;
}; };
Reply make_redis_reply_type (redisReply* parReply) { Reply make_redis_reply_type (redisReply* parReply) {
@ -80,19 +83,28 @@ namespace redis {
void hiredis_run_callback (redisAsyncContext*, void* parReply, void* parPrivData) { void hiredis_run_callback (redisAsyncContext*, void* parReply, void* parPrivData) {
assert(parPrivData); assert(parPrivData);
auto* data = static_cast<HiredisCallbackData*>(parPrivData); auto* data = static_cast<HiredisCallbackData*>(parPrivData);
const auto old_count = data->pending_futures.fetch_add(-1); {
assert(old_count > 0); const auto old_count = data->pending_futures.fetch_add(-1);
if (old_count == g_max_redis_unanswered_commands) assert(old_count > 0);
data->send_command_condition.notify_one(); if (old_count == g_max_redis_unanswered_commands)
data->send_command_condition.notify_one();
}
if (parReply) { if (parReply) {
auto reply = make_redis_reply_type(static_cast<redisReply*>(parReply)); auto reply = make_redis_reply_type(static_cast<redisReply*>(parReply));
data->promise.set_value(std::move(reply)); assert(data->reply_ptr);
*data->reply_ptr = std::move(reply);
} }
else { else {
assert(false); //Should this case also be managed? assert(false); //Should this case also be managed?
} }
{
const auto old_value = data->local_pending_futures.fetch_add(-1);
assert(old_value > 0);
if (1 == old_value)
data->local_commands_condition.notify_one();
}
delete data; delete data;
} }
@ -115,13 +127,19 @@ namespace redis {
struct Batch::LocalData { struct Batch::LocalData {
explicit LocalData ( ThreadContext& parThreadContext ) : explicit LocalData ( ThreadContext& parThreadContext ) :
free_cmd_slot(), free_cmd_slot(),
no_more_pending_futures(),
futures_mutex(), futures_mutex(),
pending_futures_mutex(),
local_pending_futures(0),
thread_context(parThreadContext) thread_context(parThreadContext)
{ {
} }
std::condition_variable free_cmd_slot; std::condition_variable free_cmd_slot;
std::condition_variable no_more_pending_futures;
std::mutex futures_mutex; std::mutex futures_mutex;
std::mutex pending_futures_mutex;
std::atomic_size_t local_pending_futures;
ThreadContext& thread_context; ThreadContext& thread_context;
}; };
@ -149,8 +167,9 @@ namespace redis {
assert(parLengths); //This /could/ be null, but I don't see why it should assert(parLengths); //This /could/ be null, but I don't see why it should
assert(m_local_data); assert(m_local_data);
m_local_data->local_pending_futures.fetch_add(1);
const auto pending_futures = m_local_data->thread_context.pending_futures.fetch_add(1); const auto pending_futures = m_local_data->thread_context.pending_futures.fetch_add(1);
auto* data = new HiredisCallbackData(m_local_data->thread_context.pending_futures, m_local_data->free_cmd_slot); auto* data = new HiredisCallbackData(m_local_data->thread_context.pending_futures, m_local_data->local_pending_futures, m_local_data->free_cmd_slot, m_local_data->no_more_pending_futures);
#if defined(VERBOSE_HIREDIS_COMM) #if defined(VERBOSE_HIREDIS_COMM)
std::cout << "run_pvt(), " << pending_futures << " items pending... "; std::cout << "run_pvt(), " << pending_futures << " items pending... ";
@ -166,7 +185,9 @@ namespace redis {
std::cout << " emplace_back(future)... "; std::cout << " emplace_back(future)... ";
#endif #endif
m_futures.emplace_back(data->promise.get_future()); std::unique_ptr<Reply> new_reply(new Reply);
data->reply_ptr = new_reply.get();
m_futures.emplace_back(std::move(new_reply));
{ {
std::lock_guard<std::mutex> lock(m_async_conn->event_mutex()); std::lock_guard<std::mutex> lock(m_async_conn->event_mutex());
const int command_added = redisAsyncCommandArgv(m_async_conn->connection(), &hiredis_run_callback, data, parArgc, parArgv, parLengths); const int command_added = redisAsyncCommandArgv(m_async_conn->connection(), &hiredis_run_callback, data, parArgc, parArgv, parLengths);
@ -185,10 +206,19 @@ namespace redis {
} }
const std::vector<Reply>& Batch::replies() { const std::vector<Reply>& Batch::replies() {
return replies_nonconst();
}
std::vector<Reply>& Batch::replies_nonconst() {
if (not replies_requested()) { if (not replies_requested()) {
if (m_local_data->local_pending_futures > 0) {
std::unique_lock<std::mutex> u_lock(m_local_data->pending_futures_mutex);
m_local_data->no_more_pending_futures.wait(u_lock, [this]() { return m_local_data->local_pending_futures == 0; });
}
m_replies.reserve(m_futures.size()); m_replies.reserve(m_futures.size());
for (auto& fut : m_futures) { for (auto& itm : m_futures) {
m_replies.emplace_back(fut.get()); m_replies.emplace_back(std::move(*itm));
} }
auto empty_vec = std::move(m_futures); auto empty_vec = std::move(m_futures);
@ -217,7 +247,7 @@ namespace redis {
} }
assert(m_local_data); assert(m_local_data);
assert(0 == m_local_data->thread_context.pending_futures); assert(0 == m_local_data->local_pending_futures);
m_futures.clear(); m_futures.clear();
m_replies.clear(); m_replies.clear();
} }

View file

@ -40,7 +40,7 @@ namespace redis {
return boost::none; return boost::none;
} }
else { else {
auto replies = get_array(parReply); auto replies(get_array(parReply));
IncRedis::opt_string_list::value_type retval; IncRedis::opt_string_list::value_type retval;
retval.reserve(replies.size()); retval.reserve(replies.size());
for (const auto& rep : replies) { for (const auto& rep : replies) {