mirror of
https://github.com/KingDuckZ/incredis
synced 2024-11-27 00:43:50 +00:00
Pass down a ThreadContext instead of a bare std::atomic.
This commit is contained in:
parent
4d8adc19f2
commit
930fde41c8
4 changed files with 50 additions and 17 deletions
|
@ -25,12 +25,12 @@
|
||||||
|
|
||||||
namespace std {
|
namespace std {
|
||||||
template <class R> class future;
|
template <class R> class future;
|
||||||
template <class T> struct atomic;
|
|
||||||
} //namespace std
|
} //namespace std
|
||||||
|
|
||||||
namespace redis {
|
namespace redis {
|
||||||
class Command;
|
class Command;
|
||||||
class AsyncConnection;
|
class AsyncConnection;
|
||||||
|
class ThreadContext;
|
||||||
|
|
||||||
class Batch {
|
class Batch {
|
||||||
friend class Command;
|
friend class Command;
|
||||||
|
@ -54,7 +54,7 @@ namespace redis {
|
||||||
private:
|
private:
|
||||||
struct LocalData;
|
struct LocalData;
|
||||||
|
|
||||||
explicit Batch ( AsyncConnection* parConn, std::atomic<std::size_t>& parPendingFutures );
|
explicit Batch ( AsyncConnection* parConn, ThreadContext& parThreadContext );
|
||||||
void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths );
|
void run_pvt ( int parArgc, const char** parArgv, std::size_t* parLengths );
|
||||||
|
|
||||||
std::vector<std::future<Reply>> m_futures;
|
std::vector<std::future<Reply>> m_futures;
|
||||||
|
|
|
@ -17,13 +17,13 @@
|
||||||
|
|
||||||
#include "batch.hpp"
|
#include "batch.hpp"
|
||||||
#include "async_connection.hpp"
|
#include "async_connection.hpp"
|
||||||
|
#include "thread_context.hpp"
|
||||||
#include <hiredis/hiredis.h>
|
#include <hiredis/hiredis.h>
|
||||||
#include <hiredis/async.h>
|
#include <hiredis/async.h>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <future>
|
#include <future>
|
||||||
#include <ciso646>
|
#include <ciso646>
|
||||||
#include <boost/iterator/transform_iterator.hpp>
|
#include <boost/iterator/transform_iterator.hpp>
|
||||||
#include <atomic>
|
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
@ -113,24 +113,24 @@ namespace redis {
|
||||||
} //unnamed namespace
|
} //unnamed namespace
|
||||||
|
|
||||||
struct Batch::LocalData {
|
struct Batch::LocalData {
|
||||||
explicit LocalData ( std::atomic_size_t& parPendingFutures ) :
|
explicit LocalData ( ThreadContext& parThreadContext ) :
|
||||||
free_cmd_slot(),
|
free_cmd_slot(),
|
||||||
futures_mutex(),
|
futures_mutex(),
|
||||||
pending_futures(parPendingFutures)
|
thread_context(parThreadContext)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
std::condition_variable free_cmd_slot;
|
std::condition_variable free_cmd_slot;
|
||||||
std::mutex futures_mutex;
|
std::mutex futures_mutex;
|
||||||
std::atomic_size_t& pending_futures;
|
ThreadContext& thread_context;
|
||||||
};
|
};
|
||||||
|
|
||||||
Batch::Batch (Batch&&) = default;
|
Batch::Batch (Batch&&) = default;
|
||||||
|
|
||||||
Batch::Batch (AsyncConnection* parConn, std::atomic_size_t& parPendingFutures) :
|
Batch::Batch (AsyncConnection* parConn, ThreadContext& parThreadContext) :
|
||||||
m_futures(),
|
m_futures(),
|
||||||
m_replies(),
|
m_replies(),
|
||||||
m_local_data(new LocalData(parPendingFutures)),
|
m_local_data(new LocalData(parThreadContext)),
|
||||||
m_async_conn(parConn)
|
m_async_conn(parConn)
|
||||||
{
|
{
|
||||||
assert(m_async_conn);
|
assert(m_async_conn);
|
||||||
|
@ -149,8 +149,8 @@ namespace redis {
|
||||||
assert(parLengths); //This /could/ be null, but I don't see why it should
|
assert(parLengths); //This /could/ be null, but I don't see why it should
|
||||||
assert(m_local_data);
|
assert(m_local_data);
|
||||||
|
|
||||||
const auto pending_futures = m_local_data->pending_futures.fetch_add(1);
|
const auto pending_futures = m_local_data->thread_context.pending_futures.fetch_add(1);
|
||||||
auto* data = new HiredisCallbackData(m_local_data->pending_futures, m_local_data->free_cmd_slot);
|
auto* data = new HiredisCallbackData(m_local_data->thread_context.pending_futures, m_local_data->free_cmd_slot);
|
||||||
|
|
||||||
#if defined(VERBOSE_HIREDIS_COMM)
|
#if defined(VERBOSE_HIREDIS_COMM)
|
||||||
std::cout << "run_pvt(), " << pending_futures << " items pending... ";
|
std::cout << "run_pvt(), " << pending_futures << " items pending... ";
|
||||||
|
@ -160,7 +160,7 @@ namespace redis {
|
||||||
std::cout << " waiting... ";
|
std::cout << " waiting... ";
|
||||||
#endif
|
#endif
|
||||||
std::unique_lock<std::mutex> u_lock(m_local_data->futures_mutex);
|
std::unique_lock<std::mutex> u_lock(m_local_data->futures_mutex);
|
||||||
m_local_data->free_cmd_slot.wait(u_lock, [this]() { return m_local_data->pending_futures < g_max_redis_unanswered_commands; });
|
m_local_data->free_cmd_slot.wait(u_lock, [this]() { return m_local_data->thread_context.pending_futures < g_max_redis_unanswered_commands; });
|
||||||
}
|
}
|
||||||
#if defined(VERBOSE_HIREDIS_COMM)
|
#if defined(VERBOSE_HIREDIS_COMM)
|
||||||
std::cout << " emplace_back(future)... ";
|
std::cout << " emplace_back(future)... ";
|
||||||
|
@ -217,7 +217,7 @@ namespace redis {
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(m_local_data);
|
assert(m_local_data);
|
||||||
assert(0 == m_local_data->pending_futures);
|
assert(0 == m_local_data->thread_context.pending_futures);
|
||||||
m_futures.clear();
|
m_futures.clear();
|
||||||
m_replies.clear();
|
m_replies.clear();
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,12 @@
|
||||||
#include "command.hpp"
|
#include "command.hpp"
|
||||||
#include "script_manager.hpp"
|
#include "script_manager.hpp"
|
||||||
#include "async_connection.hpp"
|
#include "async_connection.hpp"
|
||||||
|
#include "thread_context.hpp"
|
||||||
#include <hiredis/hiredis.h>
|
#include <hiredis/hiredis.h>
|
||||||
#include <ciso646>
|
#include <ciso646>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <atomic>
|
|
||||||
|
|
||||||
//See docs directory for info about hiredis/libev with multithreading
|
//See docs directory for info about hiredis/libev with multithreading
|
||||||
|
|
||||||
|
@ -34,14 +34,13 @@ namespace redis {
|
||||||
struct Command::LocalData {
|
struct Command::LocalData {
|
||||||
explicit LocalData (Command* parCommand, std::string&& parAddress, uint16_t parPort) :
|
explicit LocalData (Command* parCommand, std::string&& parAddress, uint16_t parPort) :
|
||||||
async_connection(std::move(parAddress), parPort),
|
async_connection(std::move(parAddress), parPort),
|
||||||
lua_scripts(parCommand),
|
lua_scripts(parCommand)
|
||||||
pending_futures(0)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncConnection async_connection;
|
AsyncConnection async_connection;
|
||||||
ScriptManager lua_scripts;
|
ScriptManager lua_scripts;
|
||||||
std::atomic_size_t pending_futures;
|
ThreadContext thread_context;
|
||||||
};
|
};
|
||||||
|
|
||||||
Command::Command (std::string&& parAddress, uint16_t parPort) :
|
Command::Command (std::string&& parAddress, uint16_t parPort) :
|
||||||
|
@ -82,7 +81,7 @@ namespace redis {
|
||||||
|
|
||||||
Batch Command::make_batch() {
|
Batch Command::make_batch() {
|
||||||
assert(is_connected());
|
assert(is_connected());
|
||||||
return Batch(&m_local_data->async_connection, m_local_data->pending_futures);
|
return Batch(&m_local_data->async_connection, m_local_data->thread_context);
|
||||||
}
|
}
|
||||||
|
|
||||||
Script Command::make_script (const std::string &parScript) {
|
Script Command::make_script (const std::string &parScript) {
|
||||||
|
|
34
src/thread_context.hpp
Normal file
34
src/thread_context.hpp
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
/* Copyright 2016, Michele Santullo
|
||||||
|
* This file is part of "incredis".
|
||||||
|
*
|
||||||
|
* "incredis" is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* "incredis" is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with "incredis". If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef idCF662C64AAB440879A3BA23C74AFF9BF
|
||||||
|
#define idCF662C64AAB440879A3BA23C74AFF9BF
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
|
namespace redis {
|
||||||
|
struct ThreadContext {
|
||||||
|
ThreadContext() :
|
||||||
|
pending_futures(0)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
std::atomic_size_t pending_futures;
|
||||||
|
};
|
||||||
|
} //namespace redis
|
||||||
|
|
||||||
|
#endif
|
Loading…
Reference in a new issue