mirror of
https://github.com/KingDuckZ/dindexer.git
synced 2025-02-20 12:14:55 +00:00
Use async hiredis API.
This commit is contained in:
parent
cdae333e1f
commit
92ef212e44
5 changed files with 273 additions and 83 deletions
|
@ -7,6 +7,7 @@ add_library(${PROJECT_NAME} SHARED
|
||||||
command.cpp
|
command.cpp
|
||||||
scan_iterator.cpp
|
scan_iterator.cpp
|
||||||
reply.cpp
|
reply.cpp
|
||||||
|
batch.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
target_include_directories(${PROJECT_NAME} SYSTEM
|
target_include_directories(${PROJECT_NAME} SYSTEM
|
||||||
|
|
159
src/backends/redis/batch.cpp
Normal file
159
src/backends/redis/batch.cpp
Normal file
|
@ -0,0 +1,159 @@
|
||||||
|
/* Copyright 2015, 2016, Michele Santullo
|
||||||
|
* This file is part of "dindexer".
|
||||||
|
*
|
||||||
|
* "dindexer" is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* "dindexer" is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with "dindexer". If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "batch.hpp"
|
||||||
|
#include <hiredis/hiredis.h>
|
||||||
|
#include <hiredis/async.h>
|
||||||
|
#include <cassert>
|
||||||
|
#include <future>
|
||||||
|
#include <ciso646>
|
||||||
|
#include <boost/iterator/transform_iterator.hpp>
|
||||||
|
#include <atomic>
|
||||||
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
namespace redis {
|
||||||
|
namespace {
|
||||||
|
const std::size_t g_max_redis_unanswered_commands = 1000;
|
||||||
|
|
||||||
|
struct HiredisCallbackData {
|
||||||
|
HiredisCallbackData ( std::atomic_size_t& parPendingFutures, std::condition_variable& parSendCmdCond ) :
|
||||||
|
promise(),
|
||||||
|
pending_futures(parPendingFutures),
|
||||||
|
send_command_condition(parSendCmdCond)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
std::promise<Reply> promise;
|
||||||
|
std::atomic_size_t& pending_futures;
|
||||||
|
std::condition_variable& send_command_condition;
|
||||||
|
};
|
||||||
|
|
||||||
|
Reply make_redis_reply_type (redisReply* parReply) {
|
||||||
|
using boost::transform_iterator;
|
||||||
|
using PtrToReplyIterator = transform_iterator<Reply(*)(redisReply*), redisReply**>;
|
||||||
|
|
||||||
|
switch (parReply->type) {
|
||||||
|
case REDIS_REPLY_INTEGER:
|
||||||
|
return parReply->integer;
|
||||||
|
case REDIS_REPLY_STRING:
|
||||||
|
return std::string(parReply->str, parReply->len);
|
||||||
|
case REDIS_REPLY_ARRAY:
|
||||||
|
return std::vector<Reply>(
|
||||||
|
PtrToReplyIterator(parReply->element, &make_redis_reply_type),
|
||||||
|
PtrToReplyIterator(parReply->element + parReply->elements, &make_redis_reply_type)
|
||||||
|
);
|
||||||
|
case REDIS_REPLY_ERROR:
|
||||||
|
return ErrorString(parReply->str, parReply->len);
|
||||||
|
default:
|
||||||
|
return Reply();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
void hiredis_run_callback (redisAsyncContext*, void* parReply, void* parPrivData) {
|
||||||
|
assert(parPrivData);
|
||||||
|
auto* data = static_cast<HiredisCallbackData*>(parPrivData);
|
||||||
|
const auto old_count = data->pending_futures.fetch_add(-1);
|
||||||
|
assert(old_count > 0);
|
||||||
|
if (old_count == g_max_redis_unanswered_commands)
|
||||||
|
data->send_command_condition.notify_one();
|
||||||
|
|
||||||
|
if (parReply) {
|
||||||
|
auto reply = make_redis_reply_type(static_cast<redisReply*>(parReply));
|
||||||
|
data->promise.set_value(std::move(reply));
|
||||||
|
}
|
||||||
|
|
||||||
|
delete data;
|
||||||
|
}
|
||||||
|
} //unnamed namespace
|
||||||
|
|
||||||
|
struct Batch::LocalData {
|
||||||
|
LocalData ( void ) :
|
||||||
|
free_cmd_slot(),
|
||||||
|
futures_mutex(),
|
||||||
|
pending_futures(0)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
std::condition_variable free_cmd_slot;
|
||||||
|
std::mutex futures_mutex;
|
||||||
|
std::atomic_size_t pending_futures;
|
||||||
|
};
|
||||||
|
|
||||||
|
Batch::Batch (Batch&&) = default;
|
||||||
|
|
||||||
|
Batch::Batch (redisAsyncContext* parContext) :
|
||||||
|
m_futures(),
|
||||||
|
m_replies(),
|
||||||
|
m_local_data(new LocalData),
|
||||||
|
m_context(parContext)
|
||||||
|
{
|
||||||
|
assert(m_context);
|
||||||
|
}
|
||||||
|
|
||||||
|
Batch::~Batch() noexcept {
|
||||||
|
}
|
||||||
|
|
||||||
|
void Batch::run_pvt (int parArgc, const char** parArgv, std::size_t* parLengths) {
|
||||||
|
assert(not replies_requested());
|
||||||
|
assert(parArgc >= 1);
|
||||||
|
assert(parArgv);
|
||||||
|
assert(parLengths); //This /could/ be null, but I don't see why it should
|
||||||
|
|
||||||
|
const auto pending_futures = m_local_data->pending_futures.fetch_add(1);
|
||||||
|
auto* data = new HiredisCallbackData(m_local_data->pending_futures, m_local_data->free_cmd_slot);
|
||||||
|
|
||||||
|
std::cout << "run_pvt(), " << pending_futures << " items pending... ";
|
||||||
|
if (pending_futures >= g_max_redis_unanswered_commands) {
|
||||||
|
std::cout << " waiting... ";
|
||||||
|
std::unique_lock<std::mutex> u_lock(m_local_data->futures_mutex);
|
||||||
|
m_local_data->free_cmd_slot.wait(u_lock, [this]() { return m_local_data->pending_futures < g_max_redis_unanswered_commands; });
|
||||||
|
}
|
||||||
|
std::cout << "command sent to hiredis" << std::endl;
|
||||||
|
|
||||||
|
m_futures.push_back(data->promise.get_future());
|
||||||
|
const int command_added = redisAsyncCommandArgv(m_context, &hiredis_run_callback, data, parArgc, parArgv, parLengths);
|
||||||
|
assert(REDIS_OK == command_added); // REDIS_ERR if error
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Batch::replies_requested() const {
|
||||||
|
return not m_replies.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
const std::vector<Reply>& Batch::replies() {
|
||||||
|
if (not replies_requested()) {
|
||||||
|
m_replies.reserve(m_futures.size());
|
||||||
|
for (auto& fut : m_futures) {
|
||||||
|
m_replies.push_back(fut.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
auto empty_vec = std::move(m_futures);
|
||||||
|
}
|
||||||
|
return m_replies;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Batch::throw_if_failed() {
|
||||||
|
const auto& rep = replies();
|
||||||
|
assert(false); //not implemented
|
||||||
|
}
|
||||||
|
|
||||||
|
RedisError::RedisError (const char* parMessage, std::size_t parLength) :
|
||||||
|
std::runtime_error(std::string(parMessage, parLength))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
} //namespace redis
|
95
src/backends/redis/batch.hpp
Normal file
95
src/backends/redis/batch.hpp
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
/* Copyright 2015, 2016, Michele Santullo
|
||||||
|
* This file is part of "dindexer".
|
||||||
|
*
|
||||||
|
* "dindexer" is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* "dindexer" is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with "dindexer". If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef idD81C81D99196491A8C9B68DED8ADD260
|
||||||
|
#define idD81C81D99196491A8C9B68DED8ADD260
|
||||||
|
|
||||||
|
#include "reply.hpp"
|
||||||
|
#include "arg_to_bin_safe.hpp"
|
||||||
|
#include <vector>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
struct redisAsyncContext;
|
||||||
|
|
||||||
|
namespace std {
|
||||||
|
template <class R> class future;
|
||||||
|
template <class T> struct atomic;
|
||||||
|
} //namespace std
|
||||||
|
|
||||||
|
namespace redis {
|
||||||
|
class Command;
|
||||||
|
|
||||||
|
class Batch {
|
||||||
|
friend class Command;
|
||||||
|
public:
|
||||||
|
Batch ( Batch&& parOther );
|
||||||
|
Batch ( const Batch& ) = delete;
|
||||||
|
~Batch ( void ) noexcept;
|
||||||
|
|
||||||
|
const std::vector<Reply>& replies ( void );
|
||||||
|
bool replies_requested ( void ) const;
|
||||||
|
void throw_if_failed ( void );
|
||||||
|
|
||||||
|
template <typename... Args>
|
||||||
|
Batch& run ( const char* parCommand, Args&&... parArgs );
|
||||||
|
|
||||||
|
template <typename... Args>
|
||||||
|
Batch& operator() ( const char* parCommand, Args&&... parArgs );
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct LocalData;
|
||||||
|
|
||||||
|
explicit Batch ( redisAsyncContext* parContext );
|
||||||
|
void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths );
|
||||||
|
|
||||||
|
std::vector<std::future<Reply>> m_futures;
|
||||||
|
std::vector<Reply> m_replies;
|
||||||
|
std::unique_ptr<LocalData> m_local_data;
|
||||||
|
redisAsyncContext* m_context;
|
||||||
|
};
|
||||||
|
|
||||||
|
class RedisError : public std::runtime_error {
|
||||||
|
public:
|
||||||
|
RedisError ( const char* parMessage, std::size_t parLength );
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename... Args>
|
||||||
|
Batch& Batch::run (const char* parCommand, Args&&... parArgs) {
|
||||||
|
constexpr const std::size_t arg_count = sizeof...(Args) + 1;
|
||||||
|
using CharPointerArray = std::array<const char*, arg_count>;
|
||||||
|
using LengthArray = std::array<std::size_t, arg_count>;
|
||||||
|
using implem::arg_to_bin_safe_char;
|
||||||
|
using implem::arg_to_bin_safe_length;
|
||||||
|
using implem::MakeCharInfo;
|
||||||
|
using boost::string_ref;
|
||||||
|
|
||||||
|
this->run_pvt(
|
||||||
|
static_cast<int>(arg_count),
|
||||||
|
CharPointerArray{ (arg_to_bin_safe_char(string_ref(parCommand))), MakeCharInfo<typename std::remove_const<typename std::remove_reference<Args>::type>::type>(std::forward<Args>(parArgs)).data()... }.data(),
|
||||||
|
LengthArray{ arg_to_bin_safe_length(string_ref(parCommand)), arg_to_bin_safe_length(std::forward<Args>(parArgs))... }.data()
|
||||||
|
);
|
||||||
|
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename... Args>
|
||||||
|
Batch& Batch::operator() (const char* parCommand, Args&&... parArgs) {
|
||||||
|
return this->run(parCommand, std::forward<Args>(parArgs)...);
|
||||||
|
}
|
||||||
|
} //namespace redis
|
||||||
|
|
||||||
|
#endif
|
|
@ -17,41 +17,19 @@
|
||||||
|
|
||||||
#include "command.hpp"
|
#include "command.hpp"
|
||||||
#include <hiredis/hiredis.h>
|
#include <hiredis/hiredis.h>
|
||||||
|
#include <hiredis/async.h>
|
||||||
#include <ciso646>
|
#include <ciso646>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <boost/iterator/transform_iterator.hpp>
|
|
||||||
|
|
||||||
namespace redis {
|
namespace redis {
|
||||||
namespace {
|
namespace {
|
||||||
using RedisReply = std::unique_ptr<redisReply, void(*)(void*)>;
|
|
||||||
|
|
||||||
Reply make_redis_reply_type (redisReply* parReply) {
|
|
||||||
using boost::transform_iterator;
|
|
||||||
using PtrToReplyIterator = transform_iterator<Reply(*)(redisReply*), redisReply**>;
|
|
||||||
|
|
||||||
switch (parReply->type) {
|
|
||||||
case REDIS_REPLY_INTEGER:
|
|
||||||
return parReply->integer;
|
|
||||||
case REDIS_REPLY_STRING:
|
|
||||||
return std::string(parReply->str, parReply->len);
|
|
||||||
case REDIS_REPLY_ARRAY:
|
|
||||||
return std::vector<Reply>(
|
|
||||||
PtrToReplyIterator(parReply->element, &make_redis_reply_type),
|
|
||||||
PtrToReplyIterator(parReply->element + parReply->elements, &make_redis_reply_type)
|
|
||||||
);
|
|
||||||
case REDIS_REPLY_ERROR:
|
|
||||||
return ErrorString(parReply->str, parReply->len);
|
|
||||||
default:
|
|
||||||
return Reply();
|
|
||||||
};
|
|
||||||
}
|
|
||||||
} //unnamed namespace
|
} //unnamed namespace
|
||||||
|
|
||||||
Command::Command (std::string&& parAddress, uint16_t parPort) :
|
Command::Command (std::string&& parAddress, uint16_t parPort) :
|
||||||
m_conn(nullptr, &redisFree),
|
m_conn(nullptr, &redisAsyncDisconnect),
|
||||||
m_address(std::move(parAddress)),
|
m_address(std::move(parAddress)),
|
||||||
m_port(parPort)
|
m_port(parPort)
|
||||||
{
|
{
|
||||||
|
@ -62,10 +40,9 @@ namespace redis {
|
||||||
|
|
||||||
void Command::connect() {
|
void Command::connect() {
|
||||||
if (not m_conn) {
|
if (not m_conn) {
|
||||||
struct timeval timeout = {5, 500000}; //5.5 seconds?
|
|
||||||
RedisConnection conn(
|
RedisConnection conn(
|
||||||
redisConnectWithTimeout(m_address.c_str(), m_port, timeout),
|
redisAsyncConnect(m_address.c_str(), m_port),
|
||||||
&redisFree
|
&redisAsyncDisconnect
|
||||||
);
|
);
|
||||||
if (not conn) {
|
if (not conn) {
|
||||||
std::ostringstream oss;
|
std::ostringstream oss;
|
||||||
|
@ -86,44 +63,6 @@ namespace redis {
|
||||||
m_conn.reset();
|
m_conn.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
Reply Command::run_pvt (int parArgc, const char** parArgv, std::size_t* parLengths) {
|
|
||||||
assert(parArgc >= 1);
|
|
||||||
assert(parArgv);
|
|
||||||
assert(parLengths); //This /could/ be null, but I don't see why it should
|
|
||||||
assert(is_connected());
|
|
||||||
|
|
||||||
RedisReply reply(
|
|
||||||
static_cast<redisReply*>(redisCommandArgv(m_conn.get(), parArgc, parArgv, parLengths)),
|
|
||||||
&freeReplyObject
|
|
||||||
);
|
|
||||||
|
|
||||||
return make_redis_reply_type(reply.get());
|
|
||||||
|
|
||||||
//std::string key;
|
|
||||||
//{
|
|
||||||
// std::ostringstream key_oss;
|
|
||||||
// RedisReply incr_reply(static_cast<redisReply*>(redisCommand(m_conn.get(), "incr set_counter")), &freeReplyObject);
|
|
||||||
// key_oss << "sets:" << incr_reply->integer;
|
|
||||||
// key = key_oss.str();
|
|
||||||
//}
|
|
||||||
|
|
||||||
//RedisReply insert_reply(
|
|
||||||
// static_cast<redisReply*>(redisCommand(
|
|
||||||
// m_conn.get(),
|
|
||||||
// "hmset %b name %b disk_label %b fs_uuid %b",
|
|
||||||
// key.data(),
|
|
||||||
// key.size(),
|
|
||||||
// parSetData.name.data(),
|
|
||||||
// parSetData.name.size(),
|
|
||||||
// parSetData.disk_label.data(),
|
|
||||||
// parSetData.disk_label.size(),
|
|
||||||
// parSetData.fs_uuid.data(),
|
|
||||||
// parSetData.fs_uuid.size()
|
|
||||||
// )),
|
|
||||||
// &freeReplyObject
|
|
||||||
//);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Command::is_connected() const {
|
bool Command::is_connected() const {
|
||||||
return m_conn and not m_conn->err;
|
return m_conn and not m_conn->err;
|
||||||
}
|
}
|
||||||
|
@ -143,4 +82,9 @@ namespace redis {
|
||||||
auto Command::zscan (boost::string_ref parKey) -> zscan_range {
|
auto Command::zscan (boost::string_ref parKey) -> zscan_range {
|
||||||
return zscan_range(zscan_iterator(this, parKey, false), zscan_iterator(this, parKey, true));
|
return zscan_range(zscan_iterator(this, parKey, false), zscan_iterator(this, parKey, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Batch Command::make_batch() {
|
||||||
|
assert(is_connected());
|
||||||
|
return Batch(m_conn.get());
|
||||||
|
}
|
||||||
} //namespace redis
|
} //namespace redis
|
||||||
|
|
|
@ -18,9 +18,9 @@
|
||||||
#ifndef idD83EEBFC927840C6B9F32D61A1D1E582
|
#ifndef idD83EEBFC927840C6B9F32D61A1D1E582
|
||||||
#define idD83EEBFC927840C6B9F32D61A1D1E582
|
#define idD83EEBFC927840C6B9F32D61A1D1E582
|
||||||
|
|
||||||
#include "arg_to_bin_safe.hpp"
|
|
||||||
#include "scan_iterator.hpp"
|
#include "scan_iterator.hpp"
|
||||||
#include "reply.hpp"
|
#include "reply.hpp"
|
||||||
|
#include "batch.hpp"
|
||||||
#include <array>
|
#include <array>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
@ -33,7 +33,7 @@
|
||||||
#include <boost/utility/string_ref.hpp>
|
#include <boost/utility/string_ref.hpp>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
|
||||||
struct redisContext;
|
struct redisAsyncContext;
|
||||||
|
|
||||||
namespace redis {
|
namespace redis {
|
||||||
class Command {
|
class Command {
|
||||||
|
@ -55,6 +55,8 @@ namespace redis {
|
||||||
|
|
||||||
bool is_connected ( void ) const;
|
bool is_connected ( void ) const;
|
||||||
|
|
||||||
|
Batch make_batch ( void );
|
||||||
|
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
Reply run ( const char* parCommand, Args&&... parArgs );
|
Reply run ( const char* parCommand, Args&&... parArgs );
|
||||||
|
|
||||||
|
@ -65,9 +67,7 @@ namespace redis {
|
||||||
zscan_range zscan ( boost::string_ref parKey );
|
zscan_range zscan ( boost::string_ref parKey );
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using RedisConnection = std::unique_ptr<redisContext, void(*)(redisContext*)>;
|
using RedisConnection = std::unique_ptr<redisAsyncContext, void(*)(redisAsyncContext*)>;
|
||||||
|
|
||||||
Reply run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths );
|
|
||||||
|
|
||||||
RedisConnection m_conn;
|
RedisConnection m_conn;
|
||||||
std::string m_address;
|
std::string m_address;
|
||||||
|
@ -76,19 +76,10 @@ namespace redis {
|
||||||
|
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
Reply Command::run (const char* parCommand, Args&&... parArgs) {
|
Reply Command::run (const char* parCommand, Args&&... parArgs) {
|
||||||
constexpr const std::size_t arg_count = sizeof...(Args) + 1;
|
auto batch = make_batch();
|
||||||
using CharPointerArray = std::array<const char*, arg_count>;
|
batch.run(parCommand, std::forward<Args>(parArgs)...);
|
||||||
using LengthArray = std::array<std::size_t, arg_count>;
|
batch.throw_if_failed();
|
||||||
using implem::arg_to_bin_safe_char;
|
return batch.replies().front();
|
||||||
using implem::arg_to_bin_safe_length;
|
|
||||||
using implem::MakeCharInfo;
|
|
||||||
using boost::string_ref;
|
|
||||||
|
|
||||||
return this->run_pvt(
|
|
||||||
static_cast<int>(arg_count),
|
|
||||||
CharPointerArray{ (arg_to_bin_safe_char(string_ref(parCommand))), MakeCharInfo<typename std::remove_const<typename std::remove_reference<Args>::type>::type>(std::forward<Args>(parArgs)).data()... }.data(),
|
|
||||||
LengthArray{ arg_to_bin_safe_length(string_ref(parCommand)), arg_to_bin_safe_length(std::forward<Args>(parArgs))... }.data()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
} //namespace redis
|
} //namespace redis
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue