轉載自 https://blog.csdn.net/ManagerUser/article/details/73840130
SRS(simple-rtmp-server)流媒體伺服器源碼分析--系統啟動
一、前言
小卒最近看SRS源碼,随手寫下部落格,其一為了整理思路,其二也是為日後翻看友善。如果不足之處,請指教!
首先總結一下SRS源碼的優點:
1、輕量級,代碼結構清楚,目前SRS3.0代碼8萬行左右,但幾乎滿足直播業務的所有要求。
2、SRS采用State Threads,支援高并發量,高性能。
3、SRS支援rtmp和hls,滿足PC和移動直播要求。
4、SRS支援叢集部署。小叢集Forward,大叢集edge。
代碼分析可分為兩個階段:
一:分析代碼架構,理清楚組織流程
二:分析代碼細節,熟悉SRS工作原理
二、代碼分析
相關SRS源碼其他總結:
SRS(simple-rtmp-server)流媒體伺服器源碼分析--系統啟動
SRS(simple-rtmp-server)流媒體伺服器源碼分析--RTMP消息play
SRS(simple-rtmp-server)流媒體伺服器源碼分析--RTMP資訊Publish
SRS(simple-rtmp-server)流媒體伺服器源碼分析--HLS切片
現階段,我主要以代碼架構梳理為主。Srs源碼架構如下圖:
系統在啟動時,初始化相關類,監聽相關端口,若來一個通路請求,則為該連結建立一個線程,專門處理與該連結的操作。 main函數在srs_main_server.cpp這個檔案中。在main函數中,啟動參數在這裡不做過多介紹。直接從run()-> run_master()看起。
- int run_master()
- {
- int ret = ERROR_SUCCESS;
- if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = _srs_server->initialize_signal()) != ERROR_SUCCESS) {
- return ret;
- }
- //将pid程序号寫進檔案
- if ((ret = _srs_server->acquire_pid_file()) != ERROR_SUCCESS) {
- return ret;
- }
- //用戶端監聽
- if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = _srs_server->register_signal()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = _srs_server->http_handle()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) {
- return ret;
- }
- return 0;
- }
進入客戶監聽
- if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
- return ret;
- }
監聽内容: 不同的連接配接請求,有不同的監聽 。
- int SrsServer::listen()
- {
- int ret = ERROR_SUCCESS;
- // 建立一個rtmp的Streamlistener
- if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = listen_http_api()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
- return ret;
- }
- return ret;
- }
1、首先分析RTMP連接配接
- int SrsServer::listen_rtmp()
- {
- int ret = ERROR_SUCCESS;
- // stream service port.
- std::vector <std::string> ip_ports = _srs_config->get_listens();
- srs_assert((int)ip_ports.size() > 0);
- close_listeners(SrsListenerRtmpStream);
- for (int i = 0; i < (int)ip_ports.size(); i++) {
- SrsListener* listener = new SrsStreamListener( this, SrsListenerRtmpStream);
- listeners.push_back( listener);
- std::string ip;
- int port;
- srs_parse_endpoint( ip_ports[ i], ip, port);
- if (( ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
- srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- }
- return ret;
- }
這裡是listen_rtmp()函數,你也可以去看看listen_http_api()函數、listen_http_stream()函數,其實結構都很相似,隻是在建立SrsStreamListener對象時,傳入了不同的參數SrsListenerRtmpStream、SrsListenerHttpApi、SrsListenerHttpStream,代表了不同類型的監聽對象。
- // listen_rtmp 中listen監聽走這裡了。
- int SrsStreamListener::listen(string i, int p)
- {
- int ret = ERROR_SUCCESS;
- ip = i;
- port = p;
- srs_freep(listener);
- listener = new SrsTcpListener(this, ip, port);
- if ((ret = listener->listen()) != ERROR_SUCCESS) {
- srs_error("tcp listen failed. ret=%d", ret);
- return ret;
- }
- srs_info("listen thread current_cid=%d, "
- "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
- _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
- srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
- return ret;
- }
注意,這裡有大量純虛函數,不要走錯路了。進入TCP監聽代碼
- // rtmp tcp監聽
- int SrsTcpListener::listen()
- {
- //C++ Socket程式設計
- int ret = ERROR_SUCCESS;
- // 1、建立套接字,流式Socket(SOCK_STREAM)
- if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
- ret = ERROR_SOCKET_CREATE;
- srs_error("create linux socket error. port=%d, ret=%d", port, ret);
- return ret;
- }
- srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);
- int reuse_socket = 1;
- if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
- ret = ERROR_SOCKET_SETREUSE;
- srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
- return ret;
- }
- srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd);
- sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
- addr.sin_addr.s_addr = inet_addr(ip.c_str());
- // 2、綁定套接字到一個IP位址和一個端口上
- if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
- ret = ERROR_SOCKET_BIND;
- srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
- // 3、将套接字設定為監聽模式等待連接配接請求
- if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
- ret = ERROR_SOCKET_LISTEN;
- srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
- if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
- ret = ERROR_ST_OPEN_SOCKET;
- srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
- // 4、等到連接配接一個客戶之後,開啟一個新的線程
- if ((ret = pthread->start()) != ERROR_SUCCESS) {
- srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);
- return ret;
- }
此代碼為C++ TCP Socket代碼,思路比較清晰,可以看到,每接受到一個rtmp通路請求,建立一個”線程“,這裡暫時将其稱為線程,後面再做具體介紹。建立線程代碼如下:
- int SrsReusableThread::start()
- {
- return pthread->start();
- }
- int SrsThread::start()
- {
- int ret = ERROR_SUCCESS;
- if(tid) {
- srs_info( "thread %s already running.", _name);
- return ret;
- }
- if((tid = st_thread_create(thread_fun, this, (_joinable? : ), )) == NULL){
- ret = ERROR_ST_CREATE_CYCLE_THREAD;
- srs_error( "st_thread_create failed. ret=%d", ret);
- return ret;
- }
- disposed = false;
- // we set to loop to true for thread to run.
- loop = true;
- // wait for cid to ready, for parent thread to get the cid.
- while (_cid < ) {
- st_usleep( * );
- }
- // now, cycle thread can run.
- can_run = true;
- return ret;
- }
來到了st_thread_create,這裡要注意,這是SRS開源項目具有高并發,高性能的重要一步。這裡建立的是協程,不是線程。協程是有别于程序和線程的一種元件,具有程序的獨立性和線程的輕量級,聽說微信能夠支援8億使用者量,也是采用協程這種網絡服務架構:http://www.infoq.com/cn/articles/CplusStyleCorourtine-At-Wechat。
從這裡可以看出,srs是一個單線程的伺服器,采用協程,主持高并發,高性能。
建立協程,協程函數為:thread_fun()
- // 每連連結一個使用者,建立一個協程程,該函數為協程函數
- void* SrsThread::thread_fun(void* arg)
- {
- SrsThread* obj = (SrsThread*)arg;
- srs_assert(obj);
- // 進入線程循環
- obj->thread_cycle();
- // for valgrind to detect.
- SrsThreadContext* ctx = dynamic_cast <SrsThreadContext*>(_srs_context);
- if (ctx) {
- ctx->clear_cid();
- }
- st_thread_exit(NULL);
- return NULL;
- }
此時,真正進入了協程循環處理
- void SrsThread::thread_cycle()
- {
- int ret = ERROR_SUCCESS;
- _srs_context->generate_id();
- srs_info("thread %s cycle start", _name);
- _cid = _srs_context->get_id();
- srs_assert(handler);
- handler->on_thread_start();
- // thread is running now.
- really_terminated = false;
- // wait for cid to ready, for parent thread to get the cid.
- while (!can_run && loop) {
- st_usleep(10 * 1000);
- }
- while (loop) {
- if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
- srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
- goto failed;
- }
- srs_info("thread %s on before cycle success", _name);
- // 注意純虛函數的應用
- if ((ret = handler->cycle()) != ERROR_SUCCESS) {
- if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
- srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
- }
- goto failed;
- }
- srs_info("thread %s cycle success", _name);
- if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
- srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);
- goto failed;
- }
- srs_info("thread %s on end cycle success", _name);
- failed:
- if (!loop) {
- break;
- }
- // to improve performance, donot sleep when interval is zero.
- // @see: https://github.com/ossrs/srs/issues/237
- if (cycle_interval_us != 0) {
- st_usleep(cycle_interval_us);
- }
- }
- // readly terminated now.
- really_terminated = true;
- handler->on_thread_stop();
- srs_info("thread %s cycle finished", _name);
- }
至此,一定要熟悉C++純虛函數的引用,本人剛學了幾天C++,對虛函數和純虛函數在SRS源碼中的應用很不習慣! 好了,進入循環ret = handler->cycle()
- int SrsConnection::cycle()
- {
- int ret = ERROR_SUCCESS;
- _srs_context->generate_id();
- id = _srs_context->get_id();
- ip = srs_get_peer_ip(st_netfd_fileno(stfd));
- //srs_trace("ip:%s", ip);
- ret = do_cycle();
- // if socket io error, set to closed.
- if (srs_is_client_gracefully_close(ret)) {
- ret = ERROR_SOCKET_CLOSED;
- }
- // success.
- if (ret == ERROR_SUCCESS) {
- srs_trace("client finished.");
- }
- // client close peer.
- if (ret == ERROR_SOCKET_CLOSED) {
- srs_warn("client disconnect peer. ret=%d", ret);
- }
- return ERROR_SUCCESS;
- }
進入ret=do_cycle();
- // TODO: return detail message when error for client.
- int SrsRtmpConn::do_cycle()
- {
- int ret = ERROR_SUCCESS;
- srs_trace( "RTMP client ip=%s", ip.c_str());
- rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
- rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
- if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
- srs_error( "rtmp handshake failed. ret=%d", ret);
- return ret;
- }
- srs_verbose( "rtmp handshake success");
- if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
- srs_error( "rtmp connect vhost/app failed. ret=%d", ret);
- return ret;
- }
- srs_verbose( "rtmp connect app success");
- // set client ip to request.
- req->ip = ip;
- // discovery vhost, resolve the vhost from config
- SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
- if (parsed_vhost) {
- req->vhost = parsed_vhost->arg0();
- }
- srs_info( "discovery app success. schema=%s, vhost=%s, port=%s, app=%s",
- req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str());
- if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {
- ret = ERROR_RTMP_REQ_TCURL;
- srs_error( "discovery tcUrl failed. "
- "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
- req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);
- return ret;
- }
- // check vhost
- if ((ret = check_vhost()) != ERROR_SUCCESS) {
- srs_error( "check vhost failed. ret=%d", ret);
- return ret;
- }
- srs_verbose( "check vhost success.");
- srs_trace( "connect app, "
- "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s",
- req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
- req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
- req->app.c_str(), (req->args? "(obj)": "null"));
- // show client identity
- if(req->args) {
- std:: string srs_version;
- std:: string srs_server_ip;
- int srs_pid = ;
- int srs_id = ;
- SrsAmf0Any* prop = NULL;
- if ((prop = req->args->ensure_property_string( "srs_version")) != NULL) {
- srs_version = prop->to_str();
- }
- if ((prop = req->args->ensure_property_string( "srs_server_ip")) != NULL) {
- srs_server_ip = prop->to_str();
- }
- if ((prop = req->args->ensure_property_number( "srs_pid")) != NULL) {
- srs_pid = ( int)prop->to_number();
- }
- if ((prop = req->args->ensure_property_number( "srs_id")) != NULL) {
- srs_id = ( int)prop->to_number();
- }
- srs_info( "edge-srs ip=%s, version=%s, pid=%d, id=%d",
- srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
- if (srs_pid > ) {
- srs_trace( "edge-srs ip=%s, version=%s, pid=%d, id=%d",
- srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
- }
- }
- ret = service_cycle();
- http_hooks_on_close();
- return ret;
- }
行了,rtmp連接配接就到這裡,要不然都快到rtmp流接受代碼了,和系統啟動越走越遠了,rtmp流接受後面再分析。
2、再分析http-api連接配接,回到int SrsServer::listen()函數中,梳理http-api連結
- int SrsServer::listen_http_api()
- {
- int ret = ERROR_SUCCESS;
- #ifdef SRS_AUTO_HTTP_API
- close_listeners(SrsListenerHttpApi);
- if (_srs_config->get_http_api_enabled()) {
- SrsListener* listener = new SrsStreamListener(this, SrsListenerHttpApi);
- listeners.push_back(listener);
- std::string ep = _srs_config->get_http_api_listen();
- std::string ip;
- int port;
- srs_parse_endpoint(ep, ip, port);
- if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
- srs_error("HTTP api listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- }
- #endif
- return ret;
- }
listen_http_api()函數和listen_rtmp()函數内容非常像,再走到listener->listen()裡面看看,結果來到了
- int SrsStreamListener::listen(string i, int p)
- {
- int ret = ERROR_SUCCESS;
- ip = i;
- port = p;
- srs_freep(listener);
- listener = new SrsTcpListener(this, ip, port);
- if ((ret = listener->listen()) != ERROR_SUCCESS) {
- srs_error("tcp listen failed. ret=%d", ret);
- return ret;
- }
- srs_info("listen thread current_cid=%d, "
- "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
- _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
- srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
- return ret;
- }
和rtmp連結監聽機制完全一樣,隻是type不同而已
- enum SrsListenerType
- {
- // RTMP client,
- SrsListenerRtmpStream = 0,
- // HTTP api,
- SrsListenerHttpApi = 1,
- // HTTP stream, HDS/HLS/DASH
- SrsListenerHttpStream = 2,
- // UDP stream, MPEG-TS over udp.
- SrsListenerMpegTsOverUdp = 3,
- // TCP stream, RTSP stream.
- SrsListenerRtsp = 4,
- // TCP stream, FLV stream over HTTP.
- SrsListenerFlv = 5,
- };
我就看了兩個連結監聽,監聽到此為止。
3、http api回調注冊
回到run_master()函數中,從_srs_server->http_handle()看起。
- int SrsServer::http_handle()
- {
- int ret = ERROR_SUCCESS;
- #ifdef SRS_AUTO_HTTP_API
- srs_assert(http_api_mux);
- if ((ret = http_api_mux->handle("/", new SrsHttpNotFoundHandler())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/", new SrsGoApiApi())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/", new SrsGoApiV1())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/summaries", new SrsGoApiSummaries())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/rusages", new SrsGoApiRusages())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/self_proc_stats", new SrsGoApiSelfProcStats())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/system_proc_stats", new SrsGoApiSystemProcStats())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/meminfos", new SrsGoApiMemInfos())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/authors", new SrsGoApiAuthors())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/features", new SrsGoApiFeatures())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/vhosts/", new SrsGoApiVhosts())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != ERROR_SUCCESS) {
- return ret;
- }
- // test the request info.
- if ((ret = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != ERROR_SUCCESS) {
- return ret;
- }
- // test the error code response.
- if ((ret = http_api_mux->handle("/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {
- return ret;
- }
- // test the redirect mechenism.
- if ((ret = http_api_mux->handle("/api/v1/tests/redirects", new SrsHttpRedirectHandler("/api/v1/tests/errors", SRS_CONSTS_HTTP_MovedPermanently))) != ERROR_SUCCESS) {
- return ret;
- }
- // test the http vhost.
- if ((ret = http_api_mux->handle("error.srs.com/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {
- return ret;
- }
- // TODO: FIXME: for console.
- // TODO: FIXME: support reload.
- std::string dir = _srs_config->get_http_stream_dir() + "/console";
- if ((ret = http_api_mux->handle("/console/", new SrsHttpFileServer(dir))) != ERROR_SUCCESS) {
- srs_error("http: mount console dir=%s failed. ret=%d", dir.c_str(), ret);
- return ret;
- }
- srs_trace("http: api mount /console to %s", dir.c_str());
- #endif
- return ret;
- }
該函數注冊了http-api回調接口。可以參考:https://github.com/ossrs/srs/wiki/v2_CN_HTTPApi
比如我們可以通路http://ip:1985/api/v1 其中ip為SRS伺服器位址,就可以看到從該接口傳回srs伺服器參數。
4、ingest(拉流,SRS主動去拉流,和推流相反)處理
注意:SRS對拉流的處理比較特殊,SRS拉流是通過ffmpeg工具去實作的,SRS代碼隻是實作簡單的系統調用,這部分内容在後面的章節中詳細說明。
- int SrsIngester::start()
- {
- int ret = ERROR_SUCCESS;
- if ((ret = parse()) != ERROR_SUCCESS) {
- clear_engines();
- ret = ERROR_SUCCESS;
- return ret;
- }
- // even no ingesters, we must also start it,
- // for the reload may add more ingesters.
- // start thread to run all encoding engines.
- if ((ret = pthread->start()) != ERROR_SUCCESS) {
- srs_error("st_thread_create failed. ret=%d", ret);
- return ret;
- }
- srs_trace("ingest thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());
- return ret;
- }
到此,可以看出,和監聽過程一樣,進入int SrsThread::start()函數,隻是傳入對象不一樣而已。
5、SRS自服務
- int SrsServer::cycle()
- {
- int ret = ERROR_SUCCESS;
- srs_trace("SrsServer")
- ret = do_cycle();
- #ifdef SRS_AUTO_GPERF_MC
- destroy();
- // remark, for gmc, never invoke the exit().
- srs_warn("sleep a long time for system st-threads to cleanup.");
- st_usleep(3 * 1000 * 1000);
- srs_warn("system quit");
- #else
- // normally quit with neccessary cleanup by dispose().
- srs_warn("main cycle terminated, system quit normally.");
- dispose();
- srs_trace("srs terminated");
- // for valgrind to detect.
- srs_freep(_srs_config);
- srs_freep(_srs_log);
- exit(0);
- #endif
- return ret;
- }
- int SrsServer::do_cycle()
- {
- int ret = ERROR_SUCCESS;
- // find the max loop
- int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);
- #ifdef SRS_AUTO_STAT
- max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES);
- max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES);
- max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES);
- max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);
- max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);
- max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);
- max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);
- #endif
- // for asprocess.
- bool asprocess = _srs_config->get_asprocess();
- // the deamon thread, update the time cache
- while (true) {
- if(handler && (ret = handler->on_cycle((int)conns.size())) != ERROR_SUCCESS){
- srs_error("cycle handle failed. ret=%d", ret);
- return ret;
- }
- // the interval in config.
- int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL);
- // dynamic fetch the max.
- int temp_max = max;
- temp_max = srs_max(temp_max, heartbeat_max_resolution);
- for (int i = 0; i < temp_max; i++) {
- st_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);
- // asprocess check.
- if (asprocess && ::getppid() != ppid) {
- srs_warn("asprocess ppid changed from %d to %d", ppid, ::getppid());
- return ret;
- }
- // gracefully quit for SIGINT or SIGTERM.
- if (signal_gracefully_quit) {
- srs_trace("cleanup for gracefully terminate.");
- return ret;
- }
- // for gperf heap checker,
- // @see: research/gperftools/heap-checker/heap_checker.cc
- // if user interrupt the program, exit to check mem leak.
- // but, if gperf, use reload to ensure main return normally,
- // because directly exit will cause core-dump.
- #ifdef SRS_AUTO_GPERF_MC
- if (signal_gmc_stop) {
- srs_warn("gmc got singal to stop server.");
- return ret;
- }
- #endif
- // do reload the config.
- if (signal_reload) {
- signal_reload = false;
- srs_info("get signal reload, to reload the config.");
- if ((ret = _srs_config->reload()) != ERROR_SUCCESS) {
- srs_error("reload config failed. ret=%d", ret);
- return ret;
- }
- srs_trace("reload config success.");
- }
- // notice the stream sources to cycle.
- if ((ret = SrsSource::cycle_all()) != ERROR_SUCCESS) {
- return ret;
- }
- // update the cache time
- if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {
- srs_info("update current time cache.");
- srs_update_system_time_ms();
- }
- #ifdef SRS_AUTO_STAT
- if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {
- srs_info("update resource info, rss.");
- srs_update_system_rusage();
- }
- if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {
- srs_info("update cpu info, cpu usage.");
- srs_update_proc_stat();
- }
- if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) {
- srs_info("update disk info, disk iops.");
- srs_update_disk_stat();
- }
- if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {
- srs_info("update memory info, usage/free.");
- srs_update_meminfo();
- }
- if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {
- srs_info("update platform info, uptime/load.");
- srs_update_platform_info();
- }
- if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {
- srs_info("update network devices info.");
- srs_update_network_devices();
- }
- if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {
- srs_info("update network server kbps info.");
- resample_kbps();
- }
- #ifdef SRS_AUTO_HTTP_CORE
- if (_srs_config->get_heartbeat_enabled()) {
- if ((i % heartbeat_max_resolution) == 0) {
- srs_info("do http heartbeat, for internal server to report.");
- http_heartbeat->heartbeat();
- }
- }
- #endif
- #endif
- srs_info("server main thread loop");
- }
- }
- return ret;
- }
主線程,更新srs時間和緩存!!至此,系統啟動代碼結構梳理完了。
三、總結
- 啟動不同的業務。
- 監聽不同的用戶端類型。
- 每連結一個用戶端,SRS為其建立一個協程,專門負責該路連結資訊互動。
- SRS系統采用了協程網絡服務架構,使得系統具有高并發,高性能等有點。