#if 0 #ifdef _WIN32 #include #endif #include #include "protocol_RTMPClient.h" #include "ripList.h" #include "stats.h" #include "streamData.h" #include "amf.h" #include "w3cLog.h" #include "log.h" #include "global.h" #include using namespace std; using namespace uniString; using namespace stringUtil; #define C1_S1_C2_S2_PACKET_SIZE 1536 #define DEFAULT_CHUNK_SIZE 128 #define RTMP_MSG__SET_CHUNK_SIZE 1 #define RTMP_MSG__ABORT 2 #define RTMP_MSG__ACKNOWLEDGEMENT 3 #define RTMP_MSG__USER_CONTROL_MESSAGE 4 #define RTMP_MSG__WINDOW_ACKNOWLEDGEMENT_SIZE 5 #define RTMP_MSG__SET_PEER_BANDWIDTH 6 #define RTMP_MSG__AUDIO_DATA 8 #define RTMP_MSG__VIDEO_DATA 9 #define RTMP_MSG__DATA_AMF3 15 #define RTMP_MSG__COMMAND_AMF3 17 #define RTMP_MSG__DATA_AMF0 18 #define RTMP_MSG__COMMAND_AMF0 20 // user control messages #define RTMP_UCM_STREAM_BEGIN 0 #define RTMP_UCM_STREAM_EOF 1 #define RTMP_UCM_STREAM_DRY 2 #define RTMP_UCM_SET_BUFFER 3 #define RTMP_UCM_STREAM_IS_RECORDED 4 #define RTMP_UCM_PING_REQUEST 6 #define RTMP_UCM_PING_RESPONSE 7 ///////////////////////// #define RTMP_PEER_BANDWIDTH_HARD 0 #define RTMP_PEER_BANDWIDTH_SOFT 1 #define RTMP_PEER_BANDWIDTH_DYNAMIC 2 #define DEFAULT_SERVER_WINDOW 0x2625a0 //(16 * 1024) #define DEBUG_LOG(x) { if (gOptions.RTMPClientDebug()) DLOG((x)); } #ifdef _WIN32 #define TIMEFUNC ::timeGetTime #else #include static unsigned long TIMEFUNC() throw() { struct timeval tp; ::gettimeofday(&tp,NULL); return (tp.tv_sec * 1000) + (tp.tv_usec / 1000); } #endif static void createRTMPMsg(__uint8 msgType,long timestamp,int outboundChunkSize,const __uint8 *payload,size_t payloadSize,vector<__uint8> &result, int chunkStreamID = 2,int msgStreamID = 0); static void createRTMPMsg(const protocol_RTMPClient::message &msg,long timestamp,int outboundChunkSize,vector<__uint8> &result); //void getRTMPMsgInfo(const vector<__uint8> &msg,__uint8 &msgType,int &payloadLength,long ×tamp,int &streamID,int &payloadOffset); static size_t decode4ByteValue(const __uint8 *data,size_t offset = 0) throw(); static utf8 prettyPrintMessage(const protocol_RTMPClient::message &msg) throw() { utf8 EOL(eol()); utf8 result(EOL); result += "msg type=" + tos((int)msg.m_messageType) + EOL; try { switch(msg.m_messageType) { case RTMP_MSG__SET_CHUNK_SIZE: result += "chunk size=" + tos(decode4ByteValue(&(msg.m_messageData[0]))) + EOL; break; case RTMP_MSG__ABORT: result += "chunk stream id=" + tos(decode4ByteValue(&(msg.m_messageData[0]))) + EOL; break; case RTMP_MSG__ACKNOWLEDGEMENT: result += "sequence number=" + tos(decode4ByteValue(&(msg.m_messageData[0]))) + EOL; break; case RTMP_MSG__USER_CONTROL_MESSAGE: break; case RTMP_MSG__WINDOW_ACKNOWLEDGEMENT_SIZE: result += "acknowledgement window size=" + tos(decode4ByteValue(&(msg.m_messageData[0]))) + EOL; break; case RTMP_MSG__SET_PEER_BANDWIDTH: result += "acknowledgement window size=" + tos(decode4ByteValue(&(msg.m_messageData[0]))) + EOL; result += "limit type=" + tos(msg.m_messageData[4]) + EOL; break; case RTMP_MSG__DATA_AMF0: case RTMP_MSG__COMMAND_AMF0: { AMFEncoding amf0; amf0.loadFromBitstream((const char *)&(msg.m_messageData[0]),(int)msg.m_messageData.size(),""); result += amf0.prettyPrint(); } break; case RTMP_MSG__DATA_AMF3: case RTMP_MSG__COMMAND_AMF3: { AMFEncoding amf3(3); amf3.loadFromBitstream((const char *)&(msg.m_messageData[0]),(int)msg.m_messageData.size(),""); result += amf3.prettyPrint(); } break; } } catch(const exception &ex) { result += string("Exception: ") + ex.what() + EOL; DEBUG_LOG(msg.packetDump()); } catch(...) { result += "Exception: " + EOL; DEBUG_LOG(msg.packetDump()); } return result; } #define TEST_FILE "C:\\Documents and Settings\\nradisch\\My Documents\\programming\\shoutcast\\current\\sc_serv2\\test.aac" FILE *fff = 0; utf8 protocol_RTMPClient::message::packetDump() const throw() { ostringstream o; int x = 0; for(std::vector<__uint8>::const_iterator i = m_messageData.begin(); i != m_messageData.end(); ++i) { if ((x++) % 16 == 0) o << stringUtil::eol(); o << setw(2) << hex << (int)(*i) << " "; } return o.str(); } protocol_RTMPClient::protocol_RTMPClient(socketOps::tSOCKET s,const utf8 &hostName,const utf8 &addr,int port,__uint8 C0)throw(exception) :m_socket(s),m_clientHostName(hostName),m_clientAddr(addr),m_clientPort(port),m_clientLogString(dstAddrLogString(hostName,port)), m_C0(C0), m_S0(3), m_inDataBuffer(0), m_lastActivityTime(::time(0)),m_startTime(::time(0)), m_lastInboundMessageStreamID(-1), m_lastInboundMessageLength(-1), m_lastInboundMessageTypeID(-1), m_lastInboundTimestamp(-1), m_windowSizeFromClient(-1), m_bufferSizeFromClient(-1), m_lastTitleTime(::time(0)), m_bytesSentForCurrentTitle(0), m_totalBytesSent(0), m_objectEncodingMode(0), m_removeClientFromStats(false), //m_state(&protocol_RTMPClient::state_AttachToStream), m_state(&protocol_RTMPClient::state_SendS0), m_streamData(0) { #ifdef TEST_FILE if (fff) ::fclose(fff); fff = 0; fff = ::fopen(TEST_FILE,"rb"); #endif DEBUG_LOG(__FUNCTION__); m_inDataBufferMax = 16 * 1024; m_inDataBuffer = new __uint8[m_inDataBufferMax]; m_inDataBufferAmt = 0; // intialize s1 to all zeros m_S1orS2.resize(C1_S1_C2_S2_PACKET_SIZE,0); memset(&(m_S1orS2[0]),0,m_S1orS2.size()); // set base time m_serverBaseTime = TIMEFUNC(); __uint32 sbt = htonl(m_serverBaseTime); memcpy(&(m_S1orS2[0]),&sbt,4); // initialize from 10 bytes of random section to random stuff. Faster than // doing entire 1528 byte block and still valid for(int x = 0; x < 10; ++x) { m_S1orS2[8+x] = rand(); } m_outDataPtr = &m_S0; m_outDataSize = 1; m_inboundChunkSize = DEFAULT_CHUNK_SIZE; m_outboundChunkSize = DEFAULT_CHUNK_SIZE; } protocol_RTMPClient::~protocol_RTMPClient() throw() { #ifdef TEST_FILE if (fff) ::fclose(fff); fff = 0; #endif DEBUG_LOG(__FUNCTION__); try { /*ILOG(m_clientLogString + " SHOUTcast 1 client connection closed (" + tos(::time(0) - m_startTime).c_str() + " seconds). " + mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")) + " [Bytes: " + tos(m_totalBytesSent).c_str() + "]");*/ if (m_removeClientFromStats) stats::removeClient(m_streamID,this); if (m_streamData) { m_streamData->abandonRTMPLimitTrigger(&m_limitTrigger); streamData::streamClientLost(m_streamData); m_streamData = 0; logW3C(); } } catch(const exception &ex) { ELOG(ex.what()); } delete [] m_inDataBuffer; m_streamData = 0; socketOps::forgetTCPSocket(m_socket); } runnable::timeSliceResult protocol_RTMPClient::timeSlice() throw(exception) { size_t listenerTime = gOptions.stream_listenerTime(DEFAULT_SHOUTCAST_SOURCE_STREAM); if(!gOptions.read_stream_listenerTime(DEFAULT_SHOUTCAST_SOURCE_STREAM)) { listenerTime = gOptions.listenerTime(); } listenerTime *= 60; // convert to seconds bool timesUp = (listenerTime && ((::time(0) - m_startTime) > (int)listenerTime)); if (m_kickNextRound || timesUp || (m_streamData && m_streamData->isDead())) { if (timesUp) { ILOG(m_clientLogString + " listener time exceeded.");} else if (m_kickNextRound) { ILOG(m_clientLogString + " kicked");} timeSliceResult result; result.m_done = true; return result; } return (this->*m_state)(); } ////////////////////////////////////////////////////////////////////////////////// ///////////////////// Initial Handshake States ////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////// runnable::timeSliceResult protocol_RTMPClient::state_Send() throw(exception) { DEBUG_LOG(__FUNCTION__); timeSliceResult result; long to = sendDataBuffer(m_socket,m_outDataPtr,m_outDataSize,m_lastActivityTime,m_clientLogString); if (to == 0) { // done m_state = m_nextState; result.m_runImmediately = true; } else { // some more result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = to; } return result; } runnable::timeSliceResult protocol_RTMPClient::state_RecvFixedAmt() throw(exception) { DEBUG_LOG(__FUNCTION__); timeSliceResult result; assert(m_inDataRequested <= m_inDataBufferMax); size_t amt_left = m_inDataRequested - m_inDataBufferAmt; size_t amt_left2 = amt_left; long to = getSocketData(m_socket,m_inDataBuffer,m_inDataBufferAmt,amt_left,m_lastActivityTime,m_clientLogString); m_inDataBufferAmt += (amt_left2 - amt_left); if (to == 0) { // got data m_state = m_nextState; result.m_runImmediately = true; } else { // wait some more result.m_readSet.insert(m_socket); result.m_timeout.tv_sec = to; } return result; } runnable::timeSliceResult protocol_RTMPClient::state_RecvMsg() throw(exception) { DEBUG_LOG(__FUNCTION__); timeSliceResult result; m_inMsg.clear(); long to = getMsg(m_inMsg); if (to == 0) { m_state = m_nextState; result.m_runImmediately = true; DEBUG_LOG(string(__FUNCTION__) + " received msg" + prettyPrintMessage(m_inMsg)); } else { result.m_readSet.insert(m_socket); result.m_timeout.tv_sec = to; } return result; } #define NEXT_STATE timeSliceResult result; result.m_runImmediately = true; return result; runnable::timeSliceResult protocol_RTMPClient::state_SendS0() throw(exception) { DEBUG_LOG(__FUNCTION__); m_outDataPtr = &m_S0; m_outDataSize = 1; m_state = &protocol_RTMPClient::state_Send; m_nextState = &protocol_RTMPClient::state_SendS1; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::state_SendS1() throw(exception) { DEBUG_LOG(__FUNCTION__); m_outDataPtr = &m_S1orS2[0]; m_outDataSize = m_S1orS2.size(); m_state = &protocol_RTMPClient::state_Send; m_nextState = &protocol_RTMPClient::state_WaitForC1; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::state_WaitForC1() throw(exception) { DEBUG_LOG(__FUNCTION__); m_inDataBufferAmt = 0; m_inDataRequested = C1_S1_C2_S2_PACKET_SIZE; m_state = &protocol_RTMPClient::state_RecvFixedAmt; m_nextState = &protocol_RTMPClient::state_GotC1; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::state_GotC1() throw(exception) { DEBUG_LOG(__FUNCTION__); m_C1ReadTime = TIMEFUNC() - m_serverBaseTime; m_C1.clear(); m_C1.insert(m_C1.end(),m_inDataBuffer,m_inDataBuffer + C1_S1_C2_S2_PACKET_SIZE); m_state = &protocol_RTMPClient::state_SendS2; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::state_SendS2() throw(exception) { DEBUG_LOG(__FUNCTION__); m_S1orS2 = m_C1; __uint32 t = htonl(m_C1ReadTime); memcpy(&(m_S1orS2[4]),&t,4); m_outDataPtr = &m_S1orS2[0]; m_outDataSize = m_S1orS2.size(); m_state = &protocol_RTMPClient::state_Send; m_nextState = &protocol_RTMPClient::state_WaitForC2; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::state_WaitForC2() throw(exception) { DEBUG_LOG(__FUNCTION__); m_inDataBufferAmt = 0; m_inDataRequested = C1_S1_C2_S2_PACKET_SIZE; m_state = &protocol_RTMPClient::state_RecvFixedAmt; m_nextState = &protocol_RTMPClient::state_GotC2; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::state_GotC2() throw(exception) { DEBUG_LOG(__FUNCTION__); m_C2.clear(); m_C2.insert(m_C2.end(),m_inDataBuffer,m_inDataBuffer + C1_S1_C2_S2_PACKET_SIZE); m_state = &protocol_RTMPClient::state_WaitForMessage; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::state_WaitForMessage() throw(exception) { DEBUG_LOG(__FUNCTION__); m_inDataBufferAmt = 0; m_state = &protocol_RTMPClient::state_RecvMsg; m_nextState = &protocol_RTMPClient::state_GotMessage; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::state_GotMessage() throw(exception) { DEBUG_LOG(__FUNCTION__); switch(m_inMsg.m_messageType) { case RTMP_MSG__USER_CONTROL_MESSAGE: return handle_USER_CONTROL_message(); case RTMP_MSG__WINDOW_ACKNOWLEDGEMENT_SIZE: { if (m_inMsg.m_messageData.size() != 4) throwEx(m_clientLogString + " Bad payload size for Window Acknowledgement message, got " + tos(m_inMsg.m_messageData.size()) + " expected 4."); m_windowSizeFromClient = decode4ByteValue(&(m_inMsg.m_messageData[0]),0); DEBUG_LOG(m_clientLogString + " WAS from client is " + tos(m_windowSizeFromClient)); m_state = &protocol_RTMPClient::state_WaitForMessage; NEXT_STATE } break; case RTMP_MSG__COMMAND_AMF0: return handle_AMF0_message(); default: throwEx(m_clientLogString + " " + __FUNCTION__ + " cannot dispatch message type " + tos((int)m_inMsg.m_messageType)); } NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::handle_USER_CONTROL_message() throw(exception) { DEBUG_LOG(__FUNCTION__); if (m_inMsg.m_messageData.size() < 2) throwEx(m_clientLogString + " User control message has insufficient data."); __uint16 t = ntohs(*(const __uint16 *)(&m_inMsg.m_messageData[0])); switch(t) { case RTMP_UCM_SET_BUFFER: { if (m_inMsg.m_messageData.size() < 10) throwEx(m_clientLogString + " Set Buffer user control message has insufficient data."); m_bufferSizeFromClient = ntohl(*(const __uint32*)&(m_inMsg.m_messageData[6])); DEBUG_LOG(m_clientLogString + " Buffer size from client is " + tos(m_bufferSizeFromClient) + " milliseconds."); m_state = &protocol_RTMPClient::state_WaitForMessage; NEXT_STATE } break; case RTMP_UCM_STREAM_BEGIN: //return handle_UCM_StreamBegin(); break; } throwEx(m_clientLogString + " User control message type " + tos(t) + " is not supported."); NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::handle_UCM_StreamBegin() throw(exception) { DEBUG_LOG(__FUNCTION__); NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::handle_AMF0_message() throw(exception) { DEBUG_LOG(__FUNCTION__); AMFEncoding amf; amf.loadFromBitstream((const char *)&(m_inMsg.m_messageData[0]),(int)m_inMsg.m_messageData.size(),m_clientLogString); const AMFVal &v0 = amf.getValue(0); if (v0.getString() == "connect") return handle_AMF0_connect(amf); if (v0.getString() == "createStream") return handle_AMF0_createStream(amf); if (v0.getString() == "play") return handle_AMF0_play(amf); throwEx(m_clientLogString + " " + __FUNCTION__ + " Unknown AMF0 message " + v0.getString()); NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::handle_AMF0_connect(const AMFEncoding &amf) throw(exception) { DEBUG_LOG(__FUNCTION__); // const AMFVal &v0 = amf.getValue(0); const AMFVal &v1 = amf.getValue(1); const AMFVal &v2 = amf.getValue(2); if (v1.getNumber() != 1) throwEx(m_clientLogString + " Unexpected transaction number. Wanted 1, got " + tos(v1.getNumber())); const AMFObject &o = v2.getObject(); const AMFVal *pv = o.getProperty("tcUrl"); if (!pv) throwEx(m_clientLogString + " Connect command has no tcUrl value."); utf8 url = pv->getString(); // use this value to create stream accessor pv = o.getProperty("objectEncoding"); m_objectEncodingMode = (pv ? (int)pv->getNumber() : 0); m_state = &protocol_RTMPClient::state_SendConnectResponse; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::handle_AMF0_createStream(const AMFEncoding &amf) throw(exception) { DEBUG_LOG(__FUNCTION__); // const AMFVal &v0 = amf.getValue(0); const AMFVal &v1 = amf.getValue(1); double transaction = v1.getNumber(); AMFEncoding amf0; AMFObject obj; amf0.appendValue(AMFVal(utf8("_result"))); // or "_error" amf0.appendValue(AMFVal((double)transaction)); amf0.appendValue(AMFVal()); amf0.appendValue(AMFVal((double)1)); //1234)); /// stream ID for client vector<__uint8> resp; amf0.serialize(resp,m_clientLogString); message msg(m_inMsg); msg.m_messageData = resp; msg.m_messageType = RTMP_MSG__COMMAND_AMF0; createRTMPMsg(msg,TIMEFUNC() - m_serverBaseTime,(int)m_outboundChunkSize,m_outDataBuffer); //createRTMPMsg(RTMP_MSG__COMMAND_AMF0,m_outboundChunkSize,&(resp[0]),resp.size(),m_outDataBuffer); m_outDataPtr = &m_outDataBuffer[0]; m_outDataSize = m_outDataBuffer.size(); m_state = &protocol_RTMPClient::state_Send; m_nextState = &protocol_RTMPClient::state_WaitForMessage; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::handle_AMF0_play(const AMFEncoding &amf) throw(exception) { DEBUG_LOG(__FUNCTION__); AMFEncoding amf0; amf0.appendValue(AMFVal(utf8("onStatus"))); // or "_error" amf0.appendValue(AMFVal(utf8("NetStream.Play.Start"))); vector<__uint8> resp; amf0.serialize(resp,m_clientLogString); long ttt = TIMEFUNC(); createRTMPMsg(RTMP_MSG__COMMAND_AMF0,ttt - m_serverBaseTime,(int)m_outboundChunkSize,&(resp[0]),resp.size(),m_outDataBuffer,m_inMsg.m_chunkStreamID,m_inMsg.m_messageStreamID); amf0.clear(); amf0.appendValue(AMFVal(utf8("onHeaderData"))); AMFEMCAArray amfA; amfA.addProperty("protocol",new AMFVal(utf8("ICY"))); amfA.addProperty("content-type",new AMFVal(utf8("audio/aacp"))); amf0.appendValue(AMFVal(amfA)); resp.clear(); amf0.serialize(resp,m_clientLogString); createRTMPMsg(RTMP_MSG__DATA_AMF0,ttt - m_serverBaseTime,(int)m_outboundChunkSize,&(resp[0]),resp.size(),m_outDataBuffer,m_inMsg.m_chunkStreamID,m_inMsg.m_messageStreamID); m_outDataPtr = &m_outDataBuffer[0]; m_outDataSize = m_outDataBuffer.size(); m_state = &protocol_RTMPClient::state_Send; m_nextState = &protocol_RTMPClient::state_SendAudio; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::state_SendConnectResponse() throw(exception) { DEBUG_LOG(__FUNCTION__); m_outDataBuffer.clear(); __uint8 data[5]; long ttt = TIMEFUNC(); m_serverBaseTime = ttt; (*(__uint32*)data) = htonl(DEFAULT_SERVER_WINDOW); createRTMPMsg(RTMP_MSG__WINDOW_ACKNOWLEDGEMENT_SIZE,ttt - m_serverBaseTime,(int)m_outboundChunkSize,data,4,m_outDataBuffer); data[4] = RTMP_PEER_BANDWIDTH_DYNAMIC; //_SOFT; createRTMPMsg(RTMP_MSG__SET_PEER_BANDWIDTH,ttt - m_serverBaseTime,(int)m_outboundChunkSize,data,5,m_outDataBuffer); __uint8 streamBeginData[6] = {0,0,0,0,0,0}; createRTMPMsg(RTMP_MSG__USER_CONTROL_MESSAGE,ttt - m_serverBaseTime,(int)m_outboundChunkSize,streamBeginData,6,m_outDataBuffer); __int32 chunkSize = htonl((int)m_outboundChunkSize); createRTMPMsg(RTMP_MSG__SET_CHUNK_SIZE,ttt - m_serverBaseTime,(int)m_outboundChunkSize,(const __uint8 *)&chunkSize,sizeof(chunkSize),m_outDataBuffer); AMFEncoding amf; //(m_objectEncodingMode > 1 ? 3 : 0); AMFObject obj; amf.appendValue(AMFVal(utf8("_result"))); // or "_error" amf.appendValue(AMFVal((double)1.0)); obj.addProperty("fmsVer",new AMFVal(utf8("FMS/3,5,3,824a"))); //new AMFVal(utf8("sc_serv " + version.first + " " + version.second))); obj.addProperty("capabilities",new AMFVal((double)127)); //31)); // ???? obj.addProperty("mode",new AMFVal((double)1)); amf.appendValue(AMFVal(obj)); obj.clearProperties(); obj.addProperty("level",new AMFVal(utf8("status"))); obj.addProperty("code",new AMFVal(utf8("NetConnection.Connect.Success"))); obj.addProperty("description",new AMFVal(utf8("Connection succeeded."))); obj.addProperty("clientid",new AMFVal((double)795525197.0)); obj.addProperty("objectEncoding",new AMFVal((double)m_objectEncodingMode)); AMFEMCAArray arry; arry.addProperty("version",new AMFVal(utf8("FMS/3,5,3,824a"))); obj.addProperty("data",new AMFVal(arry)); amf.appendValue(AMFVal(obj)); vector<__uint8> resp; amf.serialize(resp,m_clientLogString); createRTMPMsg(RTMP_MSG__COMMAND_AMF0,ttt - m_serverBaseTime,(int)m_outboundChunkSize,&(resp[0]),resp.size(),m_outDataBuffer,m_inMsg.m_chunkStreamID,m_inMsg.m_messageStreamID); m_outDataPtr = &m_outDataBuffer[0]; m_outDataSize = m_outDataBuffer.size(); m_state = &protocol_RTMPClient::state_Send; m_nextState = &protocol_RTMPClient::state_WaitForMessage; //m_nextState = &protocol_RTMPClient::state_SendAudio; NEXT_STATE } runnable::timeSliceResult protocol_RTMPClient::state_SendAudio() throw(exception) { DEBUG_LOG(__FUNCTION__); #ifdef TEST_FILE __uint8 buffer[1024]; buffer[0] = 0xaf; //FLV audio header buffer[1] = 0x01; if (::fread(&(buffer[2]),1,1022,fff) != 1022) throwEx("Test done!"); m_outDataBuffer.clear(); createRTMPMsg(RTMP_MSG__AUDIO_DATA,TIMEFUNC() - m_serverBaseTime,(int)m_outboundChunkSize,buffer,1024,m_outDataBuffer,m_inMsg.m_chunkStreamID,m_inMsg.m_messageStreamID); m_outDataPtr = &m_outDataBuffer[0]; m_outDataSize = m_outDataBuffer.size(); m_state = &protocol_RTMPClient::state_Send; m_nextState = &protocol_RTMPClient::state_SendAudio; #endif NEXT_STATE } ///////////////////////////////////////////////////// ///////////////////////////////////////////////////// static void encode3ByteValue(long v,__uint8 *data) throw() { data[0] = (v >> 16) & 0xff; data[1] = (v >> 8) & 0xff; data[2] = (v & 0xff); } static void encode4ByteValue(long v,__uint8 *data) throw() { data[0] = (v >> 24) & 0xff; data[1] = (v >> 16) & 0xff; data[2] = (v >> 8) & 0xff; data[3] = (v & 0xff); } static void chunkify(int chunkStreamID,int msgStreamID,__uint8 msgTypeID,long timestamp,int outboundChunkSize,const __uint8 *data,size_t dataSize,vector<__uint8> &result) { assert(chunkStreamID >= 2); size_t chunkCount = 1; if (dataSize > 0) chunkCount = ((dataSize-1) / outboundChunkSize)+1; assert(chunkCount); for(size_t c = 0; c < chunkCount; ++c) { __uint8 header[18]; // largest possible header __uint8 *h = header; // size basic header if (chunkStreamID >=2 && chunkStreamID <= 63) { *(h++) = chunkStreamID; } else if (chunkStreamID >= 64 && chunkStreamID <= 319) { *(h++) = 0; *(h++) = (chunkStreamID - 64); } else { h[0] = 1; h[2] = ((chunkStreamID - 64) / 256); h[1] = (chunkStreamID - 64 - (h[2] * 256)); h+= 3; } // fill in format bits and header if (c) { header[0] |= 0xc0; // type 3 } else { // type zero, full header encode3ByteValue(timestamp < 0x00ffffff ? timestamp : 0x00ffffff,h); h += 3; encode3ByteValue((int)dataSize,h); h += 3; *(h++) = msgTypeID; encode4ByteValue(msgStreamID,h); h += 4; if (timestamp >= 0x00ffffff) { encode4ByteValue(timestamp,h); h += 4; } } // put header into result result.insert(result.end(),header,h); size_t payload_amt = min(dataSize,(size_t)outboundChunkSize); // put payload into result result.insert(result.end(),data,data + payload_amt); dataSize -= payload_amt; data += payload_amt; } assert(dataSize == 0); } static void createRTMPMsg(__uint8 msgType,long timestamp,int outboundChunkSize,const __uint8 *payload,size_t payloadSize,vector<__uint8> &result, int chunkStreamID,int msgStreamID) { //chunkify(chunkStreamID,msgStreamID,msgType,TIMEFUNC(),outboundChunkSize,payload,payloadSize,result); protocol_RTMPClient::message msg; msg.m_chunkStreamID = chunkStreamID; msg.m_messageStreamID = msgStreamID; msg.m_messageType = msgType; msg.m_messageData.clear(); msg.m_messageData.insert(msg.m_messageData.end(),payload,payload + payloadSize); createRTMPMsg(msg,timestamp,outboundChunkSize,result); #if 0 // create a buffer with the message long t = TIMEFUNC(); vector<__uint8> m; m.resize(11 + payloadSize); __uint8 *p = &m[0]; memset(p,0,11); (*(__uint32*)p) = htonl(payloadSize); p[0] = msgType; (*(__uint32*)(&(p[4]))) = htonl(t); memmove(&(p[11]),payload,payloadSize); // chunkify chunkify(2 /* chunk stream id */,0 /* msg stream ID */ ,msgType,t,outboundChunkSize,p,11 + payloadSize,result); #endif } static void createRTMPMsg(const protocol_RTMPClient::message &msg,long timestamp,int outboundChunkSize,vector<__uint8> &result) { utf8 s = prettyPrintMessage(msg); DEBUG_LOG(" send " + eol() + "CSID=" + tos(msg.m_chunkStreamID) + eol() + "MSID=" + tos(msg.m_messageStreamID) + eol() + "Time=" + tos(timestamp) + eol() + "MTYPE=" + tos((int)msg.m_messageType) + eol() + "MLEN=" + tos(msg.m_messageData.size()) + eol() + s); chunkify(msg.m_chunkStreamID,msg.m_messageStreamID,msg.m_messageType,timestamp,outboundChunkSize,&(msg.m_messageData[0]),msg.m_messageData.size(),result); } ///////////////////////////////////////////////////// ///////////////////////////////////////////////////// static int decode3ByteValue(const __uint8 *data,size_t offset = 0) throw() { int result = 0; result += data[offset]; result <<= 8; result += data[offset + 1]; result <<= 8; result += data[offset + 2]; return result; } static size_t decode4ByteValue(const __uint8 *data,size_t offset) throw() { size_t result = 0; result += data[offset]; result <<= 8; result += data[offset + 1]; result <<= 8; result += data[offset + 2]; result <<= 8; result += data[offset + 3]; return result; } #if 0 void getRTMPMsgInfo(const vector<__uint8> &msg,__uint8 &msgType,int &payloadLength,long ×tamp,int &streamID,int &payloadOffset) { assert(msg.size() >= 11); const __uint8 *p = &msg[0]; msgType = *(p++); payloadLength = decode3ByteValue(p); p+=3; timestamp = decode4ByteValue(p); p += 4; streamID = decode3ByteValue(p); p += 3; payloadOffset = 11; } #endif // get chunk type from first byte in basic header static int chunkType(const __uint8 *basicHeader) throw() { return (((*basicHeader) & 0xc0) >> 6); } static int calculateBasicChunkHeaderSize(const __uint8 *basicHeader) throw() { int b = ((*basicHeader) & 0x3f); switch(b) { case 0: return 2; case 1: return 3; } return 1; } // look at complete basic header and determine how many bytes to expect in // the chunk header static int calculateChunkMsgHeaderSize(const __uint8 *basicHeader) throw() { switch(chunkType(basicHeader)) { case 0: return 11; case 1: return 7; case 2: return 3; } return 0; } // calculate the complete size of a chunk header static int calculateCompleteChunkHeaderSize(const __uint8 *basicHeader) throw() { return calculateChunkMsgHeaderSize(basicHeader) + calculateBasicChunkHeaderSize(basicHeader); } // look at a complete chunk header (basic and msg header) and see if we need to get // an extended timestamp for the chunk static bool chunkNeedsExtendedTimestamp(const __uint8 *basicHeader) throw() { int f = chunkType(basicHeader); if (f == 3) return false; int s = calculateBasicChunkHeaderSize(basicHeader); // timestamp is always just after the basic header return ((basicHeader[s] == 0xff) && (basicHeader[s+1] == 0xff) && (basicHeader[s+2] == 0xff)); } // get chunkstream ID from the basic header static int getChunkStreamIDFromBasicHeader(const __uint8 *basicHeader) throw() { int b = ((*basicHeader) & 0x3f); switch(b) { case 0: return (basicHeader[1] + 64); case 1: return (basicHeader[2] * 256 + basicHeader[1] + 64); } return b; } static int calculateMessageSize(const __uint8 *basicHeader) throw() { if (chunkType(basicHeader) <= 1) { int s = calculateBasicChunkHeaderSize(basicHeader); return decode3ByteValue(basicHeader + s + 3); } return -1; } static int calculateMessageTypeID(const __uint8 *basicHeader) throw() { if (chunkType(basicHeader) <= 1) { int s = calculateBasicChunkHeaderSize(basicHeader); return basicHeader[s + 6]; } return -1; } static int calculateMessageStreamID(const __uint8 *basicHeader) throw() { if (chunkType(basicHeader) == 0) { int s = calculateBasicChunkHeaderSize(basicHeader); return decode3ByteValue(basicHeader + s + 7); } return -1; } static long calculateTimestamp(const __uint8 *basicHeader) throw() { return (chunkType(basicHeader) < 3 ? decode3ByteValue(basicHeader + calculateBasicChunkHeaderSize(basicHeader)) : 0); } // looks in inBuffer for a complete message. If it finds one it fills in msg and returns true. // whether true or false, data from head of inBuffer should be removed based on amtToRemoveFromInBuffer bool protocol_RTMPClient::chunkSequenceComplete(const __uint8 *inBuffer,size_t amtInBuffer,size_t &amtToRemoveFromInBuffer,size_t expectedChunkSize, vector<__uint8> &msg,__uint8 &msgType,int &chunkStreamID,int &messageStreamID,const uniString::utf8 &logMsgPrefix) throw(std::exception) { bool result = false; msg.clear(); amtToRemoveFromInBuffer = 0; // walk through chunks const __uint8 *pBegin = inBuffer; const __uint8 *pEnd = inBuffer + amtInBuffer; const __uint8 *p = inBuffer; chunkStreamID = -1; messageStreamID = -1; int messageLength = -1; int messageTypeID = -1; int payloadDataSeen = 0; int chunksConsolidated = 0; // for debugging while(p != pEnd) { chunksConsolidated += 1; // do all calculations necessary to see if we have a complete chunk int tmp; int hs = calculateCompleteChunkHeaderSize(p); if ((p + hs) > pEnd) break; // not enough data if (chunkNeedsExtendedTimestamp(p)) hs += 4; if ((p + hs) > pEnd) break; // not enough data // calculate timestamp long tt = calculateTimestamp(p); switch(chunkType(p)) { case 0: m_lastInboundTimestamp = tt; break; case 1: case 2: m_lastInboundTimestamp += tt; break; } // gather and confirm chunk stream id tmp = getChunkStreamIDFromBasicHeader(p); if (chunkStreamID == -1) chunkStreamID = tmp; else if (chunkStreamID != tmp) throwEx(logMsgPrefix + " expected chunk stream ID " + tos(chunkStreamID) + " but got " + tos(tmp) + " instead."); // gather and confirm message length for this chunk sequence tmp = calculateMessageSize(p); if ((tmp == -1) && (m_lastInboundMessageLength == -1)) throwEx(logMsgPrefix + " No message length for chunk with chunk stream ID " + tos(chunkStreamID)); if ((tmp != -1) && (messageLength != -1) && (tmp != messageLength)) throwEx(logMsgPrefix + " mismatch message length for chunk with chunk stream ID " + tos(chunkStreamID) + " initially got " + tos(messageLength) + " then received " + tos(tmp)); if (tmp != -1) m_lastInboundMessageLength = messageLength = tmp; tmp = calculateMessageTypeID(p); if ((tmp == -1) && (m_lastInboundMessageTypeID == -1)) throwEx(logMsgPrefix + " No message type ID for chunk with chunk stream ID " + tos(chunkStreamID)); if ((tmp != -1) && (messageTypeID != -1) && (tmp != messageTypeID)) throwEx(logMsgPrefix + " mismatch message type ID for chunk with chunk stream ID " + tos(chunkStreamID) + " initially got " + tos(messageTypeID) + " then received " + tos(tmp)); if (tmp != -1) m_lastInboundMessageTypeID = messageTypeID = tmp; tmp = calculateMessageStreamID(p); if ((tmp == -1) && (m_lastInboundMessageStreamID == -1)) throwEx(logMsgPrefix + " No message stream ID for chunk with chunk stream ID " + tos(chunkStreamID)); if ((tmp != -1) && (messageStreamID != -1) && (tmp != messageStreamID)) throwEx(logMsgPrefix + " mismatch message stream ID for chunk with stream ID " + tos(chunkStreamID) + " initially got " + tos(messageStreamID) + " then received " + tos(tmp)); if (tmp != -1) m_lastInboundMessageStreamID = messageStreamID = tmp; // calculate data that should be in this chunk tmp = m_lastInboundMessageLength - payloadDataSeen; tmp = min(tmp,(int)expectedChunkSize); // see if we have enough if ((p + hs + tmp) > pEnd) break; // nope // yeah we do. Copy out data msg.insert(msg.end(),p+hs,p+hs+tmp); payloadDataSeen += tmp; p = p + hs + tmp; // are we all done? if (payloadDataSeen == m_lastInboundMessageLength) { // yes amtToRemoveFromInBuffer = p - pBegin; DEBUG_LOG(logMsgPrefix + " recv" + stringUtil::eol() + " CSID=" + tos(chunkStreamID) + stringUtil::eol() + " MSID=" + tos(m_lastInboundMessageStreamID) + stringUtil::eol() + " Time=" + tos(m_lastInboundTimestamp) + stringUtil::eol() + " MTYPE=" + tos(m_lastInboundMessageTypeID) + stringUtil::eol() + " LEN=" + tos(m_lastInboundMessageLength) + stringUtil::eol() + " REMOVED=" + tos(amtToRemoveFromInBuffer)); msgType = m_lastInboundMessageTypeID; messageStreamID = m_lastInboundMessageStreamID; return true; } } return result; } // return zero if get a msg, otherwise return timeout for read long protocol_RTMPClient::getMsg(message &msg) throw(exception) { size_t amt_left = m_inDataBufferMax - m_inDataBufferAmt; size_t amt_left2 = amt_left; long to = getSocketData(m_socket,m_inDataBuffer,m_inDataBufferAmt,amt_left,m_lastActivityTime,m_clientLogString); m_inDataBufferAmt += (amt_left2 - amt_left); size_t amtToRemoveFromInBuffer = 0; bool seqComplete = chunkSequenceComplete(m_inDataBuffer,m_inDataBufferAmt,amtToRemoveFromInBuffer,m_inboundChunkSize,msg.m_messageData,msg.m_messageType,msg.m_chunkStreamID,msg.m_messageStreamID,m_clientLogString); if (amtToRemoveFromInBuffer) { assert(m_inDataBufferAmt >= amtToRemoveFromInBuffer); if (m_inDataBufferAmt == amtToRemoveFromInBuffer) { m_inDataBufferAmt = 0; #ifndef NDEBUG memset(m_inDataBuffer,0,m_inDataBufferMax); #endif } else { memmove(m_inDataBuffer,m_inDataBuffer + amtToRemoveFromInBuffer,m_inDataBufferAmt - amtToRemoveFromInBuffer); m_inDataBufferAmt -= amtToRemoveFromInBuffer; #ifndef NDEBUG memset(m_inDataBuffer + m_inDataBufferAmt,0,m_inDataBufferMax - m_inDataBufferAmt); #endif } } if (seqComplete) { to = 0; } else { if (m_inDataBufferAmt == m_inDataBufferMax) throwEx(m_clientLogString + " inbound data buffer exceeded"); assert(to != 0); // ??? not sure if (to == 0) to = 1; } return to; } ///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////// runnable::timeSliceResult protocol_RTMPClient::state_Close() throw(exception) { DEBUG_LOG(__FUNCTION__); timeSliceResult result; result.m_done = true; return result; } void protocol_RTMPClient::logW3C() throw() { } #if 0 ///////////////////////////////////// W3C Logging ////////////////////////////////////////////// static utf8 titleFromMetadata(const utf8 &md) throw() { utf8 title; utf8::size_type p1 = utf8::npos; utf8::size_type p2 = utf8::npos; p1 = md.find(utf8("itle='")); if (p1 != utf8::npos) { p1 += 6; p2 = md.find(utf8("';"),p1); if (p2 != utf8::npos) { title = md.substr(p1,p2-p1); } } return title; } // create W3C entry. Entries describe the duration a client has listened to a specific title. // the entry is generated on a title change, or when the client disconnects void protocol_RTMPClient::logW3C() throw() { if (gOptions.w3cEnable()) { time_t t(::time(0)); time_t durationOfTitle = t - m_lastTitleTime; int bitrateOfTitle = (int)(durationOfTitle ? (8 * m_bytesSentForCurrentTitle) / durationOfTitle : 0); utf8 title = titleFromMetadata(m_lastSentMetadata); w3cLog::log(m_clientAddr, m_clientHostName, m_streamID, title, mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")), m_bytesSentForCurrentTitle, durationOfTitle, bitrateOfTitle); setW3CState(); } } // setup tracking variables for W3C log void protocol_RTMPClient::setW3CState() throw() { m_lastTitleTime = ::time(0); m_bytesSentForCurrentTitle = 0; } ////////////////////////////////////////////////////////////////////////////// void protocol_RTMPClient::aquireIntroFile() throw() { m_streamData->getIntroFile().getSc1Data(m_introFile); m_introFileOffset = 0; } void protocol_RTMPClient::aquireBackupFile() throw() { m_streamData->getBackupFile().getSc1Data(m_backupFile); m_backupFileOffset = 0; } runnable::timeSliceResult protocol_RTMPClient::state_SendText() throw(exception) { //DLOG(__FUNCTION__); timeSliceResult result; long to = sendHTTPStyleText(m_socket,m_outBuffer,m_outBufferSize,m_lastActivityTime,m_clientLogString); if (to == 0) { // sent m_state = m_nextState; result.m_runImmediately = true; } else { // try again result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = to; } return result; } runnable::timeSliceResult protocol_RTMPClient::state_AttachToStream() throw(exception) { DEBUG_LOG(__FUNCTION__); timeSliceResult result; assert(!m_streamData); m_streamID = DEFAULT_CLIENT_STREAM_ID; utf8::size_type pos = m_HTTPRequestInfo.m_url.find(utf8("/stream/")); if (pos != utf8::npos) m_streamID = atoi((const char *)m_HTTPRequestInfo.m_url.substr(pos + 8).c_str()); map stream_configs = gOptions.getStreamConfigs(); for(map::const_iterator i = stream_configs.begin(); i != stream_configs.end(); ++i) { if ((*i).second.m_urlPath == m_HTTPRequestInfo.m_url) { m_streamID = (*i).first; break; } pos = m_HTTPRequestInfo.m_url.find((*i).second.m_urlPath); if (pos != utf8::npos && pos == 0) { utf8 params = m_HTTPRequestInfo.m_url.substr(pos + (*i).second.m_urlPath.size()); if(params.find(utf8(";")) == 0 || params.find(utf8("/")) == 0 || params.find(utf8("/;")) == 0) { m_streamID = (*i).first; break; } } } m_clientLogString = dstAddrLogString(m_clientHostName,m_clientPort,m_streamID); utf8 user_agent = toLower(mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8(""))); bool isSourceActive = false; m_streamData = streamData::accessStream(m_streamID,isSourceActive); if (!m_streamData || !isSourceActive) { utf8 msg_icy401 = MSG_ICY401; msg_icy401.replace(msg_icy401.find(utf8("^")),1,gOptions.getVersionBuildStrings()); m_outBuffer = msg_icy401.c_str(); m_outBufferSize = strlen(m_outBuffer); m_state = &protocol_RTMPClient::state_SendText; m_nextState = &protocol_RTMPClient::state_Close; result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = gOptions.getAutoDumpSourceTime(); ELOG(m_clientLogString + " SHOUTcast 1 client connection rejected. Stream not available. " + mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8(""))); } else { // construct the response text if (!stats::addClient(m_streamID,this,m_clientAddr,m_clientPort,mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8("")),g_ripList.find(m_clientAddr))) { utf8 backup_server = m_streamData->streamBackupServer(); if (backup_server.empty()) { utf8 msg_icy503 = MSG_ICY503; msg_icy503.replace(msg_icy503.find(utf8("^")),1,gOptions.getVersionBuildStrings()); m_outBuffer = msg_icy503.c_str(); m_outBufferSize = strlen(m_outBuffer); ELOG(m_clientLogString + " SHOUTcast 1 client connection rejected. Max users reached. " + mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8(""))); } else { m_redirectResponse = http302(backup_server); m_outBuffer = m_redirectResponse.c_str(); m_outBufferSize = m_redirectResponse.size(); WLOG(m_clientLogString + " SHOUTcast 1 client connection rejected. Max users reached. Redirecting to " + backup_server + ". " + mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8(""))); } m_state = &protocol_RTMPClient::state_SendText; m_nextState = &protocol_RTMPClient::state_Close; result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = gOptions.getAutoDumpSourceTime(); } else { m_removeClientFromStats = true; m_sendMetadata = mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"icy-metadata",false); m_metaInterval = gOptions.metaInterval(); if (m_metaInterval < 256) m_metaInterval = 256; // clamp m_ICYOKResponse = MSG_ICY200; m_ICYOKResponse += "icy-name:" + m_streamData->streamName() + "\r\n"; m_ICYOKResponse += "icy-genre:" + m_streamData->streamGenre() + "\r\n"; m_ICYOKResponse += "icy-url:" + m_streamData->streamURL() + "\r\n"; m_ICYOKResponse += "content-type:" + m_streamData->streamContentType() + "\r\n"; if (isUserAgentRelay(user_agent) && (!m_streamData->allowPublicRelay())) m_ICYOKResponse += "icy-pub:0\r\n"; else m_ICYOKResponse += "icy-pub:" + tos(m_streamData->streamPublic()) + "\r\n"; if (m_sendMetadata) m_ICYOKResponse += "icy-metaint:" + tos(m_metaInterval) + "\r\n"; m_ICYOKResponse += "icy-br:" + m_streamData->streamName() + "\r\n"; m_ICYOKResponse += "\r\n"; DEBUG_LOG(m_clientLogString + " sending [" + m_ICYOKResponse + "]"); m_outBuffer = m_ICYOKResponse.c_str(); m_outBufferSize = strlen(m_outBuffer); m_state = &protocol_RTMPClient::state_SendText; m_nextState = &protocol_RTMPClient::state_InitiateStream; result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = gOptions.getAutoDumpSourceTime(); ILOG(m_clientLogString + " SHOUTcast 1 client connection accepted. " + mapGet(m_HTTPRequestInfo.m_HTTPHeaders,"user-agent",utf8(""))); } } return result; } // set read pointer to a nice distance from the write pointer. Return that distance streamData::ringBufferAccess_t protocol_RTMPClient::resetReadPtr() throw() { m_readPtr = m_streamData->getSc1ClientStartPosition(); return m_streamData->getSc1RingBuffer().m_writePtr - m_readPtr; } runnable::timeSliceResult protocol_RTMPClient::state_InitiateStream() throw(exception) { DEBUG_LOG(__FUNCTION__); assert(m_streamData); m_metaIntervalCounter = 0; resetReadPtr(); m_underruns = 0; // if we have an intro file send it, otherwise start streaming aquireIntroFile(); m_state = (m_introFile.empty() ? &protocol_RTMPClient::state_Stream : &protocol_RTMPClient::state_SendIntroFile); setW3CState(); timeSliceResult result; result.m_runImmediately = true; return result; } // construct the necessary metadata information and load into outbound buffers. void protocol_RTMPClient::sendICYMetadata(const utf8 &md) throw() { m_ICYMetadata.clear(); m_ICYMetadata.push_back(1); // placeholder if (md != m_lastSentMetadata) // don't sent duplicates { m_ICYMetadata.insert(m_ICYMetadata.end(),md.begin(),md.end()); if (!m_lastSentMetadata.empty()) logW3C(); m_lastSentMetadata = md; } unsigned int dlen = m_ICYMetadata.size(); if (dlen == 1) dlen = 0; unsigned int l1=((dlen+15)&~15); m_ICYMetadata[0] = l1/16; unsigned int send_len = l1+1; m_ICYMetadata.insert(m_ICYMetadata.end(),send_len - m_ICYMetadata.size(),0); assert(m_ICYMetadata.size() == ((m_ICYMetadata[0] * 16)+1)); m_metaIntervalCounter = 0; m_outBuffer = &m_ICYMetadata[0]; m_outBufferSize = m_ICYMetadata.size(); m_state = &protocol_RTMPClient::state_SendText; } // handle state where we are sending intro files runnable::timeSliceResult protocol_RTMPClient::state_SendIntroFile() throw(exception) { assert(m_streamData); assert(!m_introFile.empty()); timeSliceResult result; int autoDumpTime = gOptions.getAutoDumpSourceTime(); // don't want this value to change during this call m_limitTrigger.clear(); size_t amt = m_introFile.size() - m_introFileOffset; if (amt == 0) { // we're done with the intro file m_introFile.clear(); m_state = &protocol_RTMPClient::state_Stream; resetReadPtr(); result.m_runImmediately = true; } else if ((m_metaIntervalCounter == m_metaInterval) && m_sendMetadata) // check to see if we have to send the metadata { sendICYMetadata("StreamTitle='';"); m_nextState = &protocol_RTMPClient::state_SendIntroFile; result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = autoDumpTime; } else { // clamp amount to send if we are supporting metadata if (m_sendMetadata) { amt = min(amt,(m_metaInterval - m_metaIntervalCounter)); } // send time_t cur_time = ::time(0); if (autoDumpTime > 0 && (cur_time - m_lastActivityTime) >= autoDumpTime) throwEx(m_clientLogString + " Timeout waiting to send data (" + tos(cur_time) + " " + tos(m_lastActivityTime) + " [" + tos(cur_time - m_lastActivityTime) + "] )"); if (!amt) { // nothing in the source result.m_runImmediately = true; } else { int rval = ::send(m_socket,(const char *)&(m_introFile[m_introFileOffset]),amt,0); if (rval == 0) { throwEx(m_clientLogString + " Remote socket closed while sending data."); } else if (rval < 0) { rval = socketOps::errCode(); if (rval != SOCKETOPS_WOULDBLOCK) throwEx((( #ifdef _WIN32 rval == WSAECONNABORTED || rval == WSAECONNRESET #else rval == ECONNABORTED || rval == ECONNRESET || rval == EPIPE #endif ) ? uniString::utf8("") : m_clientLogString + "Socket error while waiting to send data. " + socketErrString(rval))); result.m_timeout.tv_sec = (long)(autoDumpTime - (cur_time - m_lastActivityTime)); result.m_writeSet.insert(m_socket); } else { m_bytesSentForCurrentTitle += rval; m_totalBytesSent += rval; m_lastActivityTime = ::time(NULL); m_metaIntervalCounter += rval; m_introFileOffset += rval; assert((!m_sendMetadata) || (m_metaIntervalCounter <= m_metaInterval)); result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = autoDumpTime; } } } return result; } // handle state where we are sending backup files runnable::timeSliceResult protocol_RTMPClient::state_SendBackupFile() throw(exception) { assert(m_streamData); assert(!m_backupFile.empty()); timeSliceResult result; int autoDumpTime = gOptions.getAutoDumpSourceTime(); // don't want this value to change during this call m_limitTrigger.clear(); size_t amt = m_backupFile.size() - m_backupFileOffset; if (streamData::isSourceConnected(m_streamData)) { // we're done with the backup file m_backupFile.clear(); resetReadPtr(); m_state = &protocol_RTMPClient::state_Stream; result.m_runImmediately = true; } else if (amt == 0) { // we're done with the backup file. get more data aquireBackupFile(); if (m_backupFile.empty()) { // it got cleared out from under us? Try and stream resetReadPtr(); m_state = &protocol_RTMPClient::state_Stream; } result.m_runImmediately = true; } else if ((m_metaIntervalCounter == m_metaInterval) && m_sendMetadata) // check to see if we have to send the metadata { sendICYMetadata("StreamTitle='';"); m_nextState = &protocol_RTMPClient::state_SendBackupFile; result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = autoDumpTime; } else { // clamp amount to send if we are supporting metadata if (m_sendMetadata) { amt = min(amt,(m_metaInterval - m_metaIntervalCounter)); } // send time_t cur_time = ::time(0); if (autoDumpTime > 0 && (cur_time - m_lastActivityTime) >= autoDumpTime) throwEx(m_clientLogString + " Timeout waiting to send data (" + tos(cur_time) + " " + tos(m_lastActivityTime) + " [" + tos(cur_time - m_lastActivityTime) + "] )"); if (!amt) { // nothing in the source result.m_runImmediately = true; } else { int rval = ::send(m_socket,(const char *)&(m_backupFile[m_backupFileOffset]),amt,0); if (rval == 0) { throwEx(m_clientLogString + " Remote socket closed while sending data."); } else if (rval < 0) { rval = socketOps::errCode(); if (rval != SOCKETOPS_WOULDBLOCK) throwEx((( #ifdef _WIN32 rval == WSAECONNABORTED || rval == WSAECONNRESET #else rval == ECONNABORTED || rval == ECONNRESET || rval == EPIPE #endif ) ? uniString::utf8("") : m_clientLogString + "Socket error while waiting to send data. " + socketErrString(rval))); result.m_timeout.tv_sec = (long)(autoDumpTime - (cur_time - m_lastActivityTime)); result.m_writeSet.insert(m_socket); } else { m_bytesSentForCurrentTitle += rval; m_totalBytesSent += rval; m_lastActivityTime = ::time(NULL); m_metaIntervalCounter += rval; m_backupFileOffset += rval; assert((!m_sendMetadata) || (m_metaIntervalCounter <= m_metaInterval)); result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = autoDumpTime; } } } return result; } runnable::timeSliceResult protocol_RTMPClient::state_Stream() throw(exception) { assert(m_streamData); timeSliceResult result; int autoDumpTime = gOptions.getAutoDumpSourceTime(); // don't want this value to change during this call m_limitTrigger.clear(); // check to see if we have to send the metadata if ((m_metaIntervalCounter == m_metaInterval) && m_sendMetadata) { // send metadata sendICYMetadata(m_streamData->getSc1Metadata(m_readPtr).m_songTitle); m_nextState = &protocol_RTMPClient::state_Stream; result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = autoDumpTime; } else { streamData::ringBuffer_t rb = m_streamData->getSc1RingBuffer(); streamData::ringBufferAccess_t amt = rb.m_writePtr - m_readPtr; if (amt > rb.m_bufferSize) { // the pointers are too far apart. Underrun m_underruns += 1; amt = resetReadPtr(); } // clamp amount to send if we are supporting metadata if (m_sendMetadata) { amt = min(amt,(streamData::ringBufferAccess_t)(m_metaInterval - m_metaIntervalCounter)); } streamData::ringBufferAccess_t offset = m_readPtr & rb.m_ptrMask; // clamp again so we don't read passed end of buffer amt = min(amt,rb.m_bufferSize - offset); // send time_t cur_time = ::time(0); if (autoDumpTime > 0 && (cur_time - m_lastActivityTime) >= autoDumpTime) throwEx(m_clientLogString + " Timeout waiting to send data (" + tos(cur_time) + " " + tos(m_lastActivityTime) + " [" + tos(cur_time - m_lastActivityTime) + "] )"); if (!amt) { // nothing in the source // If the source has gone away, and we have a backup file, send it. bool sendBackupFile = false; if (!streamData::isSourceConnected(m_streamData)) { aquireBackupFile(); sendBackupFile = !m_backupFile.empty(); } if (sendBackupFile) { m_state = &protocol_RTMPClient::state_SendBackupFile; result.m_runImmediately = true; } else { result.m_timeout.tv_sec = (long)(autoDumpTime - (cur_time - m_lastActivityTime)); m_limitTrigger.clear(); result.m_readSet.insert(m_limitTrigger.test()); m_streamData->scheduleSc1LimitTrigger(&m_limitTrigger,m_readPtr); } } else { int rval = ::send(m_socket,(const char *)&(rb.m_data[offset]),amt,0); if (rval == 0) { throwEx(m_clientLogString + " Remote socket closed while sending data."); } else if (rval < 0) { rval = socketOps::errCode(); if (rval != SOCKETOPS_WOULDBLOCK) throwEx((( #ifdef _WIN32 rval == WSAECONNABORTED || rval == WSAECONNRESET #else rval == ECONNABORTED || rval == ECONNRESET || rval == EPIPE #endif ) ? uniString::utf8("") : m_clientLogString + "Socket error while waiting to send data. " + socketErrString(rval))); result.m_timeout.tv_sec = (long)(autoDumpTime - (cur_time - m_lastActivityTime)); result.m_writeSet.insert(m_socket); } else { m_bytesSentForCurrentTitle += rval; m_totalBytesSent += rval; m_lastActivityTime = ::time(NULL); m_metaIntervalCounter += rval; assert((!m_sendMetadata) || (m_metaIntervalCounter <= m_metaInterval)); m_readPtr += rval; result.m_writeSet.insert(m_socket); result.m_timeout.tv_sec = autoDumpTime; } } } return result; } #endif #endif