java并发:线程并发控制机制之ReentrantLock


一、初识Lock

  Lock是一个接口,提供了无条件的、可轮询的、定时的、可中断的锁获取操作,所有加锁和解锁的方法都是显式的,其包路径是:java.util.concurrent.locks.Lock,其核心方法是lock()、unlock()、tryLock(),实现类有ReentrantLock、ReentrantReadWriteLock.ReadLock、ReentrantReadWriteLock.WriteLock,下图展示了Lock接口中定义的方法:

二、ReentrantLock

初识ReentrantLock

  Java在过去很长一段时间只能通过synchronized关键字来实现互斥,它有一些缺点,比如你不能扩展锁之外的方法或者块边界,尝试获取锁时不能中途取消等。Java5通过Lock接口提供了更复杂的控制来解决这些问题,《Java并发编程实战》一书有如下描述:

示例

此处我们看下面这两个例子,请注意其中ReentrantLock使用方式的区别:

(1)此处两个方法之间的锁是独立

package com.test;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {
    public static void main(String[] args) {
        final Countx ct = new Countx();
        for (int i = 0; i < 2; i++) {
            new Thread() {
                @Override
                public void run() {
                    ct.get();
                }
            }.start();
        }
        for (int i = 0; i < 2; i++) {
            new Thread() {
                @Override
                public void run() {
                    ct.put();
                }
            }.start();
        }
    }
}

class Countx {

    public void get() {
        final ReentrantLock lock = new ReentrantLock();
        try {
            lock.lock();// 加锁
            System.out.println(Thread.currentThread().getName() + "get begin");
            Thread.sleep(1000L);// 模仿干活
            System.out.println(Thread.currentThread().getName() + "get end");
            lock.unlock(); // 解锁
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void put() {
        final ReentrantLock lock = new ReentrantLock();
        try {
            lock.lock();// 加锁
            System.out.println(Thread.currentThread().getName() + "put begin");
            Thread.sleep(1000L);// 模仿干活
            System.out.println(Thread.currentThread().getName() + "put end");
            lock.unlock(); // 解锁
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果如下(每次运行结果都是不一样的,仔细体会一下):

Thread-1get begin
Thread-0get begin
Thread-2put begin
Thread-3put begin
Thread-0get end
Thread-3put end
Thread-1get end
Thread-2put end

(2)此处两个方法之间使用相同的锁

package com.test;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {
    public static void main(String[] args) {
        final Countx ct = new Countx();
        for (int i = 0; i < 2; i++) {
            new Thread() {
                @Override
                public void run() {
                    ct.get();
                }
            }.start();
        }
        for (int i = 0; i < 2; i++) {
            new Thread() {
                @Override
                public void run() {
                    ct.put();
                }
            }.start();
        }
    }
}

class Countx {
    final ReentrantLock lock = new ReentrantLock();

    public void get() {
        // final ReentrantLock lock = new ReentrantLock();
        try {
            lock.lock();// 加锁
            System.out.println(Thread.currentThread().getName() + "get begin");
            Thread.sleep(1000L);// 模仿干活
            System.out.println(Thread.currentThread().getName() + "get end");
            lock.unlock(); // 解锁
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void put() {
        // final ReentrantLock lock = new ReentrantLock();
        try {
            lock.lock();// 加锁
            System.out.println(Thread.currentThread().getName() + "put begin");
            Thread.sleep(1000L);// 模仿干活
            System.out.println(Thread.currentThread().getName() + "put end");
            lock.unlock(); // 解锁
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果如下(每次运行结果都是一样的):

Thread-0get begin
Thread-0get end
Thread-1get begin
Thread-1get end
Thread-2put begin
Thread-2put end
Thread-3put begin
Thread-3put end

解读:

ReentrantLock 是可重入的独占锁,某一时刻只能有一个线程可以获取该锁,其他获取该锁的线程会被阻塞而被放入该锁的AQS阻塞队列里面。

三、源码解读

下面这张图比较全面的展示了ReentrantLock的结构:

从类图可以看到,ReentrantLock 最终还是使用 AQS 来实现的,并且根据参数来决定其内部是一个公平还是非公平锁,默认是非公平锁。

构造函数

其构造函数如下:

    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

解读:

从前面的类图知道Sync 类继承自 AQS,它的子类 NonfairSync 和 FairSync 分别实现了获取锁的非公平与公平策略。

获取锁

其获取锁的代码如下:

    /**
     * Acquires the lock.
     *
     * 

Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * *

If the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. * *

If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one. */ public void lock() { sync.acquire(1); }

解读:

ReentrantLock 的 lock()委托给了 sync 类(即NonfairSync或者FairSync的实例)。

此处实际上是调用了AbstractQueuedSynchronizer的acquire方法,该方法的代码如下:

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

此方法调用了tryAcquire方法,其定义如下:

    /**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * 

This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. This can be used * to implement method {@link Lock#tryLock()}. * *

The default * implementation throws {@link UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return {@code true} if successful. Upon success, this object has * been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }

至此可知,而tryAcquire方法需要被子类重写,于是我们来看一下NonfairSync、FairSync中是如何重写该方法的。

针对NonfairSync

其定义如下:

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

解读:

从NonfairSync的定义可知,NonfairSync并没有自己实现tryAcquire方法,而是调用了父类Sync中的nonfairTryAcquire方法。

Sync的定义如下:

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        @ReservedStackAccess
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

解读:

nonfairTryAcquire方法首先查看当前锁的状态值是否为 0:

  • 如果当前状态值不为为 0,则说明当前该锁空闲,那么就尝试 CAS 获取该锁,将 AQS 的状态值从 0 设置为 1,并设置当前锁的持有者为当前线程,然后返回 true;
  • 如果当前状态值不为 0,则说明该锁已经被某个线程持有,所以查看当前线程是否是该锁的持有者,如果当前线程是该锁的持有者,则状态值加 1,然后返回 true;
  • 如果当前线程不是锁的持有者则返回 false,该线程被放入 AQS 阻塞队列。

Note:

该方法中 nextc < 0 说明可重入次数溢出了。

针对FairSync

其定义如下:

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

解读:

公平锁的 tryAcquire 方法在尝试 CAS 获取锁之前调用了 hasQueuedPredecessors 方法,该方法定义在AbstractQueuedSynchronizer中,是实现公平性的核心代码,代码如下:

    /**
     * Queries whether any threads have been waiting to acquire longer
     * than the current thread.
     *
     * 

An invocation of this method is equivalent to (but may be * more efficient than): *

 {@code
     * getFirstQueuedThread() != Thread.currentThread()
     *   && hasQueuedThreads()}
* *

Note that because cancellations due to interrupts and * timeouts may occur at any time, a {@code true} return does not * guarantee that some other thread will acquire before the current * thread. Likewise, it is possible for another thread to win a * race to enqueue after this method has returned {@code false}, * due to the queue being empty. * *

This method is designed to be used by a fair synchronizer to * avoid barging. * Such a synchronizer's {@link #tryAcquire} method should return * {@code false}, and its {@link #tryAcquireShared} method should * return a negative value, if this method returns {@code true} * (unless this is a reentrant acquire). For example, the {@code * tryAcquire} method for a fair, reentrant, exclusive mode * synchronizer might look like this: * *

 {@code
     * protected boolean tryAcquire(int arg) {
     *   if (isHeldExclusively()) {
     *     // A reentrant acquire; increment hold count
     *     return true;
     *   } else if (hasQueuedPredecessors()) {
     *     return false;
     *   } else {
     *     // try to acquire normally
     *   }
     * }}
* *
@return {@code true} if there is a queued thread preceding the * current thread, and {@code false} if the current thread * is at the head of the queue or the queue is empty * @since 1.7 */ public final boolean hasQueuedPredecessors() { Node h, s; if ((h = head) != null) { if ((s = h.next) == null || s.waitStatus > 0) { s = null; // traverse in case of concurrent cancellation for (Node p = tail; p != h && p != null; p = p.prev) { if (p.waitStatus <= 0) s = p; } } if (s != null && s.thread != Thread.currentThread()) return true; } return false; }

tryLock

尝试获取锁,如果当前该锁没有被其他线程持有,则当前线程获取该锁井返回 true,否则返回 false。

Note:

该方法不会引起当前线程阻塞。

代码如下:

    /**
     * Acquires the lock only if it is not held by another thread at the time
     * of invocation.
     *
     * 

Acquires the lock if it is not held by another thread and * returns immediately with the value {@code true}, setting the * lock hold count to one. Even when this lock has been set to use a * fair ordering policy, a call to {@code tryLock()} will * immediately acquire the lock if it is available, whether or not * other threads are currently waiting for the lock. * This "barging" behavior can be useful in certain * circumstances, even though it breaks fairness. If you want to honor * the fairness setting for this lock, then use * {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS)} * which is almost equivalent (it also detects interruption). * *

If the current thread already holds this lock then the hold * count is incremented by one and the method returns {@code true}. * *

If the lock is held by another thread then this method will return * immediately with the value {@code false}. * * @return {@code true} if the lock was free and was acquired by the * current thread, or the lock was already held by the current * thread; and {@code false} otherwise */ public boolean tryLock() { return sync.nonfairTryAcquire(1); }

思考:其与lock方法的异同?

回忆提示:非公平锁在tryAcquire中调用的也是这个方法,且调用了其他方法

释放锁

其释放锁的代码如下:

    /**
     * Attempts to release this lock.
     *
     * 

If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }

解读:

ReentrantLock 的 unlock()委托给了 sync 类(即NonfairSync或者FairSync的实例)。

此处实际上是调用了AbstractQueuedSynchronizer的release方法,该方法的代码如下:

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

此方法调用了tryRelease方法,其定义如下:

    /**
     * Attempts to set the state to reflect a release in exclusive
     * mode.
     *
     * 

This method is always invoked by the thread performing release. * *

The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this object is now in a fully released * state, so that any waiting threads may attempt to acquire; * and {@code false} otherwise. * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }

至此可知,而tryRelease方法需要被子类重写,可以查看前面描述中Sync是如何重写该方法的。

其具体过程如下:

如果当前线程不是该锁持有者则直接抛出异常,否则查看状态值是否为 0。

如果状态值为 0,则说明当前线程要放弃对该锁的持有权,则把当前锁的持有者设置为 null。

如果状态值不为 0,则仅仅让当前线程对该锁的可重入次数减 1。 

参考资料:

(1)https://www.baeldung.com/java-concurrent-locks

相关