天天看點

Crtmp 源碼分析

    Crtmp Server接收rtmp音視訊流,并實作音視訊并發,可以作為直播背景的服務。整套代碼量并不大,算是輕量級的服務。

花了些時間研究源碼,現将研究的結果,記錄下來,友善以後查閱。

     先不從架構上分析,直接看代碼。我是将crtmp運作在windows環境下,是以代碼分析以windows參考。

//這個方法在一個while循環裡執行
bool IOHandlerManager::Pulse() {
    if (_isShuttingDown)
        return false;
    //1. Create a copy of all fd sets
    FD_ZERO(&_readFdsCopy);
    FD_ZERO(&_writeFdsCopy);
    FD_ZERO(&_writeFdsCopy);
    FD_COPY(&_readFds, &_readFdsCopy);
    FD_COPY(&_writeFds, &_writeFdsCopy);

    //2. compute the max fd
    if (_activeIOHandlers.size() == 0)
        return true;

    //3. do the select
  //檢查讀寫fd集合是否有變化。
    RESET_TIMER(_timeout, 1, 0);
    int32_t count = select(MAP_KEY(--_fdState.end()) + 1, &_readFdsCopy, &_writeFdsCopy, NULL, &_timeout);
    if (count < 0) {
        FATAL("Unable to do select: %u", (uint32_t) LASTSOCKETERROR);
        return false;
    }

    _pTimersManager->TimeElapsed(time(NULL));

    if (count == 0) {
        return true;
    }

    //4. Start crunching the sets
    // _activeIOHandlers 是以IOHandler為父類的類集合。這些類在構造函數裡将自己
  //添加到_activeIOHandlers 中。沒有外來連接配接請求來時 _activeIOHandlers
   //已經存在IOHandler子類,要不下面的内容永遠執行不到。
  //這些IOHandler子類時在configure moudle建立的

    FOR_MAP(_activeIOHandlers, uint32_t, IOHandler *, i) {
        if (FD_ISSET(MAP_VAL(i)->GetInboundFd(), &_readFdsCopy)) {
            _currentEvent.type = SET_READ;
            if (!MAP_VAL(i)->OnEvent(_currentEvent))
                EnqueueForDelete(MAP_VAL(i));
        }
        if (FD_ISSET(MAP_VAL(i)->GetOutboundFd(), &_writeFdsCopy)) {
            _currentEvent.type = SET_WRITE;
            if (!MAP_VAL(i)->OnEvent(_currentEvent))
                EnqueueForDelete(MAP_VAL(i));
        }
    }

    return true;
}      

   configure module

bool Module::BindAcceptors() {
//accpetors 來自lua腳本。如下圖1所示,腳本中有3個acceptor。
    FOR_MAP(config[CONF_ACCEPTORS], string, Variant, i) {
        if (!BindAcceptor(MAP_VAL(i))) {
            FATAL("Unable to configure acceptor:\n%s", STR(MAP_VAL(i).ToString()));
            return false;
        }
    }
    return true;
}

bool Module::BindAcceptor(Variant &node) {
    //1. Get the chain
    vector<uint64_t> chain;
  // CONF_PROTOCOL 表示 "protocol",ResolveProtocolChain代碼在下面,該方法
   //執行後的傳回值chain 包含的内容是PT_TCP,PT_INBOUND_RTMP.
    chain = ProtocolFactoryManager::ResolveProtocolChain(node[CONF_PROTOCOL]);
    if (chain.size() == 0) {
        WARN("Invalid protocol chain: %s", STR(node[CONF_PROTOCOL]));
        return true;
    }

    //2. Is it TCP or UDP based?
    if (chain[0] == PT_TCP) {
        //3. This is a tcp acceptor. Instantiate it and start accepting connections
     //建立TCP Acceptor,以圖1第一組資料來看,ip為0.0.0.0,port 為1935,node為圖1第一組資料
        //chain包含的内容有PT_TCP,PT_INBOUND_RTM。
        TCPAcceptor *pAcceptor = new TCPAcceptor(node[CONF_IP],
                node[CONF_PORT], node, chain);
       //調用Bind方法,具體方法内容在下面
        if (!pAcceptor->Bind()) {
            FATAL("Unable to fire up acceptor from this config node: %s",
                    STR(node.ToString()));
            return false;
        }
        ADD_VECTOR_END(acceptors, pAcceptor);
        return true;
    } else if (chain[0] == PT_UDP) {
        //4. Ok, this is an UDP acceptor. Because of that, we can instantiate
        //the full stack. Get the stack first
        BaseProtocol *pProtocol = ProtocolFactoryManager::CreateProtocolChain(
                chain, node);
        if (pProtocol == NULL) {
            FATAL("Unable to instantiate protocol stack %s", STR(node[CONF_PROTOCOL]));
            return false;
        }

        //5. Create the carrier and bind it
        UDPCarrier *pUDPCarrier = UDPCarrier::Create(node[CONF_IP], node[CONF_PORT],
                pProtocol);
        if (pUDPCarrier == NULL) {
            FATAL("Unable to instantiate UDP carrier on %s:%hu",
                    STR(node[CONF_IP]), (uint16_t) node[CONF_PORT]);
            pProtocol->EnqueueForDelete();
            return false;
        }
        pUDPCarrier->SetParameters(node);
        ADD_VECTOR_END(acceptors, pUDPCarrier);

        //6. We are done
        return true;
    } else {
        FATAL("Invalid carrier type");
        return false;
    }
}
      

vector<uint64_t> DefaultProtocolFactory::ResolveProtocolChain(string name) {

 vector<uint64_t> result;

 if (false) {

 }

#ifdef HAS_PROTOCOL_DNS

 else if (name == CONF_PROTOCOL_INBOUND_DNS) {

  ADD_VECTOR_END(result, PT_TCP);

  ADD_VECTOR_END(result, PT_INBOUND_DNS);

 } else if (name == CONF_PROTOCOL_OUTBOUND_DNS) {

  ADD_VECTOR_END(result, PT_OUTBOUND_DNS);

#endif /* HAS_PROTOCOL_DNS */

#ifdef HAS_PROTOCOL_RTMP

 else if (name == CONF_PROTOCOL_INBOUND_RTMP) {

//圖1的第一組資料對應添加的協定類型在這裡

  ADD_VECTOR_END(result, PT_INBOUND_RTMP);

 } else if (name == CONF_PROTOCOL_OUTBOUND_RTMP) {

  ADD_VECTOR_END(result, PT_OUTBOUND_RTMP);

 } else if (name == CONF_PROTOCOL_INBOUND_RTMPS) {

  ADD_VECTOR_END(result, PT_INBOUND_SSL);

  ADD_VECTOR_END(result, PT_INBOUND_RTMPS_DISC);

#ifdef HAS_PROTOCOL_HTTP

 else if (name == CONF_PROTOCOL_INBOUND_RTMPT) {

  ADD_VECTOR_END(result, PT_INBOUND_HTTP);

  ADD_VECTOR_END(result, PT_INBOUND_HTTP_FOR_RTMP);

#endif /* HAS_PROTOCOL_HTTP */

#endif /* HAS_PROTOCOL_RTMP */

#ifdef HAS_PROTOCOL_TS

 else if (name == CONF_PROTOCOL_INBOUND_TCP_TS) {

  ADD_VECTOR_END(result, PT_INBOUND_TS);

 } else if (name == CONF_PROTOCOL_INBOUND_UDP_TS) {

  ADD_VECTOR_END(result, PT_UDP);

#endif /* HAS_PROTOCOL_TS */

#ifdef HAS_PROTOCOL_RTP

 else if (name == CONF_PROTOCOL_INBOUND_RTSP) {

  ADD_VECTOR_END(result, PT_RTSP);

 } else if (name == CONF_PROTOCOL_RTSP_RTCP) {

  ADD_VECTOR_END(result, PT_RTCP);

 } else if (name == CONF_PROTOCOL_UDP_RTCP) {

 } else if (name == CONF_PROTOCOL_INBOUND_RTSP_RTP) {

  ADD_VECTOR_END(result, PT_INBOUND_RTP);

 } else if (name == CONF_PROTOCOL_INBOUND_UDP_RTP) {

 } else if (name == CONF_PROTOCOL_RTP_NAT_TRAVERSAL) {

  ADD_VECTOR_END(result, PT_RTP_NAT_TRAVERSAL);

#endif /* HAS_PROTOCOL_RTP */

 else if (name == CONF_PROTOCOL_OUTBOUND_HTTP) {

  ADD_VECTOR_END(result, PT_OUTBOUND_HTTP);

#ifdef HAS_PROTOCOL_LIVEFLV

 else if (name == CONF_PROTOCOL_INBOUND_LIVE_FLV) {

  ADD_VECTOR_END(result, PT_INBOUND_LIVE_FLV);

#endif /* HAS_PROTOCOL_LIVEFLV */

#ifdef HAS_PROTOCOL_VAR

 else if (name == CONF_PROTOCOL_INBOUND_XML_VARIANT) {

  ADD_VECTOR_END(result, PT_XML_VAR);

 } else if (name == CONF_PROTOCOL_INBOUND_BIN_VARIANT) {

  ADD_VECTOR_END(result, PT_BIN_VAR);

 } else if (name == CONF_PROTOCOL_OUTBOUND_XML_VARIANT) {

 } else if (name == CONF_PROTOCOL_OUTBOUND_BIN_VARIANT) {

 else if (name == CONF_PROTOCOL_INBOUND_HTTP_XML_VARIANT) {

 } else if (name == CONF_PROTOCOL_INBOUND_HTTP_BIN_VARIANT) {

 } else if (name == CONF_PROTOCOL_OUTBOUND_HTTP_XML_VARIANT) {

 } else if (name == CONF_PROTOCOL_OUTBOUND_HTTP_BIN_VARIANT) {

#endif /* HAS_PROTOCOL_VAR */

#ifdef HAS_PROTOCOL_CLI

 else if (name == CONF_PROTOCOL_INBOUND_CLI_JSON) {

  ADD_VECTOR_END(result, PT_INBOUND_JSONCLI);

 else if (name == CONF_PROTOCOL_INBOUND_HTTP_CLI_JSON) {

  ADD_VECTOR_END(result, PT_HTTP_4_CLI);

#endif /* HAS_PROTOCOL_CLI */

#ifdef HAS_PROTOCOL_MMS

 else if (name == CONF_PROTOCOL_OUTBOUND_MMS) {

  ADD_VECTOR_END(result, PT_OUTBOUND_MMS);

#endif /* HAS_PROTOCOL_MMS */

#ifdef HAS_PROTOCOL_RAWHTTPSTREAM

 else if (name == CONF_PROTOCOL_INBOUND_RAW_HTTP_STREAM) {

  ADD_VECTOR_END(result, PT_INBOUND_RAW_HTTP_STREAM);

 } else if (name == CONF_PROTOCOL_INBOUND_RAW_HTTPS_STREAM) {

#endif /* HAS_PROTOCOL_RAWHTTPSTREAM */

 else {

  FATAL("Invalid protocol chain: %s.", STR(name));

 return result;

}

bool TCPAcceptor::Bind() {

//建立socket

 _inboundFd = _outboundFd = (int) socket(PF_INET, SOCK_STREAM, 0);

 if (_inboundFd < 0) {

  int err = LASTSOCKETERROR;

  FATAL("Unable to create socket: %s(%d)", strerror(err), err);

  return false;

 if (!setFdOptions(_inboundFd)) {

  FATAL("Unable to set socket options");

//将建立socket綁定到ip 為0.0.0.0,port為1935的socket address中。

 if (bind(_inboundFd, (sockaddr *) & _address, sizeof (sockaddr)) != 0) {

  int error = LASTSOCKETERROR;

  FATAL("Unable to bind on address: tcp://%s:%hu; Error was: %s (%d)",

    inet_ntoa(((sockaddr_in *) & _address)->sin_addr),

    ENTOHS(((sockaddr_in *) & _address)->sin_port),

    strerror(error),

    error);

 if (_port == 0) {

  socklen_t tempSize = sizeof (sockaddr);

  if (getsockname(_inboundFd, (sockaddr *) & _address, &tempSize) != 0) {

   FATAL("Unable to extract the random port");

   return false;

  }

  _parameters[CONF_PORT] = (uint16_t) ENTOHS(_address.sin_port);

//監聽新建立的socket fd,什麼時候執行Accept是通過select模型來實作的,但需要将fd添加到select監控

//的fd集合中,這是在activate acceptor中完成的。具體代碼在見下面。

 if (listen(_inboundFd, 100) != 0) {

  FATAL("Unable to put the socket in listening mode");

 _enabled = true;

 return true;

bool BaseClientApplication::ActivateAcceptor(IOHandler *pIOHandler) {

 switch (pIOHandler->GetType()) {

  case IOHT_ACCEPTOR:

  {

   TCPAcceptor *pAcceptor = (TCPAcceptor *) pIOHandler;

   pAcceptor->SetApplication(this);

   return pAcceptor->StartAccept();

  case IOHT_UDP_CARRIER:

   UDPCarrier *pUDPCarrier = (UDPCarrier *) pIOHandler;

   pUDPCarrier->GetProtocol()->GetNearEndpoint()->SetApplication(this);

   return pUDPCarrier->StartAccept();

  default:

   FATAL("Invalid acceptor type");

bool TCPAcceptor::StartAccept() {

//該方法将TCPAcceptor建立的fd添加到全局的fd集合中

//當有連接配接請求進來時,端口号1935上來了連接配接請求,主循環中select方法會傳回,

//在fd集合中根據相應的fd找到它歸屬的TCPAcceptor,并調用TCPAcceptor的ONEvent方法

 return IOHandlerManager::EnableAcceptConnections(this);

bool TCPAcceptor::OnEvent(select_event &event) {

 if (!OnConnectionAvailable(event))

  return IsAlive();

 else

  return true;

bool TCPAcceptor::OnConnectionAvailable(select_event &event) {

 if (_pApplication == NULL)

  return Accept();

 return _pApplication->AcceptTCPConnection(this);

bool TCPAcceptor::Accept() {

 sockaddr address;

 memset(&address, 0, sizeof (sockaddr));

 socklen_t len = sizeof (sockaddr);

 int32_t fd;

 int32_t error;

 //1. Accept the connection

//OnEvent方法會調用這個間接調用改方法

 fd = accept(_inboundFd, &address, &len);

 error = LASTSOCKETERROR;

 if (fd < 0) {

  FATAL("Unable to accept client connection: %s (%d)", strerror(error), error);

 if (!_enabled) {

  CLOSE_SOCKET(fd);

  _droppedCount++;

  WARN("Acceptor is not enabled. Client dropped: %s:%hu -> %s:%hu",

    inet_ntoa(((sockaddr_in *) & address)->sin_addr),

    ENTOHS(((sockaddr_in *) & address)->sin_port),

    STR(_ipAddress),

    _port);

 INFO("Client connected: %s:%hu -> %s:%hu",

   inet_ntoa(((sockaddr_in *) & address)->sin_addr),

   ENTOHS(((sockaddr_in *) & address)->sin_port),

   STR(_ipAddress),

   _port);

 if (!setFdOptions(fd)) {

 //4. Create the chain

//建立協定,以圖1中第一組資料為例,_protocolChain中包含PT_TCP,PT_INBOUND_RTMP

//這裡共建立兩個協定,tcp Protocol和 inbound rtmp,且tcp協定的near Protocol指向

//inbound rtmp 傳回 inbound rtmp協定

 BaseProtocol *pProtocol = ProtocolFactoryManager::CreateProtocolChain(

   _protocolChain, _parameters);

 if (pProtocol == NULL) {

  FATAL("Unable to create protocol chain");

 //5. Create the carrier and bind it

 TCPCarrier *pTCPCarrier = new TCPCarrier(fd);

//pProtocol->GetFarEndpoint()指向tcp protocol。

 pTCPCarrier->SetProtocol(pProtocol->GetFarEndpoint());

 pProtocol->GetFarEndpoint()->SetIOHandler(pTCPCarrier);

 //6. Register the protocol stack with an application

 if (_pApplication != NULL) {

  pProtocol = pProtocol->GetNearEndpoint();

  pProtocol->SetApplication(_pApplication);

//調用tcp protocol 相應的方法

 if (pProtocol->GetNearEndpoint()->GetOutputBuffer() != NULL)

  pProtocol->GetNearEndpoint()->EnqueueForOutbound();

 _acceptedCount++;

 //7. Done

Variant & TCPAcceptor::GetParameters() {

 return _parameters;

Crtmp 源碼分析

           圖1 acceptors

 有需要讨論的加群 流媒體/Ffmpeg/音視訊 127903734,QQ350197870

繼續閱讀