写作目的
说到无锁,其实就是用cas,不过我在百度上搜java实现无锁队列的文章其实不多,所以自己用cas和volatile实现一下,线程安全那是必须的。
无锁队列
代码语言:javascript复制package untils;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Data;
import sun.misc.Unsafe;
/**
* Created on 2021-06-23
*/
public class NoLockQueue {
private static final Unsafe unsafe;
//头节点
volatile Node head;
//尾节点
volatile Node tail;
//头节点偏移量
private static final Long headOffset;
//尾节点偏移量
private static final Long tailOffset;
//当前队列的长度
private AtomicInteger length = new AtomicInteger(0);
//队列允许的最大长度
private int maxSize = 0;
static {
try {
//获取成员变量
Field field = Unsafe.class.getDeclaredField("theUnsafe");
//设置为可访问
field.setAccessible(true);
//是静态字段,用null来获取Unsafe实例
unsafe = (Unsafe) field.get(null);
//设置头节点变量在类中的偏移值
headOffset = unsafe.objectFieldOffset(NoLockQueue.class.getDeclaredField("head"));
//设置尾节点变量在类中的偏移值
tailOffset = unsafe.objectFieldOffset(NoLockQueue.class.getDeclaredField("tail"));
} catch (Exception e) {
System.out.println(e.getLocalizedMessage());
throw new Error(e);
}
}
public NoLockQueue() {
this.maxSize = Integer.MAX_VALUE;
//初始化节点
head = tail = new Node();
}
public NoLockQueue(int maxSize) {
this.maxSize = maxSize;
//初始化节点
head = tail = new Node();
}
/**
* 入队
*/
public void enQueue(int value) {
//创建新节点
Node newNode = new Node();
newNode.setValue(value);
while (true) {
//获取尾节点
Node oldTail = this.tail;
if (length.get() < maxSize && oldTail.casNext(null, newNode)) {
System.out.println(Thread.currentThread().getName() "进队列:" value);
unsafe.compareAndSwapObject(this, tailOffset, oldTail, newNode);
length.incrementAndGet();
break;
}
}
}
/**
* 出队
*/
public void dequeue() {
while (true) {
//如果没有数据
if (length.get() <= 0) {
continue;
}
//获取头节点
Node oldHead = this.head;
Node oldNext = oldHead.getNext();
if (unsafe.compareAndSwapObject(this, headOffset, head, oldNext)) {
System.out.println(Thread.currentThread().getName() "出队列:" head.getValue());
length.decrementAndGet();
break;
}
}
}
}
@Data
class Node {
//Unsafe类
private static final Unsafe unsafe;
//next变量的偏移量
private static final Long nextOffset;
//值
private volatile int value;
//next节点
private volatile Node next;
static {
try {
//获取成员变量
Field field = Unsafe.class.getDeclaredField("theUnsafe");
//设置为可访问
field.setAccessible(true);
//是静态字段,用null来获取Unsafe实例
unsafe = (Unsafe) field.get(null);
//获取state变量在类中的偏移值
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception e) {
System.out.println(e.getLocalizedMessage());
throw new Error(e);
}
}
/**
* cas的方式设置next的值
* @param before 期望的值
* @param after 修改的值
* @return
*/
public boolean casNext(Node before, Node after) {
return unsafe.compareAndSwapObject(this, nextOffset, before, after);
}
}
测试类
代码语言:javascript复制package untils;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created on 2021-06-23
*/
public class Main {
public static void main(String[] args) {
NoLockQueue queue = new NoLockQueue(10);
ExecutorService executorService = Executors.newFixedThreadPool(20);
Random random = new Random();
for (int i = 0; i < 10; i ) {
executorService.submit(new Runnable() {
@Override
public void run() {
queue.enQueue(random.nextInt());
}
});
}
// //判断入队的顺序
// System.out.println("----------------");
// try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
// Node p = queue2.head.getNext();
// while (p != null){
// System.out.println(p.getValue());
// p = p.getNext();
// }
//
// System.out.println("----------------");
for (int i = 0; i < 10; i ) {
executorService.submit(new Runnable() {
@Override
public void run() {
queue.dequeue();
}
});
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
executorService.shutdown();
}
}
小插曲
unsafe类的获取
其实当时参考的是AtomicInteger里获取unsafe方法
代码语言:javascript复制private static final Unsafe unsafe = Unsafe.getUnsafe();
但是报错了,报错的原因竟然是 双亲委派模型
关于通过Unsafe.getUnsafe()方法拿Unsafe对象抛出SecurityException异常的原因 - 言午12138 - 博客园
那怎么获取unsafe类呢,如下所示,固定格式
代码语言:javascript复制//获取成员变量
Field field = Unsafe.class.getDeclaredField("theUnsafe");
//设置为可访问
field.setAccessible(true);
//是静态字段,用null来获取Unsafe实例
Unsafe unsafe = (Unsafe) field.get(null);
打印顺序不对,影响了代码的正确性
代码语言:javascript复制/**
* 出队
*/
public void dequeue() {
while (true) {
//如果没有数据
if (length.get() <= 0) {
continue;
}
//获取头节点
Node oldHead = this.head;
Node oldNext = oldHead.getNext();
if (unsafe.compareAndSwapObject(this, headOffset, head, oldNext)) {
//正确的位置
//System.out.println(Thread.currentThread().getName() "出队列:" head.getValue());
length.decrementAndGet();
//错误的位置
System.out.println(Thread.currentThread().getName() "出队列:" head.getValue());
break;
}
}
}
一开始我在错误的位置打印,发现入队和出队的顺序不一样,后来换了一个位置试了一下,好了。最后还是分析一下为什么吧。
比如此时此刻 队列里有2个元素A和B。现在2个线程按照下面的顺序执行,其实理论上出队顺序是没有问题的,只不过后面的先打印了,给了一种先出队的错觉。
收获
其实JAVA 无锁队列/栈_meiyongdesan的博客-CSDN博客 这个里面使用AtomicReference实现的,主要想用他的cas;但是我感觉有些绕,所以就自己用unsafe类实现cas。
断断续续写了一天才写了一个demo,其实不亏,至少写出来了。
参考
JAVA 无锁队列/栈_meiyongdesan的博客-CSDN博客
说说Java的Unsafe类 - 简书
关于通过Unsafe.getUnsafe()方法拿Unsafe对象抛出SecurityException异常的原因 - 言午12138 - 博客园
ConcurrentLinkedQueue源码