JUC-ReentrantLock

2023-10-17 08:33:01 浏览数 (2)

AQS

学习ReentrantLock就不得不知道AQS,因为ReentrantLock就是基于了AQS对象的

特点
  • 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁
  • getState - 获取 state 状态
  • setState - 设置 state 状态
  • compareAndSetState - cas 机制设置 state 状态
  • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

ReentrantLock

相对于 synchronized 它具备如下特点

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量

与 synchronized 一样,都支持可重入

可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

条件变量

synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待 ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤 醒

原理

非公平锁实现原理

加锁流程

先看构造器,默认为非公平锁

代码语言:javascript复制
    public ReentrantLock() {
        sync = new NonfairSync();
    }

没有竞争时

第一个竞争出现时

Thread-1 执行了

  1. CAS 尝试将 state 由 0 改为 1,结果失败
  2. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
  3. 接下来进入 addWaiter 逻辑,构造 Node 队列

图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态 Node 的创建是懒惰的 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

当前线程进入 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
  1. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  2. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
  3. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

再次有多个线程经历上述过程竞争失败,变成这个样子

Thread-0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程 找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1 回到 Thread-1 的 acquireQueued 流程

如果加锁成功(没有竞争),会设置 exclusiveOwnerThread 为 Thread-1,state = 1 head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

如果不巧又被 Thread-4 占了先

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码
代码语言:javascript复制
 static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        // 加锁实现
        final void lock() {
        	// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
            	//如果失败
                acquire(1);
        }
代码语言:javascript复制
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && //进入tryAcquire
        	//当tryAcquire返回false时候,先调用addWaiter,再调用acquireQueued
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire中的逻辑

代码语言:javascript复制
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

 		final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
			
			// 如果还没有获得锁
            if (c == 0) {
				//这里直接cas体现了非公平不去检查AQS
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果已经获得锁并且还是自己那么发送了锁重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c   acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //获取失败
            return false;
        }

addWaiter逻辑

代码语言:javascript复制
    private Node addWaiter(Node mode) {
        //将当前线程关联到一个Node对象,模式为独占模式
        Node node = new Node(Thread.currentThread(), mode);
        
        Node pred = tail;
        //如果tail不为空cas尝试将Node对象加入AQS队列尾部
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //队列为空
        enq(node);
        return node;
    }
	
	private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
            	//还没有设置head 为哨兵节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            	//cas将Node加入AQS尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued中的流程

代码语言:javascript复制
	final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//判断这个节点的上一个节点
                final Node p = node.predecessor();
				//上一个节点是head,去尝试一次
                if (p == head && tryAcquire(arg)) {
                    //获取成功设置自己为head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //返回中断标记
                    return interrupted;
                }
                //尝试失败 / 上一个节点不是head
                if (shouldParkAfterFailedAcquire(p, node) &&  // 判断是否需要等待
                    parkAndCheckInterrupt()) 
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //上一个节点的状态
        int ws = pred.waitStatus;
        //上一个节点在阻塞
        if (ws == Node.SIGNAL)
            return true;
        
        //表示取消状态
        if (ws > 0) {
            //重构前面所有取消的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
	        // 需要设置上一个节点的值为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }


	private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

注意:是否需要unpark是由当前节点的前驱节点的waitStatus == Node.SIGNAL来决定,而不是本节点的waitStatus

解锁源码
代码语言:javascript复制
    public void unlock() {
        sync.release(1);
    }
    
	public final boolean release(int arg) {
		//尝试释放锁
        if (tryRelease(arg)) {
			//队头节点
            Node h = head;
           
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease

代码语言:javascript复制
        protected final boolean tryRelease(int releases) {
        	//state减去释放的
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
			//如果state为01则设置owner为null
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

unparkSuccessor

代码语言:javascript复制
private void unparkSuccessor(Node node) {
		
		
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // 不考虑已取消的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从队尾先前找需要unpark的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
        	//启动线程
            LockSupport.unpark(s.thread);
    }
可重入原理
代码语言:javascript复制
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            
			// 原理
			
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c   acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
可打断原理
代码语言:javascript复制
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
					// 直接抛出异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
公平锁实现原理
代码语言:javascript复制
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            	//判断是否有前驱节点没有才去竞争
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c   acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
条件变量实现原理

每个条件变量对应一个等待队列,其实先类是ConditionObject

await

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

park 阻塞 Thread-0

signal 流程

假设 Thread-1 要来唤醒 Thread-0

进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1

addConditionWaiter
代码语言:javascript复制
private Node addConditionWaiter() {
            Node t = lastWaiter;
            
            if (t != null && t.waitStatus != Node.CONDITION) {
            	//从队列中删除
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //创建一个节点放入
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
doSignal
代码语言:javascript复制
		//将没取消的第一个节点转移到AQS队列
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&  //将队列中的Node转移到AQS队列,不成功就继续循环
                     (first = firstWaiter) != null);
        }
代码语言:javascript复制
final boolean transferForSignal(Node node) {
		  
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        
        //加入AQS队列尾部
        Node p = enq(node);
        int ws = p.waitStatus;
		
		
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            
            LockSupport.unpark(node.thread);
            
        return true;
    }

0 人点赞