Java并发编程系列20 | StampedLock源码解析

2020-05-16 14:15:36 浏览数 (1)

本文为何适原创并发编程系列第 20 篇,文末有本系列文章汇总。

上一篇介绍了StampedLock存在的意义以及如何使用StampedLock,按照这个系列的风格大家也应该猜到了,这一篇就是StampedLock的源码分析。这里说明一点,本文的源码分析重点在于锁获取与释放过程中的状态改变,线程入队出队以及等待操作不再做详细介绍。

  1. 锁状态
  2. StampedLock属性
  3. 写锁获取与释放
  4. 悲观读锁读取与释放
  5. 乐观读锁读取

1. 锁状态

StampedLock提供了写锁、悲观读锁、乐观读锁三种模式的锁,如何维护锁状态呢?StampedLock的锁状态用 long 类型的 state 表示,类似ReentrantReadWriteLock,通过将 state 按位切分的方式表示不同的锁状态。

悲观读锁:state 的前 7 位(0-7 位)表示获取读锁的线程数,如果超过 0-7 位的最大容量 126,则使用一个名为 readerOverflow 的 int 整型保存超出数。

写锁:state 第 8 位为写锁标志,0 表示未被占用,1 表示写锁被占用。state 第 8-64 位表示写锁的获取次数,次数超过 64 位最大容量则重新从 1 开始。

乐观读锁:不需要维护锁状态,但是在具体操作数据前要检查一下自己操作的数据是否经过修改操作,也就是验证是否有线程获取过写锁。

你有没有想过为什么 state 要记录写锁的获取次数呢?写锁是不能重入的,如果只是修改第 8 位的状态,获取写锁时 state 第 8 位变为 1,释放写锁时 state 第 8 位变回 0 不是更方便?

如果只用第 8 位来标志写锁,那么来看乐观写锁的使用过程:

① 检查是否有写锁,state 第 8 位为 0,没有写锁,拷贝数据。

② 检查是否有线程获取过写锁,state 第 8 位为 0,没有线程获取过,直接使用原来拷贝的数据。

发现其中的问题了吗?第一次检查 state 第 8 位为 0 之后,有线程获取写锁修改数据并释放了写锁,那么之后在检查是否有线程获取过写锁时 state 第 8 位还是 0,认为没有线程获取过写锁,可能导致数据不一致。

也就是 ABA 问题,在【原创】Java 并发编程系列 12 | 揭秘 CAS文章中介绍过 ABA 问题的解决办法就是加版本号,将原来的 A->B->A 就变成了 1A->2B->3A。StampedLock同样采用这种方法,将获取写锁的次数作为版本号,也就是乐观读锁的票据,写锁释放时次数加 1,也就是 state 第 8 位加 1

代码语言:javascript复制
state原始状态为     //...0001 0000 0000
获取写锁            //...0001 1000 0000
释放写锁次数加1      //...0010 0000 0000
获取写锁           // ...0010 1000 0000
释放写锁次数加1     //...0011 0000 0000

JDK 设计的精妙之处还在于,获取写锁后 state 第 8 位为 1,释放写锁时 state 第 8 位加 1 使第 8 位变回 0,既记录了写锁次数,又可以保证 state 的第 8 位一个位置来标志写锁。

2. 属性

锁状态相关属性
代码语言:javascript复制
private static final long RUNIT = 1L;                   // 一个单位的读锁        0000... 0000 0000 0001
private static final long WBIT = 1L << LG_READERS;     // 一个单位的写锁        0000... 0000 1000 0000
private static final long RBITS = WBIT - 1L;            // 读状态标识            0000... 0000 0111 1111
private static final long RFULL = RBITS - 1L;           // 读锁最大数量          0000... 0000 0111 1110
private static final long ABITS = RBITS | WBIT;         // 用于获取读写状态      0000... 0000 1111 1111
private static final long SBITS = ~RBITS;               //                       1111... 1111 1000 0000

// 锁state初始值,0000... 0001 0000 0000
private static final long ORIGIN = WBIT << 1;

/** 锁队列状态, 当处于写模式时第8位为1,读模式时前7为为1-126(附加的readerOverflow用于当读者超过126时) */
private transient volatile long state;

/** 将state超过 RFULL=126的值放到readerOverflow字段中 */
private transient int readerOverflow;

给出这些常量的比特位,等下看源码过程中会频繁用到:

结点

StampedLock中,等待队列的结点要比 AQS 中简单些,仅仅三种状态。0:初始状态;-1:等待中;1:取消。结点的定义中有个 cowait 字段,该字段指向一个栈,用于保存读线程。

代码语言:javascript复制
// 结点状态
private static final int WAITING = -1;
private static final int CANCELLED = 1;

// 结点的读写模式
private static final int RMODE = 0;
private static final int WMODE = 1;

/** Wait nodes */
static final class WNode {
    volatile WNode prev;
    volatile WNode next;
    volatile WNode cowait; // 读模式使用该结点形成栈
    volatile Thread thread; // non-null while possibly parked
    volatile int status; // 0, WAITING, or CANCELLED
    final int mode; // RMODE or WMODE

    WNode(int m, WNode p) {
        mode = m;
        prev = p;
    }
}

/** CLH队头结点 */
private transient volatile WNode whead;
/** CLH队尾结点 */
private transient volatile WNode wtail;

3. 写锁的获取与释放

获取:

  1. 可以获取写锁的条件:没有线程占用悲观读锁和写锁;
  2. 获取写锁,state 写锁位加 1,此时写锁标志位变为 1,返回邮戳 stamp;
  3. 获取失败,加入同步队列等待被唤醒。

释放:

  1. 传入获取写锁时的 stamp 验证;
  2. stamp 值被修改,抛出异常;
  3. stamp 正确,state 写锁位加 1,此时写锁标志位变为 0;
  4. 唤醒同步队列等锁线程。
代码语言:javascript复制
/**
 * 获取写锁,如果获取失败,进入阻塞
 */
public long writeLock() {
    long s, next;
    return ((((s = state) & ABITS) == 0L && // 没有读写锁
             U.compareAndSwapLong(this, STATE, s, next = s   WBIT)) ? // cas操作尝试获取写锁
            next : acquireWrite(false, 0L)); // 获取成功后返回next,失败则进行后续处理,排队也在后续处理中
}

/**
 * 释放写锁
 */
public void unlockWrite(long stamp) {
    WNode h;
    if (state != stamp || (stamp & WBIT) == 0L) //stamp值被修改,或者写锁已经被释放,抛出错误
        throw new IllegalMonitorStateException();
    state = (stamp  = WBIT) == 0L ? ORIGIN : stamp; //加0000 1000 0000来记录写锁的变化,同时改变写锁状态
    if ((h = whead) != null && h.status != 0)
        release(h);// 唤醒等待队列的队首结点
}

可以简单看下 acquireWrite,不做详细讲解了。

代码语言:javascript复制
/**
 * 尝试自旋的获取写锁, 获取不到则阻塞线程
 *
 * @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
 * @param deadline      如果非0, 则表示限时获取
 * @return 非0表示获取成功, INTERRUPTED表示中途被中断过
 */
private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;

    /**
     * 自旋入队操作
     * 如果没有任何锁被占用, 则立即尝试获取写锁, 获取成功则返回.
     * 如果存在锁被使用, 则将当前线程包装成独占结点, 并插入等待队列尾部
     */
    for (int spins = -1; ; ) {
        long m, s, ns;
        if ((m = (s = state) & ABITS) == 0L) {      // 没有任何锁被占用
            if (U.compareAndSwapLong(this, STATE, s, ns = s   WBIT))    // 尝试立即获取写锁
                return ns;                                                 // 获取成功直接返回
        } else if (spins < 0)
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        } else if ((p = wtail) == null) {       // 队列为空, 则初始化队列, 构造队列的头结点
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        } else if (node == null)               // 将当前线程包装成写结点
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {    // 链接结点至队尾
            p.next = node;
            break;
        }
    }

    for (int spins = -1; ; ) {
        WNode h, np, pp;
        int ps;
        if ((h = whead) == p) {     // 如果当前结点是队首结点, 则立即尝试获取写锁
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins; ; ) { // spin at head
                long s, ns;
                if (((s = state) & ABITS) == 0L) {      // 写锁未被占用
                    if (U.compareAndSwapLong(this, STATE, s,
                        ns = s   WBIT)) {               // CAS修改State: 占用写锁
                        // 将队首结点从队列移除
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                } else if (LockSupport.nextSecondarySeed() >= 0 &&
                    --k <= 0)
                    break;
            }
        } else if (h != null) {  // 唤醒头结点的栈中的所有读线程
            WNode c;
            Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            } else if ((ps = p.status) == 0)        // 将当前结点的前驱置为WAITING, 表示当前结点会进入阻塞, 前驱将来需要唤醒我
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            } else {        // 阻塞当前调用线程
                long time;  // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
                    U.park(false, time);    // emulate LockSupport.park
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

4. 悲观读锁的获取与释放

获取:

  1. 获取悲观读锁条件:没有线程占用写锁;
  2. 读锁标志位 1,返回邮戳 stamp;
  3. 获取失败加入同步队列。

释放:

  1. 传入邮戳 stamp 验证
  2. stamp 验证失败,抛异常
  3. stamp 验证成功,读锁标志位-1,唤醒同步队列等锁线程
代码语言:javascript复制
/**
 * 获取悲观读锁,如果写锁被占用,线程阻塞
 */
public long readLock() {
    long s = state, next;
    return ((whead == wtail && (s & ABITS) < RFULL && //队列为空,无写锁,同时读锁未溢出,尝试获取读锁
                U.compareAndSwapLong(this, STATE, s, next = s   RUNIT)) ?   //cas尝试获取读锁 1
            next : acquireRead(false, 0L));     //获取读锁成功,返回s   RUNIT,失败进入后续处理,类似acquireWrite
}

/**
 * 释放悲观读锁
 */
public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        if (m < RFULL) {    //小于最大记录值(最大记录值127超过后放在readerOverflow变量中)
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {  //cas尝试释放读锁-1
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L) //readerOverflow - 1
            break;
    }
}

5. 乐观读锁的获取

乐观读锁因为实际上没有获取过锁,所以也就没有释放锁的过程。

代码语言:javascript复制
/**
 * 尝试获取乐观锁
 * 写锁被占用,返回state第8-64位的写锁记录;没被占用返回0
 */
public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

/**
 * 验证乐观锁获取之后是否有过写操作
 */
public boolean validate(long stamp) {
    U.loadFence(); // 之前的所有load操作在内存屏障之前完成,对应的还有storeFence()及fullFence()
    return (stamp & SBITS) == (state & SBITS);  //比较是否有过写操作
}

总结

StampedLock通过将 state 按位切分的方式表示不同的锁状态。

悲观读锁:state 的 0-7 位表示获取读锁的线程数,如果超过 0-7 位的最大容量 126,则使用一个名为 readerOverflow 的 int 整型保存超出数。

写锁:state 第 8 位为写锁标志,0 表示未被占用,1 表示写锁被占用。state 第 8-64 位表示写锁的获取次数,次数超过 64 位最大容量则重新从 1 开始。

并发系列文章汇总

【原创】01|开篇获奖感言 【原创】02|并发编程三大核心问题 【原创】03|重排序 【原创】04|Java 内存模型详解 【原创】05|深入理解 volatile 【原创】06|你不知道的 final 【原创】07|synchronized 原理 【原创】08|synchronized 锁优化 【原创】09|基础干货 【原创】10|线程状态 【原创】11|线程调度 【原创】12|揭秘 CAS 【原创】13|LockSupport 【原创】14|AQS 源码分析 【原创】15|重入锁 ReentrantLock 【原创】16|公平锁与非公平锁 【原创】17|读写锁八讲(上) 【原创】18|读写锁八讲(下)

0 人点赞