以下是RUST标准库mpsc的queue代码分析, 代码路径:library/std/src/mpsc/mpsc_queue.rs 几乎是见到的最简单的一个支持多线程写,单线程读的队列数据结构了。
代码语言:javascript复制//以下是简单的FIFO的队列实现
pub enum PopResult<T> {
//返回队列成员
Data(T),
//队列为空
Empty,
//队列的一致性出错
Inconsistent,
}
//节点结构
struct Node<T> {
//next指针,利用原子指针实现多线程的Sync,值得牢记
next: AtomicPtr<Node<T>>,
value: Option<T>,
}
/// 能够被多个线程操作的队列
pub struct Queue<T> {
//利用原子指针操作实现多线程的Sync,极大简化了代码
head: AtomicPtr<Node<T>>,
//从后面的代码看,这里实际上是队列的头部,这个Queue的代码搞得奇怪
tail: UnsafeCell<*mut Node<T>>,
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> Node<T> {
unsafe fn new(v: Option<T>) -> *mut Node<T> {
//申请堆内存后,将堆内存的指针提取出来
Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
}
}
impl<T> Queue<T> {
pub fn new() -> Queue<T> {
let stub = unsafe { Node::new(None) };
//生成一个空元素的节点列表
Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
}
//在头部
pub fn push(&self, t: T) {
unsafe {
let n = Node::new(Some(t));
//换成C的话,就是head->next = n; head = n
//对于空队列来说,是tail = head; head->next = n; head = n;
//现在tail实际上是队列头部,head是尾部。tail的next是第一个有意义的成员
let prev = self.head.swap(n, Ordering::AcqRel);
//要考虑在两个赋值中间加入了其他线程的操作是否会出问题,
//这里面有一个复杂的分析,
//假设原队列为head, 有两个线程分别插入新节点n,m
//当n先执行,而m在这个代码位置插入,则m插入前prev_n = pre_head, head = n
//m插入后,prev_m = n, head = m。如果n先执行下面的语句,执行完后
// pre_head->next = n, n->next = null,然后m执行完下面语句
// pre_head->next = n, n->next = m, head = m,队列是正确的。
// 如果m先执行,执行完后 pre_head->next = null, n->next = m, head = m;
// 然后n执行,执行完成后 pre_head->next = n, n->next = m, head =m, 队列是正确的。
// 换成多个线程实际上也一样是正确的。这个地方处理十分巧妙,这是系统级编程语言的魅
//力, 当然,实际上是裸指针编程的魅力
(*prev).next.store(n, Ordering::Release);
}
}
//仅有一个线程在pop
pub fn pop(&self) -> PopResult<T> {
unsafe {
//tail实际上是队列头,value是None
let tail = *self.tail.get();
//tail的next是第一个有意义的成员
let next = (*tail).next.load(Ordering::Acquire);
//next如果为空,说明队列是空队列
if !next.is_null() {
//此处原tail会被drop,tail被赋成next
//因为push只可能改变next,所以这里不会有线程冲突问题
//这个语句完成后,队列是完整及一致的
*self.tail.get() = next;
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
//将value的所有权转移出来,*next的value又重新置为None
//当tail == head的时候 就又都是stub了
let ret = (*next).value.take().unwrap();
//恢复Box,以便以后释放堆内存
let _: Box<Node<T>> = Box::from_raw(tail);
return Data(ret);
}
//判断是否出错
if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
//空队列的stub也要释放
let mut cur = *self.tail.get();
while !cur.is_null() {
let next = (*cur).next.load(Ordering::Relaxed);
//恢复Box并消费掉,释放堆内存
let _: Box<Node<T>> = Box::from_raw(cur);
cur = next;
}
}
}
}
以上摘录自:深入RUST标准库内核
https://github.com/Warrenren/inside-rust-std-library/