環形緩沖區
環形緩沖區即回環的隊列,隊列是一種先進先出(FIFO)的結構。排隊就是一種典型的隊列:首先輪到的是排在隊伍最前面的人,後來的人總是排在隊伍的最後面。
常見的環形緩沖區設計方式
1. in+out
struct ring_buffer
{
unsigned char *buffer;
unsigned int in;
unsigned int out;
unsigned int size;
}
in索引指向隊列尾的後一個元素,當有資料要寫入時,在in索引指的位置寫入,然後in+1。
out索引指向隊列首的第一個元素,當有資料要取出時,在out索引指的位置讀取,然後out+1。
buffer表示緩沖區,size表示緩沖區大小。
運作示意圖如下:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiclRnblN2XjlGcjAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL90TUhxGeyIGashVYzZkMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLmZjNmRmZzEzYxMmY1cTO5cDN2QzNzIzMkFWZiJTN3I2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
此種設計有一個弊端,即:緩沖區滿和緩沖區空,in和out索引均相等,無法判斷緩沖區的滿和空。
2. in+out+count
struct ring_buffer
{
unsigned char *buffer;
unsigned int in;
unsigned int out;
unsigned int size;
unsigned int count;
}
針對1. 的弊端,引入一個count計數,代表目前緩沖區的使用情況。寫入一個資料count+1,讀取一個資料count-1。
此種設計解決了1. 的無法區分緩沖區滿和空的弊端,但是引入了一個新的問題:當一個消費者線程和一個生産者線程同時作用于緩沖區時,count便變成了臨界資料,此時需要加鎖做并發控制,降低了程式的并發性能。
3. "不完全填滿"技巧
struct ring_buffer
{
unsigned char *buffer;
unsigned int in;
unsigned int out;
unsigned int size;
}
方法2. 引入了一個并發上的問題,采用"不完全填滿"技巧可規避此問題。即:将緩沖區始終剩一個不填作為"滿"狀态。
運作示意圖如下:
即緩沖區還剩一個位置時,認為緩沖區已經填滿了。
當in==out,緩沖區為空;當(in+1)%size==out。緩沖區為滿。
此種設計方案可有效區分緩沖區的滿和空,且寫入隻改變in索引,讀取隻改變out索引,讀寫并發時無需加鎖。
上述方案已經很完美,但是是否還有可以改進的地方? 有!
我們先分享上述方案還可以優化的地方:
1、因為始終剩一個不填,是以會浪費一個存儲空間的位置。
2、in和out索引更新需要每次都進行in=(in+1)%size 或者 out=(out+1)%size操作,而取餘%運算符效率是不高的。
3、計算緩沖區内的資料個數時,out<=in計算比較簡單,in-out即為緩沖區内資料的個數。當in回環時,in<\out,此時為size-out+in,如下:
此時需要分兩種情況,導緻程式設計不是線性的。
4、一次隻能操作一個位元組,操作多個位元組時需要for循環一個一個位元組處理,且還需時時刻刻判斷緩沖區是否為空or滿。
參考kfifo設計環形緩沖區
程式設計
ring_buffer.h
#ifndef __RING_BUFFER_H_
#define __RING_BUFFER_H_
struct ring_buffer
{
unsigned int in;
unsigned int out;
unsigned int size;
unsigned char *buffer;
};
struct ring_buffer *ring_buffer_init(unsigned int buffer_size);
void ring_buffer_exit(struct ring_buffer *rb);
unsigned int ring_buffer_avail(struct ring_buffer *rb);
void ring_buffer_discard(struct ring_buffer *rb, unsigned int length);
unsigned int ring_buffer_read(struct ring_buffer *rb, unsigned char *buffer, unsigned int length);
unsigned int ring_buffer_write(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length);
unsigned int ring_buffer_overwrite(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length);
#endif
ring_buffer.c
#include "ring_buffer.h"
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
struct ring_buffer *ring_buffer_init(unsigned int buffer_size)
{
int i;
struct ring_buffer *rb = NULL;
rb = (struct ring_buffer *) malloc(sizeof(struct ring_buffer));
if (NULL == rb)
{
printf("malloc struct ring_buffer failed! \n");
return rb;
}
for (i=0; ; i++)
{
if (((1<<i) < buffer_size) && (buffer_size <= (1<<i+1)))
break;
}
rb->buffer = (unsigned char *) malloc((1 << (i + 1)) * sizeof(unsigned char));
if (NULL == rb->buffer)
{
printf("malloc buffer failed! \n");
free(rb);
rb = NULL;
return rb;
}
printf("The ring buffer alloced is %d bytes. \n", 1 << (i + 1));
rb->in = 0;
rb->out = 0;
rb->size = 1 << (i + 1);
return rb;
}
void ring_buffer_exit(struct ring_buffer *rb)
{
if (NULL == rb)
return ;
if (NULL != rb->buffer)
free(rb->buffer);
free(rb);
}
unsigned int ring_buffer_avail(struct ring_buffer *rb)
{
return (rb->in - rb->out);
}
void ring_buffer_discard(struct ring_buffer *rb, unsigned int length)
{
length = MIN(length, rb->in - rb->out);
rb->out += length;
}
unsigned int ring_buffer_read(struct ring_buffer *rb, unsigned char *buffer, unsigned int length)
{
unsigned int len;
length = MIN(length, rb->in - rb->out);
/* first get the data from fifo->out until the end of the buffer */
len = MIN(length, rb->size - (rb->out & (rb->size - 1)));
memcpy(buffer, rb->buffer + (rb->out & (rb->size - 1)), len);
/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + len, rb->buffer, length - len);
rb->out += length;
return length;
}
unsigned int ring_buffer_write(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length)
{
unsigned int len;
length = MIN(length, rb->size - rb->in + rb->out);
/* first put the data starting from fifo->in to buffer end */
len = MIN(length, rb->size - (rb->in & (rb->size - 1)));
memcpy(rb->buffer + (rb->in & (rb->size - 1)), buffer, len);
/* then put the rest (if any) at the beginning of the buffer */
memcpy(rb->buffer, buffer + len, length - len);
rb->in += length;
return length;
}
unsigned int ring_buffer_overwrite(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length)
{
unsigned int len;
unsigned int unavail;
unavail = rb->size - rb->in + rb->out;
if (length < rb->size)
{
if (unavail < length)
{
ring_buffer_discard(rb, length - unavail);
}
/* first put the data starting from fifo->in to buffer end */
len = MIN(length, rb->size - (rb->in & (rb->size - 1)));
memcpy(rb->buffer + (rb->in & (rb->size - 1)), buffer, len);
/* then put the rest (if any) at the beginning of the buffer */
memcpy(rb->buffer, buffer + len, length - len);
rb->in += length;
}
else
{
memcpy(rb->buffer, buffer + length - rb->size, rb->size);
rb->in = rb->size;
rb->out = 0;
}
return length;
}
代碼解釋
運作示意圖如下:
1、隊列回環,索引不回環。(計算緩沖區有效數友善)
寫入一個資料,in隻加1;讀取一個資料,out隻加1;此時in始終>=out。即:索引不做in=(in+1)%size or out=(out+1)%size的回環操作(隻有資料實際寫入or讀出緩沖區時,才做隊列回環操作:buffer[(in+1)%size]=value,*value=buffer[(out+1)%size]),這樣緩沖區内有效資料的個數就直接為in-out,不需要分情況讨論。
2、取餘%操作轉化為與&操作。(&操作比%操作效率高得多)
設計緩沖區的大小始終為2的次幂,這樣a%size的取餘%操作可轉化為a&(size-1)的與&操作,&的運算效率比%高得多。
3、程式線性化,無任何if-else分支。(無任何分支,cache命中率更高)
環形緩沖區常用的兩個函數:
unsigned int ring_buffer_read(struct ring_buffer *rb, unsigned char *buffer, unsigned int length);
unsigned int ring_buffer_write(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length);
無任何if-else分支,程式完全線性化,cache命中率更高。
4、使用memcpy而非逐位元組操作,且memcpy會針對處理器架構做優化,效率更高
可讀寫多個位元組,讀寫操作由memcpy實作;一般memcpy會針對處理器架構進行優化,甚至使用彙編實作,效率非常高。
5、并發無鎖(一個生産者線程和一個消費者線程同時作用于環形緩沖區時,無需加鎖)
unsigned int ring_buffer_read(struct ring_buffer *rb, unsigned char *buffer, unsigned int length);
unsigned int ring_buffer_write(struct ring_buffer *rb, const unsigned char *buffer, unsigned int length);
讀寫函數各隻操作自己的out、in索引;當隻有一個生産者線程和一個消費者線程同時作用于環形緩沖區時,可實作并發無鎖。
6、in和out溢出時會自回環到0,無需做溢出判斷處理
in、out索引随着寫入、讀取 雖然一直在增加,但是當其增加至unsigned int的上限時,會由0xFFFF FFFF自動溢出為0x0000 0000,形成溢出自回環至0,且上述所有跟in、out涉及的表達式均成立。
測試程式
附上測試程式如下:
#include "ring_buffer.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main(int argc, char* arg[])
{
unsigned int i = 0;
unsigned int n, ret;
char exit_q;
unsigned char buff[100];
printf("Please enter the size of ring buffer that you want to alloc: ");
scanf("%d", &i);
struct ring_buffer *ring_buffer_st = ring_buffer_init(i);
if (NULL == ring_buffer_st)
return -1;
printf("\n");
while (1)
{
memset(buff, 0, 100);
i = 0;
printf("Please enter the number of data you want to write: ");
scanf("%d", &i);
printf("Please input the data that you want to write: ");
for (n = 0; n < i; n++)
scanf("%d", &buff[n]);
ret = ring_buffer_overwrite(ring_buffer_st, buff, i);
printf("\t ring_buffer_overwrite return value = %d \n", ret);
printf("\t ------------------------\n");
printf("\t | ");
for (i = 0; i < ring_buffer_st->size; i++)
printf("%u ", ring_buffer_st->buffer[i]);
printf("\n");
printf("\t | out=%u, in=%u, \n", ring_buffer_st->out, ring_buffer_st->in);
printf("\t ------------------------\n");
ret = ring_buffer_avail(ring_buffer_st);
printf("\t ring_buffer_avail return value = %d \n\n", ret);
memset(buff, 0, 100);
i = 0;
printf("Please enter the number of data you want to read: ");
scanf("%d", &i);
ret = ring_buffer_read(ring_buffer_st, buff, i);
printf("The dats is: ");
for (n = 0; n < ret; n++)
printf("%d ", buff[n]);
printf("\n");
printf("\t ring_buffer_read return value = %d \n", ret);
printf("\t ------------------------\n");
printf("\t | ");
for (i = 0; i < ring_buffer_st->size; i++)
printf("%u ", ring_buffer_st->buffer[i]);
printf("\n");
printf("\t | out=%u, in=%u, \n", ring_buffer_st->out, ring_buffer_st->in);
printf("\t ------------------------\n");
ret = ring_buffer_avail(ring_buffer_st);
printf("\t ring_buffer_avail return value = %d \n\n", ret);
printf("You can enter Q to exit the program ...\n");
//fflush(stdin); // For Windows
setbuf(stdin, NULL); // For Linux
exit_q = getchar();
if (exit_q == 'q' || exit_q == 'Q')
break;
}
ring_buffer_exit(ring_buffer_st);
ring_buffer_st = NULL;
}