java.util.concurrent.locks.LockSupport用法
在看AQS内部的时候发现很多使用java.util.concurrent.locks.LockSupport类的东西。 比如CountDownLatch.await 阻塞的时候以及使用阻塞队列进行take、take 方法在线程阻塞的时候也是使用的该类。下面研究其主要的使用方法。
1. 线程状态简单理解
一开始学习线程的时候线程的状态如下:
1、新建状态NEW new了线程但是没有开始执行,比如: Thread t1 = new Thread();t1就是一个新建状态的线程。 2、可运行状态RUNNABLE new出来线程,调用start()方法即处于RUNNABLE状态了。处于RUNNABLE状态的线程可能正在Java虚拟机中运行,也可能正在等待处理器的资源,因为一个线程必须获得CPU的资源后,才可以运行其run()方法中的内容,否则排队等待 3、阻塞BLOCKED 如果某一线程正在等待监视器锁,以便进入一个同步的块/方法,那么这个线程的状态就是阻塞BLOCKED 4、等待WAITING 某一线程因为调用不带超时的Object的wait()方法、不带超时的Thread的join()方法、LockSupport的park()方法,就会处于等待WAITING状态 5、超时等待TIMED_WAITING 某一线程因为调用带有指定正等待时间的Object的wait()方法、Thread的join()方法、Thread的sleep()方法、LockSupport的parkNanos()方法、LockSupport的parkUntil()方法,就会处于超时等待TIMED_WAITING状态 6、终止状态TERMINATED 线程调用终止或者run()方法执行结束后,线程即处于终止状态。处于终止状态的线程不具备继续运行的能力。
可以看到当调用park 方法之后进入WAITING 状态。
2. 主要API
1. 主要API如下:
源码如下:
/* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. * * * * * * * * * * * * * * * * * * * * */ /* * * * * * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent.locks; import sun.misc.Unsafe; /** * Basic thread blocking primitives for creating locks and other * synchronization classes. * ** * where neither {@code canProceed} nor any other actions prior to the * call to {@code park} entail locking or blocking. Because only one * permit is associated with each thread, any intermediary uses of * {@code park} could interfere with its intended effects. * *This class associates, with each thread that uses it, a permit * (in the sense of the {
@link java.util.concurrent.Semaphore * Semaphore} class). A call to {@code park} will return immediately * if the permit is available, consuming it in the process; otherwise * it may block. A call to {@code unpark} makes the permit * available, if it was not already available. (Unlike with Semaphores * though, permits do not accumulate. There is at most one.) * *Methods {
@code park} and {@code unpark} provide efficient * means of blocking and unblocking threads that do not encounter the * problems that cause the deprecated methods {@code Thread.suspend} * and {@code Thread.resume} to be unusable for such purposes: Races * between one thread invoking {@code park} and another thread trying * to {@code unpark} it will preserve liveness, due to the * permit. Additionally, {@code park} will return if the caller's * thread was interrupted, and timeout versions are supported. The * {@code park} method may also return at any other time, for "no * reason", so in general must be invoked within a loop that rechecks * conditions upon return. In this sense {@code park} serves as an * optimization of a "busy wait" that does not waste as much time * spinning, but must be paired with an {@code unpark} to be * effective. * *The three forms of {
@code park} each also support a * {@code blocker} object parameter. This object is recorded while * the thread is blocked to permit monitoring and diagnostic tools to * identify the reasons that threads are blocked. (Such tools may * access blockers using method {@link #getBlocker(Thread)}.) * The use of these forms rather than the original forms without this * parameter is strongly encouraged. The normal argument to supply as * a {@code blocker} within a lock implementation is {@code this}. * *These methods are designed to be used as tools for creating * higher-level synchronization utilities, and are not in themselves * useful for most concurrency control applications. The {
@code park} * method is designed for use only in constructions of the form: * *{@code * while (!canProceed()) { ... LockSupport.park(this); }}
Sample Usage. Here is a sketch of a first-in-first-out * non-reentrant lock class: *
{@code * class FIFOMutex { * private final AtomicBoolean locked = new AtomicBoolean(false); * private final Queue
If the permit is available then it is consumed and the call returns * immediately; otherwise * the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of three things happens: * *
-
*
- Some other thread invokes {
This method does not report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread upon return. * *
If the permit is available then it is consumed and the call * returns immediately; otherwise the current thread becomes disabled * for thread scheduling purposes and lies dormant until one of four * things happens: * *
-
*
- Some other thread invokes {
This method does not report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread, or the elapsed time * upon return. * *
If the permit is available then it is consumed and the call * returns immediately; otherwise the current thread becomes disabled * for thread scheduling purposes and lies dormant until one of four * things happens: * *
-
*
- Some other thread invokes {
This method does not report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread, or the current time * upon return. * *
If the permit is available then it is consumed and the call * returns immediately; otherwise the current thread becomes disabled * for thread scheduling purposes and lies dormant until one of three * things happens: * *
-
*
*
- Some other thread invokes {
This method does not report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread upon return.
If the permit is available then it is consumed and the call * returns immediately; otherwise the current thread becomes disabled * for thread scheduling purposes and lies dormant until one of four * things happens: * *
-
*
- Some other thread invokes {
This method does not report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread, or the elapsed time * upon return. * *
If the permit is available then it is consumed and the call * returns immediately; otherwise the current thread becomes disabled * for thread scheduling purposes and lies dormant until one of four * things happens: * *
-
*
- Some other thread invokes {
This method does not report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread, or the current time * upon return. * *
2. 使用
import java.util.concurrent.locks.LockSupport; public class PlainTest { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { System.out.println("111222"); LockSupport.park(); System.out.println("222333"); try { Thread.sleep(3*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("333444"); }); thread.start(); Thread.sleep(1*1000); System.out.println("thread.getState(): " + thread.getState() + "\t1"); LockSupport.unpark(thread); Thread.sleep(1*1000); System.out.println("thread.getState(): " + thread.getState() + "\t2"); Thread.sleep(3*1000); System.out.println("thread.getState(): " + thread.getState() + "\t3"); } }
结果:
111222 thread.getState(): WAITING 1 222333 thread.getState(): TIMED_WAITING 2 333444 thread.getState(): TERMINATED 3
2. park 方法
park用于挂起当前线程。 当前线程进入等待或者超时等待状态。其恢复的条件是调用unpark、其它线程中断了线程、带参数的park 时间到达指定时间。
1. park 可以设置一个blocker 参数, 也可以不设置。设置之后可以获取到当前线程阻塞的信息。
设置的时候会通过unsafe(parkBlockerOffset 偏移量获取到thread 对象的parkBlocker 的偏移量), 然后设置到java.lang.Thread#parkBlocker。
java.lang.Thread#parkBlocker:
/** * The argument supplied to the current call to * java.util.concurrent.locks.LockSupport.park. * Set by (private) java.util.concurrent.locks.LockSupport.setBlocker * Accessed using java.util.concurrent.locks.LockSupport.getBlocker */ volatile Object parkBlocker;
(1) 不设置:
import java.util.concurrent.locks.LockSupport; public class PlainTest { public static void main(String[] args) { LockSupport.park(); } }
jstack查看线程信息:
"main" #1 prio=5 os_prio=0 tid=0x00000224f146d000 nid=0x3524 waiting on condition [0x00000081e89fe000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304) at PlainTest.main(PlainTest.java:6)
(2) 设置blocker
代码:
import java.util.concurrent.locks.LockSupport; public class PlainTest { public static void main(String[] args) { LockSupport.park(new Object()); } }
结果:
"main" #1 prio=5 os_prio=0 tid=0x000002402c6ad800 nid=0x4a14 waiting on condition [0x000000d0ed6ff000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000076caeab80> (a java.lang.Object) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at PlainTest.main(PlainTest.java:6)
(3) 设置超时时间
import java.util.concurrent.locks.LockSupport; public class PlainTest { public static void main(String[] args) { LockSupport.parkNanos(Long.MAX_VALUE); } }
结果:
"main" #1 prio=5 os_prio=0 tid=0x000001e39c82c800 nid=0x3e70 waiting on condition [0x000000215d5ff000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338) at PlainTest.main(PlainTest.java:6)
3. unpark 需要设置一个线程进行解除阻塞
用于解除线程的阻塞。注意也可以先unpark, 在park。 只不过先unpark、后park, 调用park的时候相当于线程不会进行阻塞。多次unpark 和 一次unpark 的效果一样, 只能对一次park 生效。
下面研究其原理。
3. 原理
1. park 原理
park 调用的最终是: sun.misc.Unsafe#park, 是一个native 方法
public native void park(boolean var1, long var2);
其参数有两个, 第一个是是否是相对时间(isAbsolute), 第二个参数是时间。
对于第一个参数,LockSupport 使用的时候只有Ijava.util.concurrent.locks.LockSupport#parkUntil(java.lang.Object, long)传递的是true(代表是相对时间), 其他是false。
接下来查看其调用到C++相关方法。
1. \openjdk\hotspot\src\share\vm\prims\unsafe.cpp 内部的方法:
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) UnsafeWrapper("Unsafe_Park"); EventThreadPark event; #ifndef USDT2 HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time); #else /* USDT2 */ HOTSPOT_THREAD_PARK_BEGIN( (uintptr_t) thread->parker(), (int) isAbsolute, time); #endif /* USDT2 */ JavaThreadParkedState jtps(thread, time != 0); thread->parker()->park(isAbsolute != 0, time); #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker()); #else /* USDT2 */ HOTSPOT_THREAD_PARK_END( (uintptr_t) thread->parker()); #endif /* USDT2 */ if (event.should_commit()) { oop obj = thread->current_park_blocker(); event.set_klass((obj != NULL) ? obj->klass() : NULL); event.set_timeout(time); event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop(obj) : 0); event.commit(); } UNSAFE_END
核心是在thread->parker()->park(isAbsolute != 0, time); 这一行代码,调用线程内部的parker() 获取到parker 之后继续调用 park 方法。(每个线程对象都有一个parker对象)
parker 对象:openjdk\hotspot\src\share\vm\runtime\park.hpp
class Parker : public os::PlatformParker { private: volatile int _counter ; Parker * FreeNext ; JavaThread * AssociatedWith ; // Current association public: Parker() : PlatformParker() { _counter = 0 ; FreeNext = NULL ; AssociatedWith = NULL ; } protected: ~Parker() { ShouldNotReachHere(); } public: // For simplicity of interface with Java, all forms of park (indefinite, // relative, and absolute) are multiplexed into one call. void park(bool isAbsolute, jlong time); void unpark(); // Lifecycle operators static Parker * Allocate (JavaThread * t) ; static void Release (Parker * e) ; private: static Parker * volatile FreeList ; static volatile int ListLock ; };
_counter 属性是起重要作用的属性。
其父类有互斥变量等属性:\openjdk\hotspot\src\os\linux\vm\os_linux.hpp
class PlatformParker : public CHeapObj{ protected: enum { REL_INDEX = 0, ABS_INDEX = 1 }; int _cur_index; // which cond is in use: -1, 0, 1 pthread_mutex_t _mutex [1] ; pthread_cond_t _cond [2] ; // one for relative times and one for abs. public: // TODO-FIXME: make dtor private ~PlatformParker() { guarantee (0, "invariant") ; } public: PlatformParker() { int status; status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr()); assert_status(status == 0, status, "cond_init rel"); status = pthread_cond_init (&_cond[ABS_INDEX], NULL); assert_status(status == 0, status, "cond_init abs"); status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); _cur_index = -1; // mark as unused } }; #endif // OS_LINUX_VM_OS_LINUX_HPP
2. park 方法根据操作系统不同交给对应的实现:
比如: openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp
void Parker::park(bool isAbsolute, jlong time) { // Ideally we'd do something useful while spinning, such // as calling unpackTime(). // Optional fast-path check: // Return immediately if a permit is available. // We depend on Atomic::xchg() having full barrier semantics // since we are doing a lock-free update to _counter. // 如果_counter 属性大于0, 代表有许可,直接返回 if (Atomic::xchg(0, &_counter) > 0) return; // Optional fast-exit: Check interrupt before trying to wait Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; if (Thread::is_interrupted(thread, false)) { return; } // First, demultiplex/decode time arguments timespec absTime; // 如果time 参数小于0, 或者是绝对时间且时间等于0, 直接返回 if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } if (time > 0) { // Warning: this code might be exposed to the old Solaris time // round-down bugs. Grep "roundingFix" for details. // 将时间换算后保存起来 unpackTime(&absTime, isAbsolute, time); } // Enter safepoint region // Beware of deadlocks such as 6317397. // The per-thread Parker:: _mutex is a classic leaf-lock. // In particular a thread must never block on the Threads_lock while // holding the Parker:: mutex. If safepoints are pending both the // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock. ThreadBlockInVM tbivm(jt); // Don't wait if cannot get lock since interference arises from // unblocking. Also. check interrupt before trying wait // 如果线程被中断,或者是在尝试给互斥变量加锁的过程中,加锁失败,比如被其它线程锁住了,直接返回 if (Thread::is_interrupted(thread, false) || os::Solaris::mutex_trylock(_mutex) != 0) { return; } int status ; // 走到这里代表有_counter 大于0, 则将其重置为0。 if (_counter > 0) { // no wait needed _counter = 0; // 对互斥变量解锁 status = os::Solaris::mutex_unlock(_mutex); assert (status == 0, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); return; } #ifdef ASSERT // Don't catch signals while blocked; let the running threads have the signals. // (This allows a debugger to break into the running thread.) sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Solaris::allowdebug_blocked_signals(); thr_sigsetmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() // Do this the hard way by blocking ... // See http://monaco.sfbay/detail.jsf?cr=5094058. // TODO-FIXME: for Solaris SPARC set fprs.FEF=0 prior to parking. // Only for SPARC >= V8PlusA #if defined(__sparc) && defined(COMPILER2) if (ClearFPUAtPark) { _mark_fpu_nosave() ; } #endif if (time == 0) { status = os::Solaris::cond_wait (_cond, _mutex) ; } else { status = os::Solaris::cond_timedwait (_cond, _mutex, &absTime); } // Note that an untimed cond_wait() can sometimes return ETIME on older // versions of the Solaris. assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT thr_sigsetmask(SIG_SETMASK, &oldsigs, NULL); #endif _counter = 0 ; status = os::Solaris::mutex_unlock(_mutex); assert_status(status == 0, status, "mutex_unlock") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } }
(1) ThreadBlockInVM tbivm(jt); 是修改线程状态为阻塞。 相当于创建一个ThreadBlockInVM 对象, 变量名为tbivm, 参数为jt
\openjdk\hotspot\src\share\vm\runtime\interfaceSupport.hpp
class ThreadBlockInVM : public ThreadStateTransition { public: ThreadBlockInVM(JavaThread *thread) : ThreadStateTransition(thread) { // Once we are blocked vm expects stack to be walkable thread->frame_anchor()->make_walkable(thread); trans_and_fence(_thread_in_vm, _thread_blocked); } ~ThreadBlockInVM() { trans_and_fence(_thread_blocked, _thread_in_vm); // We don't need to clear_walkable because it will happen automagically when we return to java } };
然后调用到: D:\study\sourcecode\openjdk\openjdk\hotspot\src\share\vm\runtime\interfaceSupport.hpp 中的 transition_and_fence
// transition_and_fence must be used on any thread state transition // where there might not be a Java call stub on the stack, in // particular on Windows where the Structured Exception Handler is // set up in the call stub. os::write_memory_serialize_page() can // fault and we can't recover from it on Windows without a SEH in // place. static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) { assert(thread->thread_state() == from, "coming from wrong thread state"); assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states"); // Change to transition state (assumes total store ordering! -Urs) thread->set_thread_state((JavaThreadState)(from + 1)); // Make sure new state is seen by VM thread if (os::is_MP()) { if (UseMembar) { // Force a fence between the write above and read below OrderAccess::fence(); } else { // Must use this rather than serialization page in particular on Windows InterfaceSupport::serialize_memory(thread); } } if (SafepointSynchronize::do_call_back()) { SafepointSynchronize::block(thread); } thread->set_thread_state(to); CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();) }
定义的线程状态: openjdk\hotspot\src\share\vm\utilities\globalDefinitions.hpp
enum JavaThreadState { _thread_uninitialized = 0, // should never happen (missing initialization) _thread_new = 2, // just starting up, i.e., in process of being initialized _thread_new_trans = 3, // corresponding transition state (not used, included for completness) _thread_in_native = 4, // running in native code _thread_in_native_trans = 5, // corresponding transition state _thread_in_vm = 6, // running in VM _thread_in_vm_trans = 7, // corresponding transition state _thread_in_Java = 8, // running in Java or in stub code _thread_in_Java_trans = 9, // corresponding transition state (not used, included for completness) _thread_blocked = 10, // blocked in vm _thread_blocked_trans = 11, // corresponding transition state _thread_max_state = 12 // maximum thread state+1 - used for statistics allocation };
2. unpark 原理
\openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp 中 unpark 方法如下:
void Parker::unpark() { int s, status ; // 获取互斥锁 status = os::Solaris::mutex_lock (_mutex) ; assert (status == 0, "invariant") ; // s记录原来的_counter 的值 s = _counter; // _counter 设置为0 _counter = 1; // 释放互斥锁 status = os::Solaris::mutex_unlock (_mutex) ; assert (status == 0, "invariant") ; // 如果原来的_counter为0, 证明有线程调用park 在等待信号。 则调用下面方法通知线程解除阻塞。 则原来park 等待的线程会继续后面的代码 if (s < 1) { status = os::Solaris::cond_signal (_cond) ; assert (status == 0, "invariant") ; } }
unpark本身就是将_counter 设置为1,并通知条件阻塞的线程已经可以结束等待了. 如果多次连续调用unpark 方法,则s < 1 不成立, 也就不会走下面的方法。
cond_signal 调用 \openjdk\hotspot\src\os\solaris\vm\os_solaris.hpp 中 下面代码:
static int cond_signal(cond_t *cv) { return _cond_signal(cv); }
总结: 每个线程都有一个parker 对象,内部包含_counter 可以视作许可证。 每次park 的时候相当于等待该许可证(等待该变量改为1), 调用unpark 相当于将许可证变量改为1。