天天看點

RTMPdump(libRTMP) 源代碼分析 8: 發送消息(Message)

=====================================================

RTMPdump(libRTMP) 源代碼分析系列文章:

RTMPdump 源代碼分析 1: main()函數

RTMPDump (libRTMP) 源代碼分析2:解析RTMP位址——RTMP_ParseURL()

RTMPdump (libRTMP) 源代碼分析3: AMF編碼

RTMPdump (libRTMP) 源代碼分析4: 連接配接第一步——握手 (HandShake)

RTMPdump (libRTMP) 源代碼分析5: 建立一個流媒體連接配接  (NetConnection部分)

RTMPdump (libRTMP) 源代碼分析6: 建立一個流媒體連接配接  (NetStream部分 1)

RTMPdump (libRTMP) 源代碼分析7: 建立一個流媒體連接配接  (NetStream部分 2)

RTMPdump (libRTMP) 源代碼分析8: 發送消息 (Message)

RTMPdump (libRTMP) 源代碼分析9: 接收消息 (Message)  (接收視音頻資料)

RTMPdump (libRTMP) 源代碼分析10: 處理各種消息 (Message)

=====================================================

函數調用結構圖

RTMPDump (libRTMP)的整體的函數調用結構圖如下圖所示。

RTMPdump(libRTMP) 源代碼分析 8: 發送消息(Message)

單擊檢視大圖

詳細分析

之前寫了一系列的文章介紹RTMPDump各種函數。比如怎麼建立網絡連接配接(NetConnection),怎麼建立網絡流(NetStream)之類的,唯獨沒有介紹這些發送或接收的資料,在底層到底是怎麼實作的。本文就是要剖析一下其内部的實作。即這些消息(Message)到底是怎麼發送和接收的。

先來看看發送消息吧。

  • 發送connect指令使用函數SendConnectPacket()
  • 發送createstream指令使用RTMP_SendCreateStream()
  • 發送realeaseStream指令使用SendReleaseStream()
  • 發送publish指令使用SendPublish()
  • 發送deleteStream的指令使用SendDeleteStream()
  • 發送pause指令使用RTMP_SendPause()

不再一一例舉,發現函數命名有兩種規律:RTMP_Send***()或者Send***(),其中*号代表指令的名稱。

SendConnectPacket()這個指令是每次程式開始運作的時候發送的第一個指令消息,内容比較多,包含了很多AMF編碼的内容,在此不多做分析,貼上代碼:

[cpp] 

view plain

 copy

  1. //發送“connect”指令  
  2. static int  
  3. SendConnectPacket(RTMP *r, RTMPPacket *cp)  
  4. {  
  5.   RTMPPacket packet;  
  6.   char pbuf[4096], *pend = pbuf + sizeof(pbuf);  
  7.   char *enc;  
  8.   if (cp)  
  9.     return RTMP_SendPacket(r, cp, TRUE);  
  10.   packet.m_nChannel = 0x03; /* control channel (invoke) */  
  11.   packet.m_headerType = RTMP_PACKET_SIZE_LARGE;  
  12.   packet.m_packetType = 0x14;   /* INVOKE */  
  13.   packet.m_nTimeStamp = 0;  
  14.   packet.m_nInfoField2 = 0;  
  15.   packet.m_hasAbsTimestamp = 0;  
  16.   packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;  
  17.   enc = packet.m_body;  
  18.   enc = AMF_EncodeString(enc, pend, &av_connect);  
  19.   enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);  
  20.   *enc++ = AMF_OBJECT;  
  21.   enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app);  
  22.   if (!enc)  
  23.     return FALSE;  
  24.   if (r->Link.protocol & RTMP_FEATURE_WRITE)  
  25.     {  
  26.       enc = AMF_EncodeNamedString(enc, pend, &av_type, &av_nonprivate);  
  27.       if (!enc)  
  28.     }  
  29.   if (r->Link.flashVer.av_len)  
  30.       enc = AMF_EncodeNamedString(enc, pend, &av_flashVer, &r->Link.flashVer);  
  31.   if (r->Link.swfUrl.av_len)  
  32.       enc = AMF_EncodeNamedString(enc, pend, &av_swfUrl, &r->Link.swfUrl);  
  33.   if (r->Link.tcUrl.av_len)  
  34.       enc = AMF_EncodeNamedString(enc, pend, &av_tcUrl, &r->Link.tcUrl);  
  35.   if (!(r->Link.protocol & RTMP_FEATURE_WRITE))  
  36.       enc = AMF_EncodeNamedBoolean(enc, pend, &av_fpad, FALSE);  
  37.       enc = AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 15.0);  
  38.       enc = AMF_EncodeNamedNumber(enc, pend, &av_audioCodecs, r->m_fAudioCodecs);  
  39.       enc = AMF_EncodeNamedNumber(enc, pend, &av_videoCodecs, r->m_fVideoCodecs);  
  40.       enc = AMF_EncodeNamedNumber(enc, pend, &av_videoFunction, 1.0);  
  41.       if (r->Link.pageUrl.av_len)  
  42.       enc = AMF_EncodeNamedString(enc, pend, &av_pageUrl, &r->Link.pageUrl);  
  43.         return FALSE;  
  44.   if (r->m_fEncoding != 0.0 || r->m_bSendEncoding)  
  45.     {   /* AMF0, AMF3 not fully supported yet */  
  46.       enc = AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, r->m_fEncoding);  
  47.   if (enc + 3 >= pend)  
  48.   *enc++ = 0;  
  49.   *enc++ = 0;           /* end of object - 0x00 0x00 0x09 */  
  50.   *enc++ = AMF_OBJECT_END;  
  51.   /* add auth string */  
  52.   if (r->Link.auth.av_len)  
  53.       enc = AMF_EncodeBoolean(enc, pend, r->Link.lFlags & RTMP_LF_AUTH);  
  54.       enc = AMF_EncodeString(enc, pend, &r->Link.auth);  
  55.   if (r->Link.extras.o_num)  
  56.       int i;  
  57.       for (i = 0; i < r->Link.extras.o_num; i++)  
  58.       enc = AMFProp_Encode(&r->Link.extras.o_props[i], enc, pend);  
  59.   packet.m_nBodySize = enc - packet.m_body;  
  60.   //----------------  
  61.   r->dlg->AppendMLInfo(20,1,"指令消息","Connect");  
  62.   //-----------------------------  
  63.   return RTMP_SendPacket(r, &packet, TRUE);  
  64. }  

RTMP_SendCreateStream()指令相對而言比較簡單,代碼如下:

  1. //發送“createstream”指令  
  2. int  
  3. RTMP_SendCreateStream(RTMP *r)  
  4.   char pbuf[256], *pend = pbuf + sizeof(pbuf);  
  5.   packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;  
  6.   enc = AMF_EncodeString(enc, pend, &av_createStream);  
  7.   *enc++ = AMF_NULL;        /* NULL */  
  8.   r->dlg->AppendMLInfo(20,1,"指令消息","CreateStream");  

同樣,SendReleaseStream()内容也比較簡單,我對其中部分内容作了注釋:

  1. //發送RealeaseStream指令  
  2. SendReleaseStream(RTMP *r)  
  3.   char pbuf[1024], *pend = pbuf + sizeof(pbuf);  
  4.  enc = packet.m_body;  
  5.   //對“releaseStream”字元串進行AMF編碼  
  6.   enc = AMF_EncodeString(enc, pend, &av_releaseStream);  
  7.   //對傳輸ID(0)進行AMF編碼?  
  8.   //指令對象  
  9.   *enc++ = AMF_NULL;  
  10.   //對播放路徑字元串進行AMF編碼  
  11.   enc = AMF_EncodeString(enc, pend, &r->Link.playpath);  
  12.   r->dlg->AppendMLInfo(20,1,"指令消息","ReleaseStream");  
  13.   return RTMP_SendPacket(r, &packet, FALSE);  

再來看一個SendPublish()函數,用于發送“publish”指令

  1. //發送Publish指令  
  2. SendPublish(RTMP *r)  
  3.   //塊流ID為4  
  4.   packet.m_nChannel = 0x04; /* source channel (invoke) */  
  5.   //指令消息,類型20  
  6.   //流ID  
  7.   packet.m_nInfoField2 = r->m_stream_id;  
  8.   //指向Chunk的負載  
  9.    //對“publish”字元串進行AMF編碼  
  10.   enc = AMF_EncodeString(enc, pend, &av_publish);  
  11.   //指令對象為空  
  12.   /* FIXME: should we choose live based on Link.lFlags & RTMP_LF_LIVE? */  
  13.   enc = AMF_EncodeString(enc, pend, &av_live);  
  14.   r->dlg->AppendMLInfo(20,1,"指令消息","Pulish");  

其他的指令不再一一例舉,總體的思路是聲明一個RTMPPacket類型的結構體,然後設定各種屬性值,最後交給RTMP_SendPacket()進行發送。

RTMPPacket類型的結構體定義如下,一個RTMPPacket對應RTMP協定規範裡面的一個塊(Chunk)。

  1. //Chunk資訊  
  2.   typedef struct RTMPPacket  
  3.   {  
  4.     uint8_t m_headerType;//ChunkMsgHeader的類型(4種)  
  5.     uint8_t m_packetType;//Message type ID(1-7協定控制;8,9音視訊;10以後為AMF編碼消息)  
  6.     uint8_t m_hasAbsTimestamp;  /* Timestamp 是絕對值還是相對值? */  
  7.     int m_nChannel;         //塊流ID  
  8.     uint32_t m_nTimeStamp;  // Timestamp  
  9.     int32_t m_nInfoField2;  /* last 4 bytes in a long header,消息流ID */  
  10.     uint32_t m_nBodySize;   //消息長度  
  11.     uint32_t m_nBytesRead;  
  12.     RTMPChunk *m_chunk;  
  13.     char *m_body;  
  14.   } RTMPPacket;  

下面我們來看看RTMP_SendPacket()吧,各種的RTMPPacket(即各種Chunk)都需要用這個函數進行發送。

  1. //自己編一個資料報發送出去!  
  2. //非常常用  
  3. RTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue)  
  4.   const RTMPPacket *prevPacket = r->m_vecChannelsOut[packet->m_nChannel];  
  5.   uint32_t last = 0;  
  6.   int nSize;  
  7.   int hSize, cSize;  
  8.   char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c;  
  9.   uint32_t t;  
  10.   char *buffer, *tbuf = NULL, *toff = NULL;  
  11.   int nChunkSize;  
  12.   int tlen;  
  13.   //不是完整ChunkMsgHeader  
  14.   if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE)  
  15.       /* compress a bit by using the prev packet's attributes */  
  16.     //擷取ChunkMsgHeader的類型  
  17.     //前一個Chunk和這個Chunk對比  
  18.       if (prevPacket->m_nBodySize == packet->m_nBodySize  
  19.       && prevPacket->m_packetType == packet->m_packetType  
  20.       && packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM)  
  21.     packet->m_headerType = RTMP_PACKET_SIZE_SMALL;  
  22.       if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp  
  23.       && packet->m_headerType == RTMP_PACKET_SIZE_SMALL)  
  24.     packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM;  
  25.       //上一個packet的TimeStamp  
  26.       last = prevPacket->m_nTimeStamp;  
  27.   if (packet->m_headerType > 3)   /* sanity */  
  28.       RTMP_Log(RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.",  
  29.       (unsigned char)packet->m_headerType);  
  30.       return FALSE;  
  31.   //chunk標頭大小;packetSize[] = { 12, 8, 4, 1 }  
  32.   nSize = packetSize[packet->m_headerType];  
  33.   hSize = nSize; cSize = 0;  
  34.   //相對的TimeStamp  
  35.   t = packet->m_nTimeStamp - last;  
  36.   if (packet->m_body)  
  37.     //Header的Start  
  38.     //m_body是指向負載資料首位址的指針;“-”号用于指針前移  
  39.       header = packet->m_body - nSize;  
  40.     //Header的End  
  41.       hend = packet->m_body;  
  42.   else  
  43.       header = hbuf + 6;  
  44.       hend = hbuf + sizeof(hbuf);  
  45.   //當ChunkStreamID大于319時  
  46.   if (packet->m_nChannel > 319)  
  47.     //ChunkBasicHeader是3個位元組  
  48.     cSize = 2;  
  49.   //當ChunkStreamID大于63時  
  50.   else if (packet->m_nChannel > 63)  
  51.     //ChunkBasicHeader是2個位元組  
  52.     cSize = 1;  
  53.   if (cSize)  
  54.     //header指針指向ChunkMsgHeader  
  55.       header -= cSize;  
  56.     //hsize加上ChunkBasicHeader的長度  
  57.       hSize += cSize;  
  58.   //相對TimeStamp大于0xffffff,此時需要使用ExtendTimeStamp  
  59.   if (nSize > 1 && t >= 0xffffff)  
  60.       header -= 4;  
  61.       hSize += 4;  
  62.   hptr = header;  
  63.   //把ChunkBasicHeader的Fmt類型左移6位  
  64.   c = packet->m_headerType << 6;  
  65.   switch (cSize)  
  66.     //把ChunkBasicHeader的低6位設定成ChunkStreamID  
  67.     case 0:  
  68.       c |= packet->m_nChannel;  
  69.       break;  
  70.     //同理,但低6位設定成000000  
  71.     case 1:  
  72.     //同理,但低6位設定成000001  
  73.     case 2:  
  74.       c |= 1;  
  75.   //可以拆分成兩句*hptr=c;hptr++,此時hptr指向第2個位元組  
  76.   *hptr++ = c;  
  77.   //CSize>0,即ChunkBasicHeader大于1位元組  
  78.     //将要放到第2位元組的内容tmp  
  79.       int tmp = packet->m_nChannel - 64;  
  80.     //擷取低位存儲與第2位元組  
  81.       *hptr++ = tmp & 0xff;  
  82.     //ChunkBasicHeader是最大的3位元組時  
  83.       if (cSize == 2)  
  84.     //擷取高位存儲于最後1個位元組(注意:排序使用大端序列,和主機相反)  
  85.     *hptr++ = tmp >> 8;  
  86.   //ChunkMsgHeader。注意一共有4種,包含的字段數不同。  
  87.   //TimeStamp(3B)  
  88.   if (nSize > 1)  
  89.     //相對TimeStamp和絕對TimeStamp?  
  90.       hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t);  
  91.   //MessageLength+MessageTypeID(4B)  
  92.   if (nSize > 4)  
  93.     //MessageLength  
  94.       hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize);  
  95.     //MessageTypeID  
  96.       *hptr++ = packet->m_packetType;  
  97.   //MessageStreamID(4B)  
  98.   if (nSize > 8)  
  99.     hptr += EncodeInt32LE(hptr, packet->m_nInfoField2);  
  100.   //ExtendedTimeStamp  
  101.     hptr = AMF_EncodeInt32(hptr, hend, t);  
  102.   //負載長度,指向負載的指針  
  103.   nSize = packet->m_nBodySize;  
  104.   buffer = packet->m_body;  
  105.   //Chunk大小,預設128位元組  
  106.   nChunkSize = r->m_outChunkSize;  
  107.   RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket,  
  108.       nSize);  
  109.   /* send all chunks in one HTTP request */  
  110.   //使用HTTP  
  111.   if (r->Link.protocol & RTMP_FEATURE_HTTP)  
  112.     //nSize:Message負載長度;nChunkSize:Chunk長度;  
  113.     //例nSize:307,nChunkSize:128;  
  114.     //可分為(307+128-1)/128=3個  
  115.     //為什麼+nChunkSize-1?因為除法會隻取整數部分!  
  116.       int chunks = (nSize+nChunkSize-1) / nChunkSize;  
  117.     //Chunk個數超過一個  
  118.       if (chunks > 1)  
  119.         {  
  120.     //注意:CSize=1表示ChunkBasicHeader是2位元組  
  121.     //消息分n塊後總的開銷:  
  122.     //n個ChunkBasicHeader,1個ChunkMsgHeader,1個Message負載  
  123.     //實際中隻有第一個Chunk是完整的,剩下的隻有ChunkBasicHeader  
  124.       tlen = chunks * (cSize + 1) + nSize + hSize;  
  125.     //配置設定記憶體  
  126.       tbuf = (char *) malloc(tlen);  
  127.       if (!tbuf)  
  128.       toff = tbuf;  
  129.     //消息的負載+頭  
  130.   while (nSize + hSize)  
  131.       int wrote;  
  132.       //消息負載<Chunk大小(不用分塊)  
  133.       if (nSize < nChunkSize)  
  134.     //Chunk可能小于設定值  
  135.     nChunkSize = nSize;  
  136.       RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)header, hSize);  
  137.       RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize);  
  138.       if (tbuf)  
  139.     //void *memcpy(void *dest, const void *src, int n);  
  140.     //由src指向位址為起始位址的連續n個位元組的資料複制到以dest指向位址為起始位址的空間内  
  141.       memcpy(toff, header, nChunkSize + hSize);  
  142.       toff += nChunkSize + hSize;  
  143.       else  
  144.       wrote = WriteN(r, header, nChunkSize + hSize);  
  145.       if (!wrote)  
  146.       //消息負載長度-Chunk負載長度  
  147.       nSize -= nChunkSize;  
  148.       //Buffer指針前移1個Chunk負載長度  
  149.       buffer += nChunkSize;  
  150.       hSize = 0;  
  151.       //如果消息沒有發完  
  152.       if (nSize > 0)  
  153.     //ChunkBasicHeader  
  154.       header = buffer - 1;  
  155.       hSize = 1;  
  156.       if (cSize)  
  157.           header -= cSize;  
  158.           hSize += cSize;  
  159.         }  
  160.       //ChunkBasicHeader第1個位元組  
  161.       *header = (0xc0 | c);  
  162.       //ChunkBasicHeader大于1位元組  
  163.           int tmp = packet->m_nChannel - 64;  
  164.           header[1] = tmp & 0xff;  
  165.           if (cSize == 2)  
  166.         header[2] = tmp >> 8;  
  167.   if (tbuf)  
  168.     //  
  169.       int wrote = WriteN(r, tbuf, toff-tbuf);  
  170.       free(tbuf);  
  171.       tbuf = NULL;  
  172.   /* we invoked a remote method */  
  173.   if (packet->m_packetType == 0x14)  
  174.       AVal method;  
  175.       char *ptr;  
  176.       ptr = packet->m_body + 1;  
  177.       AMF_DecodeString(ptr, &method);  
  178.       RTMP_Log(RTMP_LOGDEBUG, "Invoking %s", method.av_val);  
  179.       /* keep it in call queue till result arrives */  
  180.       if (queue) {  
  181.         int txn;  
  182.         ptr += 3 + method.av_len;  
  183.         txn = (int)AMF_DecodeNumber(ptr);  
  184.     AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn);  
  185.       }  
  186.   if (!r->m_vecChannelsOut[packet->m_nChannel])  
  187.     r->m_vecChannelsOut[packet->m_nChannel] = (RTMPPacket *) malloc(sizeof(RTMPPacket));  
  188.   memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket));  
  189.   return TRUE;  

這個函數乍一看好像非常複雜,其實不然,他隻是按照RTMP規範将資料編碼成符合規範的塊(Chunk),規範可以參考相關的文檔。

具體怎麼編碼成塊(Chunk)就不多分析了,在這裡需要注意一個函數:WriteN()。該函數完成了将資料發送出去的功能。

來看一下WriteN()函數:

  1. //發送資料報的時候調用(連接配接,buffer,長度)  
  2. WriteN(RTMP *r, const char *buffer, int n)  
  3.   const char *ptr = buffer;  
  4. #ifdef CRYPTO  
  5.   char *encrypted = 0;  
  6.   char buf[RTMP_BUFFER_CACHE_SIZE];  
  7.   if (r->Link.rc4keyOut)  
  8.       if (n > sizeof(buf))  
  9.     encrypted = (char *)malloc(n);  
  10.     encrypted = (char *)buf;  
  11.       ptr = encrypted;  
  12.       RC4_encrypt2((RC4_KEY *)r->Link.rc4keyOut, n, buffer, ptr);  
  13. #endif  
  14.   while (n > 0)  
  15.       int nBytes;  
  16.       //因方式的不同而調用不同函數  
  17.       //如果使用的是HTTP協定進行連接配接  
  18.       if (r->Link.protocol & RTMP_FEATURE_HTTP)  
  19.         nBytes = HTTP_Post(r, RTMPT_SEND, ptr, n);  
  20.         nBytes = RTMPSockBuf_Send(&r->m_sb, ptr, n);  
  21.       /*RTMP_Log(RTMP_LOGDEBUG, "%s: %d\n", __FUNCTION__, nBytes); */  
  22.       //成功發送位元組數<0  
  23.       if (nBytes < 0)  
  24.       int sockerr = GetSockError();  
  25.       RTMP_Log(RTMP_LOGERROR, "%s, RTMP send error %d (%d bytes)", __FUNCTION__,  
  26.           sockerr, n);  
  27.       if (sockerr == EINTR && !RTMP_ctrlC)  
  28.         continue;  
  29.       RTMP_Close(r);  
  30.       n = 1;  
  31.       if (nBytes == 0)  
  32.     break;  
  33.       n -= nBytes;  
  34.       ptr += nBytes;  
  35.   if (encrypted && encrypted != buf)  
  36.     free(encrypted);  
  37.   return n == 0;  

該函數中,RTMPSockBuf_Send()完成了資料發送的功能,再來看看這個函數(函數調用真是好多啊。。。。)

  1. //Socket發送(指明套接字,buffer緩沖區,資料長度)  
  2. //傳回所發資料量  
  3. RTMPSockBuf_Send(RTMPSockBuf *sb, const char *buf, int len)  
  4.   int rc;  
  5. #ifdef _DEBUG  
  6.   fwrite(buf, 1, len, netstackdump);  
  7. #if defined(CRYPTO) && !defined(NO_SSL)  
  8.   if (sb->sb_ssl)  
  9.       rc = TLS_write((SSL *)sb->sb_ssl, buf, len);  
  10.     //向一個已連接配接的套接口發送資料。  
  11.     //int send( SOCKET s, const char * buf, int len, int flags);  
  12.     //s:一個用于辨別已連接配接套接口的描述字。  
  13.     //buf:包含待發送資料的緩沖區。     
  14.     //len:緩沖區中資料的長度。  
  15.     //flags:調用執行方式。  
  16.     //rc:所發資料量。  
  17.       rc = send(sb->sb_socket, buf, len, 0);  
  18.   return rc;  
  19. RTMPSockBuf_Close(RTMPSockBuf *sb)  
  20.       TLS_shutdown((SSL *)sb->sb_ssl);  
  21.       TLS_close((SSL *)sb->sb_ssl);  
  22.       sb->sb_ssl = NULL;  
  23.   return closesocket(sb->sb_socket);  

到這個函數的時候,發現一層層的調用終于完成了,最後調用了系統Socket的send()函數完成了資料的發送功能。

之前貼過一張圖總結這個過程,可能了解起來要友善一些:RTMPDump源代碼分析 0: 主要函數調用分析

繼續閱讀