Java多线程专题4: 锁的实现基础 AQS


    • Java多线程专题4: 锁的实现基础 AQS

对 AQS(AbstractQueuedSynchronizer)的理解

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization.

Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. Class AbstractQueuedSynchronizer does not implement any synchronization interface. Instead it defines methods such as acquireInterruptibly that can be invoked as appropriate by concrete locks and related synchronizers to implement their public methods.

To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState, setState and/or compareAndSetState:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

Each of these methods by default throws UnsupportedOperationException. Implementations of these methods must be internally thread-safe, and should in general be short and not block. Defining these methods is the only supported means of using this class. All other methods are declared final because they cannot be independently varied.

For fair locks, it will call hasQueuedPredecessors() to check if there is a queued thread preceding the current thread. If yes, the tryAcquireShared() methods will return -1 immediately.

AQS是 AbstractQueuedSynchronizer 的缩写, 在同步组件的实现中, AQS是核心, 同步组件通过使用AQS提供的模板方法实现同步组件语义, AQS则实现了对同步状态的管理, 以及对阻塞线程进行排队, 等待通知等等一些底层的实现处理. AQS的核心也包括了这些方面: 同步队列, 独占式锁的获取和释放, 共享锁的获取和释放以及可中断锁, 超时等待锁获取这些特性的实现. 这些实际上则是AQS提供的模板方法.

AQS是用来构建锁和其他同步组件的基础框架, 它的实现主要依赖一个int成员变量来表示同步状态以及通过一个FIFO队列构成等待队列, 它的子类必须重写 AQS 的几个 protected 修饰的用来改变同步状态的方法, 其他方法主要是实现了排队和阻塞机制. 状态的更新使用getState, setState以及compareAndSetState这三个方法.

AQS内部实现了两个队列的抽象类: 同步队列和条件队列. 其中同步队列是一个双向链表, 里面储存的是处于等待状态的线程, 排队等待唤醒去获取锁; 而条件队列是一个单向链表, 里面储存的也是处于等待状态的线程, 只不过这些线程唤醒的结果是加入到了同步队列的队尾. AQS所做的就是管理这两个队列里面线程之间的等待状态-唤醒的工作.

在同步队列中还存在2种模式, 分别是独占模式和共享模式, 这两种模式的区别就在于AQS在唤醒线程节点的时候是不是传递唤醒, 这两种模式分别对应独占锁和共享锁.

AQS是一个抽象类, 不能直接实例化, 当我们需要实现一个自定义锁的时候可以去继承AQS然后重写获取锁的方式和释放锁的方式还有管理state.

独占式锁

  • void acquire(int arg): 独占式获取同步状态, 如果获取失败则插入同步队列进行等待;
  • void acquireInterruptibly(int arg): 与acquire方法相同, 但在同步队列中进行等待的时候可以检测中断
  • boolean tryAcquireNanos(int arg, long nanosTimeout): 在acquireInterruptibly基础上增加了超时等待功能, 在超时时间内没有获得同步状态返回false;
  • boolean release(int arg): 释放同步状态, 该方法会唤醒在同步队列中的下一个节点

共享式锁

  • void acquireShared(int arg): 共享式获取同步状态, 与独占式的区别在于同一时刻有多个线程获取同步状态
  • void acquireSharedInterruptibly(int arg): 在acquireShared方法基础上增加了能响应中断的功能;
  • boolean tryAcquireSharedNanos(int arg, long nanosTimeout): 在acquireSharedInterruptibly基础上增加了超时等待的功能;
  • boolean releaseShared(int arg): 共享式释放同步状态

当共享资源被某个线程占有, 其他请求该资源的线程将会阻塞, 从而进入同步队列. AQS中的同步队列则是通过链式方式进行实现. 同步队列是一个双向队列, AQS通过持有头尾指针管理同步队列.
在AQS有一个静态内部类Node, 其中有这样一些属性:

volatile int waitStatus //节点状态
volatile Node prev //当前节点/线程的前驱节点
volatile Node next; //当前节点/线程的后继节点
volatile Thread thread;//加入同步队列的线程引用
Node nextWaiter;//等待队列中的下一个节点

节点的状态有以下这些:

int CANCELLED =  1//节点从同步队列中取消
int SIGNAL    = -1//后继节点的线程处于等待状态, 如果当前节点释放同步状态会通知后继节点, 使得后继节点的线程能够运行;
int CONDITION = -2//当前节点进入等待队列中
int PROPAGATE = -3//表示下一次共享式同步状态获取将会无条件传播下去
int INITIAL = 0;//初始状态

当线程获取独占式锁失败后就会将当前线程加入同步队列, 在当前线程是第一个加入同步队列时, 调用compareAndSetHead(new Node())方法, 完成链式队列的头结点的初始化; 如果不是, 则调用compareAndSetTail(pred, node); 以上操作, 如果失败则自旋不断尝试CAS尾插入节点直至成功为止.

获得独占式锁时, 首先获取当前节点的先驱节点, 如果先驱节点是头结点的并且成功获得同步状态时if (p == head && tryAcquire(arg)), 当前节点所指向的线程能够获取锁. 反之, 获取锁失败进入等待状态.

  • 线程获取锁失败, 线程被封装成Node进行入队操作, 核心方法在于addWaiter()和enq(), 同时enq()完成对同步队列的头结点初始化工作以及CAS操作失败的重试;
  • 线程获取锁是一个自旋的过程, 当且仅当 当前节点的前驱节点是头结点并且成功获得同步状态时, 节点出队即该节点引用的线程获得锁, 否则, 当不满足条件时就会调用LockSupport.park()方法使得线程阻塞;
  • 释放锁的时候会唤醒后继节点;

总体来说: 在获取锁时, AQS维护一个同步队列, 获取锁失败的线程会加入到队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点是头结点并且成功获得了锁. 在释放锁时, 同步器会调用unparkSuccessor()方法唤醒后继节点.

参考

  • JDK8 https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/AbstractQueuedSynchronizer.html
  • JDK9 https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/locks/AbstractQueuedSynchronizer.html
  • JDK11 https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/locks/AbstractQueuedSynchronizer.html