winamp/Src/Plugins/DSP/sc_serv3/protocol_RTMPClient.cpp

1682 lines
50 KiB
C++
Raw Normal View History

2024-09-24 12:54:57 +00:00
#if 0
#ifdef _WIN32
#include <winsock2.h>
#endif
#include <stdio.h>
#include "protocol_RTMPClient.h"
#include "ripList.h"
#include "stats.h"
#include "streamData.h"
#include "amf.h"
#include "w3cLog.h"
#include "log.h"
#include "global.h"
#include <iomanip>
using namespace std;
using namespace uniString;
using namespace stringUtil;
#define C1_S1_C2_S2_PACKET_SIZE 1536
#define DEFAULT_CHUNK_SIZE 128
#define RTMP_MSG__SET_CHUNK_SIZE 1
#define RTMP_MSG__ABORT 2
#define RTMP_MSG__ACKNOWLEDGEMENT 3
#define RTMP_MSG__USER_CONTROL_MESSAGE 4
#define RTMP_MSG__WINDOW_ACKNOWLEDGEMENT_SIZE 5
#define RTMP_MSG__SET_PEER_BANDWIDTH 6
#define RTMP_MSG__AUDIO_DATA 8
#define RTMP_MSG__VIDEO_DATA 9
#define RTMP_MSG__DATA_AMF3 15
#define RTMP_MSG__COMMAND_AMF3 17
#define RTMP_MSG__DATA_AMF0 18
#define RTMP_MSG__COMMAND_AMF0 20
// user control messages
#define RTMP_UCM_STREAM_BEGIN 0
#define RTMP_UCM_STREAM_EOF 1
#define RTMP_UCM_STREAM_DRY 2
#define RTMP_UCM_SET_BUFFER 3
#define RTMP_UCM_STREAM_IS_RECORDED 4
#define RTMP_UCM_PING_REQUEST 6
#define RTMP_UCM_PING_RESPONSE 7
/////////////////////////
#define RTMP_PEER_BANDWIDTH_HARD 0
#define RTMP_PEER_BANDWIDTH_SOFT 1
#define RTMP_PEER_BANDWIDTH_DYNAMIC 2
#define DEFAULT_SERVER_WINDOW 0x2625a0 //(16 * 1024)
#define DEBUG_LOG(x) { if (gOptions.RTMPClientDebug()) DLOG((x)); }
#ifdef _WIN32
#define TIMEFUNC ::timeGetTime
#else
#include <sys/time.h>
static unsigned long TIMEFUNC() throw()
{
struct timeval tp;
::gettimeofday(&tp,NULL);
return (tp.tv_sec * 1000) + (tp.tv_usec / 1000);
}
#endif
static void createRTMPMsg(__uint8 msgType,long timestamp,int outboundChunkSize,const __uint8 *payload,size_t payloadSize,vector<__uint8> &result,
int chunkStreamID = 2,int msgStreamID = 0);
static void createRTMPMsg(const protocol_RTMPClient::message &msg,long timestamp,int outboundChunkSize,vector<__uint8> &result);
//void getRTMPMsgInfo(const vector<__uint8> &msg,__uint8 &msgType,int &payloadLength,long &timestamp,int &streamID,int &payloadOffset);
static size_t decode4ByteValue(const __uint8 *data,size_t offset = 0) throw();
static utf8 prettyPrintMessage(const protocol_RTMPClient::message &msg) throw()
{
utf8 EOL(eol());
utf8 result(EOL);
result += "msg type=" + tos((int)msg.m_messageType) + EOL;
try
{
switch(msg.m_messageType)
{
case RTMP_MSG__SET_CHUNK_SIZE:
result += "chunk size=" + tos(decode4ByteValue(&(msg.m_messageData[0]))) + EOL;
break;
case RTMP_MSG__ABORT:
result += "chunk stream id=" + tos(decode4ByteValue(&(msg.m_messageData[0]))) + EOL;
break;
case RTMP_MSG__ACKNOWLEDGEMENT:
result += "sequence number=" + tos(decode4ByteValue(&(msg.m_messageData[0]))) + EOL;
break;
case RTMP_MSG__USER_CONTROL_MESSAGE:
break;
case RTMP_MSG__WINDOW_ACKNOWLEDGEMENT_SIZE:
result += "acknowledgement window size=" + tos(decode4ByteValue(&(msg.m_messageData[0]))) + EOL;
break;
case RTMP_MSG__SET_PEER_BANDWIDTH:
result += "acknowledgement window size=" + tos(decode4ByteValue(&(msg.m_messageData[0]))) + EOL;
result += "limit type=" + tos(msg.m_messageData[4]) + EOL;
break;
case RTMP_MSG__DATA_AMF0:
case RTMP_MSG__COMMAND_AMF0:
{
AMFEncoding amf0;
amf0.loadFromBitstream((const char *)&(msg.m_messageData[0]),(int)msg.m_messageData.size(),"");
result += amf0.prettyPrint();
}
break;
case RTMP_MSG__DATA_AMF3:
case RTMP_MSG__COMMAND_AMF3:
{
AMFEncoding amf3(3);
amf3.loadFromBitstream((const char *)&(msg.m_messageData[0]),(int)msg.m_messageData.size(),"");
result += amf3.prettyPrint();
}
break;
}
}
catch(const exception &ex)
{
result += string("Exception: ") + ex.what() + EOL;
DEBUG_LOG(msg.packetDump());
}
catch(...)
{
result += "Exception: <unknown>" + EOL;
DEBUG_LOG(msg.packetDump());
}
return result;
}
#define TEST_FILE "C:\\Documents and Settings\\nradisch\\My Documents\\programming\\shoutcast\\current\\sc_serv2\\test.aac"
FILE *fff = 0;
utf8 protocol_RTMPClient::message::packetDump() const throw()
{
ostringstream o;
int x = 0;
for(std::vector<__uint8>::const_iterator i = m_messageData.begin(); i != m_messageData.end(); ++i)
{
if ((x++) % 16 == 0) o << stringUtil::eol();
o << setw(2) << hex << (int)(*i) << " ";
}
return o.str();
}
protocol_RTMPClient::protocol_RTMPClient(socketOps::tSOCKET s,const utf8 &hostName,const utf8 &addr,int port,__uint8 C0)throw(exception)
:m_socket(s),m_clientHostName(hostName),m_clientAddr(addr),m_clientPort(port),m_clientLogString(dstAddrLogString(hostName,port)),
m_C0(C0),
m_S0(3),
m_inDataBuffer(0),
m_lastActivityTime(::time(0)),m_startTime(::time(0)),
m_lastInboundMessageStreamID(-1),
m_lastInboundMessageLength(-1),
m_lastInboundMessageTypeID(-1),
m_lastInboundTimestamp(-1),
m_windowSizeFromClient(-1),
m_bufferSizeFromClient(-1),
m_lastTitleTime(::time(0)),
m_bytesSentForCurrentTitle(0),
m_totalBytesSent(0),
m_objectEncodingMode(0),
m_removeClientFromStats(false),
//m_state(&protocol_RTMPClient::state_AttachToStream),
m_state(&protocol_RTMPClient::state_SendS0),
m_streamData(0)
{
#ifdef TEST_FILE
if (fff) ::fclose(fff);
fff = 0;
fff = ::fopen(TEST_FILE,"rb");
#endif
DEBUG_LOG(__FUNCTION__);
m_inDataBufferMax = 16 * 1024;
m_inDataBuffer = new __uint8[m_inDataBufferMax];
m_inDataBufferAmt = 0;
// intialize s1 to all zeros
m_S1orS2.resize(C1_S1_C2_S2_PACKET_SIZE,0);
memset(&(m_S1orS2[0]),0,m_S1orS2.size());
// set base time
m_serverBaseTime = TIMEFUNC();
__uint32 sbt = htonl(m_serverBaseTime);
memcpy(&(m_S1orS2[0]),&sbt,4);
// initialize from 10 bytes of random section to random stuff. Faster than
// doing entire 1528 byte block and still valid
for(int x = 0; x < 10; ++x)
{
m_S1orS2[8+x] = rand();
}
m_outDataPtr = &m_S0;
m_outDataSize = 1;
m_inboundChunkSize = DEFAULT_CHUNK_SIZE;
m_outboundChunkSize = DEFAULT_CHUNK_SIZE;
}
protocol_RTMPClient::~protocol_RTMPClient() throw()
{
#ifdef TEST_FILE
if (fff) ::fclose(fff);
fff = 0;
#endif
DEBUG_LOG(__FUNCTION__);
try
{
/*ILOG(m_clientLogString + " SHOUTcast 1 client connection closed (" +
tos(::time(0) - m_startTime).c_str() + " seconds). " +
mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")) +
" [Bytes: " + tos(m_totalBytesSent).c_str() + "]");*/
if (m_removeClientFromStats)
stats::removeClient(m_streamID,this);
if (m_streamData)
{
m_streamData->abandonRTMPLimitTrigger(&m_limitTrigger);
streamData::streamClientLost(m_streamData);
m_streamData = 0;
logW3C();
}
}
catch(const exception &ex)
{
ELOG(ex.what());
}
delete [] m_inDataBuffer;
m_streamData = 0;
socketOps::forgetTCPSocket(m_socket);
}
runnable::timeSliceResult protocol_RTMPClient::timeSlice() throw(exception)
{
size_t listenerTime = gOptions.stream_listenerTime(DEFAULT_SHOUTCAST_SOURCE_STREAM);
if(!gOptions.read_stream_listenerTime(DEFAULT_SHOUTCAST_SOURCE_STREAM))
{
listenerTime = gOptions.listenerTime();
}
listenerTime *= 60; // convert to seconds
bool timesUp = (listenerTime && ((::time(0) - m_startTime) > (int)listenerTime));
if (m_kickNextRound || timesUp || (m_streamData && m_streamData->isDead()))
{
if (timesUp)
{ ILOG(m_clientLogString + " listener time exceeded.");}
else if (m_kickNextRound)
{ ILOG(m_clientLogString + " kicked");}
timeSliceResult result;
result.m_done = true;
return result;
}
return (this->*m_state)();
}
//////////////////////////////////////////////////////////////////////////////////
///////////////////// Initial Handshake States //////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////
runnable::timeSliceResult protocol_RTMPClient::state_Send() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
timeSliceResult result;
long to = sendDataBuffer(m_socket,m_outDataPtr,m_outDataSize,m_lastActivityTime,m_clientLogString);
if (to == 0)
{ // done
m_state = m_nextState;
result.m_runImmediately = true;
}
else
{ // some more
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = to;
}
return result;
}
runnable::timeSliceResult protocol_RTMPClient::state_RecvFixedAmt() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
timeSliceResult result;
assert(m_inDataRequested <= m_inDataBufferMax);
size_t amt_left = m_inDataRequested - m_inDataBufferAmt;
size_t amt_left2 = amt_left;
long to = getSocketData(m_socket,m_inDataBuffer,m_inDataBufferAmt,amt_left,m_lastActivityTime,m_clientLogString);
m_inDataBufferAmt += (amt_left2 - amt_left);
if (to == 0)
{ // got data
m_state = m_nextState;
result.m_runImmediately = true;
}
else
{ // wait some more
result.m_readSet.insert(m_socket);
result.m_timeout.tv_sec = to;
}
return result;
}
runnable::timeSliceResult protocol_RTMPClient::state_RecvMsg() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
timeSliceResult result;
m_inMsg.clear();
long to = getMsg(m_inMsg);
if (to == 0)
{
m_state = m_nextState;
result.m_runImmediately = true;
DEBUG_LOG(string(__FUNCTION__) + " received msg" + prettyPrintMessage(m_inMsg));
}
else
{
result.m_readSet.insert(m_socket);
result.m_timeout.tv_sec = to;
}
return result;
}
#define NEXT_STATE timeSliceResult result; result.m_runImmediately = true; return result;
runnable::timeSliceResult protocol_RTMPClient::state_SendS0() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
m_outDataPtr = &m_S0;
m_outDataSize = 1;
m_state = &protocol_RTMPClient::state_Send;
m_nextState = &protocol_RTMPClient::state_SendS1;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::state_SendS1() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
m_outDataPtr = &m_S1orS2[0];
m_outDataSize = m_S1orS2.size();
m_state = &protocol_RTMPClient::state_Send;
m_nextState = &protocol_RTMPClient::state_WaitForC1;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::state_WaitForC1() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
m_inDataBufferAmt = 0;
m_inDataRequested = C1_S1_C2_S2_PACKET_SIZE;
m_state = &protocol_RTMPClient::state_RecvFixedAmt;
m_nextState = &protocol_RTMPClient::state_GotC1;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::state_GotC1() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
m_C1ReadTime = TIMEFUNC() - m_serverBaseTime;
m_C1.clear();
m_C1.insert(m_C1.end(),m_inDataBuffer,m_inDataBuffer + C1_S1_C2_S2_PACKET_SIZE);
m_state = &protocol_RTMPClient::state_SendS2;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::state_SendS2() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
m_S1orS2 = m_C1;
__uint32 t = htonl(m_C1ReadTime);
memcpy(&(m_S1orS2[4]),&t,4);
m_outDataPtr = &m_S1orS2[0];
m_outDataSize = m_S1orS2.size();
m_state = &protocol_RTMPClient::state_Send;
m_nextState = &protocol_RTMPClient::state_WaitForC2;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::state_WaitForC2() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
m_inDataBufferAmt = 0;
m_inDataRequested = C1_S1_C2_S2_PACKET_SIZE;
m_state = &protocol_RTMPClient::state_RecvFixedAmt;
m_nextState = &protocol_RTMPClient::state_GotC2;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::state_GotC2() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
m_C2.clear();
m_C2.insert(m_C2.end(),m_inDataBuffer,m_inDataBuffer + C1_S1_C2_S2_PACKET_SIZE);
m_state = &protocol_RTMPClient::state_WaitForMessage;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::state_WaitForMessage() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
m_inDataBufferAmt = 0;
m_state = &protocol_RTMPClient::state_RecvMsg;
m_nextState = &protocol_RTMPClient::state_GotMessage;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::state_GotMessage() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
switch(m_inMsg.m_messageType)
{
case RTMP_MSG__USER_CONTROL_MESSAGE:
return handle_USER_CONTROL_message();
case RTMP_MSG__WINDOW_ACKNOWLEDGEMENT_SIZE:
{
if (m_inMsg.m_messageData.size() != 4)
throwEx<runtime_error>(m_clientLogString + " Bad payload size for Window Acknowledgement message, got " + tos(m_inMsg.m_messageData.size()) + " expected 4.");
m_windowSizeFromClient = decode4ByteValue(&(m_inMsg.m_messageData[0]),0);
DEBUG_LOG(m_clientLogString + " WAS from client is " + tos(m_windowSizeFromClient));
m_state = &protocol_RTMPClient::state_WaitForMessage;
NEXT_STATE
}
break;
case RTMP_MSG__COMMAND_AMF0:
return handle_AMF0_message();
default:
throwEx<runtime_error>(m_clientLogString + " " + __FUNCTION__ + " cannot dispatch message type " + tos((int)m_inMsg.m_messageType));
}
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::handle_USER_CONTROL_message() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
if (m_inMsg.m_messageData.size() < 2)
throwEx<runtime_error>(m_clientLogString + " User control message has insufficient data.");
__uint16 t = ntohs(*(const __uint16 *)(&m_inMsg.m_messageData[0]));
switch(t)
{
case RTMP_UCM_SET_BUFFER:
{
if (m_inMsg.m_messageData.size() < 10)
throwEx<runtime_error>(m_clientLogString + " Set Buffer user control message has insufficient data.");
m_bufferSizeFromClient = ntohl(*(const __uint32*)&(m_inMsg.m_messageData[6]));
DEBUG_LOG(m_clientLogString + " Buffer size from client is " + tos(m_bufferSizeFromClient) + " milliseconds.");
m_state = &protocol_RTMPClient::state_WaitForMessage;
NEXT_STATE
}
break;
case RTMP_UCM_STREAM_BEGIN:
//return handle_UCM_StreamBegin();
break;
}
throwEx<runtime_error>(m_clientLogString + " User control message type " + tos(t) + " is not supported.");
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::handle_UCM_StreamBegin() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::handle_AMF0_message() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
AMFEncoding amf;
amf.loadFromBitstream((const char *)&(m_inMsg.m_messageData[0]),(int)m_inMsg.m_messageData.size(),m_clientLogString);
const AMFVal &v0 = amf.getValue(0);
if (v0.getString() == "connect")
return handle_AMF0_connect(amf);
if (v0.getString() == "createStream")
return handle_AMF0_createStream(amf);
if (v0.getString() == "play")
return handle_AMF0_play(amf);
throwEx<runtime_error>(m_clientLogString + " " + __FUNCTION__ + " Unknown AMF0 message " + v0.getString());
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::handle_AMF0_connect(const AMFEncoding &amf) throw(exception)
{
DEBUG_LOG(__FUNCTION__);
// const AMFVal &v0 = amf.getValue(0);
const AMFVal &v1 = amf.getValue(1);
const AMFVal &v2 = amf.getValue(2);
if (v1.getNumber() != 1)
throwEx<runtime_error>(m_clientLogString + " Unexpected transaction number. Wanted 1, got " + tos(v1.getNumber()));
const AMFObject &o = v2.getObject();
const AMFVal *pv = o.getProperty("tcUrl");
if (!pv) throwEx<runtime_error>(m_clientLogString + " Connect command has no tcUrl value.");
utf8 url = pv->getString(); // use this value to create stream accessor
pv = o.getProperty("objectEncoding");
m_objectEncodingMode = (pv ? (int)pv->getNumber() : 0);
m_state = &protocol_RTMPClient::state_SendConnectResponse;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::handle_AMF0_createStream(const AMFEncoding &amf) throw(exception)
{
DEBUG_LOG(__FUNCTION__);
// const AMFVal &v0 = amf.getValue(0);
const AMFVal &v1 = amf.getValue(1);
double transaction = v1.getNumber();
AMFEncoding amf0;
AMFObject obj;
amf0.appendValue(AMFVal(utf8("_result"))); // or "_error"
amf0.appendValue(AMFVal((double)transaction));
amf0.appendValue(AMFVal());
amf0.appendValue(AMFVal((double)1)); //1234)); /// stream ID for client
vector<__uint8> resp;
amf0.serialize(resp,m_clientLogString);
message msg(m_inMsg);
msg.m_messageData = resp;
msg.m_messageType = RTMP_MSG__COMMAND_AMF0;
createRTMPMsg(msg,TIMEFUNC() - m_serverBaseTime,(int)m_outboundChunkSize,m_outDataBuffer);
//createRTMPMsg(RTMP_MSG__COMMAND_AMF0,m_outboundChunkSize,&(resp[0]),resp.size(),m_outDataBuffer);
m_outDataPtr = &m_outDataBuffer[0];
m_outDataSize = m_outDataBuffer.size();
m_state = &protocol_RTMPClient::state_Send;
m_nextState = &protocol_RTMPClient::state_WaitForMessage;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::handle_AMF0_play(const AMFEncoding &amf) throw(exception)
{
DEBUG_LOG(__FUNCTION__);
AMFEncoding amf0;
amf0.appendValue(AMFVal(utf8("onStatus"))); // or "_error"
amf0.appendValue(AMFVal(utf8("NetStream.Play.Start")));
vector<__uint8> resp;
amf0.serialize(resp,m_clientLogString);
long ttt = TIMEFUNC();
createRTMPMsg(RTMP_MSG__COMMAND_AMF0,ttt - m_serverBaseTime,(int)m_outboundChunkSize,&(resp[0]),resp.size(),m_outDataBuffer,m_inMsg.m_chunkStreamID,m_inMsg.m_messageStreamID);
amf0.clear();
amf0.appendValue(AMFVal(utf8("onHeaderData")));
AMFEMCAArray amfA;
amfA.addProperty("protocol",new AMFVal(utf8("ICY")));
amfA.addProperty("content-type",new AMFVal(utf8("audio/aacp")));
amf0.appendValue(AMFVal(amfA));
resp.clear();
amf0.serialize(resp,m_clientLogString);
createRTMPMsg(RTMP_MSG__DATA_AMF0,ttt - m_serverBaseTime,(int)m_outboundChunkSize,&(resp[0]),resp.size(),m_outDataBuffer,m_inMsg.m_chunkStreamID,m_inMsg.m_messageStreamID);
m_outDataPtr = &m_outDataBuffer[0];
m_outDataSize = m_outDataBuffer.size();
m_state = &protocol_RTMPClient::state_Send;
m_nextState = &protocol_RTMPClient::state_SendAudio;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::state_SendConnectResponse() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
m_outDataBuffer.clear();
__uint8 data[5];
long ttt = TIMEFUNC();
m_serverBaseTime = ttt;
(*(__uint32*)data) = htonl(DEFAULT_SERVER_WINDOW);
createRTMPMsg(RTMP_MSG__WINDOW_ACKNOWLEDGEMENT_SIZE,ttt - m_serverBaseTime,(int)m_outboundChunkSize,data,4,m_outDataBuffer);
data[4] = RTMP_PEER_BANDWIDTH_DYNAMIC; //_SOFT;
createRTMPMsg(RTMP_MSG__SET_PEER_BANDWIDTH,ttt - m_serverBaseTime,(int)m_outboundChunkSize,data,5,m_outDataBuffer);
__uint8 streamBeginData[6] = {0,0,0,0,0,0};
createRTMPMsg(RTMP_MSG__USER_CONTROL_MESSAGE,ttt - m_serverBaseTime,(int)m_outboundChunkSize,streamBeginData,6,m_outDataBuffer);
__int32 chunkSize = htonl((int)m_outboundChunkSize);
createRTMPMsg(RTMP_MSG__SET_CHUNK_SIZE,ttt - m_serverBaseTime,(int)m_outboundChunkSize,(const __uint8 *)&chunkSize,sizeof(chunkSize),m_outDataBuffer);
AMFEncoding amf; //(m_objectEncodingMode > 1 ? 3 : 0);
AMFObject obj;
amf.appendValue(AMFVal(utf8("_result"))); // or "_error"
amf.appendValue(AMFVal((double)1.0));
obj.addProperty("fmsVer",new AMFVal(utf8("FMS/3,5,3,824a"))); //new AMFVal(utf8("sc_serv " + version.first + " " + version.second)));
obj.addProperty("capabilities",new AMFVal((double)127)); //31)); // ????
obj.addProperty("mode",new AMFVal((double)1));
amf.appendValue(AMFVal(obj));
obj.clearProperties();
obj.addProperty("level",new AMFVal(utf8("status")));
obj.addProperty("code",new AMFVal(utf8("NetConnection.Connect.Success")));
obj.addProperty("description",new AMFVal(utf8("Connection succeeded.")));
obj.addProperty("clientid",new AMFVal((double)795525197.0));
obj.addProperty("objectEncoding",new AMFVal((double)m_objectEncodingMode));
AMFEMCAArray arry;
arry.addProperty("version",new AMFVal(utf8("FMS/3,5,3,824a")));
obj.addProperty("data",new AMFVal(arry));
amf.appendValue(AMFVal(obj));
vector<__uint8> resp;
amf.serialize(resp,m_clientLogString);
createRTMPMsg(RTMP_MSG__COMMAND_AMF0,ttt - m_serverBaseTime,(int)m_outboundChunkSize,&(resp[0]),resp.size(),m_outDataBuffer,m_inMsg.m_chunkStreamID,m_inMsg.m_messageStreamID);
m_outDataPtr = &m_outDataBuffer[0];
m_outDataSize = m_outDataBuffer.size();
m_state = &protocol_RTMPClient::state_Send;
m_nextState = &protocol_RTMPClient::state_WaitForMessage;
//m_nextState = &protocol_RTMPClient::state_SendAudio;
NEXT_STATE
}
runnable::timeSliceResult protocol_RTMPClient::state_SendAudio() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
#ifdef TEST_FILE
__uint8 buffer[1024];
buffer[0] = 0xaf; //FLV audio header
buffer[1] = 0x01;
if (::fread(&(buffer[2]),1,1022,fff) != 1022)
throwEx<runtime_error>("Test done!");
m_outDataBuffer.clear();
createRTMPMsg(RTMP_MSG__AUDIO_DATA,TIMEFUNC() - m_serverBaseTime,(int)m_outboundChunkSize,buffer,1024,m_outDataBuffer,m_inMsg.m_chunkStreamID,m_inMsg.m_messageStreamID);
m_outDataPtr = &m_outDataBuffer[0];
m_outDataSize = m_outDataBuffer.size();
m_state = &protocol_RTMPClient::state_Send;
m_nextState = &protocol_RTMPClient::state_SendAudio;
#endif
NEXT_STATE
}
/////////////////////////////////////////////////////
/////////////////////////////////////////////////////
static void encode3ByteValue(long v,__uint8 *data) throw()
{
data[0] = (v >> 16) & 0xff;
data[1] = (v >> 8) & 0xff;
data[2] = (v & 0xff);
}
static void encode4ByteValue(long v,__uint8 *data) throw()
{
data[0] = (v >> 24) & 0xff;
data[1] = (v >> 16) & 0xff;
data[2] = (v >> 8) & 0xff;
data[3] = (v & 0xff);
}
static void chunkify(int chunkStreamID,int msgStreamID,__uint8 msgTypeID,long timestamp,int outboundChunkSize,const __uint8 *data,size_t dataSize,vector<__uint8> &result)
{
assert(chunkStreamID >= 2);
size_t chunkCount = 1;
if (dataSize > 0)
chunkCount = ((dataSize-1) / outboundChunkSize)+1;
assert(chunkCount);
for(size_t c = 0; c < chunkCount; ++c)
{
__uint8 header[18]; // largest possible header
__uint8 *h = header;
// size basic header
if (chunkStreamID >=2 && chunkStreamID <= 63)
{
*(h++) = chunkStreamID;
}
else if (chunkStreamID >= 64 && chunkStreamID <= 319)
{
*(h++) = 0;
*(h++) = (chunkStreamID - 64);
}
else
{
h[0] = 1;
h[2] = ((chunkStreamID - 64) / 256);
h[1] = (chunkStreamID - 64 - (h[2] * 256));
h+= 3;
}
// fill in format bits and header
if (c)
{
header[0] |= 0xc0; // type 3
}
else
{
// type zero, full header
encode3ByteValue(timestamp < 0x00ffffff ? timestamp : 0x00ffffff,h);
h += 3;
encode3ByteValue((int)dataSize,h);
h += 3;
*(h++) = msgTypeID;
encode4ByteValue(msgStreamID,h);
h += 4;
if (timestamp >= 0x00ffffff)
{
encode4ByteValue(timestamp,h);
h += 4;
}
}
// put header into result
result.insert(result.end(),header,h);
size_t payload_amt = min(dataSize,(size_t)outboundChunkSize);
// put payload into result
result.insert(result.end(),data,data + payload_amt);
dataSize -= payload_amt;
data += payload_amt;
}
assert(dataSize == 0);
}
static void createRTMPMsg(__uint8 msgType,long timestamp,int outboundChunkSize,const __uint8 *payload,size_t payloadSize,vector<__uint8> &result,
int chunkStreamID,int msgStreamID)
{
//chunkify(chunkStreamID,msgStreamID,msgType,TIMEFUNC(),outboundChunkSize,payload,payloadSize,result);
protocol_RTMPClient::message msg;
msg.m_chunkStreamID = chunkStreamID;
msg.m_messageStreamID = msgStreamID;
msg.m_messageType = msgType;
msg.m_messageData.clear();
msg.m_messageData.insert(msg.m_messageData.end(),payload,payload + payloadSize);
createRTMPMsg(msg,timestamp,outboundChunkSize,result);
#if 0
// create a buffer with the message
long t = TIMEFUNC();
vector<__uint8> m;
m.resize(11 + payloadSize);
__uint8 *p = &m[0];
memset(p,0,11);
(*(__uint32*)p) = htonl(payloadSize);
p[0] = msgType;
(*(__uint32*)(&(p[4]))) = htonl(t);
memmove(&(p[11]),payload,payloadSize);
// chunkify
chunkify(2 /* chunk stream id */,0 /* msg stream ID */ ,msgType,t,outboundChunkSize,p,11 + payloadSize,result);
#endif
}
static void createRTMPMsg(const protocol_RTMPClient::message &msg,long timestamp,int outboundChunkSize,vector<__uint8> &result)
{
utf8 s = prettyPrintMessage(msg);
DEBUG_LOG(" send " + eol() +
"CSID=" + tos(msg.m_chunkStreamID) + eol() +
"MSID=" + tos(msg.m_messageStreamID) + eol() +
"Time=" + tos(timestamp) + eol() +
"MTYPE=" + tos((int)msg.m_messageType) + eol() +
"MLEN=" + tos(msg.m_messageData.size()) + eol() +
s);
chunkify(msg.m_chunkStreamID,msg.m_messageStreamID,msg.m_messageType,timestamp,outboundChunkSize,&(msg.m_messageData[0]),msg.m_messageData.size(),result);
}
/////////////////////////////////////////////////////
/////////////////////////////////////////////////////
static int decode3ByteValue(const __uint8 *data,size_t offset = 0) throw()
{
int result = 0;
result += data[offset];
result <<= 8;
result += data[offset + 1];
result <<= 8;
result += data[offset + 2];
return result;
}
static size_t decode4ByteValue(const __uint8 *data,size_t offset) throw()
{
size_t result = 0;
result += data[offset];
result <<= 8;
result += data[offset + 1];
result <<= 8;
result += data[offset + 2];
result <<= 8;
result += data[offset + 3];
return result;
}
#if 0
void getRTMPMsgInfo(const vector<__uint8> &msg,__uint8 &msgType,int &payloadLength,long &timestamp,int &streamID,int &payloadOffset)
{
assert(msg.size() >= 11);
const __uint8 *p = &msg[0];
msgType = *(p++);
payloadLength = decode3ByteValue(p);
p+=3;
timestamp = decode4ByteValue(p);
p += 4;
streamID = decode3ByteValue(p);
p += 3;
payloadOffset = 11;
}
#endif
// get chunk type from first byte in basic header
static int chunkType(const __uint8 *basicHeader) throw()
{
return (((*basicHeader) & 0xc0) >> 6);
}
static int calculateBasicChunkHeaderSize(const __uint8 *basicHeader) throw()
{
int b = ((*basicHeader) & 0x3f);
switch(b)
{
case 0: return 2;
case 1: return 3;
}
return 1;
}
// look at complete basic header and determine how many bytes to expect in
// the chunk header
static int calculateChunkMsgHeaderSize(const __uint8 *basicHeader) throw()
{
switch(chunkType(basicHeader))
{
case 0: return 11;
case 1: return 7;
case 2: return 3;
}
return 0;
}
// calculate the complete size of a chunk header
static int calculateCompleteChunkHeaderSize(const __uint8 *basicHeader) throw()
{
return calculateChunkMsgHeaderSize(basicHeader) + calculateBasicChunkHeaderSize(basicHeader);
}
// look at a complete chunk header (basic and msg header) and see if we need to get
// an extended timestamp for the chunk
static bool chunkNeedsExtendedTimestamp(const __uint8 *basicHeader) throw()
{
int f = chunkType(basicHeader);
if (f == 3) return false;
int s = calculateBasicChunkHeaderSize(basicHeader);
// timestamp is always just after the basic header
return ((basicHeader[s] == 0xff) && (basicHeader[s+1] == 0xff) && (basicHeader[s+2] == 0xff));
}
// get chunkstream ID from the basic header
static int getChunkStreamIDFromBasicHeader(const __uint8 *basicHeader) throw()
{
int b = ((*basicHeader) & 0x3f);
switch(b)
{
case 0: return (basicHeader[1] + 64);
case 1: return (basicHeader[2] * 256 + basicHeader[1] + 64);
}
return b;
}
static int calculateMessageSize(const __uint8 *basicHeader) throw()
{
if (chunkType(basicHeader) <= 1)
{
int s = calculateBasicChunkHeaderSize(basicHeader);
return decode3ByteValue(basicHeader + s + 3);
}
return -1;
}
static int calculateMessageTypeID(const __uint8 *basicHeader) throw()
{
if (chunkType(basicHeader) <= 1)
{
int s = calculateBasicChunkHeaderSize(basicHeader);
return basicHeader[s + 6];
}
return -1;
}
static int calculateMessageStreamID(const __uint8 *basicHeader) throw()
{
if (chunkType(basicHeader) == 0)
{
int s = calculateBasicChunkHeaderSize(basicHeader);
return decode3ByteValue(basicHeader + s + 7);
}
return -1;
}
static long calculateTimestamp(const __uint8 *basicHeader) throw()
{
return (chunkType(basicHeader) < 3 ? decode3ByteValue(basicHeader + calculateBasicChunkHeaderSize(basicHeader)) : 0);
}
// looks in inBuffer for a complete message. If it finds one it fills in msg and returns true.
// whether true or false, data from head of inBuffer should be removed based on amtToRemoveFromInBuffer
bool protocol_RTMPClient::chunkSequenceComplete(const __uint8 *inBuffer,size_t amtInBuffer,size_t &amtToRemoveFromInBuffer,size_t expectedChunkSize,
vector<__uint8> &msg,__uint8 &msgType,int &chunkStreamID,int &messageStreamID,const uniString::utf8 &logMsgPrefix) throw(std::exception)
{
bool result = false;
msg.clear();
amtToRemoveFromInBuffer = 0;
// walk through chunks
const __uint8 *pBegin = inBuffer;
const __uint8 *pEnd = inBuffer + amtInBuffer;
const __uint8 *p = inBuffer;
chunkStreamID = -1;
messageStreamID = -1;
int messageLength = -1;
int messageTypeID = -1;
int payloadDataSeen = 0;
int chunksConsolidated = 0; // for debugging
while(p != pEnd)
{
chunksConsolidated += 1;
// do all calculations necessary to see if we have a complete chunk
int tmp;
int hs = calculateCompleteChunkHeaderSize(p);
if ((p + hs) > pEnd) break; // not enough data
if (chunkNeedsExtendedTimestamp(p))
hs += 4;
if ((p + hs) > pEnd) break; // not enough data
// calculate timestamp
long tt = calculateTimestamp(p);
switch(chunkType(p))
{
case 0:
m_lastInboundTimestamp = tt;
break;
case 1: case 2:
m_lastInboundTimestamp += tt;
break;
}
// gather and confirm chunk stream id
tmp = getChunkStreamIDFromBasicHeader(p);
if (chunkStreamID == -1)
chunkStreamID = tmp;
else if (chunkStreamID != tmp)
throwEx<runtime_error>(logMsgPrefix + " expected chunk stream ID " + tos(chunkStreamID) + " but got " + tos(tmp) + " instead.");
// gather and confirm message length for this chunk sequence
tmp = calculateMessageSize(p);
if ((tmp == -1) && (m_lastInboundMessageLength == -1)) throwEx<runtime_error>(logMsgPrefix + " No message length for chunk with chunk stream ID " + tos(chunkStreamID));
if ((tmp != -1) && (messageLength != -1) && (tmp != messageLength)) throwEx<runtime_error>(logMsgPrefix + " mismatch message length for chunk with chunk stream ID " + tos(chunkStreamID) + " initially got " + tos(messageLength) + " then received " + tos(tmp));
if (tmp != -1)
m_lastInboundMessageLength = messageLength = tmp;
tmp = calculateMessageTypeID(p);
if ((tmp == -1) && (m_lastInboundMessageTypeID == -1)) throwEx<runtime_error>(logMsgPrefix + " No message type ID for chunk with chunk stream ID " + tos(chunkStreamID));
if ((tmp != -1) && (messageTypeID != -1) && (tmp != messageTypeID)) throwEx<runtime_error>(logMsgPrefix + " mismatch message type ID for chunk with chunk stream ID " + tos(chunkStreamID) + " initially got " + tos(messageTypeID) + " then received " + tos(tmp));
if (tmp != -1)
m_lastInboundMessageTypeID = messageTypeID = tmp;
tmp = calculateMessageStreamID(p);
if ((tmp == -1) && (m_lastInboundMessageStreamID == -1)) throwEx<runtime_error>(logMsgPrefix + " No message stream ID for chunk with chunk stream ID " + tos(chunkStreamID));
if ((tmp != -1) && (messageStreamID != -1) && (tmp != messageStreamID)) throwEx<runtime_error>(logMsgPrefix + " mismatch message stream ID for chunk with stream ID " + tos(chunkStreamID) + " initially got " + tos(messageStreamID) + " then received " + tos(tmp));
if (tmp != -1)
m_lastInboundMessageStreamID = messageStreamID = tmp;
// calculate data that should be in this chunk
tmp = m_lastInboundMessageLength - payloadDataSeen;
tmp = min(tmp,(int)expectedChunkSize);
// see if we have enough
if ((p + hs + tmp) > pEnd)
break; // nope
// yeah we do. Copy out data
msg.insert(msg.end(),p+hs,p+hs+tmp);
payloadDataSeen += tmp;
p = p + hs + tmp;
// are we all done?
if (payloadDataSeen == m_lastInboundMessageLength)
{
// yes
amtToRemoveFromInBuffer = p - pBegin;
DEBUG_LOG(logMsgPrefix + " recv" + stringUtil::eol() +
" CSID=" + tos(chunkStreamID) + stringUtil::eol() +
" MSID=" + tos(m_lastInboundMessageStreamID) + stringUtil::eol() +
" Time=" + tos(m_lastInboundTimestamp) + stringUtil::eol() +
" MTYPE=" + tos(m_lastInboundMessageTypeID) + stringUtil::eol() +
" LEN=" + tos(m_lastInboundMessageLength) + stringUtil::eol() +
" REMOVED=" + tos(amtToRemoveFromInBuffer));
msgType = m_lastInboundMessageTypeID;
messageStreamID = m_lastInboundMessageStreamID;
return true;
}
}
return result;
}
// return zero if get a msg, otherwise return timeout for read
long protocol_RTMPClient::getMsg(message &msg) throw(exception)
{
size_t amt_left = m_inDataBufferMax - m_inDataBufferAmt;
size_t amt_left2 = amt_left;
long to = getSocketData(m_socket,m_inDataBuffer,m_inDataBufferAmt,amt_left,m_lastActivityTime,m_clientLogString);
m_inDataBufferAmt += (amt_left2 - amt_left);
size_t amtToRemoveFromInBuffer = 0;
bool seqComplete = chunkSequenceComplete(m_inDataBuffer,m_inDataBufferAmt,amtToRemoveFromInBuffer,m_inboundChunkSize,msg.m_messageData,msg.m_messageType,msg.m_chunkStreamID,msg.m_messageStreamID,m_clientLogString);
if (amtToRemoveFromInBuffer)
{
assert(m_inDataBufferAmt >= amtToRemoveFromInBuffer);
if (m_inDataBufferAmt == amtToRemoveFromInBuffer)
{
m_inDataBufferAmt = 0;
#ifndef NDEBUG
memset(m_inDataBuffer,0,m_inDataBufferMax);
#endif
}
else
{
memmove(m_inDataBuffer,m_inDataBuffer + amtToRemoveFromInBuffer,m_inDataBufferAmt - amtToRemoveFromInBuffer);
m_inDataBufferAmt -= amtToRemoveFromInBuffer;
#ifndef NDEBUG
memset(m_inDataBuffer + m_inDataBufferAmt,0,m_inDataBufferMax - m_inDataBufferAmt);
#endif
}
}
if (seqComplete)
{
to = 0;
}
else
{
if (m_inDataBufferAmt == m_inDataBufferMax)
throwEx<runtime_error>(m_clientLogString + " inbound data buffer exceeded");
assert(to != 0); // ??? not sure
if (to == 0)
to = 1;
}
return to;
}
/////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////
runnable::timeSliceResult protocol_RTMPClient::state_Close() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
timeSliceResult result;
result.m_done = true;
return result;
}
void protocol_RTMPClient::logW3C() throw()
{
}
#if 0
///////////////////////////////////// W3C Logging //////////////////////////////////////////////
static utf8 titleFromMetadata(const utf8 &md) throw()
{
utf8 title;
utf8::size_type p1 = utf8::npos;
utf8::size_type p2 = utf8::npos;
p1 = md.find(utf8("itle='"));
if (p1 != utf8::npos)
{
p1 += 6;
p2 = md.find(utf8("';"),p1);
if (p2 != utf8::npos)
{
title = md.substr(p1,p2-p1);
}
}
return title;
}
// create W3C entry. Entries describe the duration a client has listened to a specific title.
// the entry is generated on a title change, or when the client disconnects
void protocol_RTMPClient::logW3C() throw()
{
if (gOptions.w3cEnable())
{
time_t t(::time(0));
time_t durationOfTitle = t - m_lastTitleTime;
int bitrateOfTitle = (int)(durationOfTitle ? (8 * m_bytesSentForCurrentTitle) / durationOfTitle : 0);
utf8 title = titleFromMetadata(m_lastSentMetadata);
w3cLog::log(m_clientAddr,
m_clientHostName,
m_streamID,
title,
mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")),
m_bytesSentForCurrentTitle,
durationOfTitle,
bitrateOfTitle);
setW3CState();
}
}
// setup tracking variables for W3C log
void protocol_RTMPClient::setW3CState() throw()
{
m_lastTitleTime = ::time(0);
m_bytesSentForCurrentTitle = 0;
}
//////////////////////////////////////////////////////////////////////////////
void protocol_RTMPClient::aquireIntroFile() throw()
{
m_streamData->getIntroFile().getSc1Data(m_introFile);
m_introFileOffset = 0;
}
void protocol_RTMPClient::aquireBackupFile() throw()
{
m_streamData->getBackupFile().getSc1Data(m_backupFile);
m_backupFileOffset = 0;
}
runnable::timeSliceResult protocol_RTMPClient::state_SendText() throw(exception)
{
//DLOG(__FUNCTION__);
timeSliceResult result;
long to = sendHTTPStyleText(m_socket,m_outBuffer,m_outBufferSize,m_lastActivityTime,m_clientLogString);
if (to == 0)
{ // sent
m_state = m_nextState;
result.m_runImmediately = true;
}
else
{ // try again
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = to;
}
return result;
}
runnable::timeSliceResult protocol_RTMPClient::state_AttachToStream() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
timeSliceResult result;
assert(!m_streamData);
m_streamID = DEFAULT_CLIENT_STREAM_ID;
utf8::size_type pos = m_HTTPRequestInfo.m_url.find(utf8("/stream/"));
if (pos != utf8::npos)
m_streamID = atoi((const char *)m_HTTPRequestInfo.m_url.substr(pos + 8).c_str());
map<int,config::streamConfig> stream_configs = gOptions.getStreamConfigs();
for(map<int,config::streamConfig>::const_iterator i = stream_configs.begin(); i != stream_configs.end(); ++i)
{
if ((*i).second.m_urlPath == m_HTTPRequestInfo.m_url)
{
m_streamID = (*i).first;
break;
}
pos = m_HTTPRequestInfo.m_url.find((*i).second.m_urlPath);
if (pos != utf8::npos && pos == 0)
{
utf8 params = m_HTTPRequestInfo.m_url.substr(pos + (*i).second.m_urlPath.size());
if(params.find(utf8(";")) == 0 || params.find(utf8("/")) == 0 || params.find(utf8("/;")) == 0)
{
m_streamID = (*i).first;
break;
}
}
}
m_clientLogString = dstAddrLogString(m_clientHostName,m_clientPort,m_streamID);
utf8 user_agent = toLower(mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")));
bool isSourceActive = false;
m_streamData = streamData::accessStream(m_streamID,isSourceActive);
if (!m_streamData || !isSourceActive)
{
utf8 msg_icy401 = MSG_ICY401;
msg_icy401.replace(msg_icy401.find(utf8("^")),1,gOptions.getVersionBuildStrings());
m_outBuffer = msg_icy401.c_str();
m_outBufferSize = strlen(m_outBuffer);
m_state = &protocol_RTMPClient::state_SendText;
m_nextState = &protocol_RTMPClient::state_Close;
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = gOptions.getAutoDumpSourceTime();
ELOG(m_clientLogString + " SHOUTcast 1 client connection rejected. Stream not available. " + mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")));
}
else
{
// construct the response text
if (!stats::addClient(m_streamID,this,m_clientAddr,m_clientPort,mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")),g_ripList.find(m_clientAddr)))
{
utf8 backup_server = m_streamData->streamBackupServer();
if (backup_server.empty())
{
utf8 msg_icy503 = MSG_ICY503;
msg_icy503.replace(msg_icy503.find(utf8("^")),1,gOptions.getVersionBuildStrings());
m_outBuffer = msg_icy503.c_str();
m_outBufferSize = strlen(m_outBuffer);
ELOG(m_clientLogString + " SHOUTcast 1 client connection rejected. Max users reached. " + mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")));
}
else
{
m_redirectResponse = http302(backup_server);
m_outBuffer = m_redirectResponse.c_str();
m_outBufferSize = m_redirectResponse.size();
WLOG(m_clientLogString + " SHOUTcast 1 client connection rejected. Max users reached. Redirecting to " + backup_server + ". " + mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")));
}
m_state = &protocol_RTMPClient::state_SendText;
m_nextState = &protocol_RTMPClient::state_Close;
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = gOptions.getAutoDumpSourceTime();
}
else
{
m_removeClientFromStats = true;
m_sendMetadata = mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"icy-metadata",false);
m_metaInterval = gOptions.metaInterval();
if (m_metaInterval < 256) m_metaInterval = 256; // clamp
m_ICYOKResponse = MSG_ICY200;
m_ICYOKResponse += "icy-name:" + m_streamData->streamName() + "\r\n";
m_ICYOKResponse += "icy-genre:" + m_streamData->streamGenre() + "\r\n";
m_ICYOKResponse += "icy-url:" + m_streamData->streamURL() + "\r\n";
m_ICYOKResponse += "content-type:" + m_streamData->streamContentType() + "\r\n";
if (isUserAgentRelay(user_agent) && (!m_streamData->allowPublicRelay()))
m_ICYOKResponse += "icy-pub:0\r\n";
else
m_ICYOKResponse += "icy-pub:" + tos(m_streamData->streamPublic()) + "\r\n";
if (m_sendMetadata)
m_ICYOKResponse += "icy-metaint:" + tos(m_metaInterval) + "\r\n";
m_ICYOKResponse += "icy-br:" + m_streamData->streamName() + "\r\n";
m_ICYOKResponse += "\r\n";
DEBUG_LOG(m_clientLogString + " sending [" + m_ICYOKResponse + "]");
m_outBuffer = m_ICYOKResponse.c_str();
m_outBufferSize = strlen(m_outBuffer);
m_state = &protocol_RTMPClient::state_SendText;
m_nextState = &protocol_RTMPClient::state_InitiateStream;
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = gOptions.getAutoDumpSourceTime();
ILOG(m_clientLogString + " SHOUTcast 1 client connection accepted. " + mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")));
}
}
return result;
}
// set read pointer to a nice distance from the write pointer. Return that distance
streamData::ringBufferAccess_t protocol_RTMPClient::resetReadPtr() throw()
{
m_readPtr = m_streamData->getSc1ClientStartPosition();
return m_streamData->getSc1RingBuffer().m_writePtr - m_readPtr;
}
runnable::timeSliceResult protocol_RTMPClient::state_InitiateStream() throw(exception)
{
DEBUG_LOG(__FUNCTION__);
assert(m_streamData);
m_metaIntervalCounter = 0;
resetReadPtr();
m_underruns = 0;
// if we have an intro file send it, otherwise start streaming
aquireIntroFile();
m_state = (m_introFile.empty() ? &protocol_RTMPClient::state_Stream : &protocol_RTMPClient::state_SendIntroFile);
setW3CState();
timeSliceResult result;
result.m_runImmediately = true;
return result;
}
// construct the necessary metadata information and load into outbound buffers.
void protocol_RTMPClient::sendICYMetadata(const utf8 &md) throw()
{
m_ICYMetadata.clear();
m_ICYMetadata.push_back(1); // placeholder
if (md != m_lastSentMetadata) // don't sent duplicates
{
m_ICYMetadata.insert(m_ICYMetadata.end(),md.begin(),md.end());
if (!m_lastSentMetadata.empty())
logW3C();
m_lastSentMetadata = md;
}
unsigned int dlen = m_ICYMetadata.size();
if (dlen == 1) dlen = 0;
unsigned int l1=((dlen+15)&~15);
m_ICYMetadata[0] = l1/16;
unsigned int send_len = l1+1;
m_ICYMetadata.insert(m_ICYMetadata.end(),send_len - m_ICYMetadata.size(),0);
assert(m_ICYMetadata.size() == ((m_ICYMetadata[0] * 16)+1));
m_metaIntervalCounter = 0;
m_outBuffer = &m_ICYMetadata[0];
m_outBufferSize = m_ICYMetadata.size();
m_state = &protocol_RTMPClient::state_SendText;
}
// handle state where we are sending intro files
runnable::timeSliceResult protocol_RTMPClient::state_SendIntroFile() throw(exception)
{
assert(m_streamData);
assert(!m_introFile.empty());
timeSliceResult result;
int autoDumpTime = gOptions.getAutoDumpSourceTime(); // don't want this value to change during this call
m_limitTrigger.clear();
size_t amt = m_introFile.size() - m_introFileOffset;
if (amt == 0)
{
// we're done with the intro file
m_introFile.clear();
m_state = &protocol_RTMPClient::state_Stream;
resetReadPtr();
result.m_runImmediately = true;
}
else if ((m_metaIntervalCounter == m_metaInterval) && m_sendMetadata) // check to see if we have to send the metadata
{
sendICYMetadata("StreamTitle='';");
m_nextState = &protocol_RTMPClient::state_SendIntroFile;
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = autoDumpTime;
}
else
{
// clamp amount to send if we are supporting metadata
if (m_sendMetadata)
{
amt = min(amt,(m_metaInterval - m_metaIntervalCounter));
}
// send
time_t cur_time = ::time(0);
if (autoDumpTime > 0 && (cur_time - m_lastActivityTime) >= autoDumpTime)
throwEx<runtime_error>(m_clientLogString + " Timeout waiting to send data (" +
tos(cur_time) + " " + tos(m_lastActivityTime) + " [" + tos(cur_time - m_lastActivityTime) + "] )");
if (!amt)
{
// nothing in the source
result.m_runImmediately = true;
}
else
{
int rval = ::send(m_socket,(const char *)&(m_introFile[m_introFileOffset]),amt,0);
if (rval == 0)
{
throwEx<runtime_error>(m_clientLogString + " Remote socket closed while sending data.");
}
else if (rval < 0)
{
rval = socketOps::errCode();
if (rval != SOCKETOPS_WOULDBLOCK)
throwEx<runtime_error>(((
#ifdef _WIN32
rval == WSAECONNABORTED || rval == WSAECONNRESET
#else
rval == ECONNABORTED || rval == ECONNRESET || rval == EPIPE
#endif
) ? uniString::utf8("") : m_clientLogString + "Socket error while waiting to send data. " + socketErrString(rval)));
result.m_timeout.tv_sec = (long)(autoDumpTime - (cur_time - m_lastActivityTime));
result.m_writeSet.insert(m_socket);
}
else
{
m_bytesSentForCurrentTitle += rval;
m_totalBytesSent += rval;
m_lastActivityTime = ::time(NULL);
m_metaIntervalCounter += rval;
m_introFileOffset += rval;
assert((!m_sendMetadata) || (m_metaIntervalCounter <= m_metaInterval));
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = autoDumpTime;
}
}
}
return result;
}
// handle state where we are sending backup files
runnable::timeSliceResult protocol_RTMPClient::state_SendBackupFile() throw(exception)
{
assert(m_streamData);
assert(!m_backupFile.empty());
timeSliceResult result;
int autoDumpTime = gOptions.getAutoDumpSourceTime(); // don't want this value to change during this call
m_limitTrigger.clear();
size_t amt = m_backupFile.size() - m_backupFileOffset;
if (streamData::isSourceConnected(m_streamData))
{
// we're done with the backup file
m_backupFile.clear();
resetReadPtr();
m_state = &protocol_RTMPClient::state_Stream;
result.m_runImmediately = true;
}
else if (amt == 0)
{
// we're done with the backup file. get more data
aquireBackupFile();
if (m_backupFile.empty())
{ // it got cleared out from under us? Try and stream
resetReadPtr();
m_state = &protocol_RTMPClient::state_Stream;
}
result.m_runImmediately = true;
}
else if ((m_metaIntervalCounter == m_metaInterval) && m_sendMetadata) // check to see if we have to send the metadata
{
sendICYMetadata("StreamTitle='';");
m_nextState = &protocol_RTMPClient::state_SendBackupFile;
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = autoDumpTime;
}
else
{
// clamp amount to send if we are supporting metadata
if (m_sendMetadata)
{
amt = min(amt,(m_metaInterval - m_metaIntervalCounter));
}
// send
time_t cur_time = ::time(0);
if (autoDumpTime > 0 && (cur_time - m_lastActivityTime) >= autoDumpTime)
throwEx<runtime_error>(m_clientLogString + " Timeout waiting to send data (" +
tos(cur_time) + " " + tos(m_lastActivityTime) + " [" + tos(cur_time - m_lastActivityTime) + "] )");
if (!amt)
{
// nothing in the source
result.m_runImmediately = true;
}
else
{
int rval = ::send(m_socket,(const char *)&(m_backupFile[m_backupFileOffset]),amt,0);
if (rval == 0)
{
throwEx<runtime_error>(m_clientLogString + " Remote socket closed while sending data.");
}
else if (rval < 0)
{
rval = socketOps::errCode();
if (rval != SOCKETOPS_WOULDBLOCK)
throwEx<runtime_error>(((
#ifdef _WIN32
rval == WSAECONNABORTED || rval == WSAECONNRESET
#else
rval == ECONNABORTED || rval == ECONNRESET || rval == EPIPE
#endif
) ? uniString::utf8("") : m_clientLogString + "Socket error while waiting to send data. " + socketErrString(rval)));
result.m_timeout.tv_sec = (long)(autoDumpTime - (cur_time - m_lastActivityTime));
result.m_writeSet.insert(m_socket);
}
else
{
m_bytesSentForCurrentTitle += rval;
m_totalBytesSent += rval;
m_lastActivityTime = ::time(NULL);
m_metaIntervalCounter += rval;
m_backupFileOffset += rval;
assert((!m_sendMetadata) || (m_metaIntervalCounter <= m_metaInterval));
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = autoDumpTime;
}
}
}
return result;
}
runnable::timeSliceResult protocol_RTMPClient::state_Stream() throw(exception)
{
assert(m_streamData);
timeSliceResult result;
int autoDumpTime = gOptions.getAutoDumpSourceTime(); // don't want this value to change during this call
m_limitTrigger.clear();
// check to see if we have to send the metadata
if ((m_metaIntervalCounter == m_metaInterval) && m_sendMetadata)
{
// send metadata
sendICYMetadata(m_streamData->getSc1Metadata(m_readPtr).m_songTitle);
m_nextState = &protocol_RTMPClient::state_Stream;
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = autoDumpTime;
}
else
{
streamData::ringBuffer_t rb = m_streamData->getSc1RingBuffer();
streamData::ringBufferAccess_t amt = rb.m_writePtr - m_readPtr;
if (amt > rb.m_bufferSize)
{
// the pointers are too far apart. Underrun
m_underruns += 1;
amt = resetReadPtr();
}
// clamp amount to send if we are supporting metadata
if (m_sendMetadata)
{
amt = min(amt,(streamData::ringBufferAccess_t)(m_metaInterval - m_metaIntervalCounter));
}
streamData::ringBufferAccess_t offset = m_readPtr & rb.m_ptrMask;
// clamp again so we don't read passed end of buffer
amt = min(amt,rb.m_bufferSize - offset);
// send
time_t cur_time = ::time(0);
if (autoDumpTime > 0 && (cur_time - m_lastActivityTime) >= autoDumpTime)
throwEx<runtime_error>(m_clientLogString + " Timeout waiting to send data (" +
tos(cur_time) + " " + tos(m_lastActivityTime) + " [" + tos(cur_time - m_lastActivityTime) + "] )");
if (!amt)
{
// nothing in the source
// If the source has gone away, and we have a backup file, send it.
bool sendBackupFile = false;
if (!streamData::isSourceConnected(m_streamData))
{
aquireBackupFile();
sendBackupFile = !m_backupFile.empty();
}
if (sendBackupFile)
{
m_state = &protocol_RTMPClient::state_SendBackupFile;
result.m_runImmediately = true;
}
else
{
result.m_timeout.tv_sec = (long)(autoDumpTime - (cur_time - m_lastActivityTime));
m_limitTrigger.clear();
result.m_readSet.insert(m_limitTrigger.test());
m_streamData->scheduleSc1LimitTrigger(&m_limitTrigger,m_readPtr);
}
}
else
{
int rval = ::send(m_socket,(const char *)&(rb.m_data[offset]),amt,0);
if (rval == 0)
{
throwEx<runtime_error>(m_clientLogString + " Remote socket closed while sending data.");
}
else if (rval < 0)
{
rval = socketOps::errCode();
if (rval != SOCKETOPS_WOULDBLOCK)
throwEx<runtime_error>(((
#ifdef _WIN32
rval == WSAECONNABORTED || rval == WSAECONNRESET
#else
rval == ECONNABORTED || rval == ECONNRESET || rval == EPIPE
#endif
) ? uniString::utf8("") : m_clientLogString + "Socket error while waiting to send data. " + socketErrString(rval)));
result.m_timeout.tv_sec = (long)(autoDumpTime - (cur_time - m_lastActivityTime));
result.m_writeSet.insert(m_socket);
}
else
{
m_bytesSentForCurrentTitle += rval;
m_totalBytesSent += rval;
m_lastActivityTime = ::time(NULL);
m_metaIntervalCounter += rval;
assert((!m_sendMetadata) || (m_metaIntervalCounter <= m_metaInterval));
m_readPtr += rval;
result.m_writeSet.insert(m_socket);
result.m_timeout.tv_sec = autoDumpTime;
}
}
}
return result;
}
#endif
#endif