从源码来看ReentrantLock和ReentrantReadWriteLockReentrantLockReentrantReadWriteLock

2018-04-16 16:02:56 浏览数 (2)

上一篇花了点时间将同步器看了一下,心中对锁的概念更加明确了一点,知道我们所使用到的锁是怎么样获取同步状态的,我们也写了一个自定义同步组件Mutex,讲到了它其实就是一个简版的ReentrantLock,本篇文章我们就来看看ReentrantLock底层是怎么样的!

目录结构:

  • ReentrantLock
  • 公平锁与非公平锁
  • ReentrantReadWriteLock

ReentrantLock

ReentrantLock我们叫做可重入锁,也就是我们可以重复进入的意思,也就是表示,一个线程可以对指定资源进行重复加锁,并且还能够选择公平锁和非公平锁。 公平锁:先请求获取加锁的线程先被满足 非公平锁:XXXX

可重入锁可以对一个资源重复加锁,同时,在释放锁时也要进行多次release,所以不难想到,ReentrantLock只要维持一个值,用来控制这个资源加锁的次数就Ok了,初始化为零,当加锁时对这个值 1,release时-1。

代码语言:javascript复制
public class ReentrantLock implements Lock, java.io.Serializable{
    .
    .
    .
}

这个是ReentrantLock的定义,上一节我就无耻的把它贴出来了→.→

接下来我们来看看ReentrantLock是如何对资源进行加锁的!

首先肯定要定义一个继承自AbstractQueuedSynchronizer的内部类:

代码语言:javascript复制
abstract static class Sync extends AbstractQueuedSynchronizer {
    .
    .
    .

}

构造函数的话,因为它可以选择公平和非公平锁,所以是否公平就是由构造函数决定的:

代码语言:javascript复制
 public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

FairSync()和NonfairSync()都是构造函数,当然都是内部类啦,他们可以分别创建公平锁和非公平锁。 我自己粗略的看了一下源码,将ReentrantLock的大概结构画了一下:

刚才我们也说了,根据参数ReentrantLock会根据我们的需要创建对应的锁,当没有参数的时候我们来看看它默认的是什么?

代码语言:javascript复制
 /**
     * Creates an instance of {@code ReentrantLock}.
     * 创建一个实例
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();//sync是Reentrant内的Sync类型的成员变量
    }

从代码中我们可以知道,当我们使用无参构造的时候ReentrantLock会为我们创建一个非公平锁(事实上,大大多数情况下非公平锁的效率会更好); 那么我们先来看一下非公平锁是怎么获得同步状态的:

代码语言:javascript复制
//非公平锁是继承Sync是毋庸置疑的
  static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            //这个是初次加锁的时候,判断同步状态是否被占用
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
           //当被占用后开始调用这个方法了
           //(当然后面的方法还会对这占用同步状态的线程是否是当前线程,
           //因为这个锁是独占的,仅仅允许一个线程多次加锁)
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

(一) 上面的compareAndSetState我们已经不陌生了,上一篇文章我们在写叫Mutex的自定义同步组件的时候看到过,它的功能是:判断当前的state是否为0,如果为0那么获取锁,如果不为0,那么将state设置为1,:

代码语言:javascript复制
//这个方法是同步器提供的方法,参数是expect和update,它继续调用了unsafe内的方法(内部是通过CAS来实现原子操作),
//将this(当前状态)和expect比较,如果相同返回true,如果不同则将state设置为update
 protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

(二) 接着便是setExclusiveOwnerThread(Thread thread),这个方法比较陌生,我们继续跟源码,

代码语言:javascript复制
//这个方法是在AbstractOwnableSynchronizer内,从名字估计大家应该知道是干嘛的了,
//我们看看这个类的注释
/**
 * A synchronizer that may be exclusively owned by a thread.  This
 * class provides a basis for creating locks and related synchronizers
 * that may entail a notion of ownership.  The
 * {@code AbstractOwnableSynchronizer} class itself does not manage or
 * use this information. However, subclasses and tools may use
 * appropriately maintained values to help control and monitor access
 * and provide diagnostics.
 *博主英语不太好,勉强知道意思是:这个类是一个线程独占的同步器,
 *这个类提供创建锁和相关同步器的基础(伴随着所有权的概念),
 *这个类本身不控制和使用信息,子类和和工具可以适当使用维持的值去帮助控制和监视访问,提  * 供诊断(额,好几个单词不会0.0)
 */
protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

所以我们可以看出这个是干嘛的了:

代码语言:javascript复制
//同步器继承自AbstractOwnableSynchronizer,Sync继承自同步器,
//NonfairSync 继承自Sync,所以这下该该明白了!
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    }

NonfairSync 间接继承AbstractOwnableSynchronized,所以他可以调用它使用的方法,将exclusiveOwnerThread 变量设置为当前线程

代码语言:javascript复制
//这个变量是AbstractOwnableSynchronized的成员变量
private transient Thread exclusiveOwnerThread;

所以为我们知道这个设置是用来帮助控制与监视用的!


(三) 接下来我们继续向下看,如果无法第一次无法获取同步状态,调用acquire方法,之前我们也见过这个方法,这个方法尝试获取同步状态,否则将当前线程组装成一个节点放入同步队列中,但是在这里,ReentrantLock对它进行了重写:

代码语言:javascript复制
 public final void acquire(int arg) {
         //其他的都没变,我们继续跟tryAcquire
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这里的tryAcquire不再是 throw new UnsupportedOperationException();, 而是:

代码语言:javascript复制
 protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

所以最终调用nonfairTryAcquire,至此我们真正的主角才上场!(鼓掌!!)

代码语言:javascript复制
   final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //再次判断当前状态
            if (c == 0) {
            //下面这行代码我就不用说了吧(参考上面的lock方法)
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果当前的线程为我们之前存的线程(就是前面已经获取同步状态的线程)
            else if (current == getExclusiveOwnerThread()) {
                //c为当前的状态,acquires为参数为1
                //当前状态最小值为0,表示当前无线程获取同步状态
                int nextc = c   acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //将当前状态设置为nextc,也就是在原来c的基础上加1了
                setState(nextc);
                return true;
            }
            return false;
        }

以上就是非公平锁的加锁方法,如果彻底明白了加锁的方法,那么release方法也不成问题了,接着看释放方法:


我们都知道ReentrantLock是同过unlock来释放锁的:

代码语言:javascript复制
public void unlock() {
        sync.release(1);
    }

继续往下面跟release方法(记得参数是1):

代码语言:javascript复制
//tryRelease方法就是不同的地方下面我们说
 public final boolean release(int arg) {
 //这个tryRelease方法被我们的ReentrantLock重写过,
 //不再是抛出UnsupportedOperationException错误了
 //下面我们详解tryRelease
        if (tryRelease(arg)) {
            //获取要释放的节点
            Node h = head;
            //如果节点不为空,且waitStatus不为0(0为初始状态)
            if (h != null && h.waitStatus != 0)
                //这个方法使用LockSupport来唤醒下一个节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

如果tryRelease释放成功(即不同步状态未0),那么获取要释放的节点,并判断当前节点是否存在,这个等待状态不能为0,只有这样才能进行下一步的唤醒操作! 记得在上一篇文章中,我就提到了release方法,但是没有详细说明,只是说unparkSuccessor方法是用来唤醒下一个节点的,这次来看看unparkSuccessor方法:

代码语言:javascript复制
 private void unparkSuccessor(Node node) {
        //获取当前节点的waitStatus
        int ws = node.waitStatus;
        //如果ws小于零即表示当前节点满足可以通知下一个节点
        if (ws < 0)
            //CAS操作,将waitStatus设置为0(node的waitStatus一定是相等的)
            compareAndSetWaitStatus(node, ws, 0);
            //获取释放节点的下一个节点
        Node s = node.next;
        //如果s节点为null或者waitStatus > 0(即不是初始状态)
        //那么s是空的
        if (s == null || s.waitStatus > 0) {
            s = null;
            //以下为同步队列的变动
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //如果s不为空,那么使用LockSupport来唤醒s
        //LockSupport是用来唤醒阻塞中断的线程的,后面一章我们详细来讲解
        if (s != null)
            LockSupport.unpark(s.thread);
    }

以上就完成了锁的释放和唤醒下一个节点,貌似我们少说了什么,对就是tryRelease

代码语言:javascript复制
//boolean类型,如果释放成功则返回true,否则返回false
   protected final boolean tryRelease(int releases) {
            //我们之前说过了这个参数为1
            //获取当前同步状态并减一
            int c = getState() - releases;
            //判断当前线程是否为独占线程,如果不是抛出错误
            //(错误释放错误,都不是你加锁的,你来凑什么热闹)
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //如果c为0,即getState为1,那么free为true,同步状态未空闲
            if (c == 0) {
                free = true;
                //将独占线程成员变量设置为空
                setExclusiveOwnerThread(null);
            }
            //将同步状态设置为0
            setState(c);
            这个时候返回free
            return free;
        }

以上便是ReentrantLock的解锁代码,因为是可重入的,所以当同步状态部位0的时候(大于零),我们可以多次调用unlock方法来调用释放同步状态!

以上就是非公平锁的的基本操作,接下来看看非公平锁是怎么样的:

代码语言:javascript复制
static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            //获得当前线程
            final Thread current = Thread.currentThread();
            //获得同步状态
            int c = getState();
            //如果同步状态为0(同步状态空闲)
            if (c == 0) {
                //如果空闲,那么判断当前线程是否有前驱(意思是让当前线程不能插队)
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c   acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

非公平锁里面同样头lock方法并重写了tryAcquire方法,lock方法里面调用acquire方法,acquire方法和之前的一样:

代码语言:javascript复制
 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

所以唯一不同的还是这个tryAcquire方法:这个方法和非公平锁不同在哪里?就是这个方法里多了一个判断hasQueuedPredecessors,这个方法判断同步队列中是否有前驱节点,如果这个方法返回true,表示有前驱节点,有线程比当前线程更早的获取锁,所以要前驱线程获取锁后释放才能继续获取锁,其他的代码都和上面的相同,我们不必纠结!


ReentrantReadWriteLock

在前面我写了一篇文章是关于读写锁的应用,主要的内容是:读读共享,读写互斥,写写互斥,读写锁维护了两把锁,一把读锁和一把写锁,通过分离读锁和写锁来提高并发性,因为在大多数并发情况下都是读数据,所以这样可以提升并发处理的效率。

我们先看下简单的结构图:

代码语言:javascript复制
//ReentrantReadWriteLock 实现ReadWriteLock接口
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    .
    .
    .
}

ReadWriteLock的接口其实很简单,只是规范了两个方法:

代码语言:javascript复制
public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     */
    Lock readLock();
     /**
     * Returns the lock used for writing.
     */
    Lock readLock();
}

看看ReentrantReadWriteLock有哪些字段:

代码语言:javascript复制
//这个是读锁ReadLock,这个类是ReentrantReadWriteLock的内部类
private final ReentrantReadWriteLock.ReadLock readerLock;

这个是读锁WriteLock,这个类是ReentrantReadWriteLock的内部类
private final ReentrantReadWriteLock.WriteLock writerLock;

final Sync sync;

构造方法:

代码语言:javascript复制
//这个构造函数通过this调用下面的这个构造函数
 public ReentrantReadWriteLock() {
        this(false);
    }
代码语言:javascript复制
public ReentrantReadWriteLock(boolean fair) {
        //读写锁同样有公平锁和非公平锁
        //通过无参构造和当前的构造方法我们可以看出默认的是new一个非公平锁
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

实现接口的方法:

代码语言:javascript复制
//分别返回读锁和写锁的方法
    public ReentrantReadWriteLock.WriteLock writeLock()  {
     return writerLock;
 }
    public ReentrantReadWriteLock.ReadLock  readLock()  { 
    return readerLock; 
}

我们上面看到了final类型的 sync,当然毋庸置疑,ReentrantReadWriteLock里面也有继承同步器的内部类:

代码语言:javascript复制
//这里的内部类是抽象类型的哟
 abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;

        //下面定义了四个常亮
        //这个常亮表示共享位移为16
        static final int SHARED_SHIFT   = 16;
        //共享单元
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        //最大数目
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        //独占掩码
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        //共享数
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        //独占数
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

        
        static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

        //为当前类的内部类,继承ThreadLocal,以调用线程为key,
        //HoldCounter为value的对象
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        //一系列的字段
        
        //当前线程读锁的持有量
        private transient ThreadLocalHoldCounter readHolds;
        
        //最后一个线程成功获取读锁的持有量
        private transient HoldCounter cachedHoldCounter;
        
        //第一个获取读锁的线程
        private transient Thread firstReader = null;
        
        //第一个获取读锁线程的锁持有量
        private transient int firstReaderHoldCount;
        
        //无参构造
        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        }

       

        abstract boolean readerShouldBlock();

        abstract boolean writerShouldBlock();

        
        //这个和ReentrantLock中的相同(因为继承了同步器,所以这些方法方法要实现)
        //以下如果我们看见和之前和同步器内的方法名相同,那么你不用怀疑,它就是一样的
        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
        
        protected final boolean tryAcquire(int acquires) {
          
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w   exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c   acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c   acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }
        //这个是读写锁特有的方法:释放共享锁(继承同步器后自定义的方法)
        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

        private IllegalMonitorStateException unmatchedUnlockException() {
            return new IllegalMonitorStateException(
                "attempt to unlock read lock, not locked by current thread");
        }
        //这个是读写锁特有的方法:获得共享锁(继承同步器后自定义的方法)
        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c   SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount  ;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count  ;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

        
        .
        .
        .
        //这里省略了一些代码

上面我代码做了简单的注释,也pass了一些方法,为了代码的完整性所以将Sync所有的的代码都贴了出来,整体上看着有点乱,我们理一下,首先ReentrantReadWriteLock和ReentrantLock结构基本相同,都有公平锁和非公平锁,实现方式是一样的,不同的地方是读写锁内部维护了两把锁,读锁和写锁,ReentrantLock同步状态表示被同一个线程获取的次数,ReentrantReadWriteLock同样需要有一个同步状态值来表示当前锁(读写锁),被同一个线程获取的次数,在读写锁中如果在一个整型变量中维护多种状态,就需要“按位切割使用”这个变量,它的高16位用来记录读状态,低16位用来记录写状态:

我们来看看具体是如何操作的: 写锁: 当重进入时候仅需加1,当释放的时候减1,获取当前状态的时候进行与(&)操作(0000000000000000 1111111111111111)将高的16位置为0即可获得当前的写状态。

读锁: 当重进入的时候仅需加1<<16,释放的时候减去1>>>16(无符号补零右移16位),获取当前状态时整体右移16位(左边补零)。

写所得获取与释放,在读写锁中,同样是重写了同步器的tryAcquire方法,和ReentrantLock不同的是在获取之前需要判定一下读锁是否存在,如果读锁存在那么获取失败!写锁的释放操作和ReentrantLock基本一致,无其他特别。

读锁的获取与释放,在获取读锁的时候判断当前读锁容量是否充足(因为存储16位,所以这个读锁会有一个最大值),如果充足还要判断当前状态是否大于零,如果大于零,那么无非三种情况, ①读锁状态位为0,写锁状态位不为→当前有写锁占用读锁进入等待(01) ②读锁状态位不为0,写锁状态位0→当前读锁占用读锁可以获得锁(10) ③读状态位和写状态位都为0→读锁可以获取(00) 以上的线程安全靠CAS进行保证!

读锁的每次释放是线程安全的,每次状态位减1<<16.

读写锁还有一个特性就是锁降级,指的是将写锁降级为读锁,是指当前线程获取的是写锁,先获取读锁然后释放写锁,保留读锁。

本章到此结束!还有很多分析不到的地方,望指正,不胜感激! 2018 3.29 10:34

0 人点赞