java并发:线程并发控制机制之ReentrantLock
一、初识Lock
Lock是一个接口,提供了无条件的、可轮询的、定时的、可中断的锁获取操作,所有加锁和解锁的方法都是显式的,其包路径是:java.util.concurrent.locks.Lock,其核心方法是lock()、unlock()、tryLock(),实现类有ReentrantLock、ReentrantReadWriteLock.ReadLock、ReentrantReadWriteLock.WriteLock,下图展示了Lock接口中定义的方法:
二、ReentrantLock
初识ReentrantLock
Java在过去很长一段时间只能通过synchronized关键字来实现互斥,它有一些缺点,比如你不能扩展锁之外的方法或者块边界,尝试获取锁时不能中途取消等。Java5通过Lock接口提供了更复杂的控制来解决这些问题,《Java并发编程实战》一书有如下描述:
示例
此处我们看下面这两个例子,请注意其中ReentrantLock使用方式的区别:
(1)此处两个方法之间的锁是独立的
package com.test; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockDemo { public static void main(String[] args) { final Countx ct = new Countx(); for (int i = 0; i < 2; i++) { new Thread() { @Override public void run() { ct.get(); } }.start(); } for (int i = 0; i < 2; i++) { new Thread() { @Override public void run() { ct.put(); } }.start(); } } } class Countx { public void get() { final ReentrantLock lock = new ReentrantLock(); try { lock.lock();// 加锁 System.out.println(Thread.currentThread().getName() + "get begin"); Thread.sleep(1000L);// 模仿干活 System.out.println(Thread.currentThread().getName() + "get end"); lock.unlock(); // 解锁 } catch (InterruptedException e) { e.printStackTrace(); } } public void put() { final ReentrantLock lock = new ReentrantLock(); try { lock.lock();// 加锁 System.out.println(Thread.currentThread().getName() + "put begin"); Thread.sleep(1000L);// 模仿干活 System.out.println(Thread.currentThread().getName() + "put end"); lock.unlock(); // 解锁 } catch (InterruptedException e) { e.printStackTrace(); } } }
运行结果如下(每次运行结果都是不一样的,仔细体会一下):
Thread-1get begin Thread-0get begin Thread-2put begin Thread-3put begin Thread-0get end Thread-3put end Thread-1get end Thread-2put end
(2)此处两个方法之间使用相同的锁
package com.test; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockDemo { public static void main(String[] args) { final Countx ct = new Countx(); for (int i = 0; i < 2; i++) { new Thread() { @Override public void run() { ct.get(); } }.start(); } for (int i = 0; i < 2; i++) { new Thread() { @Override public void run() { ct.put(); } }.start(); } } } class Countx { final ReentrantLock lock = new ReentrantLock(); public void get() { // final ReentrantLock lock = new ReentrantLock(); try { lock.lock();// 加锁 System.out.println(Thread.currentThread().getName() + "get begin"); Thread.sleep(1000L);// 模仿干活 System.out.println(Thread.currentThread().getName() + "get end"); lock.unlock(); // 解锁 } catch (InterruptedException e) { e.printStackTrace(); } } public void put() { // final ReentrantLock lock = new ReentrantLock(); try { lock.lock();// 加锁 System.out.println(Thread.currentThread().getName() + "put begin"); Thread.sleep(1000L);// 模仿干活 System.out.println(Thread.currentThread().getName() + "put end"); lock.unlock(); // 解锁 } catch (InterruptedException e) { e.printStackTrace(); } } }
运行结果如下(每次运行结果都是一样的):
Thread-0get begin Thread-0get end Thread-1get begin Thread-1get end Thread-2put begin Thread-2put end Thread-3put begin Thread-3put end
解读:
ReentrantLock 是可重入的独占锁,某一时刻只能有一个线程可以获取该锁,其他获取该锁的线程会被阻塞而被放入该锁的AQS阻塞队列里面。
三、源码解读
下面这张图比较全面的展示了ReentrantLock的结构:
从类图可以看到,ReentrantLock 最终还是使用 AQS 来实现的,并且根据参数来决定其内部是一个公平还是非公平锁,默认是非公平锁。
构造函数
其构造函数如下:
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
解读:
从前面的类图知道Sync 类继承自 AQS,它的子类 NonfairSync 和 FairSync 分别实现了获取锁的非公平与公平策略。
获取锁
其获取锁的代码如下:
/** * Acquires the lock. * *Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * *
If the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. * *
If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one.
*/ public void lock() { sync.acquire(1); }
解读:
ReentrantLock 的 lock()委托给了 sync 类(即NonfairSync或者FairSync的实例)。
此处实际上是调用了AbstractQueuedSynchronizer的acquire方法,该方法的代码如下:
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
此方法调用了tryAcquire方法,其定义如下:
/** * Attempts to acquire in exclusive mode. This method should query * if the state of the object permits it to be acquired in the * exclusive mode, and if so to acquire it. * *This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. This can be used * to implement method {
@link Lock#tryLock()}. * *The default * implementation throws {
@link UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return {@code true} if successful. Upon success, this object has * been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
至此可知,而tryAcquire方法需要被子类重写,于是我们来看一下NonfairSync、FairSync中是如何重写该方法的。
针对NonfairSync
其定义如下:
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
解读:
从NonfairSync的定义可知,NonfairSync并没有自己实现tryAcquire方法,而是调用了父类Sync中的nonfairTryAcquire方法。
Sync的定义如下:
/** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ @ReservedStackAccess final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } @ReservedStackAccess protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
解读:
nonfairTryAcquire方法首先查看当前锁的状态值是否为 0:
- 如果当前状态值不为为 0,则说明当前该锁空闲,那么就尝试 CAS 获取该锁,将 AQS 的状态值从 0 设置为 1,并设置当前锁的持有者为当前线程,然后返回 true;
- 如果当前状态值不为 0,则说明该锁已经被某个线程持有,所以查看当前线程是否是该锁的持有者,如果当前线程是该锁的持有者,则状态值加 1,然后返回 true;
- 如果当前线程不是锁的持有者则返回 false,该线程被放入 AQS 阻塞队列。
Note:
该方法中 nextc < 0 说明可重入次数溢出了。
针对FairSync
其定义如下:
/** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ @ReservedStackAccess protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
解读:
公平锁的 tryAcquire 方法在尝试 CAS 获取锁之前调用了 hasQueuedPredecessors 方法,该方法定义在AbstractQueuedSynchronizer中,是实现公平性的核心代码,代码如下:
/** * Queries whether any threads have been waiting to acquire longer * than the current thread. * ** *An invocation of this method is equivalent to (but may be * more efficient than): *
{@code * getFirstQueuedThread() != Thread.currentThread() * && hasQueuedThreads()}
Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current
* thread. Likewise, it is possible for another thread to win a
* race to enqueue after this method has returned {@code false},
* due to the queue being empty.
*
* This method is designed to be used by a fair synchronizer to
* avoid barging.
* Such a synchronizer's { {
@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}
*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
Node h, s;
if ((h = head) != null) {
if ((s = h.next) == null || s.waitStatus > 0) {
s = null; // traverse in case of concurrent cancellation
for (Node p = tail; p != h && p != null; p = p.prev) {
if (p.waitStatus <= 0)
s = p;
}
}
if (s != null && s.thread != Thread.currentThread())
return true;
}
return false;
}
tryLock
尝试获取锁,如果当前该锁没有被其他线程持有,则当前线程获取该锁井返回 true,否则返回 false。
Note:
该方法不会引起当前线程阻塞。
代码如下:
/** * Acquires the lock only if it is not held by another thread at the time * of invocation. * *Acquires the lock if it is not held by another thread and * returns immediately with the value {
@code true}, setting the * lock hold count to one. Even when this lock has been set to use a * fair ordering policy, a call to {@code tryLock()} will * immediately acquire the lock if it is available, whether or not * other threads are currently waiting for the lock. * This "barging" behavior can be useful in certain * circumstances, even though it breaks fairness. If you want to honor * the fairness setting for this lock, then use * {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS)} * which is almost equivalent (it also detects interruption). * *If the current thread already holds this lock then the hold * count is incremented by one and the method returns {
@code true}. * *If the lock is held by another thread then this method will return * immediately with the value {
@code false}. * * @return {@code true} if the lock was free and was acquired by the * current thread, or the lock was already held by the current * thread; and {@code false} otherwise */ public boolean tryLock() { return sync.nonfairTryAcquire(1); }
思考:其与lock方法的异同?
回忆提示:非公平锁在tryAcquire中调用的也是这个方法,且调用了其他方法
释放锁
其释放锁的代码如下:
/** * Attempts to release this lock. * *If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {
@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }
解读:
ReentrantLock 的 unlock()委托给了 sync 类(即NonfairSync或者FairSync的实例)。
此处实际上是调用了AbstractQueuedSynchronizer的release方法,该方法的代码如下:
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
此方法调用了tryRelease方法,其定义如下:
/** * Attempts to set the state to reflect a release in exclusive * mode. * *This method is always invoked by the thread performing release. * *
The default implementation throws * {
@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this object is now in a fully released * state, so that any waiting threads may attempt to acquire; * and {@code false} otherwise. * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
至此可知,而tryRelease方法需要被子类重写,可以查看前面描述中Sync是如何重写该方法的。
其具体过程如下:
如果当前线程不是该锁持有者则直接抛出异常,否则查看状态值是否为 0。
如果状态值为 0,则说明当前线程要放弃对该锁的持有权,则把当前锁的持有者设置为 null。
如果状态值不为 0,则仅仅让当前线程对该锁的可重入次数减 1。
参考资料:
(1)https://www.baeldung.com/java-concurrent-locks