天天看点

live555的RTCP .

live555中默认支持RTCP,如果要监视网络状态就要了解RTCP。我们这里以openRTSP为例看看RTCP的过程。

在前面的openRTSP分析中分析了openRTSP的流程,其中在在continueAfterDESCRIBE中有subsession->initiate(simpleRTPoffsetArg),在这里进行了RTP socket和RTCP socket的建立。

[cpp] view plain copy print ?

  1. if (isSSM()) {  
  2.       fRTCPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, rtcpPortNum);  
  3.     } else {  
  4.       fRTCPSocket = new Groupsock(env(), tempAddr, rtcpPortNum, 255);  
  5.     }  
if (isSSM()) {
	  fRTCPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, rtcpPortNum);
	} else {
	  fRTCPSocket = new Groupsock(env(), tempAddr, rtcpPortNum, 255);
	}
           

这里建立了RTCP 的socket,下面初始化fRTCPInstance

[cpp] view plain copy print ?

  1. if (fRTPSource != NULL && fRTCPSocket != NULL) {// Finally, create our RTCP instance. (It starts running automatically)   
  2.       // If bandwidth is specified, use it and add 5% for RTCP overhead.   
  3.       // Otherwise make a guess at 500 kbps.   
  4.       unsigned totSessionBandwidth  
  5.     = fBandwidth ? fBandwidth + fBandwidth / 20 : 500;  
  6.       fRTCPInstance = RTCPInstance::createNew(env(), fRTCPSocket,  
  7.                           totSessionBandwidth,  
  8.                           (unsigned char const*)  
  9.                           fParent.CNAME(),  
  10.                           NULL ,  
  11.                           fRTPSource);  
  12.       if (fRTCPInstance == NULL) {  
  13.     env().setResultMsg("Failed to create RTCP instance");  
  14.     break;  
  15.       }  
  16.     }  
if (fRTPSource != NULL && fRTCPSocket != NULL) {// Finally, create our RTCP instance. (It starts running automatically)
      // If bandwidth is specified, use it and add 5% for RTCP overhead.
      // Otherwise make a guess at 500 kbps.
      unsigned totSessionBandwidth
	= fBandwidth ? fBandwidth + fBandwidth / 20 : 500;
      fRTCPInstance = RTCPInstance::createNew(env(), fRTCPSocket,
					      totSessionBandwidth,
					      (unsigned char const*)
					      fParent.CNAME(),
					      NULL /* we're a client */,
					      fRTPSource);
      if (fRTCPInstance == NULL) {
	env().setResultMsg("Failed to create RTCP instance");
	break;
      }
    }
           

根据注释提示,fRTCPInstance会自动开始运行,如何运行的呢,我们再来看看。

[cpp] view plain copy print ?

  1. RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs,  
  2.                       unsigned totSessionBW,  
  3.                       unsigned char const* cname,  
  4.                       RTPSink* sink, RTPSource const* source,  
  5.                       Boolean isSSMSource) {  
  6.   return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source,  
  7.               isSSMSource);  
  8. }  
RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs,
				      unsigned totSessionBW,
				      unsigned char const* cname,
				      RTPSink* sink, RTPSource const* source,
				      Boolean isSSMSource) {
  return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source,
			  isSSMSource);
}
           

为了过程的清晰,这里还是贴出所有代码

[cpp] view plain copy print ?

  1. RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,  
  2.                unsigned totSessionBW,  
  3.                unsigned char const* cname,  
  4.                RTPSink* sink, RTPSource const* source,  
  5.                Boolean isSSMSource)  
  6.   : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW),  
  7.     fSink(sink), fSource(source), fIsSSMSource(isSSMSource),  
  8.     fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1),  
  9.     fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0),  
  10.     fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0),  
  11.     fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE),  
  12.     fHaveJustSentPacket(False), fLastPacketSentSize(0),  
  13.     fByeHandlerTask(NULL), fByeHandlerClientData(NULL),  
  14.     fSRHandlerTask(NULL), fSRHandlerClientData(NULL),  
  15.     fRRHandlerTask(NULL), fRRHandlerClientData(NULL),  
  16.     fSpecificRRHandlerTable(NULL) {  
  17. #ifdef DEBUG   
  18.   fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this);  
  19. #endif   
  20.   if (fTotSessionBW == 0) { // not allowed!   
  21.     env << "RTCPInstance::RTCPInstance error: totSessionBW parameter should not be zero!\n";  
  22.     fTotSessionBW = 1;  
  23.   }  
  24.   if (isSSMSource) RTCPgs->multicastSendOnly(); // don't receive multicast   
  25.   double timeNow = dTimeNow();  
  26.   fPrevReportTime = fNextReportTime = timeNow;  
  27.   fKnownMembers = new RTCPMemberDatabase(*this);  
  28.   fInBuf = new unsigned char[maxPacketSize];  
  29.   if (fKnownMembers == NULL || fInBuf == NULL) return;  
  30.   fNumBytesAlreadyRead = 0;  
  31.   // A hack to save buffer space, because RTCP packets are always small:   
  32.   unsigned savedMaxSize = OutPacketBuffer::maxSize;  
  33.   OutPacketBuffer::maxSize = maxPacketSize;  
  34.   fOutBuf = new OutPacketBuffer(preferredPacketSize, maxPacketSize);  
  35.   OutPacketBuffer::maxSize = savedMaxSize;  
  36.   if (fOutBuf == NULL) return;  
  37.   // Arrange to handle incoming reports from others:   
  38.   TaskScheduler::BackgroundHandlerProc* handler  
  39.     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;  
  40.   fRTCPInterface.startNetworkReading(handler);  
  41.   // Send our first report.   
  42.   fTypeOfEvent = EVENT_REPORT;  
  43.   onExpire(this);  
  44. }  
RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
			   unsigned totSessionBW,
			   unsigned char const* cname,
			   RTPSink* sink, RTPSource const* source,
			   Boolean isSSMSource)
  : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW),
    fSink(sink), fSource(source), fIsSSMSource(isSSMSource),
    fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1),
    fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0),
    fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0),
    fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE),
    fHaveJustSentPacket(False), fLastPacketSentSize(0),
    fByeHandlerTask(NULL), fByeHandlerClientData(NULL),
    fSRHandlerTask(NULL), fSRHandlerClientData(NULL),
    fRRHandlerTask(NULL), fRRHandlerClientData(NULL),
    fSpecificRRHandlerTable(NULL) {
#ifdef DEBUG
  fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this);
#endif
  if (fTotSessionBW == 0) { // not allowed!
    env << "RTCPInstance::RTCPInstance error: totSessionBW parameter should not be zero!\n";
    fTotSessionBW = 1;
  }

  if (isSSMSource) RTCPgs->multicastSendOnly(); // don't receive multicast

  double timeNow = dTimeNow();
  fPrevReportTime = fNextReportTime = timeNow;

  fKnownMembers = new RTCPMemberDatabase(*this);
  fInBuf = new unsigned char[maxPacketSize];
  if (fKnownMembers == NULL || fInBuf == NULL) return;
  fNumBytesAlreadyRead = 0;

  // A hack to save buffer space, because RTCP packets are always small:
  unsigned savedMaxSize = OutPacketBuffer::maxSize;
  OutPacketBuffer::maxSize = maxPacketSize;
  fOutBuf = new OutPacketBuffer(preferredPacketSize, maxPacketSize);
  OutPacketBuffer::maxSize = savedMaxSize;
  if (fOutBuf == NULL) return;

  // Arrange to handle incoming reports from others:
  TaskScheduler::BackgroundHandlerProc* handler
    = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
  fRTCPInterface.startNetworkReading(handler);

  // Send our first report.
  fTypeOfEvent = EVENT_REPORT;
  onExpire(this);
}
           

可以看到这里加入了handler,handler为incomingReportHandler。看看这个startNetworkReading。

[cpp] view plain copy print ?

  1. void RTPInterface  
  2. ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {  
  3.   // Normal case: Arrange to read UDP packets:   
  4.   envir().taskScheduler().  
  5.     turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);  
  6.   // Also, receive RTP over TCP, on each of our TCP connections:   
  7.   fReadHandlerProc = handlerProc;  
  8.   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;  
  9.        streams = streams->fNext) {  
  10.     // Get a socket descriptor for "streams->fStreamSocketNum":   
  11.     SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);  
  12.     // Tell it about our subChannel:   
  13.     socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);  
  14.   }  
  15. }  
void RTPInterface
::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
  // Normal case: Arrange to read UDP packets:
  envir().taskScheduler().
    turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);

  // Also, receive RTP over TCP, on each of our TCP connections:
  fReadHandlerProc = handlerProc;
  for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
       streams = streams->fNext) {
    // Get a socket descriptor for "streams->fStreamSocketNum":
    SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);

    // Tell it about our subChannel:
    socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
  }
}
           

第一句代码表示默认是通过udp来发送rtcp的包,如果rtp是tcp的就通过下面的代码处理。

这里的turnOnBackgroundReadHanding就是调用setBackgroundHandling(socketNum, SOCKET_READABLE, handlerProc, clientData);在前面分析过就是将socket加入到select set集中进行监视并传入handlerProc。这里不做重复讨论,接着看下面的注释是send our first report,接着调用了onExpire(this),来看看

[cpp] view plain copy print ?

  1. void RTCPInstance::onExpire(RTCPInstance* instance) {  
  2.   instance->onExpire1();  
  3. }  
void RTCPInstance::onExpire(RTCPInstance* instance) {
  instance->onExpire1();
}
           

[cpp] view plain copy print ?

  1. void RTCPInstance::onExpire1() {  
  2.   // Note: fTotSessionBW is kbits per second   
  3.   double rtcpBW = 0.05*fTotSessionBW*1024/8; // -> bytes per second   
  4.   OnExpire(this, // event   
  5.        numMembers(), // members   
  6.        (fSink != NULL) ? 1 : 0, // senders   
  7.        rtcpBW, // rtcp_bw   
  8.        (fSink != NULL) ? 1 : 0, // we_sent   
  9.        &fAveRTCPSize, // ave_rtcp_size   
  10.        &fIsInitial, // initial   
  11.        dTimeNow(), // tc   
  12.        &fPrevReportTime, // tp   
  13.        &fPrevNumMembers // pmembers   
  14.        );  
  15. }  
void RTCPInstance::onExpire1() {
  // Note: fTotSessionBW is kbits per second
  double rtcpBW = 0.05*fTotSessionBW*1024/8; // -> bytes per second

  OnExpire(this, // event
	   numMembers(), // members
	   (fSink != NULL) ? 1 : 0, // senders
	   rtcpBW, // rtcp_bw
	   (fSink != NULL) ? 1 : 0, // we_sent
	   &fAveRTCPSize, // ave_rtcp_size
	   &fIsInitial, // initial
	   dTimeNow(), // tc
	   &fPrevReportTime, // tp
	   &fPrevNumMembers // pmembers
	   );
}
           

再来看看这个OnExpire

[cpp] view plain copy print ?

  1. void OnExpire(event e,  
  2.                  int    members,  
  3.                  int    senders,  
  4.                  double rtcp_bw,  
  5.                  int    we_sent,  
  6.                  double *avg_rtcp_size,  
  7.                  int    *initial,  
  8.                  time_tp   tc,  
  9.                  time_tp   *tp,  
  10.                  int    *pmembers)  
  11.    {  
  12.        double t;       
  13.        double tn;      
  14.        if (TypeOfEvent(e) == EVENT_BYE) {  
  15.            t = rtcp_interval(members,  
  16.                              senders,  
  17.                              rtcp_bw,  
  18.                              we_sent,  
  19.                              *avg_rtcp_size,  
  20.                              *initial);  
  21.            tn = *tp + t;  
  22.            if (tn <= tc) {  
  23.                SendBYEPacket(e);  
  24.                exit(1);  
  25.            } else {  
  26.                Schedule(tn, e);  
  27.            }  
  28.        } else if (TypeOfEvent(e) == EVENT_REPORT) {  
  29.            t = rtcp_interval(members,  
  30.                              senders,  
  31.                              rtcp_bw,  
  32.                              we_sent,  
  33.                              *avg_rtcp_size,  
  34.                              *initial);  
  35.            tn = *tp + t;  
  36.            if (tn <= tc) {  
  37.                SendRTCPReport(e);  
  38.                *avg_rtcp_size = (1./16.)*SentPacketSize(e) +  
  39.                    (15./16.)*(*avg_rtcp_size);  
  40.                *tp = tc;  
  41.                t = rtcp_interval(members,  
  42.                                  senders,  
  43.                                  rtcp_bw,  
  44.                                  we_sent,  
  45.                                  *avg_rtcp_size,  
  46.                                  *initial);  
  47.                Schedule(t+tc,e);  
  48.                *initial = 0;  
  49.            } else {  
  50.                Schedule(tn, e);  
  51.            }  
  52.            *pmembers = members;  
  53.        }  
  54.    }  

继续阅读