Kotlin Coroutine 原理详解(3)-- 协程的挂起与恢复
挂起与恢复的功能特征
- 以同步的方式执行异步代码
- 线程调度
挂起函数
Kotlin
使用 suspend
关键词标记一个函数为挂起函数;
GlobalScope.launch {
// 协程体
aaa()
bbb()
}
suspend fun aaa(){
delay(100)
bbb()
println("aaa")
}
suspend fun bbb() {
delay(100)
println("bbb")
}
很显然我在协程体里依次调用了 aaa
和 bbb
两个挂起函数,并且在 aaa
中调用了一次 bbb
。
下面是class
文件反编译的 java
源码,大体查看源码,协程使用状态机把层层嵌套的回调给拍平了。
在协程体里有一个大的状态机,挂起函数内部还有一个小的状态机,在递归地跑完小状态机后,再跑大状态机;
// launch 扩展函数被编译为静态函数;
// 被扩展的 `CoroutineScope` 作为第一个参数;
// 第二三参数分别是 `CoroutineContext` `CoroutineStart`类型;
// 第四个参数类型为 `Function2` 一个有两个参数一个返回值的函数类型,新增了两个方法;其中 `invokeSuspend`
// 函数编写了状态机的部分;`create` 函数创建了最初的 `Continuation`;实现 `invoke` 启动状态机
// 第五六两个参数不知道有什么用。
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
MainActivity var10000;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
var10000 = MainActivity.this;
this.label = 1;
// 这里可以看出此处匿名实现的`Function2`不仅仅实现了`Function2`,还实现了`Continuation`接口,这一点并没有在反编译代码中表现;
if (var10000.aaa(this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
case 2:
ResultKt.throwOnFailure($result);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var10000 = MainActivity.this;
this.label = 2;
if (var10000.bbb(this) == var2) {
return var2;
} else {
return Unit.INSTANCE;
}
}
// 传入了`value` 参数,但是并没有用到;
// 传入了`completion`作为上层状态机;
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
// 此处的 `var3` 同样实现了 `Continuation` 接口;
Function2 var3 = new (completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return (()this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
// 函数签名被修改。
// 在传参的最后添加了一个`Continuation` 的参数
// 函数的返回值修改为`Object`,当该函数能够立即返回时则返回本来的数据类型,此处是 Unit;
// 当函数不能立即返回时,则返回`COROUTINE_SUSPENDED`,来标记当前函数稍后会返回;
@Nullable
public final Object aaa(@NotNull Continuation var1) {
Object $continuation;
label27: {
// 此处 `undefinedtype` 指的是下方所匿名实现的 `ContinuationImpl` 类型;
if (var1 instanceof ) {
$continuation = ()var1;
// 下面的操作是为了保证 `label` 的正确性
if (((()$continuation).label & Integer.MIN_VALUE) != 0) {
(()$continuation).label -= Integer.MIN_VALUE;
break label27;
}
}
$continuation = new ContinuationImpl(var1) {
// $FF: synthetic field
Object result;
int label;
Object L$0;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return MainActivity.this.aaa(this);
}
};
}
label22: {
Object $result = (()$continuation).result;
Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch((()$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
(()$continuation).L$0 = this;
(()$continuation).label = 1;
if (DelayKt.delay(100L, (Continuation)$continuation) == var6) {
return var6;
}
break;
case 1:
this = (MainActivity)(()$continuation).L$0;
ResultKt.throwOnFailure($result);
break;
case 2:
ResultKt.throwOnFailure($result);
break label22;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
(()$continuation).L$0 = null;
(()$continuation).label = 2;
if (this.bbb((Continuation)$continuation) == var6) {
return var6;
}
}
String var2 = "aaa";
boolean var3 = false;
System.out.println(var2);
return Unit.INSTANCE;
}
@Nullable
public final Object bbb(@NotNull Continuation var1) {
Object $continuation;
label20: {
if (var1 instanceof ) {
$continuation = ()var1;
if (((()$continuation).label & Integer.MIN_VALUE) != 0) {
(()$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new ContinuationImpl(var1) {
// $FF: synthetic field
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return MainActivity.this.bbb(this);
}
};
}
Object $result = (()$continuation).result;
Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch((()$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
(()$continuation).label = 1;
if (DelayKt.delay(100L, (Continuation)$continuation) == var6) {
return var6;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
String var2 = "bbb";
boolean var3 = false;
System.out.println(var2);
return Unit.INSTANCE;
}
上面的代码基本说明了协程通过状态机运行的,但是仍然有两点问题:
- 状态机是如何开始如何结束的?
- 大状态机如何切换到小状态机,小状态机如何切回大状态机?
状态机工作原理
BaseContinuationImpl
是所有编译器自动生成的类的父类;
internal abstract class BaseContinuationImpl(
// 包裹当前状态机的上层状态机
public val completion: Continuation?
) : Continuation, CoroutineStackFrame, Serializable {
// 最终实现,不允许重写;
public final override fun resumeWith(result: Result) {
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result =
try {
val outcome = invokeSuspend(param)
// 当前状态机调用了挂起函数,不能立即返回,停止循环;
if (outcome === COROUTINE_SUSPENDED) return
// 当前状态机达到最终状态,拿到了结果;
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
// 当前状态机已结束,开始向上递归;
// 如果当前状态机是 BaseContinuationImpl 的子类,则改变 current 和 param 继续循环;
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// 达到顶层状态机,不是 BaseContinuationImpl 的子类,直接调用它的 resumeWith 函数;
completion.resumeWith(outcome)
return
}
}
}
}
// 包含状态机实现
protected abstract fun invokeSuspend(result: Result): Any?
......
}
线程调度
线程调度分为两个部分:
- 切到子状态机时,使用指定线程;
- 切回上层状态机时,变回原来的线程;
public suspend fun withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
// suspendCoroutineUninterceptedOrReturn 方法由编译器实现,uCont 指的是当前协程实例;
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.ensureActive()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
//
internal fun (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
// 内部实现
public actual fun (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation
): Continuation {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2, Any?>).invoke(receiver, it)
}
}
}
// ContinuationImpl.kt
// 根据 ContinuationInterceptor 构建新的 Continuation
public fun intercepted(): Continuation =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// CoroutineDispatcher.kt
// 构建 DispatchedContinuation 对象
public final override fun interceptContinuation(continuation: Continuation): Continuation =
DispatchedContinuation(this, continuation)
// DispatchedContinuation.kt
// 扩展方法,根据类型不同特殊处理;
public fun Continuation.resumeCancellableWith(
result: Result,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
// DispatchedContinuation.kt
inline fun resumeCancellableWith(
result: Result,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
// dispatcher 是 CoroutineDispatcher 的子类,通过构造函数传入;
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
// CoroutineDispatcher.kt
// DispatchedContinuation 的父类 DispatchedTask 实现了 runnable 接口;
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// LimitingDispatcher.kt
// 这个类实现了线程池的相关功能
// DispatchedContinuation.kt
override fun resumeWith(result: Result) {
val context = continuation.context
val state = result.toState()
// 执行状态机,继续分发到线程池
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)
} else {
// 状态机结束,调用上层状态机 DispatchedCoroutine 的 resumeWith 方法, 此时仍然在线程池中调用这一方法;
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
// AbstractCoroutine.kt
public final override fun resumeWith(result: Result) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
protected open fun afterResume(state: Any?): Unit = afterCompletion(state)
// DispatchedCoroutine.kt
// 切回之前的线程
override fun afterResume(state: Any?) {
if (tryResume()) return // completed before getResult invocation -- bail out
// Resume in a cancellable way because we have to switch back to the original dispatcher
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}