AQS
一、什么是AQS
AbstractQueuedSynchronizer抽象同步队列,其定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock、Semaphore、CountDownLatch等。
二、作用
用于管理加锁导致的阻塞。当多个线程争用资源时,未抢到资源的线程则会进入此队列,在其内部进行管理。
如果共享资源被别的线程占用时,就需要一定的阻塞等待唤醒机制来保证锁的分配。这个机制主要是CLH队列(FIFO先进先出)的变体来实现,将暂时获取不到锁的线程加入队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点(Node),通过CAS、自旋、LockSupport.park()的方式,来维护队列的同步状态state(volatile int类型),使并发达到同步控制的效果。
三、Node构造
static final class Node { // 共享模式 static final Node SHARED = new Node(); // 独占模式 static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** * CANCELLED,值为1,表示当前的线程被取消 * SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark; * CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中; * PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行; * 值为0,表示当前节点在sync队列中,等待着获取锁。 */ volatile int waitStatus; // 前驱结点 volatile Node prev; // 后继结点 volatile Node next; // 与该结点绑定的线程 volatile Thread thread; // 存储condition队列中的后继节点 Node nextWaiter; // 是否为共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 获取前驱结点 final Node predecessor() throwsNullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial heador SHARED marker } Node(Thread thread, Node mode) { // Used byaddWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { //Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
四、源码分析
1、acquire(int arg)
此方法是独占模式下线程获取共享资源的顶层入口。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1.1、tryAcquire(int arg)
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。
1.2、addWaiter(Node mode)
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
private Node addWaiter(Node mode) { //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享) Node node = new Node(Thread.currentThread(), mode); //尝试快速方式直接放到队尾。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //当队列没有元素时,则通过enq入队。 enq(node); return node; }
注:通过enq加入队列时,底层会默认创建一个new Node()作为伪节点,占在队列首位,第二位才是被传入的node节点。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) {
// Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
1.3、acquireQueue(final Node node, int arg)
内部会调用tryAcquire()方法,如果获取失败,会通过parkAndCheckInterrupt()方法将队列中的Node置为等待状态,直到占用线程调用unLock()方法或中断该线程。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }