天天看点

linux下C语言实现多线程通信—环形缓冲区,可用于生产者(producer)/消费者(consumer)【转】

操作系统:ubuntu10.04

前言:

    在嵌入式开发中,只要是带操作系统的,在其上开发产品应用,基本都需要用到多线程。

    为了提高效率,尽可能的提高并发率。因此,线程之间的通信就是问题的核心。

    根据当前产品需要,使用 环形缓冲区 解决。

一,环形缓冲区的实现

    1,cbuf.h

点击(此处)折叠或打开

#ifndef __CBUF_H__

#define __CBUF_H__

#ifdef __cplusplus

extern "C" {

#endif

/* Define to prevent recursive inclusion 

-------------------------------------*/

#include "types.h"

#include "thread.h"

typedef    struct _cbuf

{

    int32_t        size;            /* 当前缓冲区中存放的数据的个数 */

    int32_t        next_in;        /* 缓冲区中下一个保存数据的位置 */

    int32_t        next_out;        /* 从缓冲区中取出下一个数据的位置 */

    int32_t        capacity;        /* 这个缓冲区的可保存的数据的总个数 */

    mutex_t        mutex;            /* Lock the structure */

    cond_t        not_full;        /* Full -> not full condition */

    cond_t        not_empty;        /* Empty -> not empty condition */

    void        *data[CBUF_MAX];/* 缓冲区中保存的数据指针 */

}cbuf_t;

/* 初始化环形缓冲区 */

extern    int32_t        cbuf_init(cbuf_t *c);

/* 销毁环形缓冲区 */

extern    void        cbuf_destroy(cbuf_t    *c);

/* 压入数据 */

extern    int32_t        cbuf_enqueue(cbuf_t *c,void *data);

/* 取出数据 */

extern    void*        cbuf_dequeue(cbuf_t *c);

/* 判断缓冲区是否为满 */

extern    bool        cbuf_full(cbuf_t    *c);

/* 判断缓冲区是否为空 */

extern    bool        cbuf_empty(cbuf_t *c);

/* 获取缓冲区可存放的元素的总个数 */

extern    int32_t        cbuf_capacity(cbuf_t *c);

}

/* END OF FILE 

---------------------------------------------------------------*/

    2,cbuf.c

#include "cbuf.h"

int32_t        cbuf_init(cbuf_t *c)

    int32_t    ret = OPER_OK;

    if((ret = mutex_init(&c->mutex)) != OPER_OK)    

    {

#ifdef DEBUG_CBUF

    debug("cbuf init fail ! mutex init fail !\n");

        return ret;

    }

    if((ret = cond_init(&c->not_full)) != OPER_OK)    

    debug("cbuf init fail ! cond not full init fail !\n");

        mutex_destroy(&c->mutex);

    if((ret = cond_init(&c->not_empty)) != OPER_OK)

    debug("cbuf init fail ! cond not empty init fail !\n");

        cond_destroy(&c->not_full);

    c->size     = 0;

    c->next_in    = 0;

    c->next_out = 0;

    c->capacity    = CBUF_MAX;

    debug("cbuf init success !\n");

    return ret;

void        cbuf_destroy(cbuf_t    *c)

    cond_destroy(&c->not_empty);

    cond_destroy(&c->not_full);

    mutex_destroy(&c->mutex);

    debug("cbuf destroy success \n");

int32_t        cbuf_enqueue(cbuf_t *c,void *data)

    if((ret = mutex_lock(&c->mutex)) != OPER_OK)    return ret;

    /*

     * Wait while the buffer is full.

     */

    while(cbuf_full(c))

    debug("cbuf is full !!!\n");

        cond_wait(&c->not_full,&c->mutex);

    c->data[c->next_in++] = data;

    c->size++;

    c->next_in %= c->capacity;

    mutex_unlock(&c->mutex);

     * Let a waiting consumer know there is data.

    cond_signal(&c->not_empty);

//    debug("cbuf enqueue success ,data : %p\n",data);

    debug("enqueue\n");

void*        cbuf_dequeue(cbuf_t *c)

    void     *data     = NULL;

    int32_t    ret     = OPER_OK;

    if((ret = mutex_lock(&c->mutex)) != OPER_OK)    return NULL;

       /*

     * Wait while there is nothing in the buffer

    while(cbuf_empty(c))

    debug("cbuf is empty!!!\n");

        cond_wait(&c->not_empty,&c->mutex);

    data = c->data[c->next_out++];

    c->size--;

    c->next_out %= c->capacity;

     * Let a waiting producer know there is room.

     * 取出了一个元素,又有空间来保存接下来需要存储的元素

    cond_signal(&c->not_full);

//    debug("cbuf dequeue success ,data : %p\n",data);

    debug("dequeue\n");

    return data;

bool        cbuf_full(cbuf_t    *c)

    return (c->size == c->capacity);

bool        cbuf_empty(cbuf_t *c)

    return (c->size == 0);

int32_t        cbuf_capacity(cbuf_t *c)

    return c->capacity;

二,辅助文件

    为了提高程序的移植性,对线程相关进行封装。

    1,thread.h

#ifndef __THREAD_H__

#define __THREAD_H__

typedef    struct _mutex

    pthread_mutex_t        mutex;

}mutex_t;

typedef    struct _cond

    pthread_cond_t        cond;

}cond_t;

typedef    pthread_t        tid_t;

typedef    pthread_attr_t    attr_t;

typedef    void*    (* thread_fun_t)(void*);

typedef    struct _thread

    tid_t            tid;

    cond_t            *cv;

    int32_t            state;

    int32_t            stack_size;

    attr_t         attr;

    thread_fun_t    fun;

}thread_t;

/* mutex */

extern    int32_t        mutex_init(mutex_t    *m);

extern    int32_t        mutex_destroy(mutex_t    *m);

extern    int32_t        mutex_lock(mutex_t    *m);

extern    int32_t        mutex_unlock(mutex_t    *m);

/* cond */

extern    int32_t        cond_init(cond_t    *c);

extern    int32_t        cond_destroy(cond_t    *c);

extern    int32_t        cond_signal(cond_t *c);

extern    int32_t        cond_wait(cond_t    *c,mutex_t *m);

/* thread */

/* 线程的创建,其属性的设置等都封装在里面 */

extern    int32_t        thread_create(thread_t *t);

//extern    int32_t        thread_init(thread_t    *t);

#define    thread_join(t, p)     pthread_join(t, p)

#define    thread_self()        pthread_self()

#define    thread_sigmask        pthread_sigmask

    2,thread.c

int32_t        mutex_init(mutex_t    *m)

    int32_t        ret = OPER_OK;

    if((ret = pthread_mutex_init(&m->mutex, NULL)) != 0)

        ret = -THREAD_MUTEX_INIT_ERROR;

int32_t        mutex_destroy(mutex_t    *m)

    if((ret = pthread_mutex_destroy(&m->mutex)) != 0)

        ret = -MUTEX_DESTROY_ERROR;

int32_t        mutex_lock(mutex_t    *m)

    if((ret = pthread_mutex_lock(&m->mutex)) != 0)

        ret = -THREAD_MUTEX_LOCK_ERROR;

int32_t        mutex_unlock(mutex_t    *m)

    if((ret = pthread_mutex_unlock(&m->mutex)) != 0)

        ret = -THREAD_MUTEX_UNLOCK_ERROR;

int32_t        cond_init(cond_t    *c)

    if((ret = pthread_cond_init(&c->cond, NULL)) != 0)

        ret = -THREAD_COND_INIT_ERROR;

int32_t        cond_destroy(cond_t    *c)

    if((ret = pthread_cond_destroy(&c->cond)) != 0)

        ret = -COND_DESTROY_ERROR;

int32_t        cond_signal(cond_t *c)

    if((ret = pthread_cond_signal(&c->cond)) != 0)

        ret = -COND_SIGNAL_ERROR;

int32_t        cond_wait(cond_t    *c,mutex_t *m)

    if((ret = pthread_cond_wait(&c->cond, &m->mutex)) != 0)

        ret = -COND_WAIT_ERROR;    

三,测试

    1,测试代码

/* 

 * cbuf begin

 */

#define        OVER    (-1)

static        cbuf_t    cmd;

static        int        line_1[200];

static        int        line_2[200];

//static        int        temp = 0;

static        bool    line1_finish = false;

static        bool    line2_finish = false;

void*    producer_1(void *data)

    int32_t    i = 0;

    for(i = 0; i < 200; i++)

        line_1[i] = i+1000;

        cbuf_enqueue(&cmd, &line_1[i]);

        if(0 == (i % 9)) sleep(1);

    line1_finish = true;

    return NULL;

void*    producer_2(void *data)

        line_2[i] = i+20000;

        cbuf_enqueue(&cmd, &line_2[i]);

    line2_finish = true;

void*    consumer(void *data)

    int32_t        *ptr = NULL;

    while(1)

        ptr = cbuf_dequeue(&cmd);

        printf("%d\n",*ptr);

        if(cbuf_empty(&cmd) && line2_finish && line1_finish)

        {

            printf("quit\n");

            break;

        }

void    test_cbuf_oper(void)

    pthread_t    l_1;

    pthread_t    l_2;

    pthread_t    c;

    cbuf_init(&cmd);

    pthread_create(&l_1,NULL,producer_1,0);

    pthread_create(&l_2,NULL,producer_2,0);

    pthread_create(&c,NULL,consumer,0);

    pthread_join(l_1,NULL);

    pthread_join(l_2,NULL);

    pthread_join(c,NULL);

    cbuf_destroy(&cmd);

void    test_cbuf(void)

    test_cbuf_oper();

 * cbuf end

    2,测试结果

linux下C语言实现多线程通信—环形缓冲区,可用于生产者(producer)/消费者(consumer)【转】

四,参考文件

1,《bareos-master》源码

2,《nginx》源码

继续阅读