=====================================================
RTMPdump(libRTMP) 源代碼分析系列文章:
RTMPdump 源代碼分析 1: main()函數
RTMPDump (libRTMP) 源代碼分析2:解析RTMP位址——RTMP_ParseURL()
RTMPdump (libRTMP) 源代碼分析3: AMF編碼
RTMPdump (libRTMP) 源代碼分析4: 連接配接第一步——握手 (HandShake)
RTMPdump (libRTMP) 源代碼分析5: 建立一個流媒體連接配接 (NetConnection部分)
RTMPdump (libRTMP) 源代碼分析6: 建立一個流媒體連接配接 (NetStream部分 1)
RTMPdump (libRTMP) 源代碼分析7: 建立一個流媒體連接配接 (NetStream部分 2)
RTMPdump (libRTMP) 源代碼分析8: 發送消息 (Message)
RTMPdump (libRTMP) 源代碼分析9: 接收消息 (Message) (接收視音頻資料)
RTMPdump (libRTMP) 源代碼分析10: 處理各種消息 (Message)
=====================================================
函數調用結構圖
RTMPDump (libRTMP)的整體的函數調用結構圖如下圖所示。
單擊檢視大圖
詳細分析
之前寫了一系列的文章介紹RTMPDump各種函數。比如怎麼建立網絡連接配接(NetConnection),怎麼建立網絡流(NetStream)之類的,唯獨沒有介紹這些發送或接收的資料,在底層到底是怎麼實作的。本文就是要剖析一下其内部的實作。即這些消息(Message)到底是怎麼發送和接收的。
先來看看發送消息吧。
- 發送connect指令使用函數SendConnectPacket()
- 發送createstream指令使用RTMP_SendCreateStream()
- 發送realeaseStream指令使用SendReleaseStream()
- 發送publish指令使用SendPublish()
- 發送deleteStream的指令使用SendDeleteStream()
- 發送pause指令使用RTMP_SendPause()
不再一一例舉,發現函數命名有兩種規律:RTMP_Send***()或者Send***(),其中*号代表指令的名稱。
SendConnectPacket()這個指令是每次程式開始運作的時候發送的第一個指令消息,内容比較多,包含了很多AMF編碼的内容,在此不多做分析,貼上代碼:
[cpp]
view plain
copy
- //發送“connect”指令
- static int
- SendConnectPacket(RTMP *r, RTMPPacket *cp)
- {
- RTMPPacket packet;
- char pbuf[4096], *pend = pbuf + sizeof(pbuf);
- char *enc;
- if (cp)
- return RTMP_SendPacket(r, cp, TRUE);
- packet.m_nChannel = 0x03; /* control channel (invoke) */
- packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
- packet.m_packetType = 0x14; /* INVOKE */
- packet.m_nTimeStamp = 0;
- packet.m_nInfoField2 = 0;
- packet.m_hasAbsTimestamp = 0;
- packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
- enc = packet.m_body;
- enc = AMF_EncodeString(enc, pend, &av_connect);
- enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
- *enc++ = AMF_OBJECT;
- enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app);
- if (!enc)
- return FALSE;
- if (r->Link.protocol & RTMP_FEATURE_WRITE)
- {
- enc = AMF_EncodeNamedString(enc, pend, &av_type, &av_nonprivate);
- if (!enc)
- }
- if (r->Link.flashVer.av_len)
- enc = AMF_EncodeNamedString(enc, pend, &av_flashVer, &r->Link.flashVer);
- if (r->Link.swfUrl.av_len)
- enc = AMF_EncodeNamedString(enc, pend, &av_swfUrl, &r->Link.swfUrl);
- if (r->Link.tcUrl.av_len)
- enc = AMF_EncodeNamedString(enc, pend, &av_tcUrl, &r->Link.tcUrl);
- if (!(r->Link.protocol & RTMP_FEATURE_WRITE))
- enc = AMF_EncodeNamedBoolean(enc, pend, &av_fpad, FALSE);
- enc = AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 15.0);
- enc = AMF_EncodeNamedNumber(enc, pend, &av_audioCodecs, r->m_fAudioCodecs);
- enc = AMF_EncodeNamedNumber(enc, pend, &av_videoCodecs, r->m_fVideoCodecs);
- enc = AMF_EncodeNamedNumber(enc, pend, &av_videoFunction, 1.0);
- if (r->Link.pageUrl.av_len)
- enc = AMF_EncodeNamedString(enc, pend, &av_pageUrl, &r->Link.pageUrl);
- return FALSE;
- if (r->m_fEncoding != 0.0 || r->m_bSendEncoding)
- { /* AMF0, AMF3 not fully supported yet */
- enc = AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, r->m_fEncoding);
- if (enc + 3 >= pend)
- *enc++ = 0;
- *enc++ = 0; /* end of object - 0x00 0x00 0x09 */
- *enc++ = AMF_OBJECT_END;
- /* add auth string */
- if (r->Link.auth.av_len)
- enc = AMF_EncodeBoolean(enc, pend, r->Link.lFlags & RTMP_LF_AUTH);
- enc = AMF_EncodeString(enc, pend, &r->Link.auth);
- if (r->Link.extras.o_num)
- int i;
- for (i = 0; i < r->Link.extras.o_num; i++)
- enc = AMFProp_Encode(&r->Link.extras.o_props[i], enc, pend);
- packet.m_nBodySize = enc - packet.m_body;
- //----------------
- r->dlg->AppendMLInfo(20,1,"指令消息","Connect");
- //-----------------------------
- return RTMP_SendPacket(r, &packet, TRUE);
- }
RTMP_SendCreateStream()指令相對而言比較簡單,代碼如下:
- //發送“createstream”指令
- int
- RTMP_SendCreateStream(RTMP *r)
- char pbuf[256], *pend = pbuf + sizeof(pbuf);
- packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
- enc = AMF_EncodeString(enc, pend, &av_createStream);
- *enc++ = AMF_NULL; /* NULL */
- r->dlg->AppendMLInfo(20,1,"指令消息","CreateStream");
同樣,SendReleaseStream()内容也比較簡單,我對其中部分内容作了注釋:
- //發送RealeaseStream指令
- SendReleaseStream(RTMP *r)
- char pbuf[1024], *pend = pbuf + sizeof(pbuf);
- enc = packet.m_body;
- //對“releaseStream”字元串進行AMF編碼
- enc = AMF_EncodeString(enc, pend, &av_releaseStream);
- //對傳輸ID(0)進行AMF編碼?
- //指令對象
- *enc++ = AMF_NULL;
- //對播放路徑字元串進行AMF編碼
- enc = AMF_EncodeString(enc, pend, &r->Link.playpath);
- r->dlg->AppendMLInfo(20,1,"指令消息","ReleaseStream");
- return RTMP_SendPacket(r, &packet, FALSE);
再來看一個SendPublish()函數,用于發送“publish”指令
- //發送Publish指令
- SendPublish(RTMP *r)
- //塊流ID為4
- packet.m_nChannel = 0x04; /* source channel (invoke) */
- //指令消息,類型20
- //流ID
- packet.m_nInfoField2 = r->m_stream_id;
- //指向Chunk的負載
- //對“publish”字元串進行AMF編碼
- enc = AMF_EncodeString(enc, pend, &av_publish);
- //指令對象為空
- /* FIXME: should we choose live based on Link.lFlags & RTMP_LF_LIVE? */
- enc = AMF_EncodeString(enc, pend, &av_live);
- r->dlg->AppendMLInfo(20,1,"指令消息","Pulish");
其他的指令不再一一例舉,總體的思路是聲明一個RTMPPacket類型的結構體,然後設定各種屬性值,最後交給RTMP_SendPacket()進行發送。
RTMPPacket類型的結構體定義如下,一個RTMPPacket對應RTMP協定規範裡面的一個塊(Chunk)。
- //Chunk資訊
- typedef struct RTMPPacket
- {
- uint8_t m_headerType;//ChunkMsgHeader的類型(4種)
- uint8_t m_packetType;//Message type ID(1-7協定控制;8,9音視訊;10以後為AMF編碼消息)
- uint8_t m_hasAbsTimestamp; /* Timestamp 是絕對值還是相對值? */
- int m_nChannel; //塊流ID
- uint32_t m_nTimeStamp; // Timestamp
- int32_t m_nInfoField2; /* last 4 bytes in a long header,消息流ID */
- uint32_t m_nBodySize; //消息長度
- uint32_t m_nBytesRead;
- RTMPChunk *m_chunk;
- char *m_body;
- } RTMPPacket;
下面我們來看看RTMP_SendPacket()吧,各種的RTMPPacket(即各種Chunk)都需要用這個函數進行發送。
- //自己編一個資料報發送出去!
- //非常常用
- RTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue)
- const RTMPPacket *prevPacket = r->m_vecChannelsOut[packet->m_nChannel];
- uint32_t last = 0;
- int nSize;
- int hSize, cSize;
- char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c;
- uint32_t t;
- char *buffer, *tbuf = NULL, *toff = NULL;
- int nChunkSize;
- int tlen;
- //不是完整ChunkMsgHeader
- if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE)
- /* compress a bit by using the prev packet's attributes */
- //擷取ChunkMsgHeader的類型
- //前一個Chunk和這個Chunk對比
- if (prevPacket->m_nBodySize == packet->m_nBodySize
- && prevPacket->m_packetType == packet->m_packetType
- && packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM)
- packet->m_headerType = RTMP_PACKET_SIZE_SMALL;
- if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp
- && packet->m_headerType == RTMP_PACKET_SIZE_SMALL)
- packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM;
- //上一個packet的TimeStamp
- last = prevPacket->m_nTimeStamp;
- if (packet->m_headerType > 3) /* sanity */
- RTMP_Log(RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.",
- (unsigned char)packet->m_headerType);
- return FALSE;
- //chunk標頭大小;packetSize[] = { 12, 8, 4, 1 }
- nSize = packetSize[packet->m_headerType];
- hSize = nSize; cSize = 0;
- //相對的TimeStamp
- t = packet->m_nTimeStamp - last;
- if (packet->m_body)
- //Header的Start
- //m_body是指向負載資料首位址的指針;“-”号用于指針前移
- header = packet->m_body - nSize;
- //Header的End
- hend = packet->m_body;
- else
- header = hbuf + 6;
- hend = hbuf + sizeof(hbuf);
- //當ChunkStreamID大于319時
- if (packet->m_nChannel > 319)
- //ChunkBasicHeader是3個位元組
- cSize = 2;
- //當ChunkStreamID大于63時
- else if (packet->m_nChannel > 63)
- //ChunkBasicHeader是2個位元組
- cSize = 1;
- if (cSize)
- //header指針指向ChunkMsgHeader
- header -= cSize;
- //hsize加上ChunkBasicHeader的長度
- hSize += cSize;
- //相對TimeStamp大于0xffffff,此時需要使用ExtendTimeStamp
- if (nSize > 1 && t >= 0xffffff)
- header -= 4;
- hSize += 4;
- hptr = header;
- //把ChunkBasicHeader的Fmt類型左移6位
- c = packet->m_headerType << 6;
- switch (cSize)
- //把ChunkBasicHeader的低6位設定成ChunkStreamID
- case 0:
- c |= packet->m_nChannel;
- break;
- //同理,但低6位設定成000000
- case 1:
- //同理,但低6位設定成000001
- case 2:
- c |= 1;
- //可以拆分成兩句*hptr=c;hptr++,此時hptr指向第2個位元組
- *hptr++ = c;
- //CSize>0,即ChunkBasicHeader大于1位元組
- //将要放到第2位元組的内容tmp
- int tmp = packet->m_nChannel - 64;
- //擷取低位存儲與第2位元組
- *hptr++ = tmp & 0xff;
- //ChunkBasicHeader是最大的3位元組時
- if (cSize == 2)
- //擷取高位存儲于最後1個位元組(注意:排序使用大端序列,和主機相反)
- *hptr++ = tmp >> 8;
- //ChunkMsgHeader。注意一共有4種,包含的字段數不同。
- //TimeStamp(3B)
- if (nSize > 1)
- //相對TimeStamp和絕對TimeStamp?
- hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t);
- //MessageLength+MessageTypeID(4B)
- if (nSize > 4)
- //MessageLength
- hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize);
- //MessageTypeID
- *hptr++ = packet->m_packetType;
- //MessageStreamID(4B)
- if (nSize > 8)
- hptr += EncodeInt32LE(hptr, packet->m_nInfoField2);
- //ExtendedTimeStamp
- hptr = AMF_EncodeInt32(hptr, hend, t);
- //負載長度,指向負載的指針
- nSize = packet->m_nBodySize;
- buffer = packet->m_body;
- //Chunk大小,預設128位元組
- nChunkSize = r->m_outChunkSize;
- RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket,
- nSize);
- /* send all chunks in one HTTP request */
- //使用HTTP
- if (r->Link.protocol & RTMP_FEATURE_HTTP)
- //nSize:Message負載長度;nChunkSize:Chunk長度;
- //例nSize:307,nChunkSize:128;
- //可分為(307+128-1)/128=3個
- //為什麼+nChunkSize-1?因為除法會隻取整數部分!
- int chunks = (nSize+nChunkSize-1) / nChunkSize;
- //Chunk個數超過一個
- if (chunks > 1)
- {
- //注意:CSize=1表示ChunkBasicHeader是2位元組
- //消息分n塊後總的開銷:
- //n個ChunkBasicHeader,1個ChunkMsgHeader,1個Message負載
- //實際中隻有第一個Chunk是完整的,剩下的隻有ChunkBasicHeader
- tlen = chunks * (cSize + 1) + nSize + hSize;
- //配置設定記憶體
- tbuf = (char *) malloc(tlen);
- if (!tbuf)
- toff = tbuf;
- //消息的負載+頭
- while (nSize + hSize)
- int wrote;
- //消息負載<Chunk大小(不用分塊)
- if (nSize < nChunkSize)
- //Chunk可能小于設定值
- nChunkSize = nSize;
- RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)header, hSize);
- RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize);
- if (tbuf)
- //void *memcpy(void *dest, const void *src, int n);
- //由src指向位址為起始位址的連續n個位元組的資料複制到以dest指向位址為起始位址的空間内
- memcpy(toff, header, nChunkSize + hSize);
- toff += nChunkSize + hSize;
- else
- wrote = WriteN(r, header, nChunkSize + hSize);
- if (!wrote)
- //消息負載長度-Chunk負載長度
- nSize -= nChunkSize;
- //Buffer指針前移1個Chunk負載長度
- buffer += nChunkSize;
- hSize = 0;
- //如果消息沒有發完
- if (nSize > 0)
- //ChunkBasicHeader
- header = buffer - 1;
- hSize = 1;
- if (cSize)
- header -= cSize;
- hSize += cSize;
- }
- //ChunkBasicHeader第1個位元組
- *header = (0xc0 | c);
- //ChunkBasicHeader大于1位元組
- int tmp = packet->m_nChannel - 64;
- header[1] = tmp & 0xff;
- if (cSize == 2)
- header[2] = tmp >> 8;
- if (tbuf)
- //
- int wrote = WriteN(r, tbuf, toff-tbuf);
- free(tbuf);
- tbuf = NULL;
- /* we invoked a remote method */
- if (packet->m_packetType == 0x14)
- AVal method;
- char *ptr;
- ptr = packet->m_body + 1;
- AMF_DecodeString(ptr, &method);
- RTMP_Log(RTMP_LOGDEBUG, "Invoking %s", method.av_val);
- /* keep it in call queue till result arrives */
- if (queue) {
- int txn;
- ptr += 3 + method.av_len;
- txn = (int)AMF_DecodeNumber(ptr);
- AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn);
- }
- if (!r->m_vecChannelsOut[packet->m_nChannel])
- r->m_vecChannelsOut[packet->m_nChannel] = (RTMPPacket *) malloc(sizeof(RTMPPacket));
- memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket));
- return TRUE;
這個函數乍一看好像非常複雜,其實不然,他隻是按照RTMP規範将資料編碼成符合規範的塊(Chunk),規範可以參考相關的文檔。
具體怎麼編碼成塊(Chunk)就不多分析了,在這裡需要注意一個函數:WriteN()。該函數完成了将資料發送出去的功能。
來看一下WriteN()函數:
- //發送資料報的時候調用(連接配接,buffer,長度)
- WriteN(RTMP *r, const char *buffer, int n)
- const char *ptr = buffer;
- #ifdef CRYPTO
- char *encrypted = 0;
- char buf[RTMP_BUFFER_CACHE_SIZE];
- if (r->Link.rc4keyOut)
- if (n > sizeof(buf))
- encrypted = (char *)malloc(n);
- encrypted = (char *)buf;
- ptr = encrypted;
- RC4_encrypt2((RC4_KEY *)r->Link.rc4keyOut, n, buffer, ptr);
- #endif
- while (n > 0)
- int nBytes;
- //因方式的不同而調用不同函數
- //如果使用的是HTTP協定進行連接配接
- if (r->Link.protocol & RTMP_FEATURE_HTTP)
- nBytes = HTTP_Post(r, RTMPT_SEND, ptr, n);
- nBytes = RTMPSockBuf_Send(&r->m_sb, ptr, n);
- /*RTMP_Log(RTMP_LOGDEBUG, "%s: %d\n", __FUNCTION__, nBytes); */
- //成功發送位元組數<0
- if (nBytes < 0)
- int sockerr = GetSockError();
- RTMP_Log(RTMP_LOGERROR, "%s, RTMP send error %d (%d bytes)", __FUNCTION__,
- sockerr, n);
- if (sockerr == EINTR && !RTMP_ctrlC)
- continue;
- RTMP_Close(r);
- n = 1;
- if (nBytes == 0)
- break;
- n -= nBytes;
- ptr += nBytes;
- if (encrypted && encrypted != buf)
- free(encrypted);
- return n == 0;
該函數中,RTMPSockBuf_Send()完成了資料發送的功能,再來看看這個函數(函數調用真是好多啊。。。。)
- //Socket發送(指明套接字,buffer緩沖區,資料長度)
- //傳回所發資料量
- RTMPSockBuf_Send(RTMPSockBuf *sb, const char *buf, int len)
- int rc;
- #ifdef _DEBUG
- fwrite(buf, 1, len, netstackdump);
- #if defined(CRYPTO) && !defined(NO_SSL)
- if (sb->sb_ssl)
- rc = TLS_write((SSL *)sb->sb_ssl, buf, len);
- //向一個已連接配接的套接口發送資料。
- //int send( SOCKET s, const char * buf, int len, int flags);
- //s:一個用于辨別已連接配接套接口的描述字。
- //buf:包含待發送資料的緩沖區。
- //len:緩沖區中資料的長度。
- //flags:調用執行方式。
- //rc:所發資料量。
- rc = send(sb->sb_socket, buf, len, 0);
- return rc;
- RTMPSockBuf_Close(RTMPSockBuf *sb)
- TLS_shutdown((SSL *)sb->sb_ssl);
- TLS_close((SSL *)sb->sb_ssl);
- sb->sb_ssl = NULL;
- return closesocket(sb->sb_socket);
到這個函數的時候,發現一層層的調用終于完成了,最後調用了系統Socket的send()函數完成了資料的發送功能。
之前貼過一張圖總結這個過程,可能了解起來要友善一些:RTMPDump源代碼分析 0: 主要函數調用分析