java实现无锁队列

2023-12-25 19:06:34 浏览数 (1)

写作目的

说到无锁,其实就是用cas,不过我在百度上搜java实现无锁队列的文章其实不多,所以自己用cas和volatile实现一下,线程安全那是必须的。

无锁队列

代码语言:javascript复制
package untils;

import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.Data;
import sun.misc.Unsafe;

/**
 * Created on 2021-06-23
 */
public class NoLockQueue {


    private static final Unsafe unsafe;

    //头节点
    volatile Node head;
    //尾节点
    volatile Node tail;

    //头节点偏移量
    private static final Long headOffset;
    //尾节点偏移量
    private static final Long tailOffset;


    //当前队列的长度
    private AtomicInteger length = new AtomicInteger(0);

    //队列允许的最大长度
    private int maxSize = 0;


    static {
        try {
            //获取成员变量
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            //设置为可访问
            field.setAccessible(true);
            //是静态字段,用null来获取Unsafe实例
            unsafe = (Unsafe) field.get(null);
            //设置头节点变量在类中的偏移值
            headOffset = unsafe.objectFieldOffset(NoLockQueue.class.getDeclaredField("head"));
            //设置尾节点变量在类中的偏移值
            tailOffset = unsafe.objectFieldOffset(NoLockQueue.class.getDeclaredField("tail"));
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
            throw new Error(e);
        }
    }

    public NoLockQueue() {

        this.maxSize = Integer.MAX_VALUE;
        //初始化节点
        head = tail = new Node();
    }


    public NoLockQueue(int maxSize) {
        this.maxSize = maxSize;
        //初始化节点
        head = tail = new Node();
    }


    /**
     * 入队
     */
    public void enQueue(int value) {

        //创建新节点
        Node newNode = new Node();
        newNode.setValue(value);


        while (true) {

            //获取尾节点
            Node oldTail = this.tail;
            if (length.get() < maxSize && oldTail.casNext(null, newNode)) {
                System.out.println(Thread.currentThread().getName()   "进队列:"   value);
                unsafe.compareAndSwapObject(this, tailOffset, oldTail, newNode);
                length.incrementAndGet();
                break;
            }

        }


    }


    /**
     * 出队
     */
    public void dequeue() {

        while (true) {
            //如果没有数据
            if (length.get() <= 0) {
                continue;
            }

            //获取头节点
            Node oldHead = this.head;
            Node oldNext = oldHead.getNext();

            if (unsafe.compareAndSwapObject(this, headOffset, head, oldNext)) {
                System.out.println(Thread.currentThread().getName()   "出队列:"   head.getValue());
                length.decrementAndGet();
                break;
            }


        }


    }


}

@Data
class Node {

    //Unsafe类
    private static final Unsafe unsafe;

    //next变量的偏移量
    private static final Long nextOffset;


    //值
    private volatile int value;

    //next节点
    private volatile Node next;

    static {
        try {
            //获取成员变量
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            //设置为可访问
            field.setAccessible(true);
            //是静态字段,用null来获取Unsafe实例
            unsafe = (Unsafe) field.get(null);
            //获取state变量在类中的偏移值
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
            throw new Error(e);
        }
    }


    /**
     * cas的方式设置next的值
     * @param before 期望的值
     * @param after 修改的值
     * @return
     */
    public boolean casNext(Node before, Node after) {
        return unsafe.compareAndSwapObject(this, nextOffset, before, after);
    }


}

测试类

代码语言:javascript复制
package untils;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created on 2021-06-23
 */
public class Main {
    public static void main(String[] args) {

        NoLockQueue queue = new NoLockQueue(10);
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        Random random = new Random();
        
        for (int i = 0; i < 10; i  ) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    queue.enQueue(random.nextInt());
                }
            });
        }
        
        //        //判断入队的顺序
        //        System.out.println("----------------");
        //        try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
        //        Node p = queue2.head.getNext();
        //        while (p != null){
        //            System.out.println(p.getValue());
        //            p = p.getNext();
        //        }
        //
        //        System.out.println("----------------");


        for (int i = 0; i < 10; i  ) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    queue.dequeue();
                }
            });
        }


        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
        executorService.shutdown();


    }
}

小插曲

unsafe类的获取

其实当时参考的是AtomicInteger里获取unsafe方法

代码语言:javascript复制
private static final Unsafe unsafe = Unsafe.getUnsafe();

但是报错了,报错的原因竟然是 双亲委派模型

关于通过Unsafe.getUnsafe()方法拿Unsafe对象抛出SecurityException异常的原因 - 言午12138 - 博客园

那怎么获取unsafe类呢,如下所示,固定格式

代码语言:javascript复制
//获取成员变量
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            //设置为可访问
            field.setAccessible(true);
            //是静态字段,用null来获取Unsafe实例
            Unsafe unsafe = (Unsafe) field.get(null);

打印顺序不对,影响了代码的正确性

代码语言:javascript复制
/**
     * 出队
     */
    public void dequeue() {
        while (true) {
            //如果没有数据
            if (length.get() <= 0) {
                continue;
            }
            //获取头节点
            Node oldHead = this.head;
            Node oldNext = oldHead.getNext();

            if (unsafe.compareAndSwapObject(this, headOffset, head, oldNext)) {
                //正确的位置
                //System.out.println(Thread.currentThread().getName()   "出队列:"   head.getValue());
                length.decrementAndGet();
                //错误的位置
                System.out.println(Thread.currentThread().getName()   "出队列:"   head.getValue());
                break;
            }
        }
    }

一开始我在错误的位置打印,发现入队和出队的顺序不一样,后来换了一个位置试了一下,好了。最后还是分析一下为什么吧。

比如此时此刻 队列里有2个元素A和B。现在2个线程按照下面的顺序执行,其实理论上出队顺序是没有问题的,只不过后面的先打印了,给了一种先出队的错觉。

收获

其实JAVA 无锁队列/栈_meiyongdesan的博客-CSDN博客 这个里面使用AtomicReference实现的,主要想用他的cas;但是我感觉有些绕,所以就自己用unsafe类实现cas。

断断续续写了一天才写了一个demo,其实不亏,至少写出来了。

参考

JAVA 无锁队列/栈_meiyongdesan的博客-CSDN博客

说说Java的Unsafe类 - 简书

关于通过Unsafe.getUnsafe()方法拿Unsafe对象抛出SecurityException异常的原因 - 言午12138 - 博客园

ConcurrentLinkedQueue源码

0 人点赞