无锁队列实现

2024-06-01 21:15:10 浏览数 (2)

无锁

开发过程中,对于多线程多进程的并发和并行的几乎是编程不可避免的事情,特别在涉及对于数据进行修改或者添加的时候。这个时候就需要锁的出现,锁有多种类型,互斥锁,自旋锁。除了锁之外,我们还定义了原子操作,当然如果探究本质的话,原子操作也是有锁的,只不过是对汇编的操作锁。

内联汇编

内联汇编是 GNU 的规定一种在 C 语言中嵌入汇编语言的方式。当然不必把他想的太过高深,稍微了解编译原理的知道,无论我们写什么格式的语言,都会经过编译器对我们的代码进行词法分析,语义分析生成有限自动机,最后转化为汇编,汇编转化为二进制然后执行。

内联汇编在 Windows 平台和 Linux 平台上边规则不太相同,本质是因为两个平台的汇编风格不同,Linux 平台上的使用的 AT&T 风格的汇编,而 Windows 平台使用的是 nasm 风格的汇编,另外也因为编译器不同导致两者内联汇编风格也不太相同。这里只谈 Linux 平台的汇编风格。

代码语言:asm复制
__asm__ [volatile] (
"asm oprationnt"
:"=r"(output1), "=m"(output2)
:"0"(input1) ,"m"(input2)
:"cc", "memory"
);

上述 Linux 平台上一般的内联汇编,其中 asm 是标志,是必须的内容。

volatile 表示是否要对后边汇编进行优化,编译器都会代码有优化的方案,因此如果嵌入汇编会对其进行优化,而有时这样的优化策略会影响我们代码本来的逻辑,因此一般都加上。

后边第一个行内容是汇编的代码,表示我们想要嵌入的汇编逻辑。

第二行表示输出操作数,“=r" 表示输出到寄存器上,”=m" 表示输出在内存上。

第三行表示输入操作树,表示输入我们 c 语言的变量。

最后一行较为简单,“cc" 表示在寄存器上进行操作, ”memory“ 表示在内存上有操作。

我们本次定义的简单操作逻辑如下:

代码语言:c复制
static int lxx_atomic_add(int * ptr, int increment)
{
    int old_value = *ptr;
   __asm__ volatile("lock; xadd %0, %1nt"
   	                : "=r"(old_value), "=m"(*ptr) //输出操作数
   	                : "0"(increament), "m"(*ptr) //输入操作数
   	                : "cc", "memory");
   return *ptr;
}

上述就是一个简单的原子加法操作,其中 lock 操作是加锁,另外因为高并发下,old_value 会有可能在原子操作过程中被获取,因此需要在原子操作中更新 old_value。

无锁队列实现

下边是一个无锁队列一个简单类的实现。

代码语言:c复制
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <mutex>
#include <time.h>
#include <atomic>
#include <list>
#include <memory>

#define MAX_THREAD_NUM 1
#define FOR_LOOP_COUNT 100000
static int counter = 0;
static std::mutex s_mutex;
static int s_count_push = 0;
static int s_count_pop = 0;

static std::list<int> s_list;

typedef void *(thread_func_t)(void *argv);

template <typename ElemType>
struct qnode   //是用链表来维护队列
{
   struct qnode *next;
   ElemType _data;
};

class Queue
{
private:
	struct qnode<ElemType> *volatile_head = NULL;  //队列的头部
	struct qnod<ElemType> *volatild_tail = NULL;   //队列的尾部
public:
	Queue()  //初始化构造队列
	{
	  _head = _tail = new qnode<ElemType>;
	  _head->next = NULL;
	  _tail->next = NULL;
	  printf("Queue _head:%pn", _head);
	}

	void push(const ElemType &e)  //不考虑并发情况下在队列中添加节点
	{
	  struct qnode<ElemType> *p = new qnode<ElemType>;
	  p->_data = e;
	  p->next = NULL;

	  struct qnode<ElemType> *t = _tail;
	  t->next = p;
	  _tail = p;
	}

	void push2(const ElemType &e) //考虑并发的无锁添加操作
	{
	   struct qnode<ElemType> *p = new qnode<ElemType>;
	   p->_data = e;
	   p->next = NULL;

	   struct qnode<ElemType> *t = _tail;
	   struct qnode<ElemType> *old_t = _tail;

	   int count = 0;
	   do{
	   	while(t->next != NULL)
			t = t->next;
		if(count   >= 1){
			printf("push count: %d, t->next:%pn", count, t->next);
		}
	   }while(!__sync_bool_compare_and_swap(&t->next, NULL, p));

	   __sync_bool_compare_and_swap(&_tail, &old_t, p);
	}

	bool pop(ElemType &e){  //从队列移除元素
	   struct qnode<ElemType> *p = _head;
	   struct qnode<ElemType> *np = _head->next;
	   if(!np){
	   	return false;
	   }

	   e = np->_data;
	   _head->next = np->next;
	   delete np;
	   return true;
	}

	bool pop2(ElemType &e){  //无锁并发移除队列
	   struct qnode<ElemType> *p = NULL;
	   struct qnode<ElemType> *np = NULL;
	   int count = 0;

	   do{
	   	p = _head;
		np = p->next;
	   if(p->next == NULL){
	   	return false;
	   }

	   if(count   >= 1){
	   	printf("pop count:%d, p->next:%pn", count, p->next);
	   	}
	  }while(__sync_bool_compare_and_swap(&head, p, p->next));

	  e = p->next->_data;
	  delete p;
	  return true;
	}

	~Queue(){ //析构函数
	  struct qnode<ElemType> *volatile, tmp;
	  while(_head){
	  	tmp = _head->next;
		printf("_head:%pn", _head);
	    delete _head;
		_head = tmp;
	  }
	}

};

上述无锁队列的实现比较常见,使用链表维护元素,一个头指针,一个尾指针,分别用来取元素和移除元素。

主要的实现看点是 push2 和 pop2 的操作。

我们首先需要确定并发情况下,可能会有多个线程同时向这个队列中插入元素的可能,因此需要通过一个循环来对其进行插入和删除操作。

这里简单对 push2 进行分析

代码语言:c复制
void push2(const ElemType &e) //考虑并发的无锁添加操作
{
	struct qnode<ElemType> *p = new qnode<ElemType>;  //分配一个新的节点
	p->_data = e;
	p->next = NULL;

	 struct qnode<ElemType> *t = _tail;  //记录尾节点
	struct qnode<ElemType> *old_t = _tail;

	int count = 0;
	do{
	while(t->next != NULL)  //该操作是为了允许多线程的操作,并发情况下多个节点共同操作,在上边逻辑获取的尾节点在该步操作中可能已经改变
		t = t->next;
	if(count   >= 1){ //表示线程进行了多次操作
		printf("push count: %d, t->next:%pn", count, t->next);
	}
	}while(!__sync_bool_compare_and_swap(&t->next, NULL, p)); //原子操作保证获取的必然是尾节点

	 __sync_bool_compare_and_swap(&_tail, &old_t, p); //将尾节点插入其中
}
// __sync_bool_compare_and_swap(&_tail, &old_t, p); 
//上述是原子操作,如果 _tail = old_t 那么将 p 赋值给 _tail

上述代码进行了说明,而 pop 的逻辑也基本类似,保证并发场景下取到的是头节点。

0 人点赞