ringbuffer是什么_Buffer

2022-10-01 10:33:04 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

Ring Buffer的高级用法(类似内核KFIFO)

环形缓冲区(ring buffer),环形队列(ring queue) 多用于2个线程之间传递数据,是标准的先入先出(FIFO)模型。一般来说,对于多线程共享数据,需要使用mutex来同步,这样共享数据才不至于发生不可预测的修改/读取,然而,mutex的使用也带来了额外的系统开销,ring buffer/queue 的引入,就是为了有效地解决这个问题,因其特殊的结构及算法,可以用于2个线程中共享数据的同步,而且必须遵循1个线程push in,另一线程pull out的原则。

线程 A 媒介 线程 B data in –> ring buffer/queue –> data out

参考代码

代码参考Linux内核Kfifo.

数据结构

数据结构中定义的缓存区大小一定要是2的n,当然也可以用动态分配来分配缓存区的大小,但是使用该高级用法一定要遵循分配的缓存区大小是2的n次方;

代码语言:javascript复制
#define MIN(a, b) (((a) < (b)) ? (a) : (b)) /* 取a和b中最小值 */

#define RING_BUFFER_SIZE       4096	//大小一定要为2的n次方才能使用该高级用法 

typedef struct {
    char buffer[RING_BUFFER_SIZE];  /* 缓冲区 ,大小一定要为2的n次方才能使用该高级用法 */
    unsigned int size;              /* 大小  注意要用unsigned类型*/
    unsigned int in;                /* 入口位置  注意要用unsigned类型*/
    unsigned int out;               /* 出口位置  注意要用unsigned类型*/
} RingBuffer_t;

数据入队操作先来看数据入队的操作,分以下几种情况分析: 1、ring_buf_p->in 、ring_buf_p->out均小于size; 只有开始使用循环队列的阶段才会是这种情况, 先分析size = MIN(size, ring_buf_p->size – ring_buf_p->in ring_buf_p->out);这句代码; 代码补全为size = MIN(size, ring_buf_p->size – (ring_buf_p->in – ring_buf_p->out)); 由于ring_buf_p->in为入指针,ring_buf_p->out为出指针,则(ring_buf_p->in – ring_buf_p->out)即为循环缓存区已经被使用的大小,而 ring_buf_p->size – (ring_buf_p->in – ring_buf_p->out)即为循环缓存区剩余未使用的大小,与即将要写入的数据大小取二者中较小的,保证填入的数据不会出现越界或覆盖原有的数据。 我们再看看len = MIN(size, ring_buf_p->size – (ring_buf_p->in & (ring_buf_p->size – 1)));这条语句, 有的人可能不太理解 (ring_buf_p->in & (ring_buf_p->size – 1)))是什么意思, 其实就是和ring_buf_p->in % ring_buf_p->size 的作用是一样的,就是取余; 但是 (ring_buf_p->in & (ring_buf_p->size – 1)))的代码执行效率要比ring_buf_p->in % ring_buf_p->size高很多, 在一下对实时性要求很高的使用场景下,代码的执行效率是要求很苛刻的;

这是又要分两种情况讨论,

第一种size小于等于ring_buf_p->size – (ring_buf_p->in & (ring_buf_p->size – 1)); 这说明循环缓存区的后半部分的未使用大小足够放下要写入的数据大小,数据只要一次就能完全写完进循环缓存区; 第二种size大于ring_buf_p->size – (ring_buf_p->in & (ring_buf_p->size – 1)); 这说明循环缓存区的后半部分的未使用大小无法放下要写入的数据大小,数据只要分两次才能写入循环缓存区; 第一次写入将后半部分剩余的缓存区大小使用完,第二次写入将剩余的未写入的数据大小从循环缓存区的首地址开始写入 (这也就是循环缓冲区的作用,使用较小的实际物理内存实现了线性缓存);

2、ring_buf_p->in大于size 、ring_buf_p->out小于size; 或 ring_buf_p->in 、ring_buf_p->out均大于于size; 这种情况才是体现改高级用法的时候,数据的写入和读取导致入指针域出指针的大小超过size的大小,

先说明数据结构定义时为什么要要求指针和大小的数据类型一定要为unsigned,因为在 本高级用法中,没有用size的大小限制指针的大小的,入指针与出指针的大小均可以达到对于数据大小的最大值,

而我们知道无符号类型的数据,大小超过最大值时,会出现溢出,导致数值又会从零开始变化,

比如unsigned char, 254 = 1,就是255 ,而255在计算机中的二进制存储为11111111,所以255 1,

在计算机的存储就会变成100000000,而由于unsigned char只有八位,就会出现“溢出”的现象,所以255 1的结果为0, 高级用法就是利用了无符号类型的数据特性。而至于为什么要使用大小要使用2的n次方的原因也是因为, 所有的无符号数据类型的数值个数为2的n次方个, 例如我们使用的指针类型为unsigned char, size的大小也使用2的8次方,也就是256, unsigned char的数据范围为0~255正好与数据中的每个字节一一对应。 而当使用的size大小为2的7次方,也就是128时,size的也是可以整除unsigned char可以数据范围个数的, 所以unsigned char的是任一个数对size可以取余都会落在每一个直接所对应的所有上。 而且size使用2的n次方可以使用(ring_buf_p->in & (ring_buf_p->size – 1))取余这样的高级用法。

入队列

代码语言:javascript复制
unsigned int RingBufferPut(RingBuffer_t *ring_buf_p, void *buffer, hd_u32_t size)
{
   unsigned int len = 0;
   if(ring_buf_p == NULL ||  buffer == NULL) {
       return -1;
   }
   /*得到实际写入的数据长度*/
   size = MIN(size, ring_buf_p->size - ring_buf_p->in   ring_buf_p->out);
   /*min(空闲空间大小,从real_in开始到缓冲区结尾的空间) -------------*/
   /* first put the data starting from fifo->in to buffer end */
   len  = MIN(size, ring_buf_p->size - (ring_buf_p->in & (ring_buf_p->size - 1)));
   memcpy(ring_buf_p->buffer   (ring_buf_p->in & (ring_buf_p->size - 1)), buffer, len);
   /* then put the rest (if any) at the beginning of the buffer */
   if (size - len > 0) {
       memcpy(ring_buf_p->buffer, (char *)buffer   len, size - len);
   }
   ring_buf_p->in  = size;
   return size;
}

出队列

代码语言:javascript复制
unsigned int RingBufferGet(RingBuffer_t *ring_buf_p, void *buffer, hd_u32_t size)
{
  unsigned int len = 0;
  if(ring_buf_p == NULL || buffer == NULL) {
  	return -1;
  }
  size  = MIN(size, ring_buf_p->in - ring_buf_p->out);
  /* first get the data from fifo->out until the end of the buffer */
  len = MIN(size, ring_buf_p->size - (ring_buf_p->out & (ring_buf_p->size - 1)));
  memcpy(buffer, ring_buf_p->buffer   (ring_buf_p->out & (ring_buf_p->size - 1)), len);
  /* then get the rest (if any) from the beginning of the buffer */
  if (size - len > 0) {
      memcpy((char *)buffer   len, ring_buf_p->buffer, size - len);
  }
  ring_buf_p->out  = size;
  return size;
}

获取队列中可读数据的大小

代码语言:javascript复制
unsigned int RingBufferLen(const RingBuffer_t *ring_buf_p)
{
   return (ring_buf_p->in - ring_buf_p->out);
}

清空循环队列

代码语言:javascript复制
void RingBufferClear(RingBuffer_t *ring_buf_p)
{
  ring_buf_p->in = 0;
  ring_buf_p->out = 0;
}

1.ring buffer 数据结构

struct _RingB { SI32 r_cursor; SI32 w_cursor; SI32 tr_cursor; SI32 tw_cursor; SI32 length; char data[0]; }; ring buffer主要用于存储一段连续的数据块,例如网络接收的数据。 r_cursor 读指针,只在线程B中才能被修改,对于线程A,它是readonly的 tr_cursor 辅助读指针,只在线程B中才能被引用,用于计算当前有多少可读数据 w_cursor 写指针,只在线程A中才能被修改,对于线程B,它是readonly的 tw_cursor 辅助写指针,只在线程A中才能被引用,用于计算当前有多少空闲位置可以写入数据 length 缓冲区长度 data 缓冲区实体 需要注意的是,在data中,需要保留1byte的位置,因为: a. push数据时,理论上来说,我们应该是从 thiz->tw_cursor = (thiz->w_cursor 1) % thiz->length; 开始写的,如果此时 thiz->tw_cursor == thiz->r_cursor, 我们认为此时缓冲区已满 b. pull数据时,也是相同的道理,thiz->r_cursor == thiz->w_cursor,我们认为此时缓冲区无数据可读 也就是说,r_cursor == w_cursor 是一个特殊的位置,所以我们需要在data里面保留1byte。 具体原理可参考 IDirectSoundBuffer8

2. ring queue 数据结构

struct _RingQ { SI32 r_cursor; SI32 w_cursor; SI32 length; void* data[0]; };

ring buffer主要用于存储指针,这个指针指向共享数据的entry. r_cursor 读指针,只在线程B中才能被修改,对于线程A,它是readonly的 w_cursor 写指针,只在线程A中才能被修改,对于线程B,它是readonly的 length 缓冲区长度 data 缓冲区实体 与ring buffer,在data中,同样需要保留1byte的位置,原理与ring buffer相似,只是tw_cursor,tw_cursor 用临时变量代替而已。 3、can通信

在CAN通信卡设备驱动程序中,为了增强CAN通信卡的通信能力、提高通信效率,根据CAN的特点,使用两级缓冲区结构,即直接面向CAN通信卡的收发缓 冲区和直接面向系统调用的接收帧缓冲区。 通讯中的收发缓冲区一般采用环形队列(或称为FIFO队列),使用环形的缓冲区可以使得读写并发执行,读进程和写进程可以采用“生产者和消费者”的模型来 访问缓冲区,从而方便了缓存的使用和管理。然而,环形缓冲区的执行效率并不高,每读一个字节之前,需要判断缓冲区是否为空,并且移动尾指针时需要进行“折行处理”(即当指针指到缓冲区内存的末尾时,需要新将其定向到缓冲区的首地址);每写一个字节之前,需要判断缓区是否为,并且移动尾指针时同样需要进行“ 折行处理”。程序大部分的执行过程都是在处理个别极端的情况。只有小部分在进行实际有效的操作。这就是软件工程中所谓的“8比2”关系。结合CAN通讯实际情况,在本设计中对环形队列进行了改进,可以较大地提高数据的收发效率。 由于CAN通信卡上接收和发送缓冲器每次只接收一帧CAN数据,而且根据CAN的通讯协议,CAN控制器的发送数据由1个字节的标识符、一个字节的RTR 和DLC位及8个字节的数据区组成,共10个字节;接收缓冲器与之类似,也有10个字节的寄存器。所以CAN控制器收的数据是短小的定长帧(数据可以不满 8字节)。 于是,采用度为10字节的数据块业分配内存比较方便,即每次需要内存缓冲区时,直接分配10个字节,由于这10个字节的地址是线性的,故不需要进行“折行”处理。更重要的是,在向缓冲区中写数据时,只需要判断一次是否有空闲块并获取其块首指针就可以了,从而减少了重复性的条件判断,大大提高了程序的执行效率;同样在从缓冲队列中读取数据时,也是一次读取10字节的数据块,同样减少了重复性的条件判断。 在CAN卡驱动程序中采用如下所示的称为“Block_Ring_t”的数据结构作为收发数据的缓冲区:

typedef struct {

long signature;

unsigned char *head_p;

unsigned char *tail_p;

unsigned char *begin_p;

unsigned char *end_p;

unsigned char buffer [512];

int usedbytes;

}Block_Ring_t;

该数据结构在通用的环形队列上增加了一个数据成员usedbytes,它表示当前缓冲区中有多少字节的空间被占用了。使用usedbytes,可以比较方 便地进行缓冲区满或空的判断。当usedbytes=0时,缓冲区空;当usedbytes=BLOCK_RING_BUFFER_SIZE时,缓冲区 满。 本驱动程序除了收发缓冲区外,还有一个接收帧缓冲区,接收帧队列负责管理经Hilon A协议解包后得到的数据帧。由于有可能要同接收多个数据帧,而根据CAN总线遥通信协议,高优先级的报文将抢占总线,则有可能在接收一个低优先级且被分为 好几段发送的数据帧时,被一个优先级高的数据帧打断。这样会出现同时接收到多个数据帧中的数据包,因而需要有个接收队列对同时接收的数据帧进行管理。 当有新的数据包到来时,应根据addr(通讯地址),mode(通讯方式),index(数据包的序号)来判断是否是新的数据帧。如果是,则开辟新的 frame_node;否则如果已有相应的帧节点存地,则将数据附加到该帧的末尾;在插入数据的同时,应该检查接收包的序号是否正确,如不正确将丢弃这包 数据。 每次建立新的frame_node时,需要向frame_queue申请内存空间;当frame_queue已满时,释放掉队首的节点(最早接收的但未完 成的帧)并返回该节点的指针。 当系统调用读取了接收帧后,释放该节点空间,使设备驱动程序可以重新使用该节点。

代码语言:javascript复制
#ifndef _RING_BUF_H_
#define _RING_BUF_H_
/*环形缓冲区管理器*/
typedef struct
{
unsigned char *buf;    /*环形缓冲区        */
unsigned int size;     /*环形缓冲区        */
unsigned int front;    /*头指针            */
unsigned int rear;     /*尾指针            */
}ring_buf_t;
/*-------------------------外部接口声明----------------------------*/
int ring_buf_create(ring_buf_t *r,unsigned char *buf,unsigned int size);
void ring_buf_clr(ring_buf_t *r);
unsigned int ring_buf_len(ring_buf_t *r);
unsigned int ring_buf_put(ring_buf_t *r,unsigned char *buf,unsigned int len);
unsigned int ring_buf_get(ring_buf_t *r,unsigned char *buf,unsigned int len);
#endif
/******************************************************************************
*
* 文件名称: ringbuffer.c
* 摘     要:环形缓冲区           
* 参    考: linux/kfifo    
* 当前版本: 1.0
* 作     者: w
* 完成日期: 2016-05-30
*             
******************************************************************************/
#include "ringbuffer.h"
#include <string.h>
#include <stddef.h>
#include <assert.h>
#define min(a,b) ( (a) < (b) )? (a):(b)     
/******************************************************************************
*函数名   :ring_buf_init
*函数功能 :构造一个空环形缓冲区
*输入参数 :r 环形缓冲区控制块
*返回值   :非0表示成功 
*****************************************************************************/
int ring_buf_create(ring_buf_t *r,unsigned char *buf,unsigned int len)
{
r->buf = buf;
r->size = len;
r->front = r->rear = 0;
return buf == NULL;
}
/**********************************************************************
*函数名   :ring_buf_clr
*函数功能 :清空环形缓冲区 
*输入参数 :r - 待清空的环形缓冲区
*返回值   :None 
*********************************************************************/
void ring_buf_clr(ring_buf_t *r)
{
r->front = r->rear = 0;
}
/**********************************************************************
*函数名   :ring_buf_len
*函数功能 :计算环形缓冲区容量 (字节为单位)
*输入参数 :r.环形缓冲区控制块
*返回值   :环形缓冲区中有效字节数 
*********************************************************************/
unsigned int ring_buf_len(ring_buf_t *r)
{
return r->rear - r->front;
}
/**********************************************************************
*函数名   :ring_buf_put
*函数功能 :将指定长度的数据放到环形缓冲区中 
*输入参数 :buf - 数据缓冲区
*          len - 缓冲区长度 
*返回值   :实际放到中的数据 
*********************************************************************/
unsigned int ring_buf_put(ring_buf_t *r,unsigned char *buf,unsigned int len)
{
unsigned int i;
unsigned int space;                      
space = r->size   r->front - r->rear;
len = min(len , space);                  /*得到实际写入的数据长度*/
/*min(空闲空间大小,从real_in开始到缓冲区结尾的空间) -------------*/
i = min(len, r->size - r->rear % r->size);   
/*
* 当len > l时,拷贝buffer中剩余的内容
*而剩余的大小为len - l 
*/            
memcpy(r->buf   r->rear % r->size, buf, i); 
memcpy(r->buf, buf   i, len - i);       
r->rear  = len;     
return len;
}
/**********************************************************************
*函数名   :rueueGet
*函数功能 :从环形缓冲区中读取指定长度的数据 
*输入参数 :len - 读取长度 
*输出参数 :buf - 输出数据缓冲区
*返回值   :实际读取长度 
*********************************************************************/
unsigned int ring_buf_get(ring_buf_t *r,unsigned char *buf,unsigned int len)
{
unsigned int i;
unsigned int space;    
space = r->rear - r->front;                     
len = min(len , space);                                
i = min(len, r->size - r->front % r->size );    
memcpy(buf, r->buf   r->front % r->size, i);    
memcpy(buf   i, r->buf, len - i);   
r->front  = len;        
return len;
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/195065.html原文链接:https://javaforall.cn

0 人点赞