深入分析ReentrantReadWriteLock读写锁

2022-07-04 14:55:54 浏览数 (1)

今天一起来聊聊ReentrantReadWriteLock,当我们有遇到一写多读的场景时,我们可以用它来提升并发性能。因为它最大的特点就是读读并发,也就是读锁不会阻塞另外的线程获取读锁。如果对ReentrantLock不了解可以先参考这篇文章(深入理解ReentrantLock和AQS),因为写锁的获取和释放就是排他锁,所以流程和ReentrantLock获取锁和释放锁的流程基本一致,本文不会再过多的篇章去分析。

一、特性

在阅读ReentrantReadWriteLock源码之前,我们先了解它的特性是非常必要的,因为它的源码相对来说比较难。我们知道了它的特性后,才能更好的分析源码。

1.并发特性

并发

不并发

不并发

并发

2.传播特性

虽然读读能并发,但是当读读节点之间有插入写的节点时,是无法传播过去。意思就是只能传播连续的读节点,下面用一个图来简单描述一下

上面一幅图中假如是在AQS同步队列中,如果节点是SHARED类型,则表示竞争读锁,节点类型是EXCLUSIVE则表示写锁(排他锁),此时如果当第一个Node获取到了读锁后,它会向后传播使得第二和第三个排队获取读锁的节点也能够获取读锁,但是最后一个SHARED的节点是无法在这一次获取到锁的,因为它前面插入了一个等待获取写锁的节点。

这两个特性非常的重要,能更好的帮助我们理解源码,下面我们用一段代码证明上述的两个特点

代码语言:javascript复制
package com.taolong.concurrent.ReentrantReadWriteLock;

import org.apache.commons.lang.time.DateFormatUtils;

import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/** * @Author taolong.hong * @Date 2020/12/10 15:45 * @Version 1.0 * @Description 分析readWriteLock工作原理 * 主要是模拟读读并发,读写和写读会阻塞 * */
public class ReentrantReadWriteLockTest {
   

    public static void main(String[] args) {
   

        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();


        Thread t1 = new Thread(()->{
   
            writeLock.lock();
            try {
   
                //睡3s
                System.out.println(getTimeInfo()   "t1获取写锁成功....");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            }finally {
   
                writeLock.unlock();
                System.out.println(getTimeInfo()  "t1获取写锁成功....");
            }
        },"t1");


        Thread t2 = new Thread(()->{
   
            readLock.lock();
            try {
   
                System.out.println(getTimeInfo()  "t2获取读锁成功....");
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                readLock.unlock();
                System.out.println(getTimeInfo()  "t2获取读锁成功....");
            }

        },"t2");

        Thread t3 = new Thread(()->{
   
            readLock.lock();
            try {
   
                System.out.println(getTimeInfo()  "t3获取读锁成功....");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                readLock.unlock();
                System.out.println(getTimeInfo()  "t3释放读锁成功....");
            }

        },"t3");


        Thread t4 = new Thread(()->{
   
            writeLock.lock();
            try {
   
                System.out.println(getTimeInfo()  "t4获取写锁成功....");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            } finally {
   
                writeLock.unlock();
                System.out.println(getTimeInfo()  "t4释放写锁成功....");
            }

        },"t4");


        Thread t5 = new Thread(()->{
   
            readLock.lock();
            try {
   
                System.out.println(getTimeInfo()  "t5获取读锁成功....");
            }  finally {
   
                readLock.unlock();
                System.out.println(getTimeInfo()  "t5释放读锁成功....");
            }

        },"t5");


        t1.start();
        try {
   
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
   
            e.printStackTrace();
        }
        t2.start();
        t3.start();
        t4.start();
        t5.start();

    }


    private static String getTimeInfo(){
   
        return DateFormatUtils.format(new Date(),"HH:mm:ss")   "--";
    }
}

这段测试代码很简单,简单解释一下:

t1:获取写锁,第一个获取,获取后睡3s

t2:获取读锁,获取后睡3s

t3:获取读锁,获取后睡2s

t4:获取写锁,获取后睡2s

t5:获取读锁

这里代码写得不是很严谨,并没有严格保证t1-t5的执行顺序哦,我们期望是t1-t5是依次执行(但正常情况是这样的,大家可以优化),这个例子只是想简单的证明上述的描述的特性。根据我们上述特性的描述,那么上述代码期望的运行结果应该如下:

step1:t1获取写锁,其它线程全部阻塞等待t1睡了3s后释放锁再重新抢锁(写读不并发)

step2:t1释放锁,t2抢到读锁,此时t3也能并发拿到读锁(读读并发),但是t4阻塞(读写不并发),t5也阻塞(中间隔了一个等待的写锁)

step3:t3先释放锁,然后t2再释放锁,等t2把读锁全部释放后,t4获取写锁,但此时t5还是获取不到锁(写读不并发)

step4:t4释放写锁,t5获取读锁

我们最后运行下看看结果,再次说明上述代码不太严谨,但正常情况能证明我们刚才上述的特性

从上面的运行结果来说,确实跟我们期望的一样。下面我们开始深入源码吧

二、写锁的获取和释放

因为写锁的获取和释放比较简单,所以我们先分析,对我们后面分析读锁的获取和释放有帮助。其实写锁的获取和释放与ReentrantLock非常相似,相同部分可能就简单描述,如果不太懂的可以参考这篇文章(深入理解ReentrantLock和AQS)

正式分析之前,先简单介绍下ReentrantReadWriteLock的一个state如何同时记录读写锁的状态。一个int类型的state4个字节32位,那么它是用高16位记录读锁、低16位记录写锁

如上述图所示,左半部分是读锁记录,右半部分是写锁的记录,用不同颜色区分了。上述表示读锁被获取了3次,写锁被获取了1次

1.写锁的获取
代码语言:javascript复制
public final void acquire(int arg) {
   
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

通用的代码,直接接着往下看,注意这里是使用了模板模式,tryAcquire()方法是ReentrantReadWriteLock内部的方法

(1).ReentrantReadWriteLock.tryAcquire()

代码语言:javascript复制
protected final boolean tryAcquire(int acquires) {
   
            /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */
            Thread current = Thread.currentThread();
    		//读写锁总共锁记录
            int c = getState();
    		//写锁记录
            int w = exclusiveCount(c);
    		//1.判断当前是否有人持有锁,
            if (c != 0) {
   
                //进入到这个if则表示当前有人持有锁(不管是写锁还是读锁)
                // (Note: if c != 0 and w == 0 then shared count != 0)
                //2.这里判断当前持有锁的是否是读锁,和判断是不是当前线程自己持有锁
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //3.到这里说明w!=0(有人获取写锁),并且是自己获取写锁->重入
                //超过限制16位的最大数
                if (w   exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c   acquires);
                return true;
            }
    		//4.c==0,表示没人持有锁,此时判断是否需要阻塞,如果不需要阻塞,则尝试cas加锁
            if (writerShouldBlock() ||
                !compareAndSetState(c, c   acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

总的来说上面的代码还是比较容易看懂,而且我加了比较详细的注释,下面也简单解释下

1.首先判断是否有人持有锁

​ 1.1.如果有人持有锁,那么这个持有锁的是否是读锁,根据读写不并发,并且读锁不能升级成写锁,所以如果当前有人持有读锁则直接获取锁失败

​ 1.2.如果当前获取锁的是写锁,那么判断是不是自己已经持有了锁,如果不是则说明别人持有写锁,谢谢互斥,则获取失败;如果是自己持有锁,则表示重入将写锁记录 1,返回获取锁成功

2.如果当前没有人获取锁,我们继续往下看代码

(2).writerShouldBlock()

这个方法在公平锁和非公平锁的实现不同,简单看一下

非公平锁:直接返回false,表示不需要阻塞

代码语言:javascript复制
final boolean writerShouldBlock() {
   
            return false; // writers can always barge
        }

公平锁:查看队列是否有人在等待锁

代码语言:javascript复制
final boolean writerShouldBlock() {
   
    return hasQueuedPredecessors();
}

这里跟ReentranLock一样,非公平锁上来就直接抢,公平锁则看看前面是否有人在等,如有则排队(直接返回false,表示获取锁失败),大家自行查看,不再啰嗦。如果不需要阻塞则尝试cas加锁,cas成功则表示加锁成功,否则返回false失败。到这tryAcquire()尝试获取锁的过程分析完了,如果没获取成功,我们接着看

addWaiter()和acquireQueued()方法。

(3).addWaiter()

这个方法在前面分析ReentrantLock和AQS时已经详细分析过了,不了解的可根据上面的链接去查看相应文章,其实就是封装成Node节点,插入到同步队列中。

(4).acquireQueued()

该方法前面的文章也详细分析过,不了解的请阅读(深入理解ReentrantLock和AQS),其实就是做下面的几个事情

1.判断前一个节点是否是Header,如果是则再次尝试加锁,成功则返回

2.如果前一个节点不是Header,则park当前线程,但是park时需要将前一个节点状态设置成SINGAL,表示前一个节点释放锁之后要唤醒我

细节部分大家自行阅读,到这里获取写锁的过程就分析完了,比较简单

2.写锁的释放

(1).AbstractQueueSynchronizer.release

代码语言:javascript复制
public final boolean release(int arg) {
   
    	//尝试释放锁
        if (tryRelease(arg)) {
   
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //释放成功则唤醒后面的节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

(2).tryRelease()

模板方法,调用的ReentrantReadWriteLock的方法

代码语言:javascript复制
protected final boolean tryRelease(int releases) {
   
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

很简单,跟ReentranLock几乎一样,唯一的区别就是这里的state操作的是后面16位,而ReentrantLock是整个32位,不清楚请参考(深入理解ReentrantLock和AQS)

三、读锁的获取和释放

1.读锁的获取

读锁的获取比较难,大家要耐心的看完,我也会尽量的描述通俗易懂,如果有理解的不对的地方也欢迎大家指正!

(1).acquireShared()

代码语言:javascript复制
public final void acquireShared(int arg) {
   
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

(2).tryAcquireShared()

代码语言:javascript复制
protected final int tryAcquireShared(int unused) {
   
            /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */
            Thread current = Thread.currentThread();
            int c = getState();
    		//1.存在写锁,但是持有写锁的线程不是当前线程,获取锁失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
    		//获取读锁的次数
            int r = sharedCount(c);
    		/*** *代码走到这里,那可能性就很多了,比如我下面举出一些可能性 * (1)当前自己持有写锁,可以重入 * (2)当前没有人获取任何锁 * (3)当前有人获取读锁,但不是自己 * (4)当前有人获取读锁,并且是自己.... **/
    		//2.尝试获取读锁
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c   SHARED_UNIT)) {
   
                //if如果判断成功,则说明获取读锁成功,再次根据不同情况获取的读锁细化处理后续的操作
                //2.1.之前没人获取锁
                if (r == 0) {
   
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } 
                //2.2.之前自己第一个获取了读锁,相当于这一次时重入,firstReader表示第一个获取读锁的线程
                else if (firstReader == current) {
   
                    //第一次获取读锁的线程记录数  1
                    firstReaderHoldCount  ;
                } 
                //2.3.读锁传播--读读并发,或者其他线程重入等
                else {
   
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count  ;
                }
                return 1;
            }
    		//3.如果没获取成功则再次尝试
            return fullTryAcquireShared(current);
        }

readerShouldBlock()表示当前获取读锁是否需要阻塞,这里公平锁和非公平锁实现不同,我简单解释一下

(3).readerShouldBlock()

先看公平锁

代码语言:javascript复制
final boolean readerShouldBlock() {
   
            return hasQueuedPredecessors();
        }

很简单,如果同步队列中已经有人在排队,则需要阻塞

再看非公平锁

代码语言:javascript复制
final boolean readerShouldBlock() {
   
    /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */
    return apparentlyFirstQueuedIsExclusive();
}
代码语言:javascript复制
final boolean apparentlyFirstQueuedIsExclusive() {
   
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

这个地方有点意思,我的理解就是找到第一个排队获取锁的Node节点是否是排他锁(EXCLUSIVE),如果是排他锁,则需要阻塞当前节点,如果不是则不需要阻塞。如果不需要阻塞,并且锁获取数没有超过最大值,就是用cas尝试加锁

如果获取成功(进入注释2的if),这里面我再总结一下:

1.如果之前没有人获取锁,当前线程是第一个获取锁的线程

(1)设置firstReader为当前线程,并且记录锁次数firstReaderHoldCount=1

2.当前线程与第一次获取读锁的线程是同一个

(1)此时是第一个获取读锁的线程重入,则firstReaderHoldCount

3.其他情况,比如其他新线程获取锁,或者其他已经获取锁的线程重入等

这里稍微有点难懂,因为有用到了ThreadLocal,还有缓存最近依次的记录,我们先来缕缕

(1)我们需要记录每一个线程获取读锁的记录

这里始用的一个类HoldCounter来表示线程和获取锁记录,然后用ThreadLocalHoldCounter《HoldCounter》(继承了ThreadLocal)来做线程安全。当然如果我们非要用ConcurrentHashMap也是ok

(2)缓存最近一次的锁记录HoldCounter

这里我不是特别明白为什么需要加一个这个缓存,知道的可以告知一下

我把这段代码再贴一下来分析

代码语言:javascript复制
else {
   
    //最近缓存
    HoldCounter rh = cachedHoldCounter;
    //如果没有最近一次缓存,或者最近一次缓存存储的线程不是当前线程
    if (rh == null || rh.tid != getThreadId(current))
        //创建一个HoldCounter放到readHolds(其实就是threadLocal)中
        cachedHoldCounter = rh = readHolds.get();
    //这次进来的线程,和最近缓存的线程一致
    else if (rh.count == 0)
        readHolds.set(rh);
    //记录  
    rh.count  ;
}

如果上面的下面的if没有执行成功

代码语言:javascript复制
f (!readerShouldBlock() &&
    r < MAX_COUNT &&
    compareAndSetState(c, c   SHARED_UNIT)) {
   }

不管是,需要阻塞啦,超过锁记录啦,还是cas失败啦,都表示获取锁失败,那我们需要进入到fullTryAcquireShared()

(4).fullTryAcquireShared()

这个地方也比较难,我尽量没有删掉英文的注释,大家可以参考英文的翻译

代码语言:javascript复制
 /** * Full version of acquire for reads, that handles CAS misses * and reentrant reads not dealt with in tryAcquireShared. */
        final int fullTryAcquireShared(Thread current) {
   
            /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */
            HoldCounter rh = null;
            for (;;) {
   
                int c = getState();
                //1.如果写锁被持有
                if (exclusiveCount(c) != 0) {
   
                    //并且持有写锁的不是当前线程,则直接返回失败
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } 
                //2.如果需要阻塞,这个地方有点意思,其实可以防止死锁,而且写锁降级到读锁也会走到这个逻辑
                /** *之前分析过readerShouldBlock()公平锁时如果同步队列有Node节点则需要阻塞; *而非公平锁时则前面第一个排队的节点是EXCLUSIVE节点时需要阻塞。 *那假如我就那非公平锁来举例子,如果同步队列第一个排队的就是EXCLUSIVE了,而我现在需要获取重入读锁,那肯定要能获取,不能就死锁了 *比如线程t1已经获取了一次读锁,线程t2在排队获取写锁,此时t1重入获取读锁,就会走这里的逻辑 */
                else if (readerShouldBlock()) {
   
                    // Make sure we're not acquiring read lock reentrantly
                    //当前线程和第一次获取读锁的线程相同,可以理解为第一次获取读锁的线程重入
                    if (firstReader == current) {
   
                        // assert firstReaderHoldCount > 0;
                    } 
                    //其他情况,下面的逻辑就是跟之前上面的tryAcquireShared逻辑相似了,多了判断就是当前线程如果前面获取的锁释放了,则返回-1获取锁失败
                    else {
   
                        if (rh == null) {
   
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
   
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        //如果之前当前获取的锁释放了,就返回-1,获取失败
                        if (rh.count == 0)
                            return -1;
                    }
                }
                ///有没有超过限制
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //剩下的部分就是和tryAcquireShared后半部分一样了
                if (compareAndSetState(c, c   SHARED_UNIT)) {
   
                    if (sharedCount(c) == 0) {
   
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
   
                        firstReaderHoldCount  ;
                    } else {
   
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count  ;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

到这里如果如果返回的结果<0则表示获取锁失败,否则就成功

代码语言:javascript复制
public final void acquireShared(int arg) {
   
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

咱们再回到这个方法,当获取失败时,则需要入队列呀,我们看doAcquireShared()

(5).doAcquireShared

代码语言:javascript复制
//这个方法跟ReentrantLock里面入队列的方法基本类似,不同的就是当尝试获取锁成功时,需要向后传播
private void doAcquireShared(int arg) {
   
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
   
            boolean interrupted = false;
            for (;;) {
   
                final Node p = node.predecessor();
                if (p == head) {
   
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
   
                        //1.区别在这里,如果获取成功,则需要向后传播,其实就是让后面阻塞获取读锁的节点获取读锁
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
   
            if (failed)
                cancelAcquire(node);
        }
    }

跟我我们第一节分析的特性,就是前一个获取读锁的节点如果获取到了锁,读读并发,它后面连续的阻塞获取读锁的节点(中间不能插入写锁)都可以获取读锁,这就需要向后传播,逻辑就在这里

(6).setHeadAndPropagate()

代码语言:javascript复制
private void setHeadAndPropagate(Node node, int propagate) {
   
        Node h = head; // Record old head for check below
    //1.获取到锁的第一个节点设置成Header 
    setHead(node);
        /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */
    	//如果满足唤醒下一个节点的条件
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
   
            Node s = node.next;
            if (s == null || s.isShared())
                //如果下一个共享模式的node则唤醒
                doReleaseShared();
        }
    }

(7).doReleaseShared()

代码语言:javascript复制
private void doReleaseShared() {
   
        /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
        for (;;) {
   
            Node h = head;
            //存在队列,并且队列里面有节点
            if (h != null && h != tail) {
   
                int ws = h.waitStatus;
                //之前分析过,只有节点的waitstatus状态=SIGNAL时有资格唤醒后继节点
                if (ws == Node.SIGNAL) {
   
                    //先改状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //1.唤醒后面的节点
                    unparkSuccessor(h);
                }
                //2.暂时唤醒后面的节点,但是要将状态设置成可以向后传播的状态
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

这个地方有一个地方需要解释一下

(1).唤醒的时候,如何向后面传递唤醒

当header唤醒后继节点时,后继节点会从它阻塞的地方继续执行,我贴下代码

代码语言:javascript复制
private void doAcquireShared(int arg) {
   
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
   
            boolean interrupted = false;
            for (;;) {
   
                final Node p = node.predecessor();
                //2.当唤醒后再到这个for循环这里时,此时的p就是head了
                if (p == head) {
   
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
   
                        //3.传播的行为在这里....
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //1.park的线程会从这里继续执行,然后回到上面的for循环
                    interrupted = true;
            }
        } finally {
   
            if (failed)
                cancelAcquire(node);
        }
    }
2.读锁的释放

(1).releaseShared()

代码语言:javascript复制
public final boolean releaseShared(int arg) {
   
    //尝试释放锁 
    if (tryReleaseShared(arg)) {
   
        //如果释放成功
            doReleaseShared();
            return true;
        }
        return false;
    }

(2).tryReleaseShared

代码语言:javascript复制
protected final boolean tryReleaseShared(int unused) {
   
            Thread current = Thread.currentThread();
    //第一个获取读锁的释放锁 
    if (firstReader == current) {
   
                // assert firstReaderHoldCount > 0;
        		//如果第一个获取读锁的线程只获取了一次,那么这次取消就彻底释放了锁,所以需要清除firstReader
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    //如果第一次获取锁的线程重入了多次,则-1就好了
                    firstReaderHoldCount--;
    }
    //其他情况,也比较简单,无非就是操作当前线程获取获取锁的记录数(-1操作),如果值这一次可以减到0,则从threadLocal中删除
    else {
   
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
   
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
   
                int c = getState();
                int nextc = c - SHARED_UNIT;
                //最终就是操作公共的锁状态了,state,始用cas,cas成功则返回,只有当全部释放完了,结果返回true,才表示释放成功
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

释放锁总体来说,还是比较简单,难点是在获取读锁的地方。这篇文章暂时分享到这里吧,如有分析的不对的地方,欢迎指正!

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/111196.html原文链接:https://javaforall.cn

0 人点赞