天天看點

MJPG-streamer源碼分析-輸出部分

MJPG-streamer可以建立多個輸出,簡單的說,根據主函數中輸入的參數解析的結果,确定輸出通道的個數,至少保證有一個輸出通道在程式運作時存在。從參數解析結果确定每個輸出通道的參數,并以這些參數為每個通道建立發送線程。在每個發送線程上,不斷偵聽是否有連接配接請求。每當有連接配接請求,在未達到最高連接配接數目時,為每個連接配接請求建立連接配接線程。在連接配接線程中,根據參數,确實發送方式是stream?snapshot?或者是其他方式。在連接配接請求不關閉時,連接配接線程一直存在,連接配接請求退出時,線程随之退出,并有系統自動釋放其所配置設定的資源。當程式運作終止信号stop不為1時,輸出通道線程不斷偵聽連接配接和發送資料,當終止信号被置1時,退出輸出線程,同時釋放其所配置設定的資源。

輸出通道的初始化部分僅僅是将輸出通道的參數分别解析和配置設定到各自通道指定的變量中,為後續每個通道的run函數提供參數。其餘的,都是run函數所執行部分,包括其執行的子函數和操作等。

需要說明的是,由于有多個輸出通道,故而需要對每個通道都進行初始化和執行輸出,這個過程通過一個循環體完成,循環體的控制變量就是輸出通道的個數。

本文根據上述思路,把關鍵部分源碼進行解釋,具體的還得參考源碼,如有錯誤,歡迎指出以便改正。

===============================================初始化部分====================================================

1、定義和初始化部分參數,這些參數用于在對輸出通道參數解析過程中的臨時變量;

char *argv[MAX_ARGUMENTS]={NULL};
  int  argc=1, i;
  int  port;
  char *credentials, *www_folder;
  char nocommands;

  port = htons(8080);
  credentials = NULL;
  www_folder = NULL;
  nocommands = 0;
  argv[0] = OUTPUT_PLUGIN_NAME;
           

2、【解析參數】

将傳入的單串參數param->parameter_string轉換為字元串數組,存放在argv數組中,調用c = getopt_long_only(argc, argv, "", long_options, &option_index),當c==-1時,說明解析完畢,退出解析過程,否則根據option_index參數設定port,credentials,www_folder,nocommands等參數

這部分的代碼與前面主程式和輸入通道程式的解析過程類似,不再詳述,都是把對應參數填寫到指定的變量上。

3、根據param->id配置每個伺服器線程參數(servers全局)

程式為每個輸出通道都定義了一個對應的參數,用于配置和存儲每個輸出通道的資訊。其結構體如下所示:

/* context of each server thread */
typedef struct {
  int sd;
  int id;
  globals *pglobal;
  pthread_t threadID;

  config conf;
} context;
           

經過上述的參數解析過程之後,使用者輸入的參數都被識别,之後再被存放到每個輸出通道伺服器線程參數變量上。即servers[param->id]。

servers[param->id].id = param->id;
  servers[param->id].pglobal = param->global;
  servers[param->id].conf.port = port;
  servers[param->id].conf.credentials = credentials;
  servers[param->id].conf.www_folder = www_folder;
  servers[param->id].conf.nocommands = nocommands;
           

至此,輸出通道的初始化完畢,等待run函數的執行。

===============================================輸出通道執行部分====================================================

 在每個輸出通道上,先為每個通道建立服務線程,調用線程處理函數進行連接配接請求的偵聽和處理

int output_run(int id) {
  DBG("launching server thread #%02d\n", id);

  //server_thread位于httpd.c頭檔案中
  pthread_create(&(servers[id].threadID), NULL, server_thread, &(servers[id]));
  //将狀态改為unjoinable狀态,確定資源的釋放,無須調用join函數釋放資源
  pthread_detach(servers[id].threadID);

  return 0;
}
           

線程處理函數建立打開一個socket并等待連接配接請求,在未達到最大連接配接請求時,為每個請求建立相對應的用戶端服務線程client_thread(),并使用通道線程相同的處理方式,将該子線程detach處理。子線程根據不同的請求方式,進行資料的分發。

void *server_thread( void *arg ) {
  //定義線程參數:服務端和用戶端位址變量,用戶端服務子線程
  struct sockaddr_in addr, client_addr;
  int on;
  pthread_t client;
  socklen_t addr_len = sizeof(struct sockaddr_in);

  //每個伺服器線程的參數
  context *pcontext = arg;
  pglobal = pcontext->pglobal;

  /* set cleanup handler to cleanup ressources */
  pthread_cleanup_push(server_cleanup, pcontext);

  /* 打開通道服務端線程 */
  pcontext->sd = socket(PF_INET, SOCK_STREAM, 0);
  if ( pcontext->sd < 0 ) {
    fprintf(stderr, "socket failed\n");
    exit(EXIT_FAILURE);
  }

  //預設情況下,server重新開機,調用socket,bind,然後listen,會失敗.因為該端口正在被使用.
  //一個端口釋放後會等待兩分鐘之後才能再被使用,SO_REUSEADDR是讓端口釋放後立即就可以被再次使用。
  on = 1;
  if (setsockopt(pcontext->sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
    perror("setsockopt(SO_REUSEADDR) failed");
    exit(EXIT_FAILURE);
  }

  /* 配置伺服器參數,用于監聽 */
  memset(&addr, 0, sizeof(addr));
  addr.sin_family = AF_INET;
  addr.sin_port = pcontext->conf.port; /* is already in right byteorder */
  //宏INADDR_ANY代替本機的IP,無需手動選擇,自動替換,當存在多個網卡情況下,自動選擇
  addr.sin_addr.s_addr = htonl(INADDR_ANY);
  if ( bind(pcontext->sd, (struct sockaddr*)&addr, sizeof(addr)) != 0 ) {
    perror("bind");
    OPRINT("%s(): bind(%d) failed", __FUNCTION__, htons(pcontext->conf.port));
    closelog();
    exit(EXIT_FAILURE);
  }

  //socket()函數建立的socket預設是一個主動類型的,listen函數将socket變為被動類型的,等待客戶的連接配接請求。
  if ( listen(pcontext->sd, 10) != 0 ) {
    fprintf(stderr, "listen failed\n");
    exit(EXIT_FAILURE);
  }

  //等待用戶端連接配接,有連接配接則建立新的線程進行處理
  while ( !pglobal->stop ) {
	//cfd結構體定義在httpd.d
    cfd *pcfd = malloc(sizeof(cfd));

    if (pcfd == NULL) {
      fprintf(stderr, "failed to allocate (a very small amount of) memory\n");
      exit(EXIT_FAILURE);
    }

    DBG("waiting for clients to connect\n");
    pcfd->fd = accept(pcontext->sd, (struct sockaddr *)&client_addr, &addr_len);
    pcfd->pc = pcontext;

    /* 建立用戶端子線程處理連接配接請求 */
    DBG("create thread to handle client that just established a connection\n");
    syslog(LOG_INFO, "serving client: %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

    if( pthread_create(&client, NULL, &client_thread, pcfd) != 0 ) {
      DBG("could not launch another client thread\n");
      close(pcfd->fd);
      free(pcfd);
      continue;
    }
    pthread_detach(client);
  }

  DBG("leaving server thread, calling cleanup function now\n");
  pthread_cleanup_pop(1);

  return NULL;
}
           

其中,用于儲存用戶端子線程參數的結構體定義如下:

typedef struct {
  context *pc;  //保留父線程的變量
  int fd;       //儲存監聽accept傳回的套接字描述符
} cfd;
           

每個連接配接請求的處理都是在void *client_thread( void *arg )中進行。函數首先對連接配接請求的參數進行解析配置,根據req.type确定發送資料的方式:

/* thread for clients that connected to this server */
void *client_thread( void *arg ) {
  int cnt;
  char buffer[BUFFER_SIZE]={0}, *pb=buffer;
  iobuffer iobuf;
  request req;
  //本地連接配接請求檔案描述變量
  cfd lcfd;          

  //如果傳入參數不為空,則将參數的内容拷貝到 lcfd 中(參數為 pcfd ,不為空)
  if (arg != NULL) {
    memcpy(&lcfd, arg, sizeof(cfd));
    free(arg);
  }
  else
    return NULL;

  /* 初始化結構體 */
  // 把iobuf清為0,iobuf變量在_readline函數中被使用,起一個臨時緩存的作用
  // iobuf的level成員表示buffer中還剩多少位元組的空間,而buffer成員用于存放資料
  init_iobuffer(&iobuf);
  //http協定,需要客服端給伺服器發送一個請求,而request就是這個請求
  init_request(&req);

  //從客服端接收資料,表示客服端發來的請求,确定給客服端發什麼資料
  memset(buffer, 0, sizeof(buffer));
  //_readline()函數:從客服端中讀取一行的資料,以換行符結束
  if ( (cnt = _readline(lcfd.fd, &iobuf, buffer, sizeof(buffer)-1, 5)) == -1 ) {
    close(lcfd.fd);
    return NULL;
  }

  /* 解析參數 */
  if ( strstr(buffer, "GET /?action=snapshot") != NULL ) {
	//請求字元串中含有"GET /?action=snapshot",則請求類型為 A_SNAPSHOT(拍照類型)
    req.type = A_SNAPSHOT;
  }
  else if ( strstr(buffer, "GET /?action=stream") != NULL ) {
	//如果請求字元串中含有"GET /?action=stream",則請求類型為 A_STREAM(發送視訊流類型)
    req.type = A_STREAM;
  }
  else if ( strstr(buffer, "GET /?action=command") != NULL ) {
	//指令請求,将請求後面的參數儲存到 req.parameter
    int len;
    req.type = A_COMMAND;

    /* advance by the length of known string */
    if ( (pb = strstr(buffer, "GET /?action=command")) == NULL ) {
      DBG("HTTP request seems to be malformed\n");
      send_error(lcfd.fd, 400, "Malformed HTTP request");
      close(lcfd.fd);
      return NULL;
    }
    pb += strlen("GET /?action=command");

    /* only accept certain characters */
    len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-=&1234567890"), 0), 100);
    req.parameter = malloc(len+1);
    if ( req.parameter == NULL ) {
      exit(EXIT_FAILURE);
    }
    memset(req.parameter, 0, len+1);
    strncpy(req.parameter, pb, len);

    DBG("command parameter (len: %d): \"%s\"\n", len, req.parameter);
  }
  else {
    int len;

    DBG("try to serve a file\n");
    req.type = A_FILE;

    if ( (pb = strstr(buffer, "GET /")) == NULL ) {
      DBG("HTTP request seems to be malformed\n");
      send_error(lcfd.fd, 400, "Malformed HTTP request");
      close(lcfd.fd);
      return NULL;
    }

    pb += strlen("GET /");
    len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._-1234567890"), 0), 100);
    req.parameter = malloc(len+1);
    if ( req.parameter == NULL ) {
      exit(EXIT_FAILURE);
    }
    memset(req.parameter, 0, len+1);
    strncpy(req.parameter, pb, len);

    DBG("parameter (len: %d): \"%s\"\n", len, req.parameter);
  }

  /*
   * parse the rest of the HTTP-request
   * the end of the request-header is marked by a single, empty line with "\r\n"
   */
  do {
    memset(buffer, 0, sizeof(buffer));

	//從用戶端再次讀取一次字元串
    if ( (cnt = _readline(lcfd.fd, &iobuf, buffer, sizeof(buffer)-1, 5)) == -1 ) {
      free_request(&req);
      close(lcfd.fd);
      return NULL;
    }

    if ( strstr(buffer, "User-Agent: ") != NULL ) {
	  //如果buffer(客服端)中存有(發送了)使用者名,則将使用者名儲存到 req.client 中
      req.client = strdup(buffer+strlen("User-Agent: "));
    }
	//如果buffer(客服端)中存有(發送了)密碼,則将密碼儲存到 req.credentials 中
    else if ( strstr(buffer, "Authorization: Basic ") != NULL ) {
      req.credentials = strdup(buffer+strlen("Authorization: Basic "));
	  //對密碼進行解碼
      decodeBase64(req.credentials);
      DBG("username:password: %s\n", req.credentials);
    }

  } while( cnt > 2 && !(buffer[0] == '\r' && buffer[1] == '\n') );

  //如果支援密碼功能,則要檢查使用者名和密碼是否比對
  if ( lcfd.pc->conf.credentials != NULL ) {
    if ( req.credentials == NULL || strcmp(lcfd.pc->conf.credentials, req.credentials) != 0 ) {
      DBG("access denied\n");
      send_error(lcfd.fd, 401, "username and password do not match to configuration");
      close(lcfd.fd);
      if ( req.parameter != NULL ) free(req.parameter);
      if ( req.client != NULL ) free(req.client);
      if ( req.credentials != NULL ) free(req.credentials);
      return NULL;
    }
    DBG("access granted\n");
  }

  //根據請求的類型,采取相應的行動 
  switch ( req.type ) {
    case A_SNAPSHOT:
      DBG("Request for snapshot\n");
      send_snapshot(lcfd.fd);
      break;
    case A_STREAM:
      DBG("Request for stream\n");
      send_stream(lcfd.fd);
      break;
    case A_COMMAND:
      if ( lcfd.pc->conf.nocommands ) {
        send_error(lcfd.fd, 501, "this server is configured to not accept commands");
        break;
      }
      command(lcfd.pc->id, lcfd.fd, req.parameter);
      break;
    case A_FILE:
      if ( lcfd.pc->conf.www_folder == NULL )
        send_error(lcfd.fd, 501, "no www-folder configured");
      else
        send_file(lcfd.pc->id, lcfd.fd, req.parameter);
      break;
    default:
      DBG("unknown request\n");
  }

  close(lcfd.fd);
  free_request(&req);

  DBG("leaving HTTP client thread\n");
  return NULL;
}
           

其中_readline()函數通過調用_read()函數從客服端中讀取一行的資料,将其逐個防止在buffer上,并将其buffer傳回,用于參數解析;

/* read just a single line or timeout */
int _readline(int fd, iobuffer *iobuf, void *buffer, size_t len, int timeout) {
  char c='\0', *out=buffer;
  int i;

  memset(buffer, 0, len);

  //從iobuf.buf[]中逐個位元組的将資料取出存放到buffer中,直到遇見換行符'\n'或者長度達到了
  for ( i=0; i<len && c != '\n'; i++ ) {
    if ( _read(fd, iobuf, &c, 1, timeout) <= 0 ) {
      /* timeout or error occured */
      return -1;
    }
    *out++ = c;
  }

  return i;
}

int _read(int fd, iobuffer *iobuf, void *buffer, size_t len, int timeout) {
  int copied=0, rc, i;
  fd_set fds;
  struct timeval tv;

  memset(buffer, 0, len);

  while ( (copied < len) ) {
	//第一次,i=0(iobuf->level初始化為0,len-copied,其中len為1,copied為0),以後,i=1  
    //i=0相當于下面的不拷貝
	//iobuf->level表示iobuf->buffer中的位元組數,初始時為0
    i = MIN(iobuf->level, len-copied);
    memcpy(buffer+copied, iobuf->buffer+IO_BUFFER-iobuf->level, i);

    iobuf->level -= i;
    copied += i;
    if ( copied >= len )
      return copied;

    //當客服端發有資料或者逾時的時候,select函數就傳回,目的防止while循環永不退出
    tv.tv_sec = timeout;
    tv.tv_usec = 0;
    FD_ZERO(&fds);
    FD_SET(fd, &fds);
    if ( (rc = select(fd+1, &fds, NULL, NULL, &tv)) <= 0 ) {
      if ( rc < 0)
        exit(EXIT_FAILURE);

      /* this must be a timeout */
      return copied;
    }

    init_iobuffer(iobuf);

    /*
     * there should be at least one byte, because select signalled it.
     * But: It may happen (very seldomly), that the socket gets closed remotly between
     * the select() and the following read. That is the reason for not relying
     * on reading at least one byte.
     */
	 //調用read函數,從客服端讀取最多 256 位元組的資料,存放到iobuf->buffer
    if ( (iobuf->level = read(fd, &iobuf->buffer, IO_BUFFER)) <= 0 ) {
      /* an error occured */
      return -1;
    }
	
	//拷貝iobuf->buffer中的資料到位址iobuf->buffer+(IO_BUFFER-iobuf->level)
    memmove(iobuf->buffer+(IO_BUFFER-iobuf->level), iobuf->buffer, iobuf->level);
  }

  return 0;
}
           

經過對http請求的參數解析,确定伺服器發送資料的類型,常用的是snapshot和stream模式,簡單的說,就是發送單幀圖像或者實時視訊(其實是通過一幀幀圖像實作的)。

/******************************************************************************
Description.: Send a complete HTTP response and a single JPG-frame.
Input Value.: fildescriptor fd to send the answer to
Return Value: -
******************************************************************************/
void send_snapshot(int fd) {
  unsigned char *frame=NULL;
  int frame_size=0;
  char buffer[BUFFER_SIZE] = {0};

  //等待輸入通道input_uvc.c裡發送資料更新請求 */
  //輸入通道:攝像頭源源不斷采集資料,每采集完一幀資料就會往倉庫裡存放資料,
  //存放好後,通過條件變量發出一個資料更新信号(通過phread_cond_broadcast函數)
  //得到資料更新信号後鎖定互斥鎖
  pthread_cond_wait(&pglobal->db_update, &pglobal->db);

  //獲得一幀圖像的大小,經過輸入通道采集并複制到全局緩沖區後,該全局變量會随之更新
  frame_size = pglobal->size;

  //根據一幀資料的大小,配置設定一個本地 frame 緩沖區,如配置設定記憶體出錯,釋放記憶體并解鎖互斥鎖
  if ( (frame = malloc(frame_size+1)) == NULL ) {
    free(frame);
    pthread_mutex_unlock( &pglobal->db );
    send_error(fd, 500, "not enough memory");
    return;
  }

  //從倉庫(pglobal->buf)中取出一幀資料,并将其放置在frame中
  memcpy(frame, pglobal->buf, frame_size);
  DBG("got frame (size: %d kB)\n", frame_size/1024);

  pthread_mutex_unlock( &pglobal->db );

  /* write the response */
  //buffer的字元串為HTTP/1.0 200 OK\r\n" STD_HEADER \"Content-type: image/jpeg 
  //HTTP/1.0 表明http協定所用版本1.0
  sprintf(buffer, "HTTP/1.0 200 OK\r\n" \
                  STD_HEADER \
                  "Content-type: image/jpeg\r\n" \
                  "\r\n");

  
  //将buffer中的字元串發送給客服端 */
  //對于mjpeg-streamer,輸出通道是通過socket程式設計來模拟http協定,
  //而對于我們的http協定來說,它需要先讓用戶端發送一個請求,
  //當伺服器收到這個請求以後,接下來會發送應答,首先會發送一個頭部資訊,
  //http應答中的頭部資訊儲存在buffer(封包),會報告http協定所用的版本
  if( write(fd, buffer, strlen(buffer)) < 0 ) {
    free(frame);
    return;
  }
  //發送一幀圖像
  write(fd, frame, frame_size);

  free(frame);
}
           
/******************************************************************************
Description.: Send a complete HTTP response and a stream of JPG-frames.
Input Value.: fildescriptor fd to send the answer to
Return Value: -
******************************************************************************/
void send_stream(int fd) {
  unsigned char *frame=NULL, *tmp=NULL;
  int frame_size=0, max_frame_size=0;
  char buffer[BUFFER_SIZE] = {0};

  DBG("preparing header\n");

  sprintf(buffer, "HTTP/1.0 200 OK\r\n" \
                  STD_HEADER \
                  "Content-Type: multipart/x-mixed-replace;boundary=" BOUNDARY "\r\n" \
                  "\r\n" \
                  "--" BOUNDARY "\r\n");

  // 将 buffer 中的字元串發送出去(封包)
  if ( write(fd, buffer, strlen(buffer)) < 0 ) {
    free(frame);
    return;
  }

  DBG("Headers send, sending stream now\n");

  //循環發送圖檔形成視訊流
  //進入循環,pglobal->stop為1時終止,按ctrl+c時pglobal->stop為1
  while ( !pglobal->stop ) {

    /* 等待輸入通道發出資料更新的信号 */
    pthread_cond_wait(&pglobal->db_update, &pglobal->db);

    /* 讀取一幀圖像的大小 */
    frame_size = pglobal->size;

    /* 檢查我們之前配置設定的緩存是否夠大,如果不夠,則重新配置設定 */
    if ( frame_size > max_frame_size ) {
      DBG("increasing buffer size to %d\n", frame_size);

      max_frame_size = frame_size+TEN_K;
      if ( (tmp = realloc(frame, max_frame_size)) == NULL ) {
        free(frame);
        pthread_mutex_unlock( &pglobal->db );
        send_error(fd, 500, "not enough memory");
        return;
      }

      frame = tmp;
    }

	//從倉庫中取出一幀資料放在 frame 
    memcpy(frame, pglobal->buf, frame_size);
    DBG("got frame (size: %d kB)\n", frame_size/1024);

    pthread_mutex_unlock( &pglobal->db );

	 //讓 buffer = ""封包,告訴客服端即将發送的圖檔的大小
    sprintf(buffer, "Content-Type: image/jpeg\r\n" \
                    "Content-Length: %d\r\n" \
                    "\r\n", frame_size);
    DBG("sending intemdiate header\n");
	//發送封包
    if ( write(fd, buffer, strlen(buffer)) < 0 ) break;

	//發送一幀圖像
    DBG("sending frame\n");
    if( write(fd, frame, frame_size) < 0 ) break;

	//發送這個字元串是因為對于用戶端來說,接收一幀資料怎麼知道接收一幀資料接收完。
	//一種是根據frame_size來判斷一幀資料是否接收完,
	//另一種是接收的資料是字元串boundarydonotcross(每兩幀圖檔的邊界值)的時候,
	//就表示一幀圖檔接收完了,即将接收的是第二幀圖檔
    DBG("sending boundary\n");
    sprintf(buffer, "\r\n--" BOUNDARY "\r\n");
    if ( write(fd, buffer, strlen(buffer)) < 0 ) break;
  }

  free(frame);
}
           

繼續閱讀