mirror of
https://github.com/WinampDesktop/winamp.git
synced 2024-09-24 15:54:12 +00:00
1135 lines
35 KiB
C++
1135 lines
35 KiB
C++
#ifdef _WIN32
|
|
#include <winsock2.h>
|
|
#endif
|
|
#include <algorithm>
|
|
#include "threadedRunner.h"
|
|
#include <openssl/ssl.h>
|
|
#include "global.h"
|
|
#include "stl/stringUtils.h"
|
|
#include "services/stdServiceImpl.h"
|
|
#include "protocol_HTTPStyle.h"
|
|
#include "protocol_HTTPSource.h"
|
|
#include "protocol_shoutcastSource.h"
|
|
#include "protocol_FlashPolicyServer.h"
|
|
#include "protocol_uvox2Source.h"
|
|
#include "uvox2Common.h"
|
|
#include "banList.h"
|
|
#include "ripList.h"
|
|
#ifdef _MSC_VER
|
|
#define MSG_DONTWAIT 0
|
|
#else
|
|
#include <netinet/tcp.h>
|
|
#include <sys/resource.h>
|
|
#ifndef EPOLLRDHUP
|
|
#define EPOLLRDHUP 0x2000
|
|
#endif
|
|
#endif
|
|
|
|
using namespace std;
|
|
using namespace stringUtil;
|
|
using namespace uniString;
|
|
|
|
#define LOGNAME "[THREADRUNNER] "
|
|
#define DEBUG_LOG(...) do { if (gOptions.threadRunnerDebug()) DLOG(__VA_ARGS__); } while (0)
|
|
|
|
#ifndef SSL_OP_NO_COMPRESSION
|
|
#define SSL_OP_NO_COMPRESSION 0
|
|
#endif
|
|
|
|
// make standard string for logging address
|
|
inline utf8 addrLogString(const utf8 &addr, const u_short port, const utf8 &xff) throw()
|
|
{
|
|
const bool use_xff = (gOptions.useXFF() && !xff.empty());
|
|
return (use_xff ? xff : addr) + ":" + tos(port) + (use_xff ? " (xff)" : "");
|
|
}
|
|
|
|
// make standard string for logging src address
|
|
utf8 srcAddrLogString(const utf8 &addr, const u_short port, const size_t sid) throw()
|
|
{
|
|
return "[SRC " + addrLogString(addr, port) + (sid > 0 ? " sid=" + tos(sid) : "") + "] ";
|
|
}
|
|
// make standard string for loggin dst address
|
|
utf8 dstAddrLogString(const utf8 &addr, const u_short port, const utf8 &xff, const size_t sid) throw()
|
|
{
|
|
return "[DST " + addrLogString(addr, port, xff) + (sid > 0 ? " sid=" + tos(sid) : "") + "] ";
|
|
}
|
|
// make standard string for logging unknown address
|
|
utf8 recvAddrLogString(const utf8 &addr, const u_short port) throw()
|
|
{
|
|
return "[RECV " + addrLogString(addr, port) + "] ";
|
|
}
|
|
// make standard string for socket error
|
|
string socketErrString(int err) throw() { return "err=" + socketOps::errMsg(err) + "(" + tos(err) + ")"; }
|
|
|
|
static AOL_namespace::mutex sm_globalRunnerLock;
|
|
static set<threadedRunner*> sm_runners;
|
|
|
|
SSL_CTX *threadedRunner::m_sslCtx = NULL;
|
|
AOL_namespace::mutex *threadedRunner::m_sslMutexes = NULL;
|
|
|
|
static bool cmp(threadedRunner* a, threadedRunner* b) throw()
|
|
{
|
|
return (a->sizeOfRunList() < b->sizeOfRunList());
|
|
}
|
|
|
|
bool threadedRunner::scheduleRunnable(runnable *r) throw()
|
|
{
|
|
if (r)
|
|
{
|
|
stackLock sml(sm_globalRunnerLock);
|
|
|
|
// diagnostics. Print load
|
|
if (gOptions.threadRunnerDebug())
|
|
{
|
|
utf8 msg = LOGNAME;
|
|
for (set<threadedRunner*>::const_iterator i = sm_runners.begin(); i != sm_runners.end(); ++i)
|
|
{
|
|
msg += ((i == sm_runners.begin() ? "Thread " : ", thread ") + (*i)->threadNumber() + " amt=" + tos((*i)->sizeOfRunList()));
|
|
}
|
|
DEBUG_LOG(msg);
|
|
}
|
|
|
|
// find least busy
|
|
set<threadedRunner*>::const_iterator which = min_element(sm_runners.begin(), sm_runners.end(), cmp);
|
|
if ((which != sm_runners.end()) && (*which)->addRunnable(r))
|
|
{
|
|
return true;
|
|
}
|
|
|
|
// didn't work... schedule anywhere
|
|
DEBUG_LOG(LOGNAME "Schedule failure, trying any thread");
|
|
for (set<threadedRunner*>::const_iterator i = sm_runners.begin(); i != sm_runners.end(); ++i)
|
|
{
|
|
if ((*i)->addRunnable(r))
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void threadedRunner::wakeup() throw()
|
|
{
|
|
stackLock sml(sm_globalRunnerLock);
|
|
|
|
for (set<threadedRunner*>::const_iterator i = sm_runners.begin(); i != sm_runners.end(); ++i)
|
|
{
|
|
(*i)->wakeupRunnable();
|
|
}
|
|
}
|
|
|
|
uniString::utf8 threadedRunner::getRunnabledetails() throw()
|
|
{
|
|
stackLock sml(sm_globalRunnerLock);
|
|
|
|
utf8 details;
|
|
for (set<threadedRunner*>::const_iterator i = sm_runners.begin(); i != sm_runners.end(); ++i)
|
|
{
|
|
details += (i != sm_runners.begin() ? "<br>" : (utf8)"") + "Thread #" +
|
|
(*i)->threadNumber() + ": <b>" + tos((*i)->sizeOfRunList()) +
|
|
"</b><div style=\"padding-left:1em;\">";
|
|
|
|
map<utf8, size_t> runners;
|
|
(*i)->enumRunnables(runners);
|
|
for (map<utf8, size_t>::const_iterator r = runners.begin(); r != runners.end(); ++r)
|
|
{
|
|
details += (r != runners.begin() ? "<br>" : "") + (*r).first + " - " + tos((*r).second);
|
|
}
|
|
details += "</div>";
|
|
}
|
|
return details;
|
|
}
|
|
|
|
threadedRunner::threadedRunner() throw() : m_stop(false), m_threadNumber((const short)(sm_runners.size() + 1))
|
|
{
|
|
stackLock sml(sm_globalRunnerLock);
|
|
sm_runners.insert(this);
|
|
}
|
|
|
|
threadedRunner::~threadedRunner() throw()
|
|
{
|
|
stackLock sml(sm_globalRunnerLock);
|
|
sm_runners.erase(this);
|
|
}
|
|
|
|
// main loop of thread
|
|
const unsigned threadedRunner::operator()() throw()
|
|
{
|
|
unsigned result = 1;
|
|
|
|
try
|
|
{
|
|
m_lock.lock();
|
|
while (!m_stop)
|
|
{
|
|
m_lock.unlock();
|
|
|
|
std::set<size_t> readSet, writeSet;
|
|
int timeout = -1;
|
|
|
|
// run everyone (as long as not scheduled) and get their status information, etc
|
|
set<runnable*>::const_iterator i = m_runList.begin();
|
|
while (i != m_runList.end())
|
|
{
|
|
runnable::timeSliceResult &tsr = (*i)->m_result;
|
|
__uint64 now = time_now_ms();
|
|
|
|
// if we're indicated as being scheduled then we
|
|
// need to skip doing anything else and look at
|
|
// checking if it's ok to process or not, etc
|
|
if (tsr.m_scheduleTime)
|
|
{
|
|
if (now < tsr.m_scheduleTime)
|
|
{
|
|
int time_diff = (int)(tsr.m_scheduleTime - now);
|
|
if (timeout == -1)
|
|
{
|
|
timeout = time_diff;
|
|
}
|
|
else
|
|
{
|
|
timeout = min(timeout, time_diff);
|
|
}
|
|
++i;
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
// we clear this as we're reached time
|
|
// but not the rest as we want what was
|
|
// set to now be used in the processing
|
|
tsr.m_scheduleTime = 0;
|
|
}
|
|
}
|
|
|
|
// make sure to reset otherwise it gets weird but
|
|
// we're only going to do this if we are able to
|
|
// run the runnable now (i.e. it's not scheduled)
|
|
tsr.reset(now);
|
|
|
|
bool exception_occured = false;
|
|
try
|
|
{
|
|
(*i)->timeSlice();
|
|
}
|
|
catch (const exception &ex)
|
|
{
|
|
exception_occured = true;
|
|
utf8 what = ex.what();
|
|
if (!what.empty())
|
|
{
|
|
ELOG(ex.what());
|
|
}
|
|
}
|
|
|
|
if (tsr.m_done || exception_occured)
|
|
{
|
|
set<runnable*>::const_iterator to_go = i;
|
|
DEBUG_LOG(LOGNAME "Removing " + (*i)->name() + " [done: " + tos(tsr.m_done) + ", exception: " + tos(exception_occured) + "]");
|
|
removeRunnable(*to_go);
|
|
++i;
|
|
m_runList.erase (to_go);
|
|
continue;
|
|
}
|
|
if (!tsr.m_runImmediately)
|
|
{
|
|
if (!tsr.m_scheduleTime)
|
|
{
|
|
update_sets:
|
|
if (tsr.m_readSet)
|
|
{
|
|
// filter out anything with an invalid socket
|
|
if ((*i)->m_socket != socketOps::cINVALID_SOCKET)
|
|
{
|
|
readSet.insert(readSet.end(), (*i)->m_socket);
|
|
}
|
|
if (tsr.m_customSocket != socketOps::cINVALID_SOCKET)
|
|
{
|
|
readSet.insert(readSet.end(), tsr.m_customSocket);
|
|
}
|
|
}
|
|
|
|
if (tsr.m_writeSet)
|
|
{
|
|
// filter out anything with an invalid socket
|
|
if ((*i)->m_socket != socketOps::cINVALID_SOCKET)
|
|
{
|
|
writeSet.insert(writeSet.end(), (*i)->m_socket);
|
|
}
|
|
if (tsr.m_customSocket != socketOps::cINVALID_SOCKET)
|
|
{
|
|
writeSet.insert(writeSet.end(), tsr.m_customSocket);
|
|
}
|
|
}
|
|
|
|
if (tsr.m_timeout != -1)
|
|
{
|
|
if (timeout == -1)
|
|
{
|
|
timeout = tsr.m_timeout;
|
|
}
|
|
else
|
|
{
|
|
timeout = min(timeout, tsr.m_timeout);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// if this is to be scheduled then we'll do a
|
|
// quick check to see if we're already after
|
|
// that time and if it isn't (which is how it
|
|
// should be) then we'll abort, else allow it
|
|
// and we get the time again to account for
|
|
// the time it's taken to process the runnable
|
|
now = time_now_ms();
|
|
if (now < tsr.m_scheduleTime)
|
|
{
|
|
int time_diff = (int)(tsr.m_scheduleTime - now);
|
|
if (timeout == -1)
|
|
{
|
|
timeout = time_diff;
|
|
}
|
|
else
|
|
{
|
|
timeout = min(timeout, time_diff);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// we clear this as we're reached time
|
|
// but not the rest as we want what was
|
|
// set to now be used in the processing
|
|
tsr.m_scheduleTime = 0;
|
|
timeout = 50;
|
|
goto update_sets;
|
|
}
|
|
}
|
|
++i;
|
|
}
|
|
} // for
|
|
|
|
// delete the old guys, no lock required here, only we add to this set
|
|
int released = 0;
|
|
while (true)
|
|
{
|
|
set<runnable*>::const_iterator it = m_runnablesToRemove.begin();
|
|
if (it == m_runnablesToRemove.end())
|
|
break;
|
|
if (++released > 300)
|
|
{
|
|
timeout &= 15; // prevent a large stall but force a quick retry
|
|
break;
|
|
}
|
|
stlx::delete_fntr<runnable> (*it);
|
|
m_runnablesToRemove.erase (it);
|
|
}
|
|
|
|
readSet.insert (m_signal.test());
|
|
if (timeout < 0)
|
|
timeout = 60000;
|
|
int n = socketOps::socketSelect(readSet, writeSet, timeout);
|
|
|
|
m_lock.lock();
|
|
|
|
// add the new guys, requires lock as set can be added from elsewhere
|
|
m_runList.insert(m_runnablesToAdd.begin(), m_runnablesToAdd.end());
|
|
m_runnablesToAdd.clear();
|
|
|
|
if (n > 0)
|
|
m_signal.clear();
|
|
}
|
|
m_lock.unlock();
|
|
result = 0;
|
|
}
|
|
catch (const exception &ex)
|
|
{
|
|
ELOG(LOGNAME + string(ex.what()));
|
|
}
|
|
catch (...)
|
|
{
|
|
ELOG(LOGNAME "Unknown exception");
|
|
}
|
|
|
|
// delete runnables in run list, and those that are queued to be added
|
|
m_lock.lock();
|
|
for_each(m_runnablesToAdd.begin(), m_runnablesToAdd.end(), stlx::delete_fntr<runnable>);
|
|
m_lock.unlock();
|
|
|
|
for_each(m_runList.begin(), m_runList.end(), stlx::delete_fntr<runnable>);
|
|
|
|
return result;
|
|
}
|
|
|
|
const size_t threadedRunner::sizeOfRunList() throw()
|
|
{
|
|
stackLock sml(m_lock);
|
|
const size_t result = (m_runList.size() + m_runnablesToAdd.size());
|
|
const size_t subtr = m_runnablesToRemove.size();
|
|
return (subtr > result ? 0 : result - subtr);
|
|
}
|
|
|
|
const bool threadedRunner::addRunnable(runnable* r) throw()
|
|
{
|
|
if (!r)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
stackLock sml(m_lock);
|
|
if (m_stop)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
m_runnablesToAdd.insert(r);
|
|
m_signal.set();
|
|
DEBUG_LOG(LOGNAME "Adding " + r->name() + " to thread " + tos(m_threadNumber));
|
|
|
|
return true;
|
|
}
|
|
|
|
const bool threadedRunner::removeRunnable(runnable *r) throw()
|
|
{
|
|
if (!r)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
m_runnablesToRemove.insert(r);
|
|
m_signal.set();
|
|
DEBUG_LOG(LOGNAME "Removing " + r->name() + " from thread " + tos(m_threadNumber));
|
|
return true;
|
|
}
|
|
|
|
void threadedRunner::enumRunnables(map<utf8, size_t>& runners) throw()
|
|
{
|
|
stackLock sml(m_lock);
|
|
|
|
for (set<runnable*>::const_iterator i = m_runList.begin(); i != m_runList.end(); ++i)
|
|
{
|
|
const utf8::size_type pos = (*i)->name().find((utf8)"protocol_");
|
|
if (pos != utf8::npos)
|
|
{
|
|
++runners[(*i)->name().substr(pos + 9, (*i)->name().length())];
|
|
}
|
|
else
|
|
{
|
|
++runners[(*i)->name()];
|
|
}
|
|
}
|
|
}
|
|
|
|
void threadedRunner::wakeupRunnable() throw()
|
|
{
|
|
if (m_lock.timedLock(1000))
|
|
{
|
|
m_signal.set();
|
|
m_lock.unlock();
|
|
}
|
|
}
|
|
|
|
void threadedRunner::stop() throw()
|
|
{
|
|
stackLock sml(m_lock);
|
|
m_stop = true;
|
|
m_signal.set();
|
|
}
|
|
|
|
///////////////////////////
|
|
#ifdef LOGNAME
|
|
#undef LOGNAME
|
|
#endif
|
|
#define LOGNAME "[MICROSERVER] "
|
|
|
|
#ifdef DEBUG_LOG
|
|
#undef DEBUG_LOG
|
|
#endif
|
|
#define DEBUG_LOG(x) { if (gOptions.microServerDebug()) DLOG((x)); }
|
|
|
|
microServer::microServer(const string &listenAddr, const u_short listenPort,
|
|
const AllowableProtocols_t protocols,
|
|
const ListenTypes_t types) throw(exception)
|
|
: m_protocols(protocols)
|
|
{
|
|
try
|
|
{
|
|
m_socket = socketOps::createTCPSocketTHROW();
|
|
#ifndef _WIN32
|
|
{
|
|
int bflag = 1;
|
|
setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &bflag, sizeof(bflag));
|
|
#if (defined PLATFORM_LINUX || defined PLATFORM_ARMv6 || defined PLATFORM_ARMv7)
|
|
int wait = 1;
|
|
setsockopt(m_socket, IPPROTO_TCP, TCP_DEFER_ACCEPT, &wait, sizeof(wait));
|
|
#endif
|
|
#ifdef PLATFORM_BSD
|
|
struct accept_filter_arg af = {"dataready", ""};
|
|
setsockopt(m_socket, SOL_SOCKET, SO_ACCEPTFILTER, &af, sizeof(af));
|
|
#endif
|
|
}
|
|
#endif
|
|
socketOps::bindTHROW(m_socket, listenPort, listenAddr);
|
|
socketOps::listenTHROW(m_socket);
|
|
socketOps::setNonblockTHROW(m_socket, true);
|
|
|
|
bindMessage(types, listenPort);
|
|
}
|
|
catch (const exception &ex)
|
|
{
|
|
socketOps::forgetTCPSocket(m_socket);
|
|
string error = ex.what();
|
|
throw runtime_error(LOGNAME "Error opening port " + tos(listenPort) + " because " + toLower(error));
|
|
}
|
|
}
|
|
|
|
void microServer::bindMessage(const ListenTypes_t types, const u_short listenPort) throw()
|
|
{
|
|
string message = "Listening for connections on port ";
|
|
|
|
if ((types & microServer::L_SOURCE) && (types & microServer::L_CLIENT))
|
|
{
|
|
message = "Listening for source and client connections on port ";
|
|
}
|
|
else if ((types & microServer::L_FLASH))
|
|
{
|
|
message = "Listening for flash policy server connection on port ";
|
|
}
|
|
else if ((types & microServer::L_SOURCE))
|
|
{
|
|
message = "Listening for legacy source connections on port ";
|
|
}
|
|
else if ((types & microServer::L_SOURCE2))
|
|
{
|
|
message = "Listening for source connections on port ";
|
|
}
|
|
else if ((types & microServer::L_CLIENT_ALT))
|
|
{
|
|
message = "Listening for client connections on alternate port ";
|
|
}
|
|
else if ((types & microServer::L_CLIENT))
|
|
{
|
|
message = "Listening for client connections on port ";
|
|
}
|
|
ILOG(LOGNAME + message + tos(listenPort));
|
|
}
|
|
|
|
void microServer::updateProtocols(AllowableProtocols_t protocols, ListenTypes_t types, const u_short listenPort) throw()
|
|
{
|
|
m_protocols = protocols;
|
|
bindMessage(types, listenPort);
|
|
}
|
|
|
|
microServer::~microServer() throw()
|
|
{
|
|
string addr;
|
|
u_short port = 0;
|
|
socketOps::getsockname(m_socket, addr, port);
|
|
socketOps::forgetTCPSocket(m_socket);
|
|
if (!iskilled())
|
|
{
|
|
ELOG(LOGNAME "Unexpected stop detected for listening for connections on port " + tos(port));
|
|
ELOG(LOGNAME "This should not happen and prevents the DNAS from working correctly.");
|
|
ELOG(LOGNAME "DNAS restart is required. If this keeps happening, enable all debugging options and provide the logs to Shoutcast support.");
|
|
}
|
|
else
|
|
{
|
|
DEBUG_LOG(LOGNAME "Stopped listening for connections on port " + tos(port));
|
|
}
|
|
}
|
|
|
|
void microServer::timeSlice() throw(exception)
|
|
{
|
|
static int repeated = 0;
|
|
|
|
// don't allow any new connections when the server is stopping
|
|
if (!iskilled())
|
|
{
|
|
try
|
|
{
|
|
string addr;
|
|
u_short port = 0;
|
|
|
|
socketOps::tSOCKET newSock = socketOps::acceptTHROW(m_socket, addr, port, true);
|
|
if (newSock != socketOps::cSOCKET_ERROR)
|
|
{
|
|
socketOps::getpeername(newSock, addr, port);
|
|
|
|
string hostName = addr;
|
|
if (gOptions.nameLookups())
|
|
{
|
|
if (socketOps::addressToHostName(addr, port, hostName))
|
|
{
|
|
hostName = addr;
|
|
}
|
|
}
|
|
|
|
socketOps::setNonblockTHROW(newSock, true);
|
|
DEBUG_LOG(LOGNAME "Connection received from " + addr + ":" + tos(port));
|
|
threadedRunner::scheduleRunnable(new microConnection(newSock, hostName, addr, port, m_protocols));
|
|
repeated = 0;
|
|
}
|
|
}
|
|
catch (const tagged_error &ex)
|
|
{
|
|
ELOG(ex.what());
|
|
}
|
|
catch (const exception &ex)
|
|
{
|
|
string msg = ex.what();
|
|
if (!msg.empty())
|
|
{
|
|
if (msg.find("Could not call") == 0)
|
|
{
|
|
// serious error, log unless repeated and delay a retry
|
|
if ((repeated & 255) == 0)
|
|
ELOG(LOGNAME + msg);
|
|
++repeated;
|
|
m_result.schedule (1000);
|
|
return;
|
|
}
|
|
ELOG(LOGNAME + msg);
|
|
}
|
|
}
|
|
|
|
m_result.read();
|
|
return;
|
|
}
|
|
|
|
m_result.done();
|
|
}
|
|
|
|
///////////////////////////////////////////////////
|
|
microConnection::microConnection(const socketOps::tSOCKET s, const string &hostName, const string &addr,
|
|
const u_short port, const microServer::AllowableProtocols_t protocols) throw()
|
|
: runnable(s), m_srcHostName(hostName),
|
|
m_srcAddress(addr), m_srcPort(port),
|
|
m_protocols(protocols)
|
|
{
|
|
}
|
|
|
|
microConnection::~microConnection() throw()
|
|
{
|
|
socketOps::forgetTCPSocket(m_socket);
|
|
}
|
|
|
|
void microConnection::timeSlice() throw(exception)
|
|
{
|
|
time_t cur_time;
|
|
const int autoDumpTime = ::detectAutoDumpTimeout(cur_time, m_lastActivityTime, (recvAddrLogString(m_srcAddress, m_srcPort) +
|
|
"Got timeout waiting for data"), gOptions.microServerDebug());
|
|
|
|
const int maxHeaderLineSize = gOptions.maxHeaderLineSize();
|
|
const bool flash_policy = !!(m_protocols & P_FLASHPOLICYFILE);
|
|
bool uvox_checked = false;
|
|
runnable *runnable = NULL;
|
|
|
|
char buf[MAX_MESSAGE_SIZE] = {0};
|
|
// if we've got a 1.x source connection then only handle
|
|
// on a per-byte basis, for everything else, try getting
|
|
// a few bytes so we can use that as a guide on how then
|
|
// to try to process things a bit quicker than per-byte.
|
|
// int amt = (!(m_protocols & P_SHOUTCAST1SOURCE) ? UV2X_HDR_SIZE : 1);
|
|
int amt = (m_ssl) ? 1 : 4096;
|
|
ssize_t rval = 0;
|
|
|
|
while (true)
|
|
{
|
|
if (iskilled() || (size_t)amt > sizeof(buf))
|
|
{
|
|
m_result.done();
|
|
return;
|
|
}
|
|
|
|
int flags = (m_lineBuffer.size() || m_ssl) ? 0 : MSG_PEEK; // use PEEK initially as SSL requires bytes in the socket
|
|
if ((rval = recv (buf, amt, flags|MSG_DONTWAIT)) < 1)
|
|
{
|
|
if (rval == 0)
|
|
{
|
|
throwEx<runtime_error>((gOptions.microServerDebug() ? (recvAddrLogString(m_srcAddress, m_srcPort) +
|
|
"Remote socket closed while waiting for data.") : (utf8)""));
|
|
}
|
|
else if (rval < 0)
|
|
{
|
|
rval = socketOps::errCode();
|
|
if (rval != SOCKETOPS_WOULDBLOCK)
|
|
{
|
|
throwEx<runtime_error>((gOptions.microServerDebug() ? (recvAddrLogString(m_srcAddress, m_srcPort) +
|
|
"Socket error while waiting for data. " + socketErrString(rval)) : (utf8)""));
|
|
}
|
|
|
|
m_result.schedule();
|
|
m_result.read();
|
|
m_result.timeout((autoDumpTime - (int)(cur_time - m_lastActivityTime)));
|
|
return;
|
|
}
|
|
}
|
|
|
|
m_lineBuffer.insert (m_lineBuffer.end(), buf, buf + rval);
|
|
int lineSize = (int)m_lineBuffer.size();
|
|
|
|
if (lineSize > maxHeaderLineSize)
|
|
{
|
|
throwEx<runtime_error>((gOptions.microServerDebug() ? (recvAddrLogString(m_srcAddress, m_srcPort) +
|
|
"Protocol header line is too large - exceeds " + tos(maxHeaderLineSize) +
|
|
" bytes") : (utf8)""));
|
|
}
|
|
if (m_ssl == NULL && flags && (m_lineBuffer [0] == 0x16)) // SSLv3 / TLSv1.x ?
|
|
{
|
|
if (threadedRunner::m_sslCtx == NULL)
|
|
{
|
|
throwEx<runtime_error>((gOptions.microServerDebug() ? (recvAddrLogString(m_srcAddress, m_srcPort) +
|
|
"Remote socket closed, no SSL configured.") : (utf8)""));
|
|
}
|
|
if (lineSize < 6)
|
|
{
|
|
m_result.schedule();
|
|
m_result.read();
|
|
m_result.timeout((autoDumpTime - (int)(cur_time - m_lastActivityTime)));
|
|
return;
|
|
}
|
|
if ((m_lineBuffer [1] == 0x3) && (m_lineBuffer [5] == 0x1))
|
|
{
|
|
DLOG ("detected ssl request, checking further");
|
|
m_ssl = SSL_new (threadedRunner::m_sslCtx);
|
|
SSL_set_accept_state (m_ssl);
|
|
SSL_set_fd (m_ssl, (int)m_socket);
|
|
SSL_set_mode (m_ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER|SSL_MODE_ENABLE_PARTIAL_WRITE);
|
|
m_lineBuffer.clear();
|
|
continue;
|
|
}
|
|
}
|
|
utf8::size_type nl = m_lineBuffer.find ((unsigned char)'\n');
|
|
if (nl != utf8::npos)
|
|
{
|
|
rval = lineSize = (int)nl+1; // 0 offset
|
|
if (flags) m_lineBuffer.erase (lineSize); // truncate line to maintain parsing
|
|
}
|
|
|
|
if (flags)
|
|
::recv (m_socket, buf, rval, MSG_DONTWAIT); // pull bytes from input, passed any PEEK requirement
|
|
|
|
if ((lineSize > 0) && (lineSize < UV2X_HDR_SIZE) && (m_lineBuffer [lineSize - 1] != '\n'))
|
|
{
|
|
if (m_lineBuffer[0] == UVOX2_SYNC_BYTE)
|
|
{
|
|
// if it looks like it might be a uvox
|
|
// frame then grab more on the next go
|
|
amt = 3;
|
|
}
|
|
else
|
|
{
|
|
// no point in doing any of the checks
|
|
// if there is not enough data to view
|
|
// e.g. we need to see a valid newline
|
|
amt = 1;
|
|
}
|
|
|
|
m_lastActivityTime = ::time(NULL);
|
|
continue;
|
|
}
|
|
if ((lineSize >= UV2X_HDR_SIZE) && (uvox_checked == false))
|
|
{
|
|
// look at first uvox packet to see if we're running uvox 2 or uvox 2.1
|
|
// NOTE: This is a protocol change. We need to add a new packet to 2.1 so request cipher key
|
|
const uv2xHdr *voxhdr = reinterpret_cast<const uv2xHdr*>(m_lineBuffer.c_str());
|
|
if ((voxhdr->sync == UVOX2_SYNC_BYTE) &&
|
|
(ntohs(voxhdr->msgType) == (u_short)MSG_CIPHER))
|
|
{
|
|
const int wanted = (ntohs(voxhdr->msgLen) + UV2X_OVERHEAD);
|
|
if (wanted == lineSize)
|
|
{
|
|
// we have uvox 2.1
|
|
if (m_protocols & P_SHOUTCAST2SOURCE) // only if allowed
|
|
{
|
|
runnable = new protocol_uvox2Source (*this, (const __uint8 *)m_lineBuffer.c_str(), lineSize);
|
|
}
|
|
break;
|
|
}
|
|
amt = min(MAX_MESSAGE_SIZE, (wanted - lineSize));
|
|
m_lastActivityTime = ::time(NULL);
|
|
continue;
|
|
}
|
|
// if we've got enough and there's no sync
|
|
// byte then there's not point to re-check.
|
|
uvox_checked = true;
|
|
}
|
|
|
|
if ((lineSize > 0) && (m_lineBuffer[lineSize - 1] == '\n'))
|
|
{
|
|
// look at start of line, if it's a GET or POST or some standard HTTP thing, then we
|
|
// have either a web request or a client connection request. If that is missing, then
|
|
// we have to assume it's a shoutcast source, and we have just received the password.
|
|
//
|
|
// this should be enough to detect absolute and relative requests made to the server
|
|
// if there's no / for absolute paths then we'll reject the request as a bad access.
|
|
if ((m_lineBuffer.find((utf8)"GET /") == 0) ||
|
|
(m_lineBuffer.find((utf8)"GET h") == 0) ||
|
|
(m_lineBuffer.find((utf8)"POST /") == 0) ||
|
|
(m_lineBuffer.find((utf8)"POST h") == 0) ||
|
|
(m_lineBuffer.find((utf8)"HEAD /") == 0) ||
|
|
(m_lineBuffer.find((utf8)"HEAD h") == 0))
|
|
{
|
|
if (m_protocols & (P_SHOUTCAST1CLIENT |
|
|
P_SHOUTCAST2CLIENT |
|
|
P_WEB | P_WEB_SETUP))
|
|
{
|
|
runnable = new protocol_HTTPStyle (*this, stripWhitespace(m_lineBuffer).hideAsString());
|
|
}
|
|
break;
|
|
}
|
|
else // assume shoutcast source, and this is the password (though do some checks to sanitise)
|
|
{
|
|
// and now look for invalid HTTP requests and
|
|
// reject them as the earlier handling should
|
|
// allow valid relative and absolute requests
|
|
if (lineSize > 5)
|
|
{
|
|
if ((m_lineBuffer.find((utf8)"GET ") == 0) ||
|
|
(m_lineBuffer.find((utf8)"POST ") == 0) ||
|
|
(m_lineBuffer.find((utf8)"SOURCE ") == 0) ||
|
|
(m_lineBuffer.find((utf8)"PUT ") == 0) ||
|
|
(m_lineBuffer.find((utf8)"HEAD ") == 0))
|
|
{
|
|
throwEx<runtime_error>((gOptions.microServerDebug() ? (recvAddrLogString(m_srcAddress, m_srcPort) +
|
|
"Invalid HTTP request detected - only valid relative and absolute paths are allowed.") : (utf8)""));
|
|
}
|
|
}
|
|
|
|
// if we appear to have a 'PUT' or 'SOURCE' request then we'll need to
|
|
// do some different handling in-order to get the correct details before
|
|
// we can then actually process the stream as a valid (icecast?) source
|
|
if (((m_lineBuffer.find((utf8)"SOURCE ") == 0) ||
|
|
(m_lineBuffer.find((utf8)"PUT ") == 0)) &&
|
|
((m_lineBuffer.find((utf8)"HTTP/1.") != utf8::npos) ||
|
|
(m_lineBuffer.find((utf8)"ICE/1.") != utf8::npos)))
|
|
{
|
|
runnable = new protocol_HTTPSource (*this, stripWhitespace(m_lineBuffer).hideAsString());
|
|
}
|
|
else
|
|
{
|
|
runnable = new protocol_shoutcastSource (*this, stripWhitespace(m_lineBuffer));
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
if (flash_policy && (m_lineBuffer.find((utf8)"<policy-file-request/>") == 0))
|
|
{
|
|
runnable = new protocol_FlashPolicyServer (m_socket, dstAddrLogString(m_srcHostName, m_srcPort));
|
|
break;
|
|
}
|
|
amt = 1;
|
|
m_lastActivityTime = ::time(NULL);
|
|
} // while
|
|
|
|
if (runnable)
|
|
{
|
|
threadedRunner::scheduleRunnable (runnable);
|
|
}
|
|
m_result.done();
|
|
return;
|
|
}
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// return 0 if line is ready, or a timeout in seconds for next select call if we are still waiting
|
|
// lineBuffer and lastActivityTime are updated by this call
|
|
const bool runnable::getHTTPStyleHeaderLine(const size_t sid, utf8 &lineBuffer, const utf8 &logMsgPrefix, int maxLineLength) throw(exception)
|
|
{
|
|
time_t cur_time;
|
|
const int autoDumpTime = ::detectAutoDumpTimeout (cur_time, m_lastActivityTime,
|
|
(logMsgPrefix + "Timeout waiting for data"), gOptions.microServerDebug(), sid);
|
|
|
|
const int maxHeaderLineSize = maxLineLength > 0 ? maxLineLength : gOptions.maxHeaderLineSize();
|
|
int count = 0;
|
|
bool ret = true;
|
|
|
|
while (true)
|
|
{
|
|
int rval = 0;
|
|
char buf[2] = {0};
|
|
if ((rval = recv(buf, 1, 0x0)) < 1)
|
|
{
|
|
if (rval == 0)
|
|
{
|
|
if (gOptions.microServerDebug())
|
|
ELOG (logMsgPrefix + "Remote socket closed while waiting for data.", LOGNAME, sid);
|
|
throwEx<runtime_error>((utf8)"");
|
|
}
|
|
rval = socketOps::errCode();
|
|
if (rval != SOCKETOPS_WOULDBLOCK)
|
|
{
|
|
if (gOptions.microServerDebug())
|
|
ELOG (logMsgPrefix + "Socket error while waiting for data. " + socketErrString(rval), LOGNAME, sid);
|
|
throwEx<runtime_error>((utf8)"");
|
|
}
|
|
|
|
// if we've read something then it's likely to be from a POST response
|
|
if (lineBuffer.empty() == false)
|
|
{
|
|
ret = false;
|
|
if (count) break;
|
|
}
|
|
|
|
// try again but wait a bit
|
|
// so we don't overload it.
|
|
m_result.schedule(30);
|
|
m_result.timeout((autoDumpTime - (int)(cur_time - m_lastActivityTime)));
|
|
return false;
|
|
}
|
|
++count;
|
|
|
|
lineBuffer.insert (lineBuffer.end(), buf, buf + rval);
|
|
|
|
const int lineSize = (int)lineBuffer.size();
|
|
if (lineSize == maxLineLength) break;
|
|
if (lineSize > maxHeaderLineSize)
|
|
{
|
|
ELOG (logMsgPrefix + "Protocol header line is too large - exceeds "
|
|
+ tos(maxHeaderLineSize) + " bytes", LOGNAME, sid);
|
|
throwEx<runtime_error> ((utf8)"");
|
|
}
|
|
if ((lineSize > 0) && lineBuffer [lineSize - 1] == '\n')
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
m_result.run();
|
|
m_lastActivityTime = ::time(NULL);
|
|
return ret;
|
|
}
|
|
|
|
// send a hunk of data out a socket - returns true if send is complete,
|
|
// outBuffer and outBufferSize should be initially set to point to the
|
|
// data and the size of the data - these values are moved and updated.
|
|
const bool runnable::sendDataBuffer(const size_t sid, const uniString::utf8::value_type *&outBuffer,
|
|
int &outBufferSize, const uniString::utf8 &logMsgPrefix) throw(std::exception)
|
|
{
|
|
#if defined(_DEBUG) || defined(DEBUG)
|
|
DEBUG_LOG(logMsgPrefix + __FUNCTION__ + " " + tos(outBufferSize));
|
|
#endif
|
|
|
|
if (outBufferSize > 0) // done
|
|
{
|
|
time_t cur_time;
|
|
const int autoDumpTime = ::detectAutoDumpTimeout(cur_time, m_lastActivityTime,
|
|
(logMsgPrefix + "Timeout waiting to send data"),
|
|
gOptions.microServerDebug(), sid);
|
|
|
|
int rval = send ((const char *)outBuffer, outBufferSize, 0);
|
|
if (rval == 0)
|
|
{
|
|
throwEx<std::runtime_error>((gOptions.microServerDebug() ? (logMsgPrefix +
|
|
"Remote socket closed while sending data") :
|
|
(uniString::utf8)""));
|
|
}
|
|
else if (rval < 0)
|
|
{
|
|
rval = socketOps::errCode();
|
|
if (rval != SOCKETOPS_WOULDBLOCK)
|
|
{
|
|
throwEx<std::runtime_error>((gOptions.microServerDebug() ? (((
|
|
#ifdef _WIN32
|
|
rval == WSAECONNABORTED || rval == WSAECONNRESET
|
|
#else
|
|
rval == ECONNABORTED || rval == ECONNRESET || rval == EPIPE
|
|
#endif
|
|
) ? (uniString::utf8)"" : logMsgPrefix +
|
|
"Socket error while waiting to send data. " +
|
|
socketErrString(rval))) : (uniString::utf8)""));
|
|
}
|
|
|
|
// try again but wait a bit
|
|
// so we don't overload it.
|
|
m_result.schedule();
|
|
m_result.timeout((autoDumpTime - (int)(cur_time - m_lastActivityTime)));
|
|
return false;
|
|
}
|
|
|
|
// move pointers
|
|
outBufferSize -= rval;
|
|
outBuffer += rval;
|
|
|
|
// update time
|
|
m_lastActivityTime = ::time(NULL);
|
|
m_result.timeout((autoDumpTime - (int)(cur_time - m_lastActivityTime)));
|
|
|
|
if (outBufferSize == 0) // done
|
|
{
|
|
m_result.schedule();
|
|
return true;
|
|
}
|
|
m_result.schedule (160);
|
|
return false;
|
|
}
|
|
|
|
m_result.write();
|
|
m_result.schedule();
|
|
m_result.timeoutSID(sid);
|
|
return true;
|
|
}
|
|
|
|
|
|
runnable::runnable (runnable &r) throw()
|
|
{
|
|
m_socket = r.m_socket;
|
|
m_ssl = r.m_ssl;
|
|
m_lastActivityTime = ::time (NULL);
|
|
// the following are handed off to this sub-protocol, so make sure they cannot affect them
|
|
r.m_socket = socketOps::cINVALID_SOCKET;
|
|
r.m_ssl = NULL;
|
|
}
|
|
|
|
|
|
ssize_t runnable::recv (void *buf, size_t len, int flags)
|
|
{
|
|
if (m_ssl)
|
|
{
|
|
ssize_t bytes = SSL_read (m_ssl, buf, len);
|
|
int code = SSL_get_error (m_ssl, bytes);
|
|
// char err[128];
|
|
|
|
switch (code)
|
|
{
|
|
case SSL_ERROR_NONE:
|
|
case SSL_ERROR_ZERO_RETURN:
|
|
break;
|
|
case SSL_ERROR_WANT_READ:
|
|
case SSL_ERROR_WANT_WRITE:
|
|
return -1;
|
|
default:
|
|
bytes = 0;
|
|
}
|
|
return bytes;
|
|
}
|
|
return (ssize_t)::recv (m_socket, (char*)buf, len, flags);
|
|
}
|
|
|
|
|
|
ssize_t runnable::send(const void *buf, size_t len, int flags)
|
|
{
|
|
if (m_ssl)
|
|
{
|
|
ssize_t bytes = SSL_write (m_ssl, buf, len);
|
|
int code = SSL_get_error (m_ssl, bytes);
|
|
// char err[128];
|
|
|
|
switch (code)
|
|
{
|
|
case SSL_ERROR_NONE:
|
|
case SSL_ERROR_ZERO_RETURN:
|
|
break;
|
|
case SSL_ERROR_WANT_READ:
|
|
case SSL_ERROR_WANT_WRITE:
|
|
return -1;
|
|
default:
|
|
return -1;
|
|
}
|
|
return bytes;
|
|
}
|
|
return (ssize_t)::send(m_socket, (char*)buf, len, flags);
|
|
}
|
|
|
|
// This pick the dump time for sources, as there is no general class for that yet, unlike listeners
|
|
int runnable::detectAutoDumpTimeout (time_t &cur_time, const size_t streamID, const utf8 &msg) throw(runtime_error)
|
|
{
|
|
const int autoDumpTime = gOptions.stream_autoDumpSourceTime(streamID);
|
|
|
|
cur_time = ::time(NULL);
|
|
if ((autoDumpTime > 0) && ((cur_time - m_lastActivityTime) >= autoDumpTime))
|
|
{
|
|
WLOG (msg, LOGNAME, streamID);
|
|
throwEx<runtime_error>("");
|
|
}
|
|
return autoDumpTime;
|
|
}
|
|
|
|
|
|
unsigned long threadedRunner::SSL_idFunction (void)
|
|
{
|
|
return threadedRunner::getCurrentThreadID();
|
|
}
|
|
|
|
void threadedRunner::SSL_lockingFunction (int mode, int n, const char * /*file*/, int /*line*/)
|
|
{
|
|
if (mode & CRYPTO_LOCK)
|
|
m_sslMutexes[n].lock();
|
|
else
|
|
m_sslMutexes[n].unlock();
|
|
}
|
|
|
|
|
|
void threadedRunner::SSL_shutdown ()
|
|
{
|
|
#if !defined(WIN32) && OPENSSL_VERSION_NUMBER < 0x10000000
|
|
CRYPTO_set_id_callback (NULL);
|
|
#endif
|
|
CRYPTO_set_locking_callback (NULL);
|
|
if (m_sslCtx)
|
|
{
|
|
::SSL_CTX_free (m_sslCtx);
|
|
m_sslCtx = NULL;
|
|
}
|
|
if (m_sslMutexes)
|
|
delete [] m_sslMutexes;
|
|
m_sslMutexes = NULL;
|
|
}
|
|
|
|
void threadedRunner::SSL_init ()
|
|
{
|
|
SSL_load_error_strings();
|
|
SSL_library_init ();
|
|
utf8 cert_file = gOptions.sslCertificateFile();
|
|
utf8 key_file = gOptions.sslCertificateKeyFile();
|
|
|
|
do {
|
|
if (cert_file == "") break;
|
|
|
|
CRYPTO_set_id_callback (&threadedRunner::SSL_idFunction);
|
|
#if !defined(WIN32) && OPENSSL_VERSION_NUMBER < 0x10000000
|
|
CRYPTO_set_locking_callback (&threadedRunner::SSL_lockingFunction);
|
|
#endif
|
|
|
|
m_sslMutexes = new AOL_namespace::mutex [CRYPTO_num_locks()];
|
|
if (m_sslMutexes == NULL)
|
|
break;
|
|
|
|
m_sslCtx = ::SSL_CTX_new (::SSLv23_server_method());
|
|
|
|
long ssl_opts = ::SSL_CTX_get_options (m_sslCtx);
|
|
::SSL_CTX_set_options (m_sslCtx, ssl_opts|SSL_OP_NO_SSLv2|SSL_OP_NO_SSLv3|SSL_OP_NO_COMPRESSION);
|
|
|
|
if (::SSL_CTX_use_certificate_chain_file (m_sslCtx, (char*)cert_file.c_str()) <= 0)
|
|
{
|
|
WLOG ("[MAIN] Invalid certificate file " + cert_file);
|
|
break;
|
|
}
|
|
|
|
utf8 &pkfile = key_file.empty() ? cert_file : key_file;
|
|
if (::SSL_CTX_use_PrivateKey_file (m_sslCtx, (char*)pkfile.c_str(), SSL_FILETYPE_PEM) <= 0)
|
|
{
|
|
WLOG ("[MAIN] Invalid private key file " + pkfile);
|
|
break;
|
|
}
|
|
if (! SSL_CTX_check_private_key (m_sslCtx))
|
|
{
|
|
WLOG ("[MAIN] Invalid, private key does not match public key, " + pkfile);
|
|
break;
|
|
}
|
|
ILOG ("[MAIN] SSL keys installed");
|
|
return;
|
|
|
|
} while (0);
|
|
if (m_sslCtx)
|
|
{
|
|
WLOG ("[MAIN] failed to set up SSL, " + utf8(::ERR_reason_error_string (::ERR_peek_last_error())));
|
|
::SSL_CTX_free (m_sslCtx);
|
|
m_sslCtx = NULL;
|
|
}
|
|
CRYPTO_set_id_callback (NULL);
|
|
CRYPTO_set_locking_callback (NULL);
|
|
if (m_sslMutexes)
|
|
delete [] m_sslMutexes;
|
|
m_sslMutexes = NULL;
|
|
}
|