天天看點

參考Linux核心kfifo設計高效環形緩沖區(隊列)

環形緩沖區

環形緩沖區即回環的隊列,隊列是一種先進先出(FIFO)的結構。排隊就是一種典型的隊列:首先輪到的是排在隊伍最前面的人,後來的人總是排在隊伍的最後面。

常見的環形緩沖區設計方式

1. in+out
struct ring_buffer
{
    unsigned char *buffer;
	unsigned int in;
    unsigned int out;
    unsigned int size;
}
           

in索引指向隊列尾的後一個元素,當有資料要寫入時,在in索引指的位置寫入,然後in+1。

out索引指向隊列首的第一個元素,當有資料要取出時,在out索引指的位置讀取,然後out+1。

buffer表示緩沖區,size表示緩沖區大小。

運作示意圖如下:

參考Linux核心kfifo設計高效環形緩沖區(隊列)

此種設計有一個弊端,即:緩沖區滿和緩沖區空,in和out索引均相等,無法判斷緩沖區的滿和空。

2. in+out+count
struct ring_buffer
{
    unsigned char *buffer;
	unsigned int in;
    unsigned int out;
    unsigned int size;
    unsigned int count;
}
           

針對1. 的弊端,引入一個count計數,代表目前緩沖區的使用情況。寫入一個資料count+1,讀取一個資料count-1。

此種設計解決了1. 的無法區分緩沖區滿和空的弊端,但是引入了一個新的問題:當一個消費者線程和一個生産者線程同時作用于緩沖區時,count便變成了臨界資料,此時需要加鎖做并發控制,降低了程式的并發性能。

3. "不完全填滿"技巧
struct ring_buffer
{
    unsigned char *buffer;
	unsigned int in;
    unsigned int out;
    unsigned int size;
}
           

方法2. 引入了一個并發上的問題,采用"不完全填滿"技巧可規避此問題。即:将緩沖區始終剩一個不填作為"滿"狀态。

運作示意圖如下:

參考Linux核心kfifo設計高效環形緩沖區(隊列)

即緩沖區還剩一個位置時,認為緩沖區已經填滿了。

當in==out,緩沖區為空;當(in+1)%size==out。緩沖區為滿。

此種設計方案可有效區分緩沖區的滿和空,且寫入隻改變in索引,讀取隻改變out索引,讀寫并發時無需加鎖。

上述方案已經很完美,但是是否還有可以改進的地方? 有!

我們先分享上述方案還可以優化的地方:

1、因為始終剩一個不填,是以會浪費一個存儲空間的位置。

2、in和out索引更新需要每次都進行in=(in+1)%size 或者 out=(out+1)%size操作,而取餘%運算符效率是不高的。

3、計算緩沖區内的資料個數時,out<=in計算比較簡單,in-out即為緩沖區内資料的個數。當in回環時,in<\out,此時為size-out+in,如下:

參考Linux核心kfifo設計高效環形緩沖區(隊列)

此時需要分兩種情況,導緻程式設計不是線性的。

4、一次隻能操作一個位元組,操作多個位元組時需要for循環一個一個位元組處理,且還需時時刻刻判斷緩沖區是否為空or滿。

參考kfifo設計環形緩沖區

程式設計

ring_buffer.h

#ifndef __RING_BUFFER_H_
#define __RING_BUFFER_H_

struct ring_buffer
{
	unsigned int in;
	unsigned int out;
	unsigned int size;
	unsigned char *buffer;
};

struct ring_buffer *ring_buffer_init(unsigned int buffer_size);
void ring_buffer_exit(struct ring_buffer *rb);
unsigned int ring_buffer_avail(struct ring_buffer *rb);
void ring_buffer_discard(struct ring_buffer *rb, unsigned int length);
unsigned int ring_buffer_read(struct ring_buffer *rb, unsigned char *buffer, unsigned int length);
unsigned int ring_buffer_write(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length);
unsigned int ring_buffer_overwrite(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length);

#endif

           

ring_buffer.c

#include "ring_buffer.h"
#include <string.h>
#include <stdlib.h>
#include <stdio.h>

#define MIN(a, b) (((a) < (b)) ? (a) : (b))

struct ring_buffer *ring_buffer_init(unsigned int buffer_size)
{
	int i;
	struct ring_buffer *rb = NULL;
	
	rb = (struct ring_buffer *) malloc(sizeof(struct ring_buffer));
	if (NULL == rb)
	{
		printf("malloc struct ring_buffer failed! \n");
		return rb;
	}
	
	for (i=0; ; i++)
	{
		if (((1<<i) < buffer_size) && (buffer_size <= (1<<i+1)))
			break;
	}
	
	rb->buffer = (unsigned char *) malloc((1 << (i + 1)) * sizeof(unsigned char));
	if (NULL == rb->buffer)
	{
		printf("malloc buffer failed! \n");
		free(rb);
		rb = NULL;
		return rb;
	}
	
	printf("The ring buffer alloced is %d bytes. \n", 1 << (i + 1));
	
	rb->in = 0;
	rb->out = 0;
	rb->size = 1 << (i + 1);
	
	return rb;
}

void ring_buffer_exit(struct ring_buffer *rb)
{
	if (NULL == rb)
		return ;
	
	if (NULL != rb->buffer)
		free(rb->buffer);
	
	free(rb);
}

unsigned int ring_buffer_avail(struct ring_buffer *rb)
{
	return (rb->in - rb->out);
}

void ring_buffer_discard(struct ring_buffer *rb, unsigned int length)
{
	length = MIN(length, rb->in - rb->out);
	rb->out += length;
}

unsigned int ring_buffer_read(struct ring_buffer *rb, unsigned char *buffer, unsigned int length)
{
	unsigned int len;
	
	length = MIN(length, rb->in - rb->out);
	
	/* first get the data from fifo->out until the end of the buffer */
	len = MIN(length, rb->size - (rb->out & (rb->size - 1)));
	memcpy(buffer, rb->buffer + (rb->out & (rb->size - 1)), len);
	
	/* then get the rest (if any) from the beginning of the buffer */
	memcpy(buffer + len, rb->buffer, length - len);
	
	rb->out += length;
	return length;
}

unsigned int ring_buffer_write(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length)
{
	unsigned int len;
	
	length = MIN(length, rb->size - rb->in + rb->out);
	
	/* first put the data starting from fifo->in to buffer end */
	len  = MIN(length, rb->size - (rb->in & (rb->size - 1)));
	memcpy(rb->buffer + (rb->in & (rb->size - 1)), buffer, len);
	
	/* then put the rest (if any) at the beginning of the buffer */
	memcpy(rb->buffer, buffer + len, length - len);
	
	rb->in += length;
	
	return length;
}

unsigned int ring_buffer_overwrite(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length)
{
	unsigned int len;
	unsigned int unavail;
	unavail = rb->size - rb->in + rb->out;
	
	if (length < rb->size)
	{
		if (unavail < length)
		{
			ring_buffer_discard(rb, length - unavail);
		}
		
		/* first put the data starting from fifo->in to buffer end */
		len  = MIN(length, rb->size - (rb->in & (rb->size - 1)));
		memcpy(rb->buffer + (rb->in & (rb->size - 1)), buffer, len);
		
		/* then put the rest (if any) at the beginning of the buffer */
		memcpy(rb->buffer, buffer + len, length - len);
		
		rb->in += length;
	}
	else
	{
		memcpy(rb->buffer, buffer + length - rb->size, rb->size);
		rb->in = rb->size;
		rb->out = 0;
	}
	
	return length;
}

           
代碼解釋

運作示意圖如下:

參考Linux核心kfifo設計高效環形緩沖區(隊列)

1、隊列回環,索引不回環。(計算緩沖區有效數友善)

寫入一個資料,in隻加1;讀取一個資料,out隻加1;此時in始終>=out。即:索引不做in=(in+1)%size or out=(out+1)%size的回環操作(隻有資料實際寫入or讀出緩沖區時,才做隊列回環操作:buffer[(in+1)%size]=value,*value=buffer[(out+1)%size]),這樣緩沖區内有效資料的個數就直接為in-out,不需要分情況讨論。

2、取餘%操作轉化為與&操作。(&操作比%操作效率高得多)

設計緩沖區的大小始終為2的次幂,這樣a%size的取餘%操作可轉化為a&(size-1)的與&操作,&的運算效率比%高得多。

3、程式線性化,無任何if-else分支。(無任何分支,cache命中率更高)

環形緩沖區常用的兩個函數:

unsigned int ring_buffer_read(struct ring_buffer *rb, unsigned char *buffer, unsigned int length);
unsigned int ring_buffer_write(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length);
           

無任何if-else分支,程式完全線性化,cache命中率更高。

4、使用memcpy而非逐位元組操作,且memcpy會針對處理器架構做優化,效率更高

可讀寫多個位元組,讀寫操作由memcpy實作;一般memcpy會針對處理器架構進行優化,甚至使用彙編實作,效率非常高。

5、并發無鎖(一個生産者線程和一個消費者線程同時作用于環形緩沖區時,無需加鎖)

unsigned int ring_buffer_read(struct ring_buffer *rb, unsigned char *buffer, unsigned int length);
unsigned int ring_buffer_write(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length);
           

讀寫函數各隻操作自己的out、in索引;當隻有一個生産者線程和一個消費者線程同時作用于環形緩沖區時,可實作并發無鎖。

6、in和out溢出時會自回環到0,無需做溢出判斷處理

in、out索引随着寫入、讀取 雖然一直在增加,但是當其增加至unsigned int的上限時,會由0xFFFF FFFF自動溢出為0x0000 0000,形成溢出自回環至0,且上述所有跟in、out涉及的表達式均成立。

測試程式

附上測試程式如下:

#include "ring_buffer.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int main(int argc, char* arg[])
{
    unsigned int i = 0;
    unsigned int n, ret;
    char exit_q;
    unsigned char buff[100];
    
	printf("Please enter the size of ring buffer that you want to alloc: ");
    scanf("%d", &i);
    struct ring_buffer *ring_buffer_st = ring_buffer_init(i);
	if (NULL == ring_buffer_st)
		return -1;
	printf("\n");
    
    while (1)
    {
        memset(buff, 0, 100);
        i = 0;
        printf("Please enter the number of data you want to write: ");
        scanf("%d", &i);
        printf("Please input the data that you want to write: ");
        for (n = 0; n < i; n++)
            scanf("%d", &buff[n]);
        ret = ring_buffer_overwrite(ring_buffer_st, buff, i);
        printf("\t ring_buffer_overwrite return value = %d \n", ret);
		
		printf("\t ------------------------\n");
		printf("\t | ");
		for (i = 0; i < ring_buffer_st->size; i++)
			printf("%u ", ring_buffer_st->buffer[i]);
		printf("\n");
		printf("\t | out=%u, in=%u, \n", ring_buffer_st->out, ring_buffer_st->in);
		printf("\t ------------------------\n");
		        ret = ring_buffer_avail(ring_buffer_st);
        printf("\t ring_buffer_avail return value = %d \n\n", ret);
        
        memset(buff, 0, 100);
        i = 0;
        printf("Please enter the number of data you want to read: ");
        scanf("%d", &i);
        ret = ring_buffer_read(ring_buffer_st, buff, i);
        printf("The dats is: ");
        for (n = 0; n < ret; n++)
            printf("%d ", buff[n]);
        printf("\n");
        printf("\t ring_buffer_read return value = %d \n", ret);
        
		printf("\t ------------------------\n");
		printf("\t | ");
		for (i = 0; i < ring_buffer_st->size; i++)
			printf("%u ", ring_buffer_st->buffer[i]);
		printf("\n");
		printf("\t | out=%u, in=%u, \n", ring_buffer_st->out, ring_buffer_st->in);
		printf("\t ------------------------\n");
		        ret = ring_buffer_avail(ring_buffer_st);
        printf("\t ring_buffer_avail return value = %d \n\n", ret);
		
        printf("You can enter Q to exit the program ...\n");
        //fflush(stdin);    // For Windows
        setbuf(stdin, NULL);    // For Linux
        exit_q = getchar();
        if (exit_q == 'q' || exit_q == 'Q')
            break;
    }
    
    ring_buffer_exit(ring_buffer_st);
	ring_buffer_st = NULL;
}

           

繼續閱讀