1
0
Fork 0
mirror of https://github.com/KingDuckZ/dindexer.git synced 2024-11-25 00:53:43 +00:00

Add locking so async communication works correctly.

helgrind says it works. I think.
This commit is contained in:
King_DuckZ 2016-06-29 22:51:45 +01:00
parent 7f25fdb37c
commit cf9ac6b296
12 changed files with 637 additions and 106 deletions

View file

@ -0,0 +1,31 @@
# Try to find libev
# Once done, this will define
#
# LIBEV_FOUND - system has libev
# LIBEV_INCLUDE_DIRS - libev include directories
# LIBEV_LIBRARIES - libraries needed to use libev
if(LIBEV_INCLUDE_DIRS AND LIBEV_LIBRARIES)
set(LIBEV_FIND_QUIETLY TRUE)
else()
find_path(
LIBEV_INCLUDE_DIR
NAMES ev.h
HINTS ${LIBEV_ROOT_DIR}
PATH_SUFFIXES include)
find_library(
LIBEV_LIBRARY
NAME ev
HINTS ${LIBEV_ROOT_DIR}
PATH_SUFFIXES ${CMAKE_INSTALL_LIBDIR})
set(LIBEV_INCLUDE_DIRS ${LIBEV_INCLUDE_DIR})
set(LIBEV_LIBRARIES ${LIBEV_LIBRARY})
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(
libev DEFAULT_MSG LIBEV_LIBRARY LIBEV_INCLUDE_DIR)
mark_as_advanced(LIBEV_LIBRARY LIBEV_INCLUDE_DIR)
endif()

View file

@ -0,0 +1,166 @@
[TOC]
# Using hiredis+libev in a separate thread for events
Used sources:
1. [stackoverflow.com/questions/14621261][link_so_1]
2. [stackoverflow.com/questions/8611126][link_so_2]
3. [libev thread locking example][link_ev_doc]
## Explanation
### Link 1
Possibly due to the original question asked, [Link 1][link_so_1] seems to show that you need to have at least two ev_loop objects created in order to run one in a separate thread. The code on that page:
```cpp
//This program is demo for using pthreads with libev.
//Try using Timeout values as large as 1.0 and as small as 0.000001
//and notice the difference in the output
//(c) 2009 debuguo
//(c) 2013 enthusiasticgeek for stack overflow
//Free to distribute and improve the code. Leave credits intact
#include <ev.h>
#include <stdio.h> // for puts
#include <stdlib.h>
#include <pthread.h>
pthread_mutex_t lock;
double timeout = 0.00001;
ev_timer timeout_watcher;
int timeout_count = 0;
ev_async async_watcher;
int async_count = 0;
struct ev_loop* loop2;
void* loop2thread(void* args)
{
printf("Inside loop 2"); // Here one could initiate another timeout watcher
ev_loop(loop2, 0); // similar to the main loop - call it say timeout_cb1
return NULL;
}
static void async_cb (EV_P_ ev_async *w, int revents)
{
//puts ("async ready");
pthread_mutex_lock(&lock); //Don't forget locking
++async_count;
printf("async = %d, timeout = %d \n", async_count, timeout_count);
pthread_mutex_unlock(&lock); //Don't forget unlocking
}
static void timeout_cb (EV_P_ ev_timer *w, int revents) // Timer callback function
{
//puts ("timeout");
if (ev_async_pending(&async_watcher)==false) { //the event has not yet been processed (or even noted) by the event loop? (i.e. Is it serviced? If yes then proceed to)
ev_async_send(loop2, &async_watcher); //Sends/signals/activates the given ev_async watcher, that is, feeds an EV_ASYNC event on the watcher into the event loop.
}
pthread_mutex_lock(&lock); //Don't forget locking
++timeout_count;
pthread_mutex_unlock(&lock); //Don't forget unlocking
w->repeat = timeout;
ev_timer_again(loop, &timeout_watcher); //Start the timer again.
}
int main (int argc, char** argv)
{
if (argc < 2) {
puts("Timeout value missing.\n./demo <timeout>");
return -1;
}
timeout = atof(argv[1]);
struct ev_loop *loop = EV_DEFAULT; //or ev_default_loop (0);
//Initialize pthread
pthread_mutex_init(&lock, NULL);
pthread_t thread;
// This loop sits in the pthread
loop2 = ev_loop_new(0);
//This block is specifically used pre-empting thread (i.e. temporary interruption and suspension of a task, without asking for its cooperation, with the intention to resume that task later.)
//This takes into account thread safety
ev_async_init(&async_watcher, async_cb);
ev_async_start(loop2, &async_watcher);
pthread_create(&thread, NULL, loop2thread, NULL);
ev_timer_init (&timeout_watcher, timeout_cb, timeout, 0.); // Non repeating timer. The timer starts repeating in the timeout callback function
ev_timer_start (loop, &timeout_watcher);
// now wait for events to arrive
ev_loop(loop, 0);
//Wait on threads for execution
pthread_join(thread, NULL);
pthread_mutex_destroy(&lock);
return 0;
}
```
with the comment *"Note for libev 4+ ev_loop should be ev_run."* is still slightly useful but it shouldn't be taken as a model.
### Link 2
The [second link][link_so_2] is what shows that one ev_loop is enough. This is the code provided by the original poster:
```cpp
void RedisSubscriber::Start() {
m_redis = redisAsyncConnect(m_addr.c_str(),m_port);
m_redis->data = (void*)this;
m_loop = ev_loop_new(EVFLAG_NOINOTIFY);
redisLibevAttach(m_loop, m_redis);
redisAsyncSetConnectCallback(m_redis,connectCallback);
redisAsyncSetDisconnectCallback(m_redis,disconnectCallback);
redisAsyncCommand(m_redis, subscribeCallback, NULL, "SUBSCRIBE %s", m_channel.c_str());
m_thread = boost::thread(ev_loop,m_loop,0);
}
void RedisSubscriber::Stop() {
redisAsyncFree(m_redis);
m_thread.join();
m_redis = 0;
}
void RedisSubscriber::connectCallback(const redisAsyncContext *c) {
}
void RedisSubscriber::disconnectCallback(const redisAsyncContext *c, int status) {
RedisSubscriber* r = (RedisSubscriber*)(c->data);
ev_unloop(r->m_loop,EVUNLOOP_ALL);
}
void RedisSubscriber::subscribeCallback(redisAsyncContext *c, void *r, void *privdata) {
}
```
There are no accepted answers, but the answer from *themoondothshine* provides very useful info. Here is what it says:
Assuming that you mean ev_run for your boost::thread, here's what you can do:
1. Setup an `ev_async`
2. In the callback of `ev_async` call `ev_break`.
3. Call `ev_async_send` from `RedisSubscriber::Stop()`. `ev_async` watchers are thread-safe -- it uses memory barriers for synchronising between threads.
This will cause the event loop to stop, and `m_thread.join()` will return.
### Link 3
The [THREAD LOCKING EXAMPLE][link_ev_doc] shows how to lock in order to protect the ev_loop object in use.
[link_so_1]: http://stackoverflow.com/questions/14621261/using-libev-with-multiple-threads#14779930
[link_so_2]: http://stackoverflow.com/questions/8611126/hiredis-libev-and-boostthreads
[link_ev_doc]: http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#THREAD_LOCKING_EXAMPLE

View file

@ -2,6 +2,7 @@ project(${bare_name}-backend-redis CXX)
find_package(hiredis 0.11.0 REQUIRED) find_package(hiredis 0.11.0 REQUIRED)
find_package(CryptoPP 5.6) find_package(CryptoPP 5.6)
find_package(libev 4.0 REQUIRED)
add_library(${PROJECT_NAME} SHARED add_library(${PROJECT_NAME} SHARED
backend_redis.cpp backend_redis.cpp
@ -9,12 +10,15 @@ add_library(${PROJECT_NAME} SHARED
scan_iterator.cpp scan_iterator.cpp
reply.cpp reply.cpp
batch.cpp batch.cpp
script.cpp
script_manager.cpp
) )
target_include_directories(${PROJECT_NAME} SYSTEM target_include_directories(${PROJECT_NAME} SYSTEM
PUBLIC ${Boost_INCLUDE_DIRS} PUBLIC ${Boost_INCLUDE_DIRS}
PRIVATE ${HIREDIS_INCLUDE_DIRS} PRIVATE ${HIREDIS_INCLUDE_DIRS}
PRIVATE ${CMAKE_SOURCE_DIR}/lib/better-enums PRIVATE ${CMAKE_SOURCE_DIR}/lib/better-enums
PRIVATE ${LIBEV_INCLUDE_DIRS}
) )
target_include_directories(${PROJECT_NAME} target_include_directories(${PROJECT_NAME}
PRIVATE ${CMAKE_CURRENT_BINARY_DIR} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}
@ -24,6 +28,11 @@ target_link_libraries(${PROJECT_NAME}
PRIVATE ${bare_name}-inc PRIVATE ${bare_name}-inc
PRIVATE ${bare_name}-pq PRIVATE ${bare_name}-pq
PRIVATE ${HIREDIS_LIBRARIES} PRIVATE ${HIREDIS_LIBRARIES}
PRIVATE ${LIBEV_LIBRARIES}
)
target_compile_definitions(${PROJECT_NAME}
PRIVATE EV_COMPAT3=0
) )
if (CryptoPP_FOUND) if (CryptoPP_FOUND)

View file

@ -35,11 +35,10 @@ namespace dindb {
uint16_t database; uint16_t database;
}; };
std::pair<std::string, mchlib::FileRecordData> pair_list_to_file_record (const redis::Command::hscan_range& parRange, const mchlib::TigerHash& parHash) { std::pair<std::string, mchlib::FileRecordData> pair_list_to_file_record (const redis::Command::hscan_range& parRange) {
using dinhelp::lexical_cast; using dinhelp::lexical_cast;
mchlib::FileRecordData retval; mchlib::FileRecordData retval;
retval.hash = parHash;
std::array<std::string, 2> mime; std::array<std::string, 2> mime;
std::string group_key; std::string group_key;
@ -47,7 +46,7 @@ namespace dindb {
if (itm.first == "path") if (itm.first == "path")
retval.abs_path = itm.second; retval.abs_path = itm.second;
else if (itm.first == "hash") else if (itm.first == "hash")
assert(tiger_to_string(parHash) == itm.second); retval.hash = mchlib::string_to_tiger(itm.second);
else if (itm.first == "size") else if (itm.first == "size")
retval.size = lexical_cast<decltype(retval.size)>( retval.size = lexical_cast<decltype(retval.size)>(
itm.second); itm.second);
@ -124,6 +123,19 @@ namespace YAML {
}; };
} //namespace YAML } //namespace YAML
//namespace redis {
// template <>
// struct RedisStructAdapt<mchlib::FileRecordData> {
// static bool decode (const Command::hscan_range& parFrom, mchlib::FileRecordData& parOut) {
// return true;
// }
//
// static void encode (const Command::hscan_range& parFrom, mchlib::FileRecordData& parOut) {
// return true;
// }
// };
//}
namespace dindb { namespace dindb {
BackendRedis::BackendRedis(std::string &&parAddress, uint16_t parPort, uint16_t parDatabase, bool parConnect) : BackendRedis::BackendRedis(std::string &&parAddress, uint16_t parPort, uint16_t parDatabase, bool parConnect) :
m_redis(std::move(parAddress), parPort), m_redis(std::move(parAddress), parPort),
@ -140,9 +152,18 @@ namespace dindb {
using dinhelp::lexical_cast; using dinhelp::lexical_cast;
m_redis.connect(); m_redis.connect();
if (m_redis.is_connected() and m_database > 0) { m_redis.wait_for_connect();
m_redis.run("SELECT", lexical_cast<std::string>(m_database)); if (m_redis.is_connected()) {
m_redis.run("CLIENT", "SETNAME", PROGRAM_NAME "_v" STRINGIZE(VERSION_MAJOR) "." STRINGIZE(VERSION_MINOR) "." STRINGIZE(VERSION_PATCH)); auto batch = m_redis.make_batch();
batch.run("SELECT", lexical_cast<std::string>(m_database));
batch.run("CLIENT", "SETNAME", PROGRAM_NAME "_v" STRINGIZE(VERSION_MAJOR) "." STRINGIZE(VERSION_MINOR) "." STRINGIZE(VERSION_PATCH));
batch.throw_if_failed();
m_redis.submit_lua_script("return 42;");
}
else {
std::ostringstream oss;
oss << "Error connecting to Redis: " << m_redis.connection_error();
throw std::runtime_error(oss.str());
} }
} }
@ -186,6 +207,7 @@ namespace dindb {
"type", parSetData.type, "type", parSetData.type,
"content_type", parSetData.content_type "content_type", parSetData.content_type
); );
//m_redis.hmset(parSetData);
for (const auto& file_data : parData) { for (const auto& file_data : parData) {
redis::Reply file_id_reply = m_redis.run("HINCRBY", PROGRAM_NAME ":indices", "files", "1"); redis::Reply file_id_reply = m_redis.run("HINCRBY", PROGRAM_NAME ":indices", "files", "1");
@ -223,7 +245,7 @@ namespace dindb {
} }
else { else {
const auto result_id = redis::get_string(hash_reply); const auto result_id = redis::get_string(hash_reply);
auto set_key_and_file_item = pair_list_to_file_record(m_redis.hscan(result_id), parHash); auto set_key_and_file_item = pair_list_to_file_record(m_redis.hscan(result_id));
parItem = std::move(set_key_and_file_item.second); parItem = std::move(set_key_and_file_item.second);
const std::string group_key = std::move(set_key_and_file_item.first); const std::string group_key = std::move(set_key_and_file_item.first);

View file

@ -16,6 +16,7 @@
*/ */
#include "batch.hpp" #include "batch.hpp"
#include "command.hpp"
#include <hiredis/hiredis.h> #include <hiredis/hiredis.h>
#include <hiredis/async.h> #include <hiredis/async.h>
#include <cassert> #include <cassert>
@ -26,6 +27,7 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <iostream> #include <iostream>
#include <sstream>
namespace redis { namespace redis {
namespace { namespace {
@ -60,12 +62,14 @@ namespace redis {
); );
case REDIS_REPLY_ERROR: case REDIS_REPLY_ERROR:
return ErrorString(parReply->str, parReply->len); return ErrorString(parReply->str, parReply->len);
case REDIS_REPLY_STATUS:
return StatusString(parReply->str, parReply->len);
default: default:
assert(false); //not reached
return Reply(); return Reply();
}; };
} }
extern "C"
void hiredis_run_callback (redisAsyncContext*, void* parReply, void* parPrivData) { void hiredis_run_callback (redisAsyncContext*, void* parReply, void* parPrivData) {
assert(parPrivData); assert(parPrivData);
auto* data = static_cast<HiredisCallbackData*>(parPrivData); auto* data = static_cast<HiredisCallbackData*>(parPrivData);
@ -78,6 +82,9 @@ namespace redis {
auto reply = make_redis_reply_type(static_cast<redisReply*>(parReply)); auto reply = make_redis_reply_type(static_cast<redisReply*>(parReply));
data->promise.set_value(std::move(reply)); data->promise.set_value(std::move(reply));
} }
else {
assert(false); //Should this case also be managed?
}
delete data; delete data;
} }
@ -98,12 +105,14 @@ namespace redis {
Batch::Batch (Batch&&) = default; Batch::Batch (Batch&&) = default;
Batch::Batch (redisAsyncContext* parContext) : Batch::Batch (redisAsyncContext* parContext, Command* parCommand) :
m_futures(), m_futures(),
m_replies(), m_replies(),
m_local_data(new LocalData), m_local_data(new LocalData),
m_command(parCommand),
m_context(parContext) m_context(parContext)
{ {
assert(m_command);
assert(m_context); assert(m_context);
} }
@ -125,11 +134,16 @@ namespace redis {
std::unique_lock<std::mutex> u_lock(m_local_data->futures_mutex); std::unique_lock<std::mutex> u_lock(m_local_data->futures_mutex);
m_local_data->free_cmd_slot.wait(u_lock, [this]() { return m_local_data->pending_futures < g_max_redis_unanswered_commands; }); m_local_data->free_cmd_slot.wait(u_lock, [this]() { return m_local_data->pending_futures < g_max_redis_unanswered_commands; });
} }
std::cout << "command sent to hiredis" << std::endl; std::cout << " emplace_back(future)... ";
m_futures.push_back(data->promise.get_future()); m_futures.emplace_back(data->promise.get_future());
m_command->lock();
const int command_added = redisAsyncCommandArgv(m_context, &hiredis_run_callback, data, parArgc, parArgv, parLengths); const int command_added = redisAsyncCommandArgv(m_context, &hiredis_run_callback, data, parArgc, parArgv, parLengths);
m_command->unlock();
assert(REDIS_OK == command_added); // REDIS_ERR if error assert(REDIS_OK == command_added); // REDIS_ERR if error
std::cout << "command sent to hiredis" << std::endl;
m_command->wakeup_thread();
} }
bool Batch::replies_requested() const { bool Batch::replies_requested() const {
@ -140,7 +154,7 @@ namespace redis {
if (not replies_requested()) { if (not replies_requested()) {
m_replies.reserve(m_futures.size()); m_replies.reserve(m_futures.size());
for (auto& fut : m_futures) { for (auto& fut : m_futures) {
m_replies.push_back(fut.get()); m_replies.emplace_back(fut.get());
} }
auto empty_vec = std::move(m_futures); auto empty_vec = std::move(m_futures);
@ -149,8 +163,22 @@ namespace redis {
} }
void Batch::throw_if_failed() { void Batch::throw_if_failed() {
const auto& rep = replies(); std::ostringstream oss;
assert(false); //not implemented int err_count = 0;
const int max_reported_errors = 3;
oss << "Error in reply: ";
for (const auto& rep : replies()) {
if (rep.which() == RedisVariantType_Error) {
++err_count;
if (err_count <= max_reported_errors)
oss << '"' << get_error_string(rep).message() << "\" ";
}
}
if (err_count) {
oss << " (showing " << err_count << '/' << max_reported_errors << " errors on " << replies().size() << " total replies)";
throw std::runtime_error(oss.str());
}
} }
RedisError::RedisError (const char* parMessage, std::size_t parLength) : RedisError::RedisError (const char* parMessage, std::size_t parLength) :

View file

@ -53,12 +53,13 @@ namespace redis {
private: private:
struct LocalData; struct LocalData;
explicit Batch ( redisAsyncContext* parContext ); Batch ( redisAsyncContext* parContext, Command* parCommand );
void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths ); void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths );
std::vector<std::future<Reply>> m_futures; std::vector<std::future<Reply>> m_futures;
std::vector<Reply> m_replies; std::vector<Reply> m_replies;
std::unique_ptr<LocalData> m_local_data; std::unique_ptr<LocalData> m_local_data;
Command* m_command;
redisAsyncContext* m_context; redisAsyncContext* m_context;
}; };

View file

@ -16,57 +16,115 @@
*/ */
#include "command.hpp" #include "command.hpp"
#include "helpers/lexical_cast.hpp" #include "script_manager.hpp"
#include <hiredis/hiredis.h> #include <hiredis/hiredis.h>
#include <hiredis/async.h> #include <hiredis/async.h>
#include <hiredis/adapters/libev.h>
#include <ev.h>
#include <ciso646> #include <ciso646>
#include <cassert> #include <cassert>
#include <sstream> #include <sstream>
#include <algorithm> #include <algorithm>
#include <stdexcept> #include <stdexcept>
#if defined(WITH_CRYPTOPP) #include <signal.h>
# include <crypto++/sha.h> #include <thread>
#endif #include <condition_variable>
#include <atomic>
#include <mutex>
//See docs directory for info about hiredis/libev with multithreading
namespace redis { namespace redis {
namespace { namespace {
#if defined(WITH_CRYPTOPP) void async_callback (ev_loop* /*parLoop*/, ev_async* /*parObject*/, int /*parRevents*/) {
struct LuaScriptHash { }
union {
struct { void async_halt_loop (ev_loop* parLoop, ev_async* /*parObject*/, int /*parRevents*/) {
uint64_t part_a, part_b; ev_break(parLoop, EVBREAK_ALL);
uint32_t part_c; }
};
uint8_t raw_bytes[20]; void lock_mutex_libev (ev_loop* parLoop) {
}; std::mutex* mtx = static_cast<std::mutex*>(ev_userdata(parLoop));
}; assert(mtx);
#endif mtx->lock();
}
void unlock_mutex_libev (ev_loop* parLoop) {
std::mutex* mtx = static_cast<std::mutex*>(ev_userdata(parLoop));
assert(mtx);
mtx->unlock();
}
} //unnamed namespace } //unnamed namespace
struct Command::LocalData {
explicit LocalData (Command* parCommand) :
lua_scripts(parCommand),
redis_poll_thread(),
connect_processed(false),
disconnect_processed(true)
{
}
ScriptManager lua_scripts;
ev_async watcher_wakeup;
ev_async watcher_halt;
std::thread redis_poll_thread;
std::mutex hiredis_mutex;
std::mutex libev_mutex;
std::condition_variable condition_connected;
std::condition_variable condition_disconnected;
std::string connect_err_msg;
std::atomic_bool connect_processed;
std::atomic_bool disconnect_processed;
};
void on_connect (const redisAsyncContext* parContext, int parStatus) { void on_connect (const redisAsyncContext* parContext, int parStatus) {
assert(parContext and parContext->data); assert(parContext and parContext->data);
Command& self = *static_cast<Command*>(parContext->data); Command& self = *static_cast<Command*>(parContext->data);
assert(parContext == self.m_conn.get());
assert(not self.m_local_data->connect_processed);
self.m_connection_lost = false; self.m_connection_lost = false;
self.m_connected = (parStatus == REDIS_OK); self.m_connected = (parStatus == REDIS_OK);
self.m_local_data->connect_processed = true;
self.m_local_data->connect_err_msg = parContext->errstr;
self.m_local_data->condition_connected.notify_one();
} }
void on_disconnect (const redisAsyncContext* parContext, int parStatus) { void on_disconnect (const redisAsyncContext* parContext, int parStatus) {
assert(parContext and parContext->data); assert(parContext and parContext->data);
Command& self = *static_cast<Command*>(parContext->data); Command& self = *static_cast<Command*>(parContext->data);
assert(self.m_connected); assert(self.m_connected);
assert(not self.m_local_data->disconnect_processed);
self.m_connection_lost = (REDIS_ERR == parStatus); self.m_connection_lost = (REDIS_ERR == parStatus);
self.m_connected = false; self.m_connected = false;
self.m_local_data->disconnect_processed = true;
self.m_local_data->connect_err_msg.clear();
self.m_local_data->condition_disconnected.notify_one();
}; };
Command::Command (std::string&& parAddress, uint16_t parPort) : Command::Command (std::string&& parAddress, uint16_t parPort) :
m_conn(nullptr, &redisAsyncDisconnect), m_conn(nullptr, &redisAsyncDisconnect),
m_libev_loop_thread(ev_loop_new(EVFLAG_NOINOTIFY), &ev_loop_destroy),
m_address(std::move(parAddress)), m_address(std::move(parAddress)),
m_local_data(new LocalData(this)),
m_port(parPort), m_port(parPort),
m_connected(false), m_connected(false),
m_connection_lost(false) m_connection_lost(false)
{ {
//Init libev stuff
{
signal(SIGPIPE, SIG_IGN);
//See: http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#THREAD_LOCKING_EXAMPLE
ev_async_init(&m_local_data->watcher_wakeup, &async_callback);
ev_async_start(m_libev_loop_thread.get(), &m_local_data->watcher_wakeup);
ev_async_init(&m_local_data->watcher_halt, &async_halt_loop);
ev_async_start(m_libev_loop_thread.get(), &m_local_data->watcher_halt);
ev_set_userdata(m_libev_loop_thread.get(), &m_local_data->libev_mutex);
ev_set_loop_release_cb(m_libev_loop_thread.get(), &unlock_mutex_libev, &lock_mutex_libev);
}
} }
Command::Command (std::string&& parSocket) : Command::Command (std::string&& parSocket) :
@ -75,10 +133,13 @@ namespace redis {
} }
Command::~Command() noexcept { Command::~Command() noexcept {
this->disconnect();
this->wait_for_disconnect();
} }
void Command::connect() { void Command::connect() {
if (not m_conn) { if (not m_conn) {
m_local_data->disconnect_processed = false;
RedisConnection conn( RedisConnection conn(
(is_socket_connection() ? (is_socket_connection() ?
redisAsyncConnectUnix(m_address.c_str()) redisAsyncConnectUnix(m_address.c_str())
@ -95,23 +156,49 @@ namespace redis {
else { else {
conn->data = this; conn->data = this;
} }
if (conn->err) { if (REDIS_OK != redisLibevAttach(m_libev_loop_thread.get(), conn.get()))
std::ostringstream oss; throw std::runtime_error("Unable to set event loop");
oss << "Unable to connect to Redis server at " << m_address << ':' << m_port <<
": " << conn->errstr;
throw std::runtime_error(oss.str());
}
if (REDIS_OK != redisAsyncSetConnectCallback(conn.get(), &on_connect)) if (REDIS_OK != redisAsyncSetConnectCallback(conn.get(), &on_connect))
throw std::runtime_error("Unable to set \"on_connect()\" callback"); throw std::runtime_error("Unable to set \"on_connect()\" callback");
if (REDIS_OK != redisAsyncSetDisconnectCallback(conn.get(), &on_disconnect)) if (REDIS_OK != redisAsyncSetDisconnectCallback(conn.get(), &on_disconnect))
throw std::runtime_error("Unable to set \"on_disconnect()\" callback"); throw std::runtime_error("Unable to set \"on_disconnect()\" callback");
std::swap(conn, m_conn); std::swap(conn, m_conn);
m_local_data->redis_poll_thread = std::thread([this]() {
m_local_data->libev_mutex.lock();
ev_run(m_libev_loop_thread.get(), 0);
m_local_data->libev_mutex.unlock();
});
wakeup_thread();
}
}
void Command::wait_for_connect() {
if (not m_local_data->connect_processed) {
std::unique_lock<std::mutex> lk(m_local_data->hiredis_mutex);
m_local_data->condition_connected.wait(lk, [this]() { return m_local_data->connect_processed.load(); });
assert(true == m_local_data->connect_processed);
} }
} }
void Command::disconnect() { void Command::disconnect() {
assert(m_local_data->redis_poll_thread.joinable());
m_local_data->connect_processed = false;
{
std::lock_guard<std::mutex> lock(m_local_data->libev_mutex);
assert(not ev_async_pending(&m_local_data->watcher_halt));
ev_async_send(m_libev_loop_thread.get(), &m_local_data->watcher_halt);
m_conn.reset(); m_conn.reset();
} }
m_local_data->redis_poll_thread.join();
}
void Command::wait_for_disconnect() {
if (not m_local_data->disconnect_processed) {
std::unique_lock<std::mutex> lk(m_local_data->hiredis_mutex);
m_local_data->condition_disconnected.wait(lk, [this]() { return m_local_data->disconnect_processed.load(); });
assert(true == m_local_data->disconnect_processed);
}
}
bool Command::is_connected() const { bool Command::is_connected() const {
const bool connected = m_conn and not m_conn->err and m_connected; const bool connected = m_conn and not m_conn->err and m_connected;
@ -119,6 +206,10 @@ namespace redis {
return connected; return connected;
} }
boost::string_ref Command::connection_error() const {
return m_local_data->connect_err_msg;
}
auto Command::scan() -> scan_range { auto Command::scan() -> scan_range {
return scan_range(scan_iterator(this, false), scan_iterator(this, true)); return scan_range(scan_iterator(this, false), scan_iterator(this, true));
} }
@ -137,63 +228,28 @@ namespace redis {
Batch Command::make_batch() { Batch Command::make_batch() {
assert(is_connected()); assert(is_connected());
return Batch(m_conn.get()); return Batch(m_conn.get(), this);
} }
#if defined(WITH_CRYPTOPP)
boost::string_ref Command::add_lua_script_ifn (const std::string& parScript) {
if (parScript.empty())
return boost::string_ref();
using dinhelp::lexical_cast;
static_assert(20 == CryptoPP::SHA1::DIGESTSIZE, "Unexpected SHA1 digest size");
static_assert(sizeof(LuaScriptHash) >= CryptoPP::SHA1::DIGESTSIZE, "Wrong SHA1 struct size");
static_assert(Sha1Array().size() == CryptoPP::SHA1::DIGESTSIZE, "Wrong array size");
LuaScriptHash digest;
CryptoPP::SHA1().CalculateDigest(digest.raw_bytes, reinterpret_cast<const uint8_t*>(parScript.data()), parScript.size());
//TODO: change when lexical_cast will support arrays
const std::string sha1_str = lexical_cast<std::string>(digest.part_a) + lexical_cast<std::string>(digest.part_b) + lexical_cast<std::string>(digest.part_c);
Sha1Array sha1_array;
std::copy(sha1_str.begin(), sha1_str.end(), sha1_array.begin());
auto it_found = m_known_hashes.find(sha1_array);
const bool was_present = (m_known_hashes.end() != it_found);
if (was_present) {
return boost::string_ref(it_found->data(), it_found->size());
}
auto reply = this->run("SCRIPT", "LOAD", parScript);
assert(not was_present);
assert(get_string(reply) == sha1_str);
const auto it_inserted = m_known_hashes.insert(it_found, sha1_array);
(void)reply;
return boost::string_ref(it_inserted->data(), it_inserted->size());
}
#else
boost::string_ref Command::add_lua_script_ifn (const std::string& parScript) {
auto it_found = m_known_hashes.find(parScript);
const bool was_present = (m_known_hashes.end() != it_found);
if (was_present) {
return boost::string_ref(it_found->second.data(), it_found->second.size());
}
auto reply = this->run("SCRIPT", "LOAD", parScript);
assert(not was_present);
const auto sha1_str = get_string(reply);
Sha1Array sha1_array;
std::copy(sha1_str.begin(), sha1_str.end(), sha1_array.begin());
auto it_inserted = m_known_hashes.insert(it_found, std::make_pair(parScript, sha1_array));
return boost::string_ref(it_inserted->second.data(), it_inserted->second.size());
}
#endif
bool Command::is_socket_connection() const { bool Command::is_socket_connection() const {
return not (m_port or m_address.empty()); return not (m_port or m_address.empty());
} }
void Command::submit_lua_script (const std::string& parScript) {
m_local_data->lua_scripts.submit_lua_script(parScript);
}
void Command::wakeup_thread() {
std::lock_guard<std::mutex> lock(m_local_data->libev_mutex);
if (ev_async_pending(&m_local_data->watcher_wakeup) == false)
ev_async_send(m_libev_loop_thread.get(), &m_local_data->watcher_wakeup);
}
void Command::lock() {
m_local_data->libev_mutex.lock();
}
void Command::unlock() {
m_local_data->libev_mutex.unlock();
}
} //namespace redis } //namespace redis

View file

@ -33,14 +33,9 @@
#include <boost/range/iterator_range_core.hpp> #include <boost/range/iterator_range_core.hpp>
#include <boost/utility/string_ref.hpp> #include <boost/utility/string_ref.hpp>
#include <stdexcept> #include <stdexcept>
#if defined(WITH_CRYPTOPP)
# include <set>
#else
# include <map>
#endif
#include <boost/utility/string_ref.hpp>
struct redisAsyncContext; struct redisAsyncContext;
struct ev_loop;
namespace redis { namespace redis {
class Command { class Command {
@ -61,9 +56,12 @@ namespace redis {
~Command ( void ) noexcept; ~Command ( void ) noexcept;
void connect ( void ); void connect ( void );
void wait_for_connect ( void );
void disconnect ( void ); void disconnect ( void );
void wait_for_disconnect ( void );
bool is_connected ( void ) const; bool is_connected ( void ) const;
boost::string_ref connection_error ( void ) const;
Batch make_batch ( void ); Batch make_batch ( void );
@ -77,24 +75,26 @@ namespace redis {
zscan_range zscan ( boost::string_ref parKey ); zscan_range zscan ( boost::string_ref parKey );
void submit_lua_script ( const std::string& parScript ); void submit_lua_script ( const std::string& parScript );
void wakeup_thread();
void lock();
void unlock();
private: private:
using RedisConnection = std::unique_ptr<redisAsyncContext, void(*)(redisAsyncContext*)>; using RedisConnection = std::unique_ptr<redisAsyncContext, void(*)(redisAsyncContext*)>;
using Sha1Array = std::array<char, 20>; using LibevLoop = std::unique_ptr<ev_loop, void(*)(ev_loop*)>;
boost::string_ref add_lua_script_ifn ( const std::string& parScript );
bool is_socket_connection ( void ) const; bool is_socket_connection ( void ) const;
void on_connect_successful ( void );
struct LocalData;
RedisConnection m_conn; RedisConnection m_conn;
#if defined(WITH_CRYPTOPP) LibevLoop m_libev_loop_thread;
std::set<Sha1Array> m_known_hashes;
#else
std::map<std::string, Sha1Array> m_known_scripts;
#endif
std::string m_address; std::string m_address;
std::unique_ptr<LocalData> m_local_data;
uint16_t m_port; uint16_t m_port;
bool m_connected; volatile bool m_connected;
bool m_connection_lost; volatile bool m_connection_lost;
}; };
template <typename... Args> template <typename... Args>

View file

@ -0,0 +1,21 @@
/* Copyright 2015, 2016, Michele Santullo
* This file is part of "dindexer".
*
* "dindexer" is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* "dindexer" is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with "dindexer". If not, see <http://www.gnu.org/licenses/>.
*/
#include "script.hpp"
namespace redis {
} //namespace redis

View file

@ -0,0 +1,28 @@
/* Copyright 2015, 2016, Michele Santullo
* This file is part of "dindexer".
*
* "dindexer" is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* "dindexer" is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with "dindexer". If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef id5B30CDA57F894CD6888093B64F9433DA
#define id5B30CDA57F894CD6888093B64F9433DA
#include "script_manager.hpp"
namespace redis {
class Script {
};
} //namespace redis
#endif

View file

@ -0,0 +1,115 @@
/* Copyright 2015, 2016, Michele Santullo
* This file is part of "dindexer".
*
* "dindexer" is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* "dindexer" is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with "dindexer". If not, see <http://www.gnu.org/licenses/>.
*/
#include "script_manager.hpp"
#include "helpers/lexical_cast.hpp"
#include "command.hpp"
#include <cassert>
#if defined(WITH_CRYPTOPP)
# include <crypto++/sha.h>
#endif
namespace redis {
namespace {
#if defined(WITH_CRYPTOPP)
struct LuaScriptHash {
union {
struct {
uint64_t part_a, part_b;
uint32_t part_c;
};
uint8_t raw_bytes[20];
};
};
#endif
} //unnamed namespace
ScriptManager::ScriptManager (Command* parCommand) :
m_command(parCommand)
{
assert(m_command);
}
#if defined(WITH_CRYPTOPP)
boost::string_ref ScriptManager::add_lua_script_ifn (const std::string& parScript) {
assert(m_command->is_connected());
if (parScript.empty())
return boost::string_ref();
using dinhelp::lexical_cast;
static_assert(20 == CryptoPP::SHA1::DIGESTSIZE, "Unexpected SHA1 digest size");
static_assert(sizeof(LuaScriptHash) >= CryptoPP::SHA1::DIGESTSIZE, "Wrong SHA1 struct size");
static_assert(Sha1Array().size() == CryptoPP::SHA1::DIGESTSIZE * 2, "Wrong array size");
LuaScriptHash digest;
CryptoPP::SHA1().CalculateDigest(digest.raw_bytes, reinterpret_cast<const uint8_t*>(parScript.data()), parScript.size());
//TODO: change when lexical_cast will support arrays
auto sha1_str_parta = lexical_cast<std::string, dinhelp::tags::hexl>(__builtin_bswap64(digest.part_a));
auto sha1_str_partb = lexical_cast<std::string, dinhelp::tags::hexl>(__builtin_bswap64(digest.part_b));
auto sha1_str_partc = lexical_cast<std::string, dinhelp::tags::hexl>(__builtin_bswap32(digest.part_c));
const std::string sha1_str =
std::string(sizeof(digest.part_a) * 2 - sha1_str_parta.size(), '0') + sha1_str_parta +
std::string(sizeof(digest.part_b) * 2 - sha1_str_partb.size(), '0') + sha1_str_partb +
std::string(sizeof(digest.part_c) * 2 - sha1_str_partc.size(), '0') + sha1_str_partc
;
Sha1Array sha1_array;
assert(sha1_str.size() == sha1_array.size());
std::copy(sha1_str.begin(), sha1_str.end(), sha1_array.begin());
auto it_found = m_known_hashes.find(sha1_array);
const bool was_present = (m_known_hashes.end() != it_found);
if (was_present) {
return boost::string_ref(it_found->data(), it_found->size());
}
auto reply = m_command->run("SCRIPT", "LOAD", parScript);
assert(not was_present);
assert(get_string(reply) == sha1_str);
const auto it_inserted = m_known_hashes.insert(it_found, sha1_array);
(void)reply;
return boost::string_ref(it_inserted->data(), it_inserted->size());
}
#else
boost::string_ref ScriptManager::add_lua_script_ifn (const std::string& parScript) {
assert(m_command->is_connected());
auto it_found = m_known_hashes.find(parScript);
const bool was_present = (m_known_hashes.end() != it_found);
if (was_present) {
return boost::string_ref(it_found->second.data(), it_found->second.size());
}
auto reply = m_command->run("SCRIPT", "LOAD", parScript);
assert(not was_present);
const auto sha1_str = get_string(reply);
Sha1Array sha1_array;
std::copy(sha1_str.begin(), sha1_str.end(), sha1_array.begin());
auto it_inserted = m_known_hashes.insert(it_found, std::make_pair(parScript, sha1_array));
return boost::string_ref(it_inserted->second.data(), it_inserted->second.size());
}
#endif
void ScriptManager::submit_lua_script (const std::string& parScript) {
add_lua_script_ifn(parScript);
}
} //namespace redis

View file

@ -0,0 +1,54 @@
/* Copyright 2015, 2016, Michele Santullo
* This file is part of "dindexer".
*
* "dindexer" is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* "dindexer" is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with "dindexer". If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef id8E124FF76DF449CDB8FBA806F8EF4E78
#define id8E124FF76DF449CDB8FBA806F8EF4E78
#include "redisConfig.h"
#include <boost/utility/string_ref.hpp>
#if defined(WITH_CRYPTOPP)
# include <set>
#else
# include <map>
#endif
#include <string>
#include <array>
namespace redis {
class Command;
class ScriptManager {
public:
explicit ScriptManager ( Command* parCommand );
void submit_lua_script ( const std::string& parScript );
private:
using Sha1Array = std::array<char, 40>;
boost::string_ref add_lua_script_ifn ( const std::string& parScript );
Command* m_command;
#if defined(WITH_CRYPTOPP)
std::set<Sha1Array> m_known_hashes;
#else
std::map<std::string, Sha1Array> m_known_scripts;
#endif
};
} //namespace redis
#endif