AbstractQueuedSynchronizer、Unsafe概述

2024-10-03 23:10:51 浏览数 (2)

概述

AbstractQueuedSynchronizer,抽象队列同步器,简称AQS,用于构建同步器,抽象类,是JUC包下CountDownLatch、ReentrantLock、RenntrantReadWriteLock、Semaphore、ThreadPoolExecutor实现的基础。支持独占模式和共享模式,并通过内部的FIFO(First InFirst Out,先进先出)双向队列管理等待线程。

AQS维护FIFO线程等待队列(使用双向链表实现的,当多线程争用资源被阻塞时会进入此队列)。只有当Head结点持有的线程释放资源后,下一个线程才能获得资源。在这个工作模型中,state即是AQS的同步状态标量,也被称为资源。

AQS通过内部实现的FIFO同步等待队列来完成资源获取线程的等待工作,如果当前线程获取资源失败,AQS则会将当前线程以及等待状态等信息构造成一个Node结构的节点,并将其加入等待队列中,同时会阻塞当前线程;当其它获取到资源的线程释放持有的资源时,则会把等待队列节点中的线程唤醒,使其再次尝试获取对应资源。

源码

本文源码基于JDK22。AQS的内部类Node:

代码语言:java复制
abstract static class Node {
    // 前驱节点
    volatile Node prev; // initially attached via casTail
    // 后继节点
    volatile Node next; // visibly nonnull when signallable
    Thread waiter; // visibly nonnull when enqueued
	// 其值只能为CANCELLED、WAITING、COND,初始值为0
    volatile int status; // written by owner, atomic bit ops by others

	// methods for atomic operations
    final boolean casPrev(Node c, Node v) {  // for cleanQueue
        return U.weakCompareAndSetReference(this, PREV, c, v);
    }
    final boolean casNext(Node c, Node v) {  // for cleanQueue
        return U.weakCompareAndSetReference(this, NEXT, c, v);
    }
    final int getAndUnsetStatus(int v) {     // for signalling
        return U.getAndBitwiseAndInt(this, STATUS, ~v);
    }
    final void setPrevRelaxed(Node p) {      // for off-queue assignment
        U.putReference(this, PREV, p);
    }
    final void setStatusRelaxed(int s) {     // for off-queue assignment
        U.putInt(this, STATUS, s);
    }
    final void clearStatus() {               // for reducing unneeded signals
        U.putIntOpaque(this, STATUS, 0);
    }

    private static final long STATUS = U.objectFieldOffset(Node.class, "status");
    private static final long NEXT = U.objectFieldOffset(Node.class, "next");
    private static final long PREV = U.objectFieldOffset(Node.class, "prev");
}

基于Node有两个空的实现类:ExclusiveNode、SharedNode,见名知义,分别用于独占式和共享式场景。另有一个实现类ConditionNode:

代码语言:java复制
static final class ConditionNode extends Node
    implements ForkJoinPool.ManagedBlocker {
    ConditionNode nextWaiter; // link to next waiting node
    /**
     * Allows Conditions to be used in ForkJoinPools without
     * risking fixed pool exhaustion. This is usable only for
     * untimed Condition waits, not timed versions.
     */
    public final boolean isReleasable() {
        return status <= 1 || Thread.currentThread().isInterrupted();
    }

    public final boolean block() {
        while (!isReleasable()) LockSupport.park();
        return true;
    }
}

Node.status可能的三种状态:

代码语言:java复制
// Node status bits, also used as argument and return values
static final int WAITING   = 1;          // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND      = 2;          // in a condition wait

ConditionObject类:

AQS的三个重要属性:

代码语言:java复制
// 等待队列的头结点
private transient volatile Node head;
// 等待队列的尾节点
private transient volatile Node tail;
// 线程同步状态
private volatile int state;

state用来表示线程争抢的资源。state=0,表示没有线程正在等待资源,state>0,则有n个线程正在等待资源释放。

AQS的三个final方法用于操作同步状态,都是protected修饰的,说明只能在子类中使用这些方法。

代码语言:java复制
protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
// 使用CAS设置同步状态,确保线程安全
protected final boolean compareAndSetState(int expect, int update) {
    return U.compareAndSetInt(this, STATE, expect, update);
}

AQS提供的用于获取和释放同步状态的protect方法:

  • tryAcquire:独占式的获取同步状态,获取成功返回true,否则false
  • tryRelease:独占式的释放同步状态,释放成功返回true,否则false
  • tryAcquireShared:共享式的获取同步状态,获取成功返回true,否则false
  • tryReleaseShared:共享式的释放同步状态,释放成功返回true,否则false
  • isHeldExclusively:在独占模式下,如果当前线程已经获取到同步状态,则返回true;其他情况则返回false

AQS提供默认的同步实现,但是获取锁和释放锁的实现被定义为抽象方法,由子类实现。这样做的目的是使开发人员可以自由定义锁的获取与释放方式。

这几个方法都是空实现,并抛出UnsupportedOperationException。不同的同步工具针对不同的具体并发场景,如何获取、释放同步状态需要在AQS的自定义子类中实现。如果需要在独占模式下工作,则重写tryAcquire、tryRelease和isHeldExclusively方法;共享模式,则重写tryAcquireShared和tryReleaseShared方法。

代码语言:java复制
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}

AQS提供的队列相关方法:

  • hasQueuedThreads:判断是否有正在等待获取同步状态的线程
  • hasContended:判断是否有任何线程曾试图获取此同步器;即acquire方法是否被阻塞过
  • getFirstQueuedThread:返回队列中第一个(等待时间最长的)线程,如果目前没有将任何线程加入队列,则返回null
  • isQueued:判断给定线程是否已加入同步队列,是则返回true

AQS提供的监控方法:

  • getQueueLength:返回等待获取同步状态的线程数估计值,因为在构造该结果时,多线程环境下实际线程集合可能发生大的变化
  • getQueuedThreads:返回包含可能正在等待获取的线程集合,因为在构造该结果时多线程环境下实际线程集合可能发生大的变化
  • getExclusiveQueuedThreads:返回独占式线程列表
  • getSharedQueuedThreads:返回共享式线程列表

可在自定义的同步工具中使用上面提到的这些方法

双向链表

双向链表的优势:

  • 双向链表提供双向指针,可在任何一个节点向前或向后进行遍历
  • 双向链表可以在任意节点位置实现数据的插入和删除,这些操作的时间复杂度都是O(1),不受链表长度的影响

使用双向链表来管理等待队列中的线程。原因:

  • 存储在双向链表中的线程,有可能这个线程出现异常不再需要竞争锁,所以需要把这些异常节点从链表中删除,而删除操作需要找到这个节点的前驱节点,如果不采用双向链表,就必须从头节点开始遍历,时间复杂度是O(n)
  • 新加入链表中的线程,在进入到阻塞状态之前,需要判断前驱节点的状态,只有前驱节点是Sign状态的时候才会让当前线程阻塞,所以这里也会涉及前驱节点的查找,采用双向链表能够更好地提升查找效率
  • 线程在加入链表中后,会通过自旋的方式去尝试竞争锁来提升性能,在自旋竞争锁的时候为保证锁竞争的公平性,需要先判断当前线程所在节点的前驱节点是否是头节点。这个判断也需要获取当前节点的前驱节点,同样采用双向链表能提高查找效率
  • FIFO排队: AQS通过双向链表实现一个FIFO等待队列。当线程尝试获取锁但失败时,它会被放入等待队列的尾部,并在合适的时候被唤醒。FIFO排队有助于防止线程饥饿,保证公平性
  • 方便节点删除: 使用双向链表可以方便地从中间删除节点。如果一个线程在等待过程中被中断或超时。双向链表中每个节点都持有对前驱和后继节点的引用,使得移除节点的操作时间复杂度为O(1)
  • 支持Condition变量: AQS中的ConditionObject也依赖于AQS的队列机制。ConditionObject维护一个单独的等待队列,当调用await()方法时,线程会被放入这个等待队列,直到被signal()signalAll()唤醒。双向链表的使用同样有助于管理这些Condition变量的等待队列

子类

即AQS的实现类,前面提到JDK包下有CountDownLatch、ReentrantLock、RenntrantReadWriteLock、Semaphore、ThreadPoolExecutor这5个。其他开源框架,如Tomcat、Elasticsearch、Jersey、Sentry等,很多。

在CountDownLatch里,内部类Sync继承AQS。

在Semaphore里,抽象类Sync继承自AQS,Sync有两个子类NonfairSync和FairSync,分别表示非公平锁和公平锁。

在ThreadPoolExecutor里,表示线程任务的类Worker继承AQS。

在ReentrantLock里,Sync作为内部抽象类,实现简单的锁的获取与释放。Sync的两个子类NonfairSync和FairSync,都是ReentrantLock的内部类。ReentrantLock实现Lock接口的lock-unlock方法,这个方法会根据fair参数决定使用NonfairSync还是FairSync。

ReentrantReadWriteLock是Lock的另一种实现方式,ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。与排他锁相比,能提供更高的并发性。

Unsafe

jdk.internal.misc.Unsafe是一个特殊类,严格来说它并不属于J2SE标准。在使用Java时,往往会强调Java的安全性。而Unsafe恰恰破坏Java的安全性,提供功能强大的方法可以直接操作内存和线程;使用不慎,可能会导致严重的后果。

使用到Unsafe提供的如下几个方法:

  • compareAndSetInt:与Compare-And-Swap类似,是一个更高层的抽象,提供给Java开发者使用,通常用于实现乐观锁等无锁算法
  • weakCompareAndSetReference:用于弱引用
  • compareAndSetReference:用于引用
  • getAndBitwiseAndInt:原子地将所给对象中的某个字段或数组元素的当前值替换为当前值与掩码的按位与结果
  • putReference:
  • putInt:
  • putIntOpaque:
  • objectFieldOffset:
  • park:挂起线程

其他未使用到的,但Unsafe提供的方法:

  • 获取底层内存信息,如addressSize()pageSize()
  • 获取类相关的信息,如分配实例内存allocateInstance(Class),获取静态域偏移量staticFieldOffset(Field)
  • 数组相关,如获取索引范围arrayIndexScale(Class)
  • CAS操作,compareAndSetChar
  • 直接访问内存,如申请内存allocateMemory(long)

拓展

AbstractOwnableSynchronizer

AbstractOwnableSynchronizer,简称AOS,是AQS和AQLS的抽象父类,为实现独占模式(exclusive mode)的同步器提供基础,提供获取和设置当前拥有独占锁的线程的方法。

AbstractQueuedLongSynchronizer

简称AQLS,与AQS类似,使用long类型的状态变量来支持更大范围的状态值。

参考

  • AQS介绍与源码剖析
  • 35张图深入AQS

0 人点赞