最近在研究OBS源码,里面有一个很重要的模块是推流模块,OBS是使用RTMP进行推流的,源码里面也有RTMP的源码,翻了一下目前网上没有详细的RTMP源码注释,所以这里基于OBS项目,来详细讲一下RTMP源码包括内核数据结构、公共函数接口功能。关于具体的RTMP协议,网上有很多RTMP协议可以找到这里只做简单介绍,重点是代码的注释分析。关于RTMP源码的内核结构体,在代码中涉及的我会有标注,在另一个博文中具体分析了核心结构体注释。
接口比较多写的比较细,文章比较长,有些函数体中无效的代码(例如 log日志、容错代码我会省略)耐心看哈哈哈。
小弟写的比较辛苦给个关注吧
这里所有的实际测试推流操作均为向斗鱼上推流(因为我一直用它看直播哈)
这一博文主要讲rtmp读取服务端数据并处理,在上一个博文中提到当RTMP连接网络并握手后发送av_connect命令,服务器发送反馈消息后,客户端需要响应。RTMP_ConnectStream 主要作用为读取反馈命令并调用响应的接口进行处理。
int RTMP_ConnectStream(RTMP *r, int seekTime)
{
RTMPPacket packet = { 0 }; //重新初始化的packet
/* seekTime was already set by SetupStream / SetupURL.
* This is only needed by ReconnectStream.
*/
if (seekTime > 0)
r->Link.seekTime = seekTime;
r->m_mediaChannel = 0;
//循环读取套接字中缓存内容
while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet))
{
if (RTMPPacket_IsReady(&packet))//message大小是否等于已读大小 即 m_nBytesRead是否等于m_nBodySize
{
if (!packet.m_nBodySize)
continue;
//音视频数据跳过
if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) ||
(packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) ||
(packet.m_packetType == RTMP_PACKET_TYPE_INFO))
{
RTMP_Log(RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring.");
RTMPPacket_Free(&packet);
continue;
}
RTMP_ClientPacket(r, &packet);
RTMPPacket_Free(&packet);
}
}
return r->m_bPlaying;
}
重要接口哈
typedef struct RTMPPacket //一个Tag Data的数据结构(这里是包含了由flv封装的Tag Data head)或者是"命令"数据,根据chunk结构定义的
{
uint8_t m_headerType;// chunk type id (2bit)fmt 对应message head {0,3,7,11} + (6bit)chunk stream id /*大部分情况是一个字节
uint8_t m_packetType;// Message type ID(1-7协议控制;8,9音视频;10以后为AMF编码消息)/*一个字节
uint8_t m_hasAbsTimestamp; //绝对时间戳/* timestamp absolute or relative? */// 是否含有Extend timeStamp字段
int m_nChannel; //chunk stream id(chunk basic header)字段
uint32_t m_nTimeStamp; /* timestamp */
int32_t m_nInfoField2; //message stream id /* last 4 bytes in a long header
uint32_t m_nBodySize; //经过AMF编码组包后,message的大小 (如果是音视频数据 即FLV格式一个Tag中Tag Data大小)
uint32_t m_nBytesRead; //需要发送一个Tag Data的数据大小或者是已读取数据大小
RTMPChunk *m_chunk;
char *m_body;//Tag Data数据 Tag data的起始指针因为会循环读取,给m_body所在内存赋值,直到一个Tag Data读完
//当chunk中的内容为命令时,m_body为chunk body(data)的起始指针
} RTMPPacket
//读取一个chunk内容,有时message被分成很多chunk,所以需要循环调用RTMP_ReadPacket
//RTMPPacket是根据chunk定义的 一个chunk相当于一个RTMPPacket
//r->m_vecChannelsIn二级指针记录读取上一个chunk的信息(当收到一个message被分成多个chunk,需要循环读取 RTMP_ReadPacket)
//因为当message被分成多个chunk时,只有第一个chunk的信息是完整的(message head为11字节),所以就需要在一个message中记录上一个chunk的基本信息,如msg大小、msg type等
//memcpy C语言拷贝内存函数
int RTMP_ReadPacket(RTMP *r, RTMPPacket *packet)
{
uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 };
char *header = (char *)hbuf;//chunk header 头部指针变量
int nSize;//message header 的大小
int hSize;//chunk basic + message head 大小
int nToRead;//总的message数据减去已读数据等于还需要读取数据
int nChunk;//chunk输入大小(chunk data大小)
// int didAlloc = FALSE;
int extendedTimestamp = 0;
RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, (int)r->m_sb.sb_socket);
if (ReadN(r, (char *)hbuf, 1) == 0) //先读一个字节,看看socket缓存是否为空
{
RTMP_Log(RTMP_LOGDEBUG, "%s, failed to read RTMP packet header", __FUNCTION__);
return FALSE;
}
packet->m_headerType = (hbuf[0] & 0xc0) >> 6; //第一个字节向前移6bit,得出fmt
packet->m_nChannel = (hbuf[0] & 0x3f);//chunk stream id
header++; //chunk header 头部指针加一
if (packet->m_nChannel == 0)//Chunk basic header有2个字节
{
if (ReadN(r, (char *)&hbuf[1], 1) != 1)//读第2个字节失败
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte",
__FUNCTION__);
return FALSE;
}
packet->m_nChannel = hbuf[1];
packet->m_nChannel += 64;
header++;
}
else if (packet->m_nChannel == 1)//Chunk basic header有3个字节
{
int tmp;
if (ReadN(r, (char *)&hbuf[1], 2) != 2)//读第3个字节失败
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte",
__FUNCTION__);
return FALSE;
}
tmp = (hbuf[2] << 8) + hbuf[1];
packet->m_nChannel = tmp + 64;
RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel);
header += 2;
}
//小Tips: static const int packetSize[] = { 12, 8, 4, 1 }数组定义message header 的大小
//这里message head 在原本{ 11, 7, 3, 0 }上加一,但是我不明白为什么会加一,因为后边在传输时会减掉一,这里注意一下
nSize = packetSize[packet->m_headerType];//message header 的大小
if (packet->m_nChannel >= r->m_channelsAllocatedIn)//r->m_channelsAllocatedIn为0
{
int n = packet->m_nChannel + 10; //n 为10
int *timestamp = realloc(r->m_channelTimestamp, sizeof(int) * n);
//r->m_vecChannelsIn二级指针记录读取上一个chunk的信息(当收到一个message被分成多个chunk,需要循环读取 RTMP_ReadPacket)
//因为当message被分成多个chunk时,只有第一个chunk的信息是完整的(message head为11字节),所以就需要在一个message中记录上一个chunk的基本信息,如msg大小、msg type等
RTMPPacket **packets = realloc(r->m_vecChannelsIn, sizeof(RTMPPacket*) * n);
if (!timestamp)
free(r->m_channelTimestamp);
if (!packets)
free(r->m_vecChannelsIn);
r->m_channelTimestamp = timestamp;
r->m_vecChannelsIn = packets;
if (!timestamp || !packets)
{
r->m_channelsAllocatedIn = 0;
return FALSE;
}
//n 为10,初始化m_channelTimestamp和m_vecChannelsIn后的10个指针为0
memset(r->m_channelTimestamp + r->m_channelsAllocatedIn, 0, sizeof(int) * (n - r->m_channelsAllocatedIn));
memset(r->m_vecChannelsIn + r->m_channelsAllocatedIn, 0, sizeof(RTMPPacket*) * (n - r->m_channelsAllocatedIn));
r->m_channelsAllocatedIn = n;
}
if (nSize == RTMP_LARGE_HEADER_SIZE) /* if we get a full header the timestamp is absolute */
packet->m_hasAbsTimestamp = TRUE; //nSize为18有绝对时间戳
else if (nSize < RTMP_LARGE_HEADER_SIZE)//nSize小于18
{
/* using values from the last message of this channel */
//取上一个chunk的基本信息,(当收到一个message被分成多个chunk,需要循环读取 RTMP_ReadPacket)
if (r->m_vecChannelsIn[packet->m_nChannel])
//取上一个chunk的基本信息,拷贝到packet
memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel],
sizeof(RTMPPacket));
}
nSize--; //因为初始化packetSize时给每个元素自动加一在这要减一,这里就是我上面讲的不知道为什么packetSize每个元素要加一
if (nSize > 0 && ReadN(r, header, nSize) != nSize) //读取socket缓存nSize字节
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x",
__FUNCTION__, (unsigned int)hbuf[0]);
return FALSE;
}
hSize = nSize + (header - (char *)hbuf);// message head + chunk basic head 的大小
if (nSize >= 3)
{
packet->m_nTimeStamp = AMF_DecodeInt24(header);
if (nSize >= 6)
{
//经过AMF编码组包后,message的大小 (如果是音视频数据 即FLV格式一个Tag中Tag Data大小),大端转小端
packet->m_nBodySize = AMF_DecodeInt24(header + 3); //经过AMF编码组包后,message的大小
packet->m_nBytesRead = 0;
RTMPPacket_Free(packet);
if (nSize > 6)
{
packet->m_packetType = header[6];// Message type ID(1-7协议控制;8,9音视频;10以后为AMF编码消息)/*一个字节
if (nSize == 11)
packet->m_nInfoField2 = DecodeInt32LE(header + 7);//message stream id
}
}
extendedTimestamp = (packet->m_nTimeStamp == 0xffffff); //绝对时间戳
if (extendedTimestamp)
{
if (ReadN(r, header + nSize, 4) != 4)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp",
__FUNCTION__);
return FALSE;
}
packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize);
hSize += 4;
}
}
RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize);
if (packet->m_nBodySize > 0 && packet->m_body == NULL)
{
if (!RTMPPacket_Alloc(packet, packet->m_nBodySize)) //在栈分配m_body指向存储空间,为了赋值
{
RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__);
return FALSE;
}
// didAlloc = TRUE;
packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
}
nToRead = packet->m_nBodySize - packet->m_nBytesRead; //总的message数据减去已读数据等于还需要读取数据
nChunk = r->m_inChunkSize;//chunk输入大小(chunk data大小)
if (nToRead < nChunk)
nChunk = nToRead;
/* Does the caller want the raw chunk? */
if (packet->m_chunk)
{
packet->m_chunk->c_headerSize = hSize;
memcpy(packet->m_chunk->c_header, hbuf, hSize);
packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead;
packet->m_chunk->c_chunkSize = nChunk;
}
//小Tips:packet->m_body这个其实指针本身是不会变,因为每次读取赋值都是赋值在packet->m_body向后偏移packet->m_nBytesRead的地址处,packet->m_body在分配好之后就不会变了
//这里要注意以下哈哈,我当时想了半天通,呵呵,这里反正是写的比较绕
if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %u",
__FUNCTION__, packet->m_nBodySize);
return FALSE;
}
RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk);
packet->m_nBytesRead += nChunk; //已读chunk数据大小
/* keep the packet as ref for other packets on this channel */
if (!r->m_vecChannelsIn[packet->m_nChannel])
//给当前m_vecChannelsIn[packet->m_nChannel]开内存空间
r->m_vecChannelsIn[packet->m_nChannel] = malloc(sizeof(RTMPPacket));
//将当前的packet赋值给m_vecChannelsIn[packet->m_nChannel]暂存起来,因为当收到一个message被分成多个chunk,需要循环读取RTMP_ReadPacket,chunk需要保留上一个的chunk信息
memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket));
if (extendedTimestamp)//如果有绝对时间戳也要保存
r->m_vecChannelsIn[packet->m_nChannel]->m_nTimeStamp = 0xffffff;
if (RTMPPacket_IsReady(packet))//message大小是否等于已读大小 即 m_nBytesRead是否等于m_nBodySize
{
/* make packet's timestamp absolute */
if (!packet->m_hasAbsTimestamp)
packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel]; /* timestamps seem to be always relative!! */
r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp;
/* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel */
/* arrives and requests to re-use some info (small packet header) */
//读完数据赋值为0
r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL;
r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0;
r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE; /* can only be false if we reuse header */
}
else
{
//说明一下:这里给m_body为NULL,是因为r->m_vecChannelsIn[packet->m_nChannel]已经存好m_body和m_nBytesRead等参数的所有配置,要是有下一个chunk只需要赋值就可以得到上一次的m_body指针位置和m_nBytesRead已读大小
//小Tips:packet->m_body这个其实指针本身是不会变,因为每次读取赋值都是赋值在packet->m_body向后偏移packet->m_nBytesRead的地址处
//但这里是为了防止野指针,每次都有r->m_vecChannelsIn[packet->m_nChannel]赋值,packet->m_body地址都会得到
//要注意一下
packet->m_body = NULL; //防止野指针/* so it won't be erased on free */
}
return TRUE;
}
int RTMPPacket_Alloc(RTMPPacket *p, int nSize) //重新分配m_body指向存储空间
{
char *ptr = calloc(1, nSize + RTMP_MAX_HEADER_SIZE);//message 大小+18
if (!ptr)
return FALSE;
//chunk的头指针后移18(因为有时m_body需要减去头部大小,RTMP_Write接口中有体现),配之后m_body就不会变了
p->m_body = ptr + RTMP_MAX_HEADER_SIZE;
p->m_nBytesRead = 0;//已读字节数
return TRUE;
}
重点接口哈哈
static int ReadN(RTMP *r, char *buffer, int n)
{
int nOriginalSize = n;
int avail;
char *ptr;
r->m_sb.sb_timedout = FALSE;
#ifdef _DEBUG
memset(buffer, 0, n); //内存初始化
#endif
ptr = buffer;
while (n > 0)
{
int nBytes = 0, nRead;
if (r->Link.protocol & RTMP_FEATURE_HTTP) //HTTP协议,暂时不管,因为这里用的是TCP协议
{
int refill = 0;
while (!r->m_resplen)
{
int ret;
if (r->m_sb.sb_size < 13 || refill)
{
if (!r->m_unackd)
HTTP_Post(r, RTMPT_IDLE, "", 1);
if (RTMPSockBuf_Fill(&r->m_sb) < 1)
{
if (!r->m_sb.sb_timedout)
RTMP_Close(r);
return 0;
}
}
if ((ret = HTTP_read(r, 0)) == -1)
{
RTMP_Log(RTMP_LOGDEBUG, "%s, No valid HTTP response found", __FUNCTION__);
RTMP_Close(r);
return 0;
}
else if (ret == -2)
{
refill = 1;
}
else
{
refill = 0;
}
}
if (r->m_resplen && !r->m_sb.sb_size)
RTMPSockBuf_Fill(&r->m_sb);
avail = r->m_sb.sb_size;
if (avail > r->m_resplen)
avail = r->m_resplen;
}
else
{
avail = r->m_sb.sb_size;
if (avail == 0)
{
//读取socket缓存中的数据
//这个函数while当中只会执行一次
if (RTMPSockBuf_Fill(&r->m_sb) < 1)
{
if (!r->m_sb.sb_timedout)
RTMP_Close(r);
return 0;
}
avail = r->m_sb.sb_size;
}
}
nRead = ((n < avail) ? n : avail);
if (nRead > 0)
{
memcpy(ptr, r->m_sb.sb_start, nRead);
r->m_sb.sb_start += nRead;
r->m_sb.sb_size -= nRead;
nBytes = nRead;
r->m_nBytesIn += nRead;
if (r->m_bSendCounter
&& r->m_nBytesIn > ( r->m_nBytesInSent + r->m_nClientBW / 10))
if (!SendBytesReceived(r))//发送读取消息
return FALSE;
}
/*RTMP_Log(RTMP_LOGDEBUG, "%s: %d bytes\n", __FUNCTION__, nBytes); */
#if defined(RTMP_NETSTACK_DUMP)
fwrite(ptr, 1, nBytes, netstackdump_read);
#endif
if (nBytes == 0)
{
RTMP_Log(RTMP_LOGDEBUG, "%s, RTMP socket closed by peer", __FUNCTION__);
/*goto again; */
RTMP_Close(r);
break;
}
if (r->Link.protocol & RTMP_FEATURE_HTTP)
r->m_resplen -= nBytes;
#ifdef CRYPTO
if (r->Link.rc4keyIn)
{
RC4_encrypt(r->Link.rc4keyIn, nBytes, ptr);
}
#endif
n -= nBytes;
ptr += nBytes;
}
return nOriginalSize - n;
}
//读取socket缓存中的数据
//这个函数while当中只会执行一次
int RTMPSockBuf_Fill(RTMPSockBuf *sb)
{
int nBytes; //读取的字节数
if (!sb->sb_size) //sb_size未处理的缓存大小
sb->sb_start = sb->sb_buf;//初始化缓存头指针
while (1)
{
nBytes = (int)sizeof(sb->sb_buf) - 1 - sb->sb_size - (sb->sb_start - sb->sb_buf);
#if defined(CRYPTO) && !defined(NO_SSL) //网络安全的一些定义
if (sb->sb_ssl)
{
nBytes = TLS_read(sb->sb_ssl, sb->sb_start + sb->sb_size, nBytes);
}
else
#endif
{
nBytes = recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0);
}
if (nBytes > 0)
{
sb->sb_size += nBytes;
}
else if (nBytes == 0)
{
...
}
else
{
int level;
int sockerr = GetSockError();
......
}
break; //只会执行一次
}
return nBytes;
}
//重点接口哈哈
//这个接口是处理服务端发送的消息
int RTMP_ClientPacket(RTMP *r, RTMPPacket *packet) //处理chunk msg 消息
{
int bHasMediaPacket = 0;
switch (packet->m_packetType)//message type
{
case RTMP_PACKET_TYPE_CHUNK_SIZE: //设置chunk 大小
/* chunk size */
HandleChangeChunkSize(r, packet);
RTMP_Log(RTMP_LOGERROR, "%s, received: bytes read report", "RTMP_PACKET_TYPE_CHUNK_SIZE");
break;
case RTMP_PACKET_TYPE_BYTES_READ_REPORT: //当达到Window Acknowledgement Size数据大小时对端回复ACK的message type
/* bytes read report */
RTMP_Log(RTMP_LOGERROR, "%s, received: bytes read report", __FUNCTION__);
break;
case RTMP_PACKET_TYPE_CONTROL:
/* ctrl */
HandleCtrl(r, packet);
RTMP_Log(RTMP_LOGERROR, "%s, received: bytes read report", "RTMP_PACKET_TYPE_CONTROL");
break;
case RTMP_PACKET_TYPE_SERVER_BW:
/* server bw */
HandleServerBW(r, packet); //Window Acknowledgement Size 服务端通知客户端收到设置数据即回复的数据大小
RTMP_Log(RTMP_LOGERROR, "%s, received: bytes read report", "RTMP_PACKET_TYPE_SERVER_BW");
break;
case RTMP_PACKET_TYPE_CLIENT_BW:
/* client bw */
HandleClientBW(r, packet); //设置客户端窗口大小
RTMP_Log(RTMP_LOGERROR, "%s, received: bytes read report", "RTMP_PACKET_TYPE_CLIENT_BW");
break;
case RTMP_PACKET_TYPE_AUDIO: //音频数据
/* audio data */
/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize); */
HandleAudio(r, packet);
bHasMediaPacket = 1; //多媒体数据
if (!r->m_mediaChannel)
r->m_mediaChannel = packet->m_nChannel;
if (!r->m_pausing)
r->m_mediaStamp = packet->m_nTimeStamp;
break;
case RTMP_PACKET_TYPE_VIDEO: //视频数据
/* video data */
/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize); */
HandleVideo(r, packet);
bHasMediaPacket = 1;//多媒体数据
if (!r->m_mediaChannel)
r->m_mediaChannel = packet->m_nChannel;
if (!r->m_pausing)
r->m_mediaStamp = packet->m_nTimeStamp;
break;
case RTMP_PACKET_TYPE_FLEX_STREAM_SEND:
/* flex stream send */
RTMP_Log(RTMP_LOGDEBUG,
"%s, flex stream send, size %u bytes, not supported, ignoring",
__FUNCTION__, packet->m_nBodySize);
break;
case RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT:
/* flex shared object */
RTMP_Log(RTMP_LOGDEBUG,
"%s, flex shared object, size %u bytes, not supported, ignoring",
__FUNCTION__, packet->m_nBodySize);
break;
case RTMP_PACKET_TYPE_FLEX_MESSAGE:
/* flex message */
{
RTMP_Log(RTMP_LOGDEBUG,
"%s, flex message, size %u bytes, not fully supported",
__FUNCTION__, packet->m_nBodySize);
/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */
/* some DEBUG code */
#if 0
RTMP_LIB_AMFObject obj;
int nRes = obj.Decode(packet.m_body+1, packet.m_nBodySize-1);
if(nRes < 0)
{
RTMP_Log(RTMP_LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__);
/*return; */
}
obj.Dump();
#endif
if (HandleInvoke(r, packet->m_body + 1, packet->m_nBodySize - 1) == 1)
bHasMediaPacket = 2;
break;
}
case RTMP_PACKET_TYPE_INFO: //处理音视频元数据
/* metadata (notify) */
RTMP_Log(RTMP_LOGDEBUG, "%s, received: notify %u bytes", __FUNCTION__,
packet->m_nBodySize);
if (HandleMetadata(r, packet->m_body, packet->m_nBodySize))
bHasMediaPacket = 1;
break;
case RTMP_PACKET_TYPE_SHARED_OBJECT:
RTMP_Log(RTMP_LOGDEBUG, "%s, shared object, not supported, ignoring",
__FUNCTION__);
break;
case RTMP_PACKET_TYPE_INVOKE: //处理"命令"类型的消息(最重要的接口,播放、暂停等都通过这个接口)
/* invoke */
RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__,
packet->m_nBodySize);
if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1)
bHasMediaPacket = 2;
break;
case RTMP_PACKET_TYPE_FLASH_VIDEO:
{
/* go through FLV packets and handle metadata packets */
unsigned int pos = 0;
uint32_t nTimeStamp = packet->m_nTimeStamp;
while (pos + 11 < packet->m_nBodySize)
{
uint32_t dataSize = AMF_DecodeInt24(packet->m_body + pos + 1); /* size without header (11) and prevTagSize (4) */
if (pos + 11 + dataSize + 4 > packet->m_nBodySize)
{
RTMP_Log(RTMP_LOGWARNING, "Stream corrupt?!");
break;
}
if (packet->m_body[pos] == 0x12)
{
HandleMetadata(r, packet->m_body + pos + 11, dataSize);
}
else if (packet->m_body[pos] == 8 || packet->m_body[pos] == 9)
{
nTimeStamp = AMF_DecodeInt24(packet->m_body + pos + 4);
nTimeStamp |= (packet->m_body[pos + 7] << 24);
}
pos += (11 + dataSize + 4);
}
if (!r->m_pausing)
r->m_mediaStamp = nTimeStamp;
/* FLV tag(s) */
/*RTMP_Log(RTMP_LOGDEBUG, "%s, received: FLV tag(s) %lu bytes", __FUNCTION__, packet.m_nBodySize); */
bHasMediaPacket = 1;
break;
}
default:
RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,
packet->m_packetType);
#ifdef _DEBUG
RTMP_LogHex(RTMP_LOGDEBUG, (const uint8_t*)packet->m_body, packet->m_nBodySize);
#endif
}
return bHasMediaPacket;
}
//处理"命令"类型的消息(最重要的接口,播放、暂停等都通过这个接口)
static int HandleInvoke(RTMP *r, const char *body, unsigned int nBodySize)
{
AMFObject obj;
AVal method;
double txn;
int ret = 0, nRes;
if (body[0] != 0x02) /* make sure it is a string method name we start with */
{
RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet",
__FUNCTION__);
return 0;
}
nRes = AMF_Decode(&obj, body, nBodySize, FALSE);//根据AMF协议解码成AMF定义数据结构(反序列)
if (nRes < 0)
{
RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);
return 0;
}
AMF_Dump(&obj);
AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);//AMF协议接口 获取"命令"名称
txn = AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
RTMP_Log(RTMP_LOGERROR, "%s, server invoking <%s>", __FUNCTION__, method.av_val);
//匹配字符串,调用相应的接口处理
//小Tips:这里有个要注意的地方,比如method和av__result是AVal 结构类型的,因为AMF协议里有AVal结构体来处理字符串
//所以比对都用AVal类型,在注册的时候用到了宏定义
#define SAVC(x) static const AVal av_##x = AVC(#x) //转换command name为特定字符串(av_.....)
SAVC(app);
SAVC(connect);
SAVC(flashVer);
SAVC(swfUrl);
.......
if (AVMATCH(&method, &av__result))
{
AVal methodInvoked = {0};
int i;
//在基于OBS的RTMP源码分析之RTMP_Write(rtmp发送音视频数据或命令)博文中介绍了
//类似RPC远程调用功能,在RTMP_SendPacket发送命令时,会存储发送的"命令"名称在m_methodCalls中
//在这里收到服务器回复后,调用接口处理完毕后需要AV_erase删除m_methodCalls中缓存的"命令"名称
for (i=0; i<r->m_numCalls; i++)
{
if (r->m_methodCalls[i].num == (int)txn)
{
methodInvoked = r->m_methodCalls[i].name;
AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE);
break;
}
}
if (!methodInvoked.av_val)
{
RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request",
__FUNCTION__, txn);
goto leave;
}
RTMP_Log(RTMP_LOGDEBUG, "%s, received result for method call <%s>", __FUNCTION__,
methodInvoked.av_val);
if (AVMATCH(&methodInvoked, &av_connect))
{
if (r->Link.token.av_len)
{
AMFObjectProperty p;
if (RTMP_FindFirstMatchingProperty(&obj, &av_secureToken, &p))
{
DecodeTEA(&r->Link.token, &p.p_vu.p_aval);
SendSecureTokenResponse(r, &p.p_vu.p_aval);
}
}
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
for (int i = 0; i < r->Link.nStreams; i++)
SendReleaseStream(r, i);//释放绑定推流码资源的ID
for (int i = 0; i < r->Link.nStreams; i++)
SendFCPublish(r, i);//发送事物ID
}
else
{
RTMP_SendServerBW(r);
RTMP_SendCtrl(r, 3, 0, 300);
}
for (int i = 0; i < r->Link.nStreams; i++)
RTMP_SendCreateStream(r);
if (!(r->Link.protocol & RTMP_FEATURE_WRITE))
{
/* Authenticate on Justin.tv legacy servers before sending FCSubscribe */
if (r->Link.usherToken.av_len)
SendUsherToken(r, &r->Link.usherToken);
/* Send the FCSubscribe if live stream or if subscribepath is set */
if (r->Link.subscribepath.av_len)
SendFCSubscribe(r, &r->Link.subscribepath);
else if (r->Link.lFlags & RTMP_LF_LIVE)
{
for (int i = 0; i < r->Link.nStreams; i++)
SendFCSubscribe(r, &r->Link.streams[i].playpath);
}
}
}
else if (AVMATCH(&methodInvoked, &av_createStream))
{
int id = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));
r->Link.streams[r->Link.curStreamIdx].id = id; //服务器返回的stream id
if (r->Link.protocol & RTMP_FEATURE_WRITE)
SendPublish(r, r->Link.curStreamIdx); //开始推流
else
{
if (r->Link.lFlags & RTMP_LF_PLST)
SendPlaylist(r, r->Link.curStreamIdx);
SendPlay(r, r->Link.curStreamIdx);
RTMP_SendCtrl(r, 3, id, r->m_nBufferMS);
}
r->Link.curStreamIdx++;
}
else if (AVMATCH(&methodInvoked, &av_play) ||
AVMATCH(&methodInvoked, &av_publish))
{
r->m_bPlaying = TRUE;
r->Link.playingStreams++;
}
RTMP_Log(RTMP_LOGERROR, "%s, server invoking <%s>", __FUNCTION__, methodInvoked.av_val);
free(methodInvoked.av_val);
}
else if (AVMATCH(&method, &av_onBWDone))
{
if (!r->m_nBWCheckCounter)
SendCheckBW(r);
}
else if (AVMATCH(&method, &av_onFCSubscribe))
{
/* SendOnFCSubscribe(); */
}
else if (AVMATCH(&method, &av_onFCUnsubscribe))
{
RTMP_Close(r);
ret = 1;
}
else if (AVMATCH(&method, &av_ping))
{
SendPong(r, txn);
}
else if (AVMATCH(&method, &av__onbwcheck))
{
SendCheckBWResult(r, txn);
}
else if (AVMATCH(&method, &av__onbwdone))
{
int i;
for (i = 0; i < r->m_numCalls; i++)
if (AVMATCH(&r->m_methodCalls[i].name, &av__checkbw))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
else if (AVMATCH(&method, &av__error))//出现错误
{
#if defined(CRYPTO) || defined(USE_ONLY_MD5)
AVal methodInvoked = {0};
int i;
if (r->Link.protocol & RTMP_FEATURE_WRITE)
{
for (i=0; i<r->m_numCalls; i++)
{
if (r->m_methodCalls[i].num == txn)
{
methodInvoked = r->m_methodCalls[i].name;
AV_erase(r->m_methodCalls, &r->m_numCalls, i, FALSE);
break;
}
}
if (!methodInvoked.av_val)
{
RTMP_Log(RTMP_LOGDEBUG, "%s, received result id %f without matching request",
__FUNCTION__, txn);
goto leave;
}
RTMP_Log(RTMP_LOGDEBUG, "%s, received error for method call <%s>", __FUNCTION__,
methodInvoked.av_val);
if (AVMATCH(&methodInvoked, &av_connect))
{
AMFObject obj2;
AVal code, level, description;
AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);
AMFProp_GetString(AMF_GetProp(&obj2, &av_description, -1), &description);
RTMP_Log(RTMP_LOGDEBUG, "%s, error description: %s", __FUNCTION__, description.av_val);
/* if PublisherAuth returns 1, then reconnect */
if (PublisherAuth(r, &description) == 1)
{
RTMP_Close(r);
if (r->Link.pFlags & RTMP_PUB_CLATE)
{
r->Link.pFlags |= RTMP_PUB_CLEAN;
}
if (!RTMP_Connect(r, NULL) || !RTMP_ConnectStream(r, 0))
{
goto leave;
}
}
}
}
else
{
RTMP_Log(RTMP_LOGERROR, "rtmp server sent error");
}
free(methodInvoked.av_val);
#else
RTMP_Log(RTMP_LOGERROR, "rtmp server sent error");
#endif
}
else if (AVMATCH(&method, &av_close))
{
RTMP_Log(RTMP_LOGERROR, "rtmp server requested close");
RTMP_Close(r);
// disabled this for now, if the server sends an rtmp close message librtmp
// will enter an infinite loop here until stack is exhausted.
#if 0 && (defined(CRYPTO) || defined(USE_ONLY_MD5))
if ((r->Link.protocol & RTMP_FEATURE_WRITE) &&
!(r->Link.pFlags & RTMP_PUB_CLEAN) &&
( !(r->Link.pFlags & RTMP_PUB_NAME) ||
!(r->Link.pFlags & RTMP_PUB_RESP) ||
(r->Link.pFlags & RTMP_PUB_CLATE) ) )
{
/* clean later */
if(r->Link.pFlags & RTMP_PUB_CLATE)
r->Link.pFlags |= RTMP_PUB_CLEAN;
RTMP_Log(RTMP_LOGERROR, "authenticating publisher");
if (!RTMP_Connect(r, NULL) || !RTMP_ConnectStream(r, 0))
goto leave;
}
#endif
}
else if (AVMATCH(&method, &av_onStatus)) //状态
{
AMFObject obj2;
AVal code, level, description;
AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);
AMFProp_GetString(AMF_GetProp(&obj2, &av_description, -1), &description);
RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);
if (AVMATCH(&code, &av_NetStream_Failed)
|| AVMATCH(&code, &av_NetStream_Play_Failed)
|| AVMATCH(&code, &av_NetStream_Play_StreamNotFound)
|| AVMATCH(&code, &av_NetConnection_Connect_InvalidApp)
|| AVMATCH(&code, &av_NetStream_Publish_Rejected)
|| AVMATCH(&code, &av_NetStream_Publish_Denied))
{
r->m_stream_id = -1;
RTMP_Close(r);
if (description.av_len)
RTMP_Log(RTMP_LOGERROR, "%s:\n%s (%s)", r->Link.tcUrl.av_val, code.av_val, description.av_val);
else
RTMP_Log(RTMP_LOGERROR, "%s:\n%s", r->Link.tcUrl.av_val, code.av_val);
}
else if (AVMATCH(&code, &av_NetStream_Play_Start)
|| AVMATCH(&code, &av_NetStream_Play_PublishNotify))
{
int i;
r->m_bPlaying = TRUE;
for (i = 0; i < r->m_numCalls; i++)
{
//删除缓存"命令"名称
if (AVMATCH(&r->m_methodCalls[i].name, &av_play))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
}
else if (AVMATCH(&code, &av_NetStream_Publish_Start))
{
int i;
r->m_bPlaying = TRUE;
for (i = 0; i < r->m_numCalls; i++)
{
//删除缓存"命令"名称
if (AVMATCH(&r->m_methodCalls[i].name, &av_publish))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
}
/* Return 1 if this is a Play.Complete or Play.Stop */
else if (AVMATCH(&code, &av_NetStream_Play_Complete)
|| AVMATCH(&code, &av_NetStream_Play_Stop)
|| AVMATCH(&code, &av_NetStream_Play_UnpublishNotify))
{
RTMP_Close(r);
ret = 1;
}
else if (AVMATCH(&code, &av_NetStream_Seek_Notify))
{
r->m_read.flags &= ~RTMP_READ_SEEKING;
}
else if (AVMATCH(&code, &av_NetStream_Pause_Notify))
{
if (r->m_pausing == 1 || r->m_pausing == 2)
{
RTMP_SendPause(r, FALSE, r->m_pauseStamp);
r->m_pausing = 3;
}
}
}
else if (AVMATCH(&method, &av_playlist_ready))
{
int i;
for (i = 0; i < r->m_numCalls; i++)
{
if (AVMATCH(&r->m_methodCalls[i].name, &av_set_playlist))
{
AV_erase(r->m_methodCalls, &r->m_numCalls, i, TRUE);
break;
}
}
}
else
{
}
leave:
AMF_Reset(&obj);
return ret;
}