天天看點

實作一個通用的生産者消費者隊列(c語言版本)

版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/voidreturn/article/details/78151898

背景:筆者之前一直從事嵌入式音視訊相關的開發工作,對于音視訊的資料的處理,生産者消費者隊列必不可少,而如何實作一個高效穩定的生産者消費者隊列則十分重要,不過按照筆者從業的經驗,所看到的現象,不容樂觀,很多知名大廠在這種基礎元件的開發能力上十分堪憂。

音視訊資料處理的特點:

  • 音視訊資料量大:音視訊資料特别是視訊資料,占據了計算機資料的很大一塊,不信就看看每個人的硬碟裡,去除電影,照片,mp3是不是很空蕩蕩的。
  • 實時性要求高:音視訊的延時如果大于200ms,使用體驗會十分糟糕。
  • 處理流程複雜:一幀資料從sensor捕獲到最終網傳輸出或者lcd顯示,都需要經過一系列的子產品進行處理。特别是網絡傳輸,一般要經過原始資料捕獲,視訊資料格式轉換,資料編碼壓縮,資料封裝打包,網絡傳輸。

生産者消費者隊列在視訊資料處理的必要性:

視訊資料的處理為什麼需要生産者消費者隊列?其實上面提到的音視訊資料處理的特點就是答案。

  • 一般對視訊資料處理的子產品都是運作在多線程中,每一個處理子產品運作在一個線程中(也是軟體工程分子產品的思想),互相之間通過生産者消費者隊列進行資料的互動,之是以用線程模型,而沒有用我一直推崇的程序模型是因為線程間的共享記憶體比較友善,而程序則要相對複雜的多。
  • 利用多線程/多程序的并行處理能力 ,如果采用單線程單程序的單線處理模式,一幀資料從采集到輸出線性經過幾個子產品,時效性無法保證。
  • 緩存資料,保持平滑,通過隊列緩存視訊資料,可以有效的去除一些資料抖動,幫助音視訊資料的平滑播放,同時這個資料的緩存又不易過多,否則加大了延時,損傷實時性,是以隊列大小的設定是一個平衡的藝術。

常見的生産者消費者隊列實作存在的問題:

不注重效率性能:

  • 對于buffer狀态的檢測采用loop輪詢方式。

    loop輪詢是任何有追求的程式員都要避免的處理方式,而筆者以自己經曆經常看到以下類似的代碼:

pthread_mutex_lock();
state = check_some_state();
if (state == xxx) {
    do_some_process();
} else {
    usleep(x);
}
pthread_mutex_unlock();           

以上代碼loop一個狀态,如果狀态成立做有效的處理,不成立則睡眠一定時間,之後再次調用該段代碼進行下一次的狀态檢測,而這個睡眠時間是一個随機經驗值,很有可能下次仍然是無效的檢測,接着睡眠再loop,多餘的loop是一種資源的浪費。

資料的傳遞采用copy方式:

資料的傳遞,采用copy的方式,一幀資料在一個完整的處理流程中經過n次copy(筆者見過一個系統中一幀資料copy了8次之多)

生産者消費者隊列和業務代碼混雜在一起,沒有分離:

對于開發者,都希望用最簡單的接口完成某個功能,而不關心内部實作,在這裡就是,隻需要生産者生産資料,消費者消費資料,而内部的處理(同步,資料的處理等)完全不關心,這樣開發者就不需要去弄很多鎖,降低了開發難度,也可以使代碼元件化,子產品化。

有經驗的開發者應該感覺以上都是基礎點,不會有人犯這樣的錯誤,不過筆者以自己的經曆肯定的說,以上兩種問題在某世界級大廠的視訊裝置上随處可見。

讓我悲哀的是,當我指出這些問題時,某些開發者完全無動于衷。而我更無力的是,現在的cpu,memory性能實在是高,在某些不太高端的嵌入式晶片上,優化過的資料并沒有十分明顯,在一個實際項目上,經過優化後cpu大概降低1%(原總系統cpu占用7%),有經驗的開發者又會說原7%的cpu占用率說明這個晶片做這個系統浪費了,不過也沒辦法其實已經用了比較低端的晶片了。。。

以上的吐槽主要是想說明:由于cpu,memory性能的提升,讓很多開發者感覺軟體的優化意義不大了,而我是一個理想主義者,對于某些設計ugly的代碼真的是零容忍啊。

如何優化:

以上說了這麼多,那如何操作呢?

  • 提高效率,去除多餘的loop輪詢:

    采用線程的同步機制,當狀态條件符合要求時,通過通知機制觸發後續處理,這樣去除了無效的loop輪詢檢測,linux系統下可以采用條件變量,信号量來實作,我更傾向于使用條件變量,因為它是linux系統原生支援的接口。

    提到條件變量,不得不提一個概念:同步互斥,這麼一個基本的作業系統概念,我最喜歡作為面試第一題,不過能用一句話切中要害的說出之間差別和各自特點的人不是很多,答不出這題的,基本上就pass了。

  • 減少多餘copy:

    采用預配置設定的方式,将buffer分為free和active兩大類,每一類buffer又切成幾個小buffer,然後通過指針将兩類buffer下的小buffer連結成兩個連結清單,使用者擷取buffer通過free連結清單擷取buffer,再将buffer put到active連結清單上,以上都是指針的操作,沒有資料的copy,極大的減少了copy操作。(再次強調指針是個好東西)

  • 子產品化元件化:

    将生産者消費者隊列的處理部分完全剝離成一個獨立的子產品元件,對外隻提供幾個基本的接口,内部完成同步通知的處理。

一個簡單的實作:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <time.h>

#include "sfifo.h"

//#define CONFIG_COND_FREE 1
#define CONFIG_COND_ACTIVE 1

#define MAX_SFIFO_NUM   32

struct sfifo_des_s sfifo_des[MAX_SFIFO_NUM];
struct sfifo_des_s *my_sfifo_des;

struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p)
{
    static long empty_count = 0;
    struct sfifo_s *sfifo = NULL;

    pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex));
#ifdef CONFIG_COND_FREE
    while (sfifo_des_p->free_list.head == NULL) {
        pthread_cond_wait(&(sfifo_des_p->free_list.cond), &(sfifo_des_p->free_list.lock_mutex));
    }
#else
    if (sfifo_des_p->free_list.head == NULL) {
        if (empty_count++ % 120 == 0) {
            printf("free list empty\n");
        }
        goto EXIT;
    }
#endif
    sfifo = sfifo_des_p->free_list.head;
    sfifo_des_p->free_list.head = sfifo->next;

EXIT:
    pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex));

    return sfifo;
}

int sfifo_put_free_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p)
{
    int send_cond = 0;

    pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex));
    if (sfifo_des_p->free_list.head == NULL) {
        sfifo_des_p->free_list.head = sfifo;
        sfifo_des_p->free_list.tail = sfifo;
        sfifo_des_p->free_list.tail->next = NULL;
        send_cond = 1;
    } else {
        sfifo_des_p->free_list.tail->next = sfifo;
        sfifo_des_p->free_list.tail = sfifo;
        sfifo_des_p->free_list.tail->next = NULL;
    }
    pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex));
#ifdef CONFIG_COND_FREE
    if (send_cond) {
        pthread_cond_signal(&(sfifo_des_p->free_list.cond));
    }
#endif
    return 0;
}

struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p)
{
    struct sfifo_s *sfifo = NULL;

    pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex));
#ifdef CONFIG_COND_ACTIVE
    while (sfifo_des_p->active_list.head == NULL) {
        //pthread_cond_timedwait(&(sfifo_des_p->active_list.cond), &(sfifo_des_p->active_list.lock_mutex), &outtime);
        pthread_cond_wait(&(sfifo_des_p->active_list.cond), &(sfifo_des_p->active_list.lock_mutex));
    }
#else
    if (sfifo_des_p->active_list.head == NULL) {
        printf("active list empty\n");
        goto EXIT;
    }
#endif

    sfifo = sfifo_des_p->active_list.head;
    sfifo_des_p->active_list.head = sfifo->next;

EXIT:
    pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex));

    return sfifo;
}

int sfifo_put_active_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p)
{
    int send_cond = 0;

    pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex));
    if (sfifo_des_p->active_list.head == NULL) {
        sfifo_des_p->active_list.head = sfifo;
        sfifo_des_p->active_list.tail = sfifo;
        sfifo_des_p->active_list.tail->next = NULL;
        send_cond = 1;
    } else {
        sfifo_des_p->active_list.tail->next = sfifo;
        sfifo_des_p->active_list.tail = sfifo;
        sfifo_des_p->active_list.tail->next = NULL;
    }
    pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex));
#ifdef CONFIG_COND_ACTIVE
    if (send_cond) {
        pthread_cond_signal(&(sfifo_des_p->active_list.cond));
    }
#endif
    return 0;
}

int dump_sfifo_list(struct sfifo_list_des_s *list)
{
    struct sfifo_s *sfifo = NULL;
    sfifo = list->head;
    do {
        printf("dump : %x\n", sfifo->buffer[0]);
        usleep(500 * 1000);
    } while (sfifo->next != NULL && (sfifo = sfifo->next));

    return 0;
}

struct sfifo_des_s *sfifo_init(int sfifo_num, int sfifo_buffer_size, int sfifo_active_max_num)
{
    int i = 0;
    struct sfifo_s *sfifo;

    struct sfifo_des_s *sfifo_des_p;
    sfifo_des_p = (struct sfifo_des_s *)malloc(sizeof(struct sfifo_des_s));

    sfifo_des_p->sfifos_num = sfifo_num;
    sfifo_des_p->sfifos_active_max_num = sfifo_active_max_num;

    sfifo_des_p->free_list.sfifo_num = 0;
    sfifo_des_p->free_list.head = NULL;
    sfifo_des_p->free_list.tail = NULL;
    pthread_mutex_init(&sfifo_des_p->free_list.lock_mutex, NULL);
    pthread_cond_init(&sfifo_des_p->free_list.cond, NULL);

    sfifo_des_p->active_list.sfifo_num = 0;
    sfifo_des_p->active_list.head = NULL;
    sfifo_des_p->active_list.tail = NULL;
    pthread_mutex_init(&sfifo_des_p->active_list.lock_mutex, NULL);
    pthread_cond_init(&sfifo_des_p->active_list.cond, NULL);

    for (i = 0; i < sfifo_num; i++) {
        sfifo = (struct sfifo_s *)malloc(sizeof(struct sfifo_s));
        sfifo->buffer = (unsigned char *)malloc(sfifo_buffer_size);
        printf("sfifo_init: %x\n", sfifo->buffer);
        memset(sfifo->buffer, i, sfifo_buffer_size);
        sfifo->size = sfifo_buffer_size;
        sfifo->next = NULL;
        sfifo_put_free_buf(sfifo, sfifo_des_p);
    }

    return sfifo_des_p;
}

void *productor_thread_func(void *arg)
{
    struct sfifo_s *sfifo;

    while (1) {
        sfifo = sfifo_get_free_buf(my_sfifo_des);
        if (sfifo != NULL) {
            printf("+++++++++++++++++ put : %x\n", sfifo->buffer[0]);
            sfifo_put_active_buf(sfifo, my_sfifo_des);
        }
        //usleep(20*1000);
    }
}

void *comsumer_thread_func(void *arg)
{
    struct sfifo_s *sfifo;
    int count = 0;

    while (1) {
        sfifo = sfifo_get_active_buf(my_sfifo_des);
        if (sfifo != NULL) {
            printf("---------------- get %x\n", sfifo->buffer[0]);
            sfifo_put_free_buf(sfifo, my_sfifo_des);
        }
        //usleep(10 * 1000);
        // if (count++ > 10000) {
        //  exit(-1);
        // }
    }
}

int main()
{
    int ret;
    static pthread_t productor_thread;
    static pthread_t consumer_thread;
    struct sfifo_s *r_sfifo;

    my_sfifo_des = sfifo_init(10, 4096, 5);

    ret = pthread_create(&productor_thread, NULL, productor_thread_func, NULL);
    ret = pthread_create(&consumer_thread, NULL, comsumer_thread_func, NULL);

    while (1) {
        sleep(1);
    }

    return 0;
}           

以上是一個簡單的生産者消費者隊列的c語言的實作,對應的頭檔案在本文底部(貼代碼太長看起來很崩潰)。

仔細的同學可能會發現,以上代碼sfifo_get_free_buf()中預設是loop輪詢檢測free buffer連結清單的,你前面不是說了一大堆不能loop嗎?怎麼還用loop呢?

這裡其實有兩個原因:

  • 生産者的loop是可以接受的,當發生多餘loop,無法命中時,說明生産者太快,消費者太慢,而其實對于一個生産者消費者模型出現以上問題時,說明整個業務流程要重新考慮,因為正常的情況是消費者總是要快于生産者,這個業務模型才能正常的運作下去。
  • 對于有些業務模型,生産者業務子產品部分是不能阻塞的,也就是說,如果free list沒有資料,我們采用pthread_cond_wait()阻塞後,會導緻生産者出現問題,這樣最好的處理方式就是生産者子產品接口傳回出錯,生産者業務方丢棄資料(此時就是丢幀了,這種情況如果頻繁發生是不能接受了,不過也說明了消費者要有足夠的能力處理生産者生産出的資料,否則整個業務都是有問題)

這個實作有那些優勢:

走讀和運作以上代碼的同學應該可以發現這裡做了一個簡單可運作的demo模拟了生産者和消費者雙方:

void *productor_thread_func(void *arg)
{
    struct sfifo_s *sfifo;

    while (1) {
        sfifo = sfifo_get_free_buf(my_sfifo_des);
        if (sfifo != NULL) {
            printf("+++++++++++++++++ put : %x\n", sfifo->buffer[0]);
            sfifo_put_active_buf(sfifo, my_sfifo_des);
        }
        //usleep(20*1000);
    }
}

void *comsumer_thread_func(void *arg)
{
    struct sfifo_s *sfifo;
    int count = 0;

    while (1) {
        sfifo = sfifo_get_active_buf(my_sfifo_des);
        if (sfifo != NULL) {
            printf("---------------- get %x\n", sfifo->buffer[0]);
            sfifo_put_free_buf(sfifo, my_sfifo_des);
        }
        //usleep(10 * 1000);
        // if (count++ > 10000) {
        //  exit(-1);
        // }
    }
}
           

這裡面對于使用者的優點有:

  1. 接口簡單:隻需要get free,put active;get active,put free。
  2. 沒有了資料copy,隻需要操作連結清單上的buffer就可以了,而這些buffer的參數控制通過init接口設定。
  3. 不用再控制sleep的時間值:前面提到,在loop模型下,如果狀态不成立需要sleep一段時間,再次檢查,這樣來控制同步狀态,而這個時間值很難确定,如果時間值過長,則會導緻狀态檢測不及時,延誤資料處理,如果時間值太短,則會增加狀态檢測miss cache的次數,耗費更多cpu資源。而采用本子產品的實作則完全不需要考慮這些問題,隻需要銜接業務處理,sleep,同步,yield cpu的操作都由這個子產品實作吧,完全不需要關心。
  4. 子產品化,完全和業務處理無關,可以毫無壓力的運用在不同的業務處理邏輯中,沒有剝離代碼的工作。

以上描述了一個生産者消費者隊列c語言的實作,為什麼是c語言版本的?因為其他進階語言,有很多成熟的庫提供了該功能,完全不用自己寫,而c就沒這麼完善了,不過這也說明了c的簡單靈活。但悲哀的是很多人是以進行了很ugly的實作。

多吐槽幾句,嵌入式行業由于各種技術原因,導緻開發語言還是采用c,這樣對開發人員有了不小的要求,而如何才能寫一些優雅的代碼,對人的素質有了要求,但現狀是優秀的開發者都被網際網路行業搶走了,導緻嵌入式行業開發人員的水準參差不齊,本來應該是一個對編碼能力要求很高的行業被一些水準低下的開發者占據。so,我離開了這個行業了。。。

附:子產品頭檔案,類linux使用者可通過gcc xxx.c指令build該demo,然後運作測試。

#ifndef SFIFO_H_
#define SFIFO_H_

struct sfifo_list_des_s {
    int sfifo_num;

    struct sfifo_s *head;
    struct sfifo_s *tail;

    pthread_mutex_t lock_mutex;
    pthread_cond_t cond;
};

struct sfifo_des_s {
    int sfifo_init;

    unsigned int sfifos_num;
    unsigned int sfifos_active_max_num;

    struct sfifo_list_des_s free_list;
    struct sfifo_list_des_s active_list;
};

struct sfifo_s {
    unsigned char *buffer;
    unsigned int size;
    struct sfifo_s *next;
};

extern struct sfifo_des_s *sfifo_init(int sfifo_num, int sfifo_buffer_size, int sfifo_active_max_num);

/* productor */
extern struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p);
extern int sfifo_put_free_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p);

/* consumer */
extern struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p);
extern int sfifo_put_active_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p);

#endif           

繼續閱讀