AQS


一、什么是AQS

AbstractQueuedSynchronizer抽象同步队列,其定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock、Semaphore、CountDownLatch等。

二、作用

用于管理加锁导致的阻塞。当多个线程争用资源时,未抢到资源的线程则会进入此队列,在其内部进行管理。

如果共享资源被别的线程占用时,就需要一定的阻塞等待唤醒机制来保证锁的分配。这个机制主要是CLH队列(FIFO先进先出)的变体来实现,将暂时获取不到锁的线程加入队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点(Node),通过CAS、自旋、LockSupport.park()的方式,来维护队列的同步状态state(volatile int类型),使并发达到同步控制的效果。

三、Node构造

static final class Node {
 
    // 共享模式
    static final Node SHARED = new Node();
 
    // 独占模式
    static final Node EXCLUSIVE = null;
 
    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
 
    /**
     * CANCELLED,值为1,表示当前的线程被取消
     * SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
     * CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
     * PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
     * 值为0,表示当前节点在sync队列中,等待着获取锁。
     */
    volatile int waitStatus;
 
    // 前驱结点
    volatile Node prev;
 
    // 后继结点
    volatile Node next;
 
    // 与该结点绑定的线程
    volatile Thread thread;
 
    // 存储condition队列中的后继节点
    Node nextWaiter;
 
    // 是否为共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
 
    // 获取前驱结点
    final Node predecessor() throwsNullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
 
    Node() { // Used to establish initial heador SHARED marker
    }
 
    Node(Thread thread, Node mode) { // Used byaddWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
 
    Node(Thread thread, int waitStatus) { //Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

 

四、源码分析

1、acquire(int arg)

此方法是独占模式下线程获取共享资源的顶层入口。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

1.1、tryAcquire(int arg)

此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。

1.2、addWaiter(Node mode)

此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。

private Node addWaiter(Node mode) {
    //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);

    //尝试快速方式直接放到队尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    //当队列没有元素时,则通过enq入队。
    enq(node);
    return node;
}

注:通过enq加入队列时,底层会默认创建一个new Node()作为伪节点,占在队列首位,第二位才是被传入的node节点。

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
          // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

 1.3、acquireQueue(final Node node, int arg)

 内部会调用tryAcquire()方法,如果获取失败,会通过parkAndCheckInterrupt()方法将队列中的Node置为等待状态,直到占用线程调用unLock()方法或中断该线程。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }