天天看點

SRS(simple-rtmp-server)流媒體伺服器源碼分析--系統啟動SRS(simple-rtmp-server)流媒體伺服器源碼分析--系統啟動

轉載自 https://blog.csdn.net/ManagerUser/article/details/73840130

SRS(simple-rtmp-server)流媒體伺服器源碼分析--系統啟動

一、前言

       小卒最近看SRS源碼,随手寫下部落格,其一為了整理思路,其二也是為日後翻看友善。如果不足之處,請指教!

首先總結一下SRS源碼的優點:

       1、輕量級,代碼結構清楚,目前SRS3.0代碼8萬行左右,但幾乎滿足直播業務的所有要求。

       2、SRS采用State Threads,支援高并發量,高性能。

       3、SRS支援rtmp和hls,滿足PC和移動直播要求。

       4、SRS支援叢集部署。小叢集Forward,大叢集edge。

代碼分析可分為兩個階段:

       一:分析代碼架構,理清楚組織流程

       二:分析代碼細節,熟悉SRS工作原理

二、代碼分析

相關SRS源碼其他總結:

       SRS(simple-rtmp-server)流媒體伺服器源碼分析--系統啟動

       SRS(simple-rtmp-server)流媒體伺服器源碼分析--RTMP消息play

       SRS(simple-rtmp-server)流媒體伺服器源碼分析--RTMP資訊Publish

           SRS(simple-rtmp-server)流媒體伺服器源碼分析--HLS切片

現階段,我主要以代碼架構梳理為主。Srs源碼架構如下圖:

SRS(simple-rtmp-server)流媒體伺服器源碼分析--系統啟動SRS(simple-rtmp-server)流媒體伺服器源碼分析--系統啟動

                 系統在啟動時,初始化相關類,監聽相關端口,若來一個通路請求,則為該連結建立一個線程,專門處理與該連結的操作。          main函數在srs_main_server.cpp這個檔案中。在main函數中,啟動參數在這裡不做過多介紹。直接從run()-> run_master()看起。

  1. int run_master()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) {
  5. return ret;
  6. }
  7. if ((ret = _srs_server->initialize_signal()) != ERROR_SUCCESS) {
  8. return ret;
  9. }
  10. //将pid程序号寫進檔案
  11. if ((ret = _srs_server->acquire_pid_file()) != ERROR_SUCCESS) {
  12. return ret;
  13. }
  14. //用戶端監聽
  15. if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
  16. return ret;
  17. }
  18. if ((ret = _srs_server->register_signal()) != ERROR_SUCCESS) {
  19. return ret;
  20. }
  21. if ((ret = _srs_server->http_handle()) != ERROR_SUCCESS) {
  22. return ret;
  23. }
  24. if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) {
  25. return ret;
  26. }
  27. if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) {
  28. return ret;
  29. }
  30. return 0;
  31. }

進入客戶監聽

  1. if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
  2. return ret;
  3. }

  監聽内容:  不同的連接配接請求,有不同的監聽 。

  1. int SrsServer::listen()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. // 建立一個rtmp的Streamlistener
  5. if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
  6. return ret;
  7. }
  8. if ((ret = listen_http_api()) != ERROR_SUCCESS) {
  9. return ret;
  10. }
  11. if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
  12. return ret;
  13. }
  14. if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
  15. return ret;
  16. }
  17. return ret;
  18. }

1、首先分析RTMP連接配接 

  1. int SrsServer::listen_rtmp()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. // stream service port.
  5. std::vector <std::string> ip_ports = _srs_config->get_listens();
  6. srs_assert((int)ip_ports.size() > 0);
  7. close_listeners(SrsListenerRtmpStream);
  8. for (int i = 0; i < (int)ip_ports.size(); i++) {
  9. SrsListener* listener = new SrsStreamListener( this, SrsListenerRtmpStream);
  10. listeners.push_back( listener);
  11. std::string ip;
  12. int port;
  13. srs_parse_endpoint( ip_ports[ i], ip, port);
  14. if (( ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
  15. srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
  16. return ret;
  17. }
  18. }
  19. return ret;
  20. }

         這裡是listen_rtmp()函數,你也可以去看看listen_http_api()函數、listen_http_stream()函數,其實結構都很相似,隻是在建立SrsStreamListener對象時,傳入了不同的參數SrsListenerRtmpStream、SrsListenerHttpApi、SrsListenerHttpStream,代表了不同類型的監聽對象。

  1. // listen_rtmp 中listen監聽走這裡了。
  2. int SrsStreamListener::listen(string i, int p)
  3. {
  4. int ret = ERROR_SUCCESS;
  5. ip = i;
  6. port = p;
  7. srs_freep(listener);
  8. listener = new SrsTcpListener(this, ip, port);
  9. if ((ret = listener->listen()) != ERROR_SUCCESS) {
  10. srs_error("tcp listen failed. ret=%d", ret);
  11. return ret;
  12. }
  13. srs_info("listen thread current_cid=%d, "
  14. "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
  15. _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
  16. srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
  17. return ret;
  18. }

注意,這裡有大量純虛函數,不要走錯路了。進入TCP監聽代碼

  1. // rtmp tcp監聽
  2. int SrsTcpListener::listen()
  3. {
  4. //C++ Socket程式設計
  5. int ret = ERROR_SUCCESS;
  6. // 1、建立套接字,流式Socket(SOCK_STREAM)
  7. if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
  8. ret = ERROR_SOCKET_CREATE;
  9. srs_error("create linux socket error. port=%d, ret=%d", port, ret);
  10. return ret;
  11. }
  12. srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);
  13. int reuse_socket = 1;
  14. if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
  15. ret = ERROR_SOCKET_SETREUSE;
  16. srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
  17. return ret;
  18. }
  19. srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd);
  20. sockaddr_in addr;
  21. addr.sin_family = AF_INET;
  22. addr.sin_port = htons(port);
  23. addr.sin_addr.s_addr = inet_addr(ip.c_str());
  24. // 2、綁定套接字到一個IP位址和一個端口上
  25. if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
  26. ret = ERROR_SOCKET_BIND;
  27. srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
  28. return ret;
  29. }
  30. srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
  31. // 3、将套接字設定為監聽模式等待連接配接請求
  32. if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
  33. ret = ERROR_SOCKET_LISTEN;
  34. srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
  35. return ret;
  36. }
  37. srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
  38. if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
  39. ret = ERROR_ST_OPEN_SOCKET;
  40. srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
  41. return ret;
  42. }
  43. srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
  44. // 4、等到連接配接一個客戶之後,開啟一個新的線程
  45. if ((ret = pthread->start()) != ERROR_SUCCESS) {
  46. srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
  47. return ret;
  48. }
  49. srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);
  50. return ret;
  51. }

       此代碼為C++ TCP  Socket代碼,思路比較清晰,可以看到,每接受到一個rtmp通路請求,建立一個”線程“,這裡暫時将其稱為線程,後面再做具體介紹。建立線程代碼如下:

  1. int SrsReusableThread::start()
  2. {
  3. return pthread->start();
  4. }
  1. int SrsThread::start()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. if(tid) {
  5. srs_info( "thread %s already running.", _name);
  6. return ret;
  7. }
  8. if((tid = st_thread_create(thread_fun, this, (_joinable? : ), )) == NULL){
  9. ret = ERROR_ST_CREATE_CYCLE_THREAD;
  10. srs_error( "st_thread_create failed. ret=%d", ret);
  11. return ret;
  12. }
  13. disposed = false;
  14. // we set to loop to true for thread to run.
  15. loop = true;
  16. // wait for cid to ready, for parent thread to get the cid.
  17. while (_cid < ) {
  18. st_usleep( * );
  19. }
  20. // now, cycle thread can run.
  21. can_run = true;
  22. return ret;
  23. }

       來到了st_thread_create,這裡要注意,這是SRS開源項目具有高并發,高性能的重要一步。這裡建立的是協程,不是線程。協程是有别于程序和線程的一種元件,具有程序的獨立性和線程的輕量級,聽說微信能夠支援8億使用者量,也是采用協程這種網絡服務架構:http://www.infoq.com/cn/articles/CplusStyleCorourtine-At-Wechat。

從這裡可以看出,srs是一個單線程的伺服器,采用協程,主持高并發,高性能。

建立協程,協程函數為:thread_fun()

  1. // 每連連結一個使用者,建立一個協程程,該函數為協程函數
  2. void* SrsThread::thread_fun(void* arg)
  3. {
  4. SrsThread* obj = (SrsThread*)arg;
  5. srs_assert(obj);
  6. // 進入線程循環
  7. obj->thread_cycle();
  8. // for valgrind to detect.
  9. SrsThreadContext* ctx = dynamic_cast <SrsThreadContext*>(_srs_context);
  10. if (ctx) {
  11. ctx->clear_cid();
  12. }
  13. st_thread_exit(NULL);
  14. return NULL;
  15. }

此時,真正進入了協程循環處理

  1. void SrsThread::thread_cycle()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. _srs_context->generate_id();
  5. srs_info("thread %s cycle start", _name);
  6. _cid = _srs_context->get_id();
  7. srs_assert(handler);
  8. handler->on_thread_start();
  9. // thread is running now.
  10. really_terminated = false;
  11. // wait for cid to ready, for parent thread to get the cid.
  12. while (!can_run && loop) {
  13. st_usleep(10 * 1000);
  14. }
  15. while (loop) {
  16. if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
  17. srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
  18. goto failed;
  19. }
  20. srs_info("thread %s on before cycle success", _name);
  21. // 注意純虛函數的應用
  22. if ((ret = handler->cycle()) != ERROR_SUCCESS) {
  23. if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
  24. srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
  25. }
  26. goto failed;
  27. }
  28. srs_info("thread %s cycle success", _name);
  29. if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
  30. srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);
  31. goto failed;
  32. }
  33. srs_info("thread %s on end cycle success", _name);
  34. failed:
  35. if (!loop) {
  36. break;
  37. }
  38. // to improve performance, donot sleep when interval is zero.
  39. // @see: https://github.com/ossrs/srs/issues/237
  40. if (cycle_interval_us != 0) {
  41. st_usleep(cycle_interval_us);
  42. }
  43. }
  44. // readly terminated now.
  45. really_terminated = true;
  46. handler->on_thread_stop();
  47. srs_info("thread %s cycle finished", _name);
  48. }

       至此,一定要熟悉C++純虛函數的引用,本人剛學了幾天C++,對虛函數和純虛函數在SRS源碼中的應用很不習慣!  好了,進入循環ret = handler->cycle()

  1. int SrsConnection::cycle()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. _srs_context->generate_id();
  5. id = _srs_context->get_id();
  6. ip = srs_get_peer_ip(st_netfd_fileno(stfd));
  7. //srs_trace("ip:%s", ip);
  8. ret = do_cycle();
  9. // if socket io error, set to closed.
  10. if (srs_is_client_gracefully_close(ret)) {
  11. ret = ERROR_SOCKET_CLOSED;
  12. }
  13. // success.
  14. if (ret == ERROR_SUCCESS) {
  15. srs_trace("client finished.");
  16. }
  17. // client close peer.
  18. if (ret == ERROR_SOCKET_CLOSED) {
  19. srs_warn("client disconnect peer. ret=%d", ret);
  20. }
  21. return ERROR_SUCCESS;
  22. }

進入ret=do_cycle();

  1. // TODO: return detail message when error for client.
  2. int SrsRtmpConn::do_cycle()
  3. {
  4. int ret = ERROR_SUCCESS;
  5. srs_trace( "RTMP client ip=%s", ip.c_str());
  6. rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
  7. rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
  8. if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
  9. srs_error( "rtmp handshake failed. ret=%d", ret);
  10. return ret;
  11. }
  12. srs_verbose( "rtmp handshake success");
  13. if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
  14. srs_error( "rtmp connect vhost/app failed. ret=%d", ret);
  15. return ret;
  16. }
  17. srs_verbose( "rtmp connect app success");
  18. // set client ip to request.
  19. req->ip = ip;
  20. // discovery vhost, resolve the vhost from config
  21. SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
  22. if (parsed_vhost) {
  23. req->vhost = parsed_vhost->arg0();
  24. }
  25. srs_info( "discovery app success. schema=%s, vhost=%s, port=%s, app=%s",
  26. req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str());
  27. if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {
  28. ret = ERROR_RTMP_REQ_TCURL;
  29. srs_error( "discovery tcUrl failed. "
  30. "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
  31. req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);
  32. return ret;
  33. }
  34. // check vhost
  35. if ((ret = check_vhost()) != ERROR_SUCCESS) {
  36. srs_error( "check vhost failed. ret=%d", ret);
  37. return ret;
  38. }
  39. srs_verbose( "check vhost success.");
  40. srs_trace( "connect app, "
  41. "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s",
  42. req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
  43. req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
  44. req->app.c_str(), (req->args? "(obj)": "null"));
  45. // show client identity
  46. if(req->args) {
  47. std:: string srs_version;
  48. std:: string srs_server_ip;
  49. int srs_pid = ;
  50. int srs_id = ;
  51. SrsAmf0Any* prop = NULL;
  52. if ((prop = req->args->ensure_property_string( "srs_version")) != NULL) {
  53. srs_version = prop->to_str();
  54. }
  55. if ((prop = req->args->ensure_property_string( "srs_server_ip")) != NULL) {
  56. srs_server_ip = prop->to_str();
  57. }
  58. if ((prop = req->args->ensure_property_number( "srs_pid")) != NULL) {
  59. srs_pid = ( int)prop->to_number();
  60. }
  61. if ((prop = req->args->ensure_property_number( "srs_id")) != NULL) {
  62. srs_id = ( int)prop->to_number();
  63. }
  64. srs_info( "edge-srs ip=%s, version=%s, pid=%d, id=%d",
  65. srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
  66. if (srs_pid > ) {
  67. srs_trace( "edge-srs ip=%s, version=%s, pid=%d, id=%d",
  68. srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
  69. }
  70. }
  71. ret = service_cycle();
  72. http_hooks_on_close();
  73. return ret;
  74. }

行了,rtmp連接配接就到這裡,要不然都快到rtmp流接受代碼了,和系統啟動越走越遠了,rtmp流接受後面再分析。

2、再分析http-api連接配接,回到int SrsServer::listen()函數中,梳理http-api連結

  1. int SrsServer::listen_http_api()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. #ifdef SRS_AUTO_HTTP_API
  5. close_listeners(SrsListenerHttpApi);
  6. if (_srs_config->get_http_api_enabled()) {
  7. SrsListener* listener = new SrsStreamListener(this, SrsListenerHttpApi);
  8. listeners.push_back(listener);
  9. std::string ep = _srs_config->get_http_api_listen();
  10. std::string ip;
  11. int port;
  12. srs_parse_endpoint(ep, ip, port);
  13. if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
  14. srs_error("HTTP api listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
  15. return ret;
  16. }
  17. }
  18. #endif
  19. return ret;
  20. }

listen_http_api()函數和listen_rtmp()函數内容非常像,再走到listener->listen()裡面看看,結果來到了

  1. int SrsStreamListener::listen(string i, int p)
  2. {
  3. int ret = ERROR_SUCCESS;
  4. ip = i;
  5. port = p;
  6. srs_freep(listener);
  7. listener = new SrsTcpListener(this, ip, port);
  8. if ((ret = listener->listen()) != ERROR_SUCCESS) {
  9. srs_error("tcp listen failed. ret=%d", ret);
  10. return ret;
  11. }
  12. srs_info("listen thread current_cid=%d, "
  13. "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
  14. _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
  15. srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
  16. return ret;
  17. }

和rtmp連結監聽機制完全一樣,隻是type不同而已

  1. enum SrsListenerType
  2. {
  3. // RTMP client,
  4. SrsListenerRtmpStream = 0,
  5. // HTTP api,
  6. SrsListenerHttpApi = 1,
  7. // HTTP stream, HDS/HLS/DASH
  8. SrsListenerHttpStream = 2,
  9. // UDP stream, MPEG-TS over udp.
  10. SrsListenerMpegTsOverUdp = 3,
  11. // TCP stream, RTSP stream.
  12. SrsListenerRtsp = 4,
  13. // TCP stream, FLV stream over HTTP.
  14. SrsListenerFlv = 5,
  15. };

我就看了兩個連結監聽,監聽到此為止。

3、http api回調注冊

回到run_master()函數中,從_srs_server->http_handle()看起。

  1. int SrsServer::http_handle()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. #ifdef SRS_AUTO_HTTP_API
  5. srs_assert(http_api_mux);
  6. if ((ret = http_api_mux->handle("/", new SrsHttpNotFoundHandler())) != ERROR_SUCCESS) {
  7. return ret;
  8. }
  9. if ((ret = http_api_mux->handle("/api/", new SrsGoApiApi())) != ERROR_SUCCESS) {
  10. return ret;
  11. }
  12. if ((ret = http_api_mux->handle("/api/v1/", new SrsGoApiV1())) != ERROR_SUCCESS) {
  13. return ret;
  14. }
  15. if ((ret = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != ERROR_SUCCESS) {
  16. return ret;
  17. }
  18. if ((ret = http_api_mux->handle("/api/v1/summaries", new SrsGoApiSummaries())) != ERROR_SUCCESS) {
  19. return ret;
  20. }
  21. if ((ret = http_api_mux->handle("/api/v1/rusages", new SrsGoApiRusages())) != ERROR_SUCCESS) {
  22. return ret;
  23. }
  24. if ((ret = http_api_mux->handle("/api/v1/self_proc_stats", new SrsGoApiSelfProcStats())) != ERROR_SUCCESS) {
  25. return ret;
  26. }
  27. if ((ret = http_api_mux->handle("/api/v1/system_proc_stats", new SrsGoApiSystemProcStats())) != ERROR_SUCCESS) {
  28. return ret;
  29. }
  30. if ((ret = http_api_mux->handle("/api/v1/meminfos", new SrsGoApiMemInfos())) != ERROR_SUCCESS) {
  31. return ret;
  32. }
  33. if ((ret = http_api_mux->handle("/api/v1/authors", new SrsGoApiAuthors())) != ERROR_SUCCESS) {
  34. return ret;
  35. }
  36. if ((ret = http_api_mux->handle("/api/v1/features", new SrsGoApiFeatures())) != ERROR_SUCCESS) {
  37. return ret;
  38. }
  39. if ((ret = http_api_mux->handle("/api/v1/vhosts/", new SrsGoApiVhosts())) != ERROR_SUCCESS) {
  40. return ret;
  41. }
  42. if ((ret = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != ERROR_SUCCESS) {
  43. return ret;
  44. }
  45. if ((ret = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != ERROR_SUCCESS) {
  46. return ret;
  47. }
  48. // test the request info.
  49. if ((ret = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != ERROR_SUCCESS) {
  50. return ret;
  51. }
  52. // test the error code response.
  53. if ((ret = http_api_mux->handle("/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {
  54. return ret;
  55. }
  56. // test the redirect mechenism.
  57. if ((ret = http_api_mux->handle("/api/v1/tests/redirects", new SrsHttpRedirectHandler("/api/v1/tests/errors", SRS_CONSTS_HTTP_MovedPermanently))) != ERROR_SUCCESS) {
  58. return ret;
  59. }
  60. // test the http vhost.
  61. if ((ret = http_api_mux->handle("error.srs.com/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {
  62. return ret;
  63. }
  64. // TODO: FIXME: for console.
  65. // TODO: FIXME: support reload.
  66. std::string dir = _srs_config->get_http_stream_dir() + "/console";
  67. if ((ret = http_api_mux->handle("/console/", new SrsHttpFileServer(dir))) != ERROR_SUCCESS) {
  68. srs_error("http: mount console dir=%s failed. ret=%d", dir.c_str(), ret);
  69. return ret;
  70. }
  71. srs_trace("http: api mount /console to %s", dir.c_str());
  72. #endif
  73. return ret;
  74. }

該函數注冊了http-api回調接口。可以參考:https://github.com/ossrs/srs/wiki/v2_CN_HTTPApi

比如我們可以通路http://ip:1985/api/v1  其中ip為SRS伺服器位址,就可以看到從該接口傳回srs伺服器參數。

4、ingest(拉流,SRS主動去拉流,和推流相反)處理

注意:SRS對拉流的處理比較特殊,SRS拉流是通過ffmpeg工具去實作的,SRS代碼隻是實作簡單的系統調用,這部分内容在後面的章節中詳細說明。

  1. int SrsIngester::start()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. if ((ret = parse()) != ERROR_SUCCESS) {
  5. clear_engines();
  6. ret = ERROR_SUCCESS;
  7. return ret;
  8. }
  9. // even no ingesters, we must also start it,
  10. // for the reload may add more ingesters.
  11. // start thread to run all encoding engines.
  12. if ((ret = pthread->start()) != ERROR_SUCCESS) {
  13. srs_error("st_thread_create failed. ret=%d", ret);
  14. return ret;
  15. }
  16. srs_trace("ingest thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());
  17. return ret;
  18. }

到此,可以看出,和監聽過程一樣,進入int SrsThread::start()函數,隻是傳入對象不一樣而已。

5、SRS自服務

  1. int SrsServer::cycle()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. srs_trace("SrsServer")
  5. ret = do_cycle();
  6. #ifdef SRS_AUTO_GPERF_MC
  7. destroy();
  8. // remark, for gmc, never invoke the exit().
  9. srs_warn("sleep a long time for system st-threads to cleanup.");
  10. st_usleep(3 * 1000 * 1000);
  11. srs_warn("system quit");
  12. #else
  13. // normally quit with neccessary cleanup by dispose().
  14. srs_warn("main cycle terminated, system quit normally.");
  15. dispose();
  16. srs_trace("srs terminated");
  17. // for valgrind to detect.
  18. srs_freep(_srs_config);
  19. srs_freep(_srs_log);
  20. exit(0);
  21. #endif
  22. return ret;
  23. }
  1. int SrsServer::do_cycle()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. // find the max loop
  5. int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);
  6. #ifdef SRS_AUTO_STAT
  7. max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES);
  8. max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES);
  9. max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES);
  10. max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);
  11. max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);
  12. max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);
  13. max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);
  14. #endif
  15. // for asprocess.
  16. bool asprocess = _srs_config->get_asprocess();
  17. // the deamon thread, update the time cache
  18. while (true) {
  19. if(handler && (ret = handler->on_cycle((int)conns.size())) != ERROR_SUCCESS){
  20. srs_error("cycle handle failed. ret=%d", ret);
  21. return ret;
  22. }
  23. // the interval in config.
  24. int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL);
  25. // dynamic fetch the max.
  26. int temp_max = max;
  27. temp_max = srs_max(temp_max, heartbeat_max_resolution);
  28. for (int i = 0; i < temp_max; i++) {
  29. st_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);
  30. // asprocess check.
  31. if (asprocess && ::getppid() != ppid) {
  32. srs_warn("asprocess ppid changed from %d to %d", ppid, ::getppid());
  33. return ret;
  34. }
  35. // gracefully quit for SIGINT or SIGTERM.
  36. if (signal_gracefully_quit) {
  37. srs_trace("cleanup for gracefully terminate.");
  38. return ret;
  39. }
  40. // for gperf heap checker,
  41. // @see: research/gperftools/heap-checker/heap_checker.cc
  42. // if user interrupt the program, exit to check mem leak.
  43. // but, if gperf, use reload to ensure main return normally,
  44. // because directly exit will cause core-dump.
  45. #ifdef SRS_AUTO_GPERF_MC
  46. if (signal_gmc_stop) {
  47. srs_warn("gmc got singal to stop server.");
  48. return ret;
  49. }
  50. #endif
  51. // do reload the config.
  52. if (signal_reload) {
  53. signal_reload = false;
  54. srs_info("get signal reload, to reload the config.");
  55. if ((ret = _srs_config->reload()) != ERROR_SUCCESS) {
  56. srs_error("reload config failed. ret=%d", ret);
  57. return ret;
  58. }
  59. srs_trace("reload config success.");
  60. }
  61. // notice the stream sources to cycle.
  62. if ((ret = SrsSource::cycle_all()) != ERROR_SUCCESS) {
  63. return ret;
  64. }
  65. // update the cache time
  66. if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {
  67. srs_info("update current time cache.");
  68. srs_update_system_time_ms();
  69. }
  70. #ifdef SRS_AUTO_STAT
  71. if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {
  72. srs_info("update resource info, rss.");
  73. srs_update_system_rusage();
  74. }
  75. if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {
  76. srs_info("update cpu info, cpu usage.");
  77. srs_update_proc_stat();
  78. }
  79. if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) {
  80. srs_info("update disk info, disk iops.");
  81. srs_update_disk_stat();
  82. }
  83. if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {
  84. srs_info("update memory info, usage/free.");
  85. srs_update_meminfo();
  86. }
  87. if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {
  88. srs_info("update platform info, uptime/load.");
  89. srs_update_platform_info();
  90. }
  91. if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {
  92. srs_info("update network devices info.");
  93. srs_update_network_devices();
  94. }
  95. if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {
  96. srs_info("update network server kbps info.");
  97. resample_kbps();
  98. }
  99. #ifdef SRS_AUTO_HTTP_CORE
  100. if (_srs_config->get_heartbeat_enabled()) {
  101. if ((i % heartbeat_max_resolution) == 0) {
  102. srs_info("do http heartbeat, for internal server to report.");
  103. http_heartbeat->heartbeat();
  104. }
  105. }
  106. #endif
  107. #endif
  108. srs_info("server main thread loop");
  109. }
  110. }
  111. return ret;
  112. }

主線程,更新srs時間和緩存!!至此,系統啟動代碼結構梳理完了。

三、總結

  • 啟動不同的業務。
  • 監聽不同的用戶端類型。
  • 每連結一個用戶端,SRS為其建立一個協程,專門負責該路連結資訊互動。
  • SRS系統采用了協程網絡服務架構,使得系統具有高并發,高性能等有點。

繼續閱讀