AQS-Exchanger源码学习

2023-02-28 16:21:43 浏览数 (1)

上文:AQS-semaphore&CyclicBarrier&CountDownLatch源码学习

源码下载:https://gitee.com/hong99/jdk8

Exchanger是什么?

exchanger是一个极少使用到的交换类,主要用于线程阻塞或者因为阻塞引起但任务又急于执行,这里候就可以进行交换。但是有一个非常的复杂点就是两个并发任务执行过程中交换数据,这一点是非常厉害的,可以看下下面的一些基础实现。

基础功能的学习

代码语言:javascript复制
package com.aqs;

import java.util.concurrent.Exchanger;

/**
 * @author: csh
 * @Date: 2022/12/17 10:43
 * @Description:Exchanger 学习
 */
public class ExchangerStudy {

    public static void main(String[] args) {
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        for (int i = 0; i < 10; i  ) {
            final int index=i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:Thread_" Thread.currentThread().getName() "下标为:" index);
                    try {
                        int newIndex = exchanger.exchange(index);
                        //等待100毫秒
                        Thread.sleep(100);
                        System.out.println("当前线程:Thread_" Thread.currentThread().getName() "原来下标为:" index "交换后下标为:" newIndex);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

    }
}

结果

代码语言:javascript复制
当前线程:Thread_Thread-2下标为:2
当前线程:Thread_Thread-1下标为:1
当前线程:Thread_Thread-4下标为:4
当前线程:Thread_Thread-3下标为:3
当前线程:Thread_Thread-5下标为:5
当前线程:Thread_Thread-6下标为:6
当前线程:Thread_Thread-0下标为:0
当前线程:Thread_Thread-7下标为:7
当前线程:Thread_Thread-8下标为:8
当前线程:Thread_Thread-9下标为:9
当前线程:Thread_Thread-4原来下标为:4交换后下标为:3
当前线程:Thread_Thread-6原来下标为:6交换后下标为:5
当前线程:Thread_Thread-0原来下标为:0交换后下标为:7
当前线程:Thread_Thread-9原来下标为:9交换后下标为:8
当前线程:Thread_Thread-8原来下标为:8交换后下标为:9
当前线程:Thread_Thread-7原来下标为:7交换后下标为:0
当前线程:Thread_Thread-5原来下标为:5交换后下标为:6
当前线程:Thread_Thread-2原来下标为:2交换后下标为:1
当前线程:Thread_Thread-1原来下标为:1交换后下标为:2
当前线程:Thread_Thread-3原来下标为:3交换后下标为:4

可以看到两个线程可以交换执行的下标。是比较厉害。接下来我们看看底层的源码实现。

源码学习

java.util.concurrent.Exchanger 源码实现时

代码语言:javascript复制
//交换机
public class Exchanger<V> {
    //避免伪共享 左移数据下标(获取内存偏移量)
    private static final int ASHIFT = 7;

    //节点最大的数组下标
    private static final int MMASK = 0xff;

    //用于递增,每次加一个seq
    private static final int SEQ = MMASK   1;

    //获取cpu的核数
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    //实际组数长度(线程数)
    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;

    //自旋次数 当cpu为单核时,该参为禁用
    private static final int SPINS = 1 << 10;

    //用于提供给其他线程的交换对象
    private static final Object NULL_ITEM = new Object();

    //用于超时的传递对象
    private static final Object TIMED_OUT = new Object();

    //交换节点 保存交换的数据
    @sun.misc.Contended static final class Node {
        int index; // 多槽数据索引
        int bound; // 上一次的边界
        int collides; // 记录边界范围内cas失败次数
        int hash; // 代表hash值 用于自旋优化
        Object item; // 节点带的数据
        volatile Object match; // 未来 配对成功 交换的数据
        volatile Thread parked; // 匹配的线程
    }
    //本地线程类实现初始化值
    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }

    // 存放node节点 保障线程安全
    private final Participant participant;

   //多槽数组
    private volatile Node[] arena;

    /**
     * 交换的槽位
     */
    private volatile Node slot;

    //上次记录
    private volatile int bound;

    //多槽位的交换实现(带过期时间)
    private final Object arenaExchange(Object item, boolean timed, long ns) {
        //获取多槽位数组
        Node[] a = arena;
        //获取当前线程node对象
        Node p = participant.get();
        for (int i = p.index;;) { // access slot at i
            int b, m, c; long j; // 初始化相关的交换变量
            //获取交换节点信息(先获取偏移地址)
            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT)   ABASE);
            //如果不为空(证明已经有线程) 进行交换
            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                //获取交换q的内容
                Object v = q.item; // release
                //将当前线程的内容赋值给q的match
                q.match = item;
                //获取被交换线程
                Thread w = q.parked;
                //不为空进行唤醒
                if (w != null)
                    U.unpark(w);
                //这个是交换后的值
                return v;
            }
            //槽位还没被占的场景
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                //交换对象值
                p.item = item; // offer
                //cas交换
                if (U.compareAndSwapObject(a, j, null, p)) {
                    //计算超时时间
                    long end = (timed && m == 0) ? System.nanoTime()   ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    for (int h = p.hash, spins = SPINS;;) {
                        //用来携带交换线程的数
                        Object v = p.match;
                        //已被交换 则清标识
                        if (v != null) {
                            U.putOrderedObject(p, MATCH, null);
                            p.item = null; // clear for next use
                            p.hash = h;
                            //交换成功
                            return v;
                        }
                        //判断自旋是否大于0
                        else if (spins > 0) {
                            //太复杂了这里 反正就是各各自旋次数获取 然后释放线程资源
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0) // initialize hash
                                h = SPINS | (int)t.getId();
                            else if (h < 0 && // approx 50% true
                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield(); // two yields per wait
                        }
                        //已有交换线程 准备数据中
                        else if (U.getObjectVolatile(a, j) != p)
                            spins = SPINS; // releaser hasn't set match yet
                        // //线程不挂起 不是 多槽 时间没结束
                        else if (!t.isInterrupted() && m == 0 &&
                                 (!timed ||
                                  (ns = end - System.nanoTime()) > 0L)) {
                            U.putObject(t, BLOCKER, this); // emulate LockSupport
                            p.parked = t; // minimize window
                            //注意这里 park则挂起线程
                            if (U.getObjectVolatile(a, j) == p)
                                U.park(false, ns);
                            p.parked = null;
                            U.putObject(t, BLOCKER, null);
                        }
                        //换槽位(原因可能一直没有线程) 逻辑有点复杂
                        else if (U.getObjectVolatile(a, j) == p &&
                                 U.compareAndSwapObject(a, j, p, null)) {
                            if (m != 0) // try to shrink
                                U.compareAndSwapInt(this, BOUND, b, b   SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            i = p.index >>>= 1; // descend
                            if (Thread.interrupted())
                                return null;
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break; // expired; restart
                        }
                    }
                }
                else
                    //获取槽位失败,先清空数据
                    p.item = null; // clear offer
            }
            //不在有效范围内,或者已经被其它线程抢了~
            else {
                //更新bound
                if (p.bound != b) { // stale; reset
                    p.bound = b;
                    p.collides = 0;
                    i = (i != m || m == 0) ? m : m - 1;
                }
                else if ((c = p.collides) < m || m == FULL ||
                         !U.compareAndSwapInt(this, BOUND, b, b   SEQ   1)) {
                    p.collides = c   1;
                    i = (i == 0) ? m : i - 1; // cyclically traverse
                }
                else
                    //往后挪动
                    i = m   1; // grow
                //更新下标
                p.index = i;
            }
        }
    }

    //单槽位交换方法
    private final Object slotExchange(Object item, boolean timed, long ns) {
        //获取当前线程携带的数据
        Node p = participant.get();
        //获取当前线程
        Thread t = Thread.currentThread();
        //判断中断不为中断
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;
        //自旋
        for (Node q;;) {
            //赋给q
            if ((q = slot) != null) {
                //cas交换对象并将slot置为null
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    //将当前的对象赋给交换对象的match
                    q.match = item;
                    //取出交换的线程
                    Thread w = q.parked;
                    //唤醒线程
                    if (w != null)
                        U.unpark(w);
                    return v;
                }
                // create arena on contention, but continue until slot null
                if (NCPU > 1 && bound == 0 &&
                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL   2) << ASHIFT];
            }
            //多槽交换不为空则退出
            else if (arena != null)
                return null; // caller must reroute to arenaExchange
            else {
                //把数据赋给当前node节点
                p.item = item;
                //进行交换数据 成功则退出
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                 //如果交换不成功则赋为空
                p.item = null;
            }
        }

        // await release
        //拿到节点hash
        int h = p.hash;
        //计算时间
        long end = timed ? System.nanoTime()   ns : 0L;
        //获取自旋次数
        int spins = (NCPU > 1) ? SPINS : 1;
        //返回值
        Object v;
        //判断不为空
        while ((v = p.match) == null) {
            //自旋次数
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    //让出线程权限
                    Thread.yield();
            }
            //优化操作 自旋线程准备中
            else if (slot != p)
                spins = SPINS;
            //线程不挂起 不是 多槽 时间没结束
            else if (!t.isInterrupted() && arena == null &&
                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
                //
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    U.park(false, ns);
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        //
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        //返回这个v是线程交换线程的。
        return v;
    }

    //构造方法
    public Exchanger() {
        participant = new Participant();
    }

    //交换的数据
    @SuppressWarnings("unchecked")
    public V exchange(V x) throws InterruptedException {
        //当前线程用于交换的数据
        Object v;
        //获取item值
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        //不是多槽
        if ((arena != null ||
              //slotExchange 单槽位交换实现的方法
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() || // disambiguates null return
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

    //
    @SuppressWarnings("unchecked")
    public V exchange(V x, long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x;
        long ns = unit.toNanos(timeout);
        if ((arena != null ||
             (v = slotExchange(item, true, ns)) == null) &&
            ((Thread.interrupted() ||
              (v = arenaExchange(item, true, ns)) == null)))
            throw new InterruptedException();
        if (v == TIMED_OUT)
            throw new TimeoutException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

    //字段偏移量信息
    private static final sun.misc.Unsafe U;
    private static final long BOUND;
    private static final long SLOT;
    private static final long MATCH;
    private static final long BLOCKER;
    private static final int ABASE;
    //初始化信息
    static {
        int s;
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> ek = Exchanger.class;
            Class<?> nk = Node.class;
            Class<?> ak = Node[].class;
            Class<?> tk = Thread.class;
            BOUND = U.objectFieldOffset
                (ek.getDeclaredField("bound"));
            SLOT = U.objectFieldOffset
                (ek.getDeclaredField("slot"));
            MATCH = U.objectFieldOffset
                (nk.getDeclaredField("match"));
            BLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            s = U.arrayIndexScale(ak);
            // ABASE absorbs padding in front of element 0
            ABASE = U.arrayBaseOffset(ak)   (1 << ASHIFT);

        } catch (Exception e) {
            throw new Error(e);
        }
        if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
            throw new Error("Unsupported array scale");
    }

}

最后

exchanger默认也是使用了CAS park/unpark进行实现的,我这里是基于jdk8,jdk7与jdk8是有区别的。本身解决的问题是通过两个线程进行交换执行值,没想到这个exchanger代码不多但是非常复杂,有些可能写得不好,但是有想深入同学可以看看下面两个文章。

参考资料:

https://www.bilibili.com/video/BV17P4y177SD/?spm_id_from=333.788&vd_source=7d0e42b081e08cb3cefaea55cc1fa8b7

https://blog.csdn.net/weixin_30612769/article/details/97769773

0 人点赞