kotlin协程——>协程上下文与调度器
协程上下?与调度器
协程总是运?在?些以 CoroutineContext 类型为代表的上下?中,它们被定义在了 Kotlin 的标准库 ?。 协程上下?是各种不同元素的集合。其中主元素是协程中的 Job,我们在前?的?档中?过它以及它的 调度器,?本?将对它进?介绍。
调度器与线程
协程上下?包含?个 协程调度器(参? CoroutineDispatcher)它确定了哪些线程或与线程相对应的 协程执?。协程调度器可以将协程限制在?个特定的线程执?,或将它分派到?个线程池,亦或是让它 不受限地运?。 所有的协程构建器诸如 launch 和 async 接收?个可选的 CoroutineContext 参数,它可以被?来显式 的为?个新协程或其它上下?元素指定?个调度器。
尝试下?的?例:
launch { // 运?在?协程的上下?中,即 runBlocking 主协程 println("main runBlocking : I'm working in thread ${Thread.currentThread().name}") } launch(Dispatchers.Unconfined) { // 不受限的——将?作在主线程中 println("Unconfined : I'm working in thread ${Thread.currentThread().name}") } launch(Dispatchers.Default) { // 将会获取默认调度器 println("Default : I'm working in thread ${Thread.currentThread().name}") } launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得?个新的线程 println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}") }
它执?后得到了如下输出(也许顺序会有所不同):
Unconfined : I'm working in thread main Default : I'm working in thread DefaultDispatcher-worker-1 newSingleThreadContext: I'm working in thread MyOwnThread main runBlocking : I'm working in thread main
当调? launch { …… } 时不传参数,它从启动了它的 CoroutineScope 中承袭了上下?(以及调度 器)。在这个案例中,它从 main 线程中的 runBlocking 主协程承袭了上下?。 Dispatchers.Unconfined 是?个特殊的调度器且似乎也运?在 main 线程中,但实际上,它是?种不 同的机制,这会在后?中讲到。 当协程在 GlobalScope 中启动时,使?的是由 Dispatchers.Default 代表的默认调度器。默认调度器使 ?共享的后台线程池。所以 launch(Dispatchers.Default) { …… } 与 GlobalScope.launch { …… } 使?相同的调度器。 newSingleThreadContext 为协程的运?启动了?个线程。?个专?的线程是?种?常昂贵的资源。 在真实的应?程序中两者都必须被释放,当不再需要的时候,使? close 函数,或存储在?个顶层变量 中使它在整个应?程序中被重?。
?受限调度器 vs 受限调度器
Dispatchers.Unconfined 协程调度器在调?它的线程启动了?个协程,但它仅仅只是运?到第?个挂 起点。挂起后,它恢复线程中的协程,?这完全由被调?的挂起函数来决定。?受限的调度器?常适?于 执?不消耗 CPU 时间的任务,以及不更新局限于特定线程的任何共享数据(如UI)的协程。 另???,该调度器默认继承了外部的 CoroutineScope。runBlocking 协程的默认调度器,特别是,当 它被限制在了调?者线程时,继承?它将会有效地限制协程在该线程运?并且具有可预测的 FIFO 调 度。
launch(Dispatchers.Unconfined) { // ?受限的——将和主线程?起?作 println("Unconfined : I'm working in thread ${Thread.currentThread().name}") delay(500) println("Unconfined : After delay in thread ${Thread.currentThread().name}") } launch { // ?协程的上下?,主 runBlocking 协程 println("main runBlocking: I'm working in thread ${Thread.currentThread().name}") delay(1000) println("main runBlocking: After delay in thread ${Thread.currentThread().name}") }
执?后的输出:
Unconfined : I'm working in thread main main runBlocking: I'm working in thread main Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor main runBlocking: After delay in thread main
所以,该协程的上下?继承? runBlocking {...} 协程并在 main 线程中运?,当 delay 函数调 ?的时候,?受限的那个协程在默认的执?者线程中恢复执?。
?受限的调度器是?种?级机制,可以在某些极端情况下提供帮助?不需要调度协程以便稍后执?或产?不希望的副作?,因为某些操作必须?即在协程中执?。?受限调度器不应该在通常的
代码中使?。
调试协程与线程
协程可以在?个线程上挂起并在其它线程上恢复。甚??个单线程的调度器也是难以弄清楚协程在何 时何地正在做什么事情。使?通常调试应?程序的?法是让线程在每?个?志?件的?志声明中打印 线程的名字。这种特性在?志框架中是普遍受?持的。但是在使?协程时,单独的线程名称不会给出很 多协程上下?信息,所以 kotlinx.coroutines 包含了调试?具来让它更简单。 使? -Dkotlinx.coroutines.debug JVM 参数运?下?的代码:
val a = async { log("I'm computing a piece of the answer") 6 } val b = async { log("I'm computing another piece of the answer") 7 } log("The answer is ${a.await() * b.await()}")
这?有三个协程,包括 runBlocking 内的主协程 (#1),以及计算延期的值的另外两个协程 a (#2) 和 b (#3)。它们都在 runBlocking 上下?中执?并且被限制在了主线程内。这段代码的输出如下:
[main @coroutine#2] I'm computing a piece of the answer [main @coroutine#3] I'm computing another piece of the answer [main @coroutine#1] The answer is 42
这个 log 函数在?括号种打印了线程的名字,并且你可以看到它是 main 线程,并且附带了当前正在 其上执?的协程的标识符。这个标识符在调试模式开启时,将连续分配给所有创建的协程。
当 JVM 以 -ea 参数配置运?时,调试模式也会开启。你可以在 DEBUG_PROPERTY_NAME 属性 的?档中阅读有关调试?具的更多信息。
在不同线程间跳转
使? -Dkotlinx.coroutines.debug JVM 参数运?下?的代码
newSingleThreadContext("Ctx1").use { ctx1 -> newSingleThreadContext("Ctx2").use { ctx2 -> runBlocking(ctx1) { log("Started in ctx1") withContext(ctx2) { log("Working in ctx2") } log("Back to ctx1") } } }
它演?了?些新技术。其中?个使? runBlocking 来显式指定了?个上下?,并且另?个使? withContext 函数来改变协程的上下?,?仍然驻留在相同的协程中,正如可以在下?的输出中所?到的:
[Ctx1 @coroutine#1] Started in ctx1 [Ctx2 @coroutine#1] Working in ctx2 [Ctx1 @coroutine#1] Back to ctx1
注意,在这个例?中,当我们不再需要某个在 newSingleThreadContext 中创建的线程的时候,它使? 了 Kotlin 标准库中的 use 函数来释放该线程。
上下?中的作业
协程的 Job 是上下?的?部分,并且可以使? coroutineContext [Job] 表达式在上下?中检索 它:
println("My job is ${coroutineContext[Job]}")
在调试模式下,它将输出如下这些信息:
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
请注意,CoroutineScope 中的 isActive 只是 coroutineContext[Job]?.isActive == true 的?种?便的快捷?式。
?协程
当?个协程被其它协程在 CoroutineScope 中启动的时候,它将通过 CoroutineScope.coroutineContext 来承袭上下?,并且这个新协程的 Job 将会成为?协程作业的?作业。当?个?协程被取消的时候,所有它的?协程也会被递归的取消。 然?,当使? GlobalScope 来启动?个协程时,则新协程的作业没有?作业。因此它与这个启动的作?域?关且独?运作。
// 启动?个协程来处理某种传?请求(request) val request = launch { // 孵化了两个?作业, 其中?个通过 GlobalScope 启动 GlobalScope.launch { println("job1: I run in GlobalScope and execute independently!") delay(1000) println("job1: I am not affected by cancellation of the request") } // 另?个则承袭了?协程的上下? launch { delay(100) println("job2: I am a child of the request coroutine") delay(1000) println("job2: I will not execute this line if my parent request is cancelled") } } delay(500) request.cancel() // 取消请求(request)的执? delay(1000) // 延迟?秒钟来看看发?了什么 println("main: Who has survived request cancellation?")
这段代码的输出如下:
job1: I run in GlobalScope and execute independently! job2: I am a child of the request coroutine job1: I am not affected by cancellation of the request main: Who has survived request cancellation?
父协程的职责
?个?协程总是等待所有的?协程执?结束。?协程并不显式的跟踪所有?协程的启动,并且不必使? Job.join 在最后的时候等待它们:
// 启动?个协程来处理某种传?请求(request) val request = launch { repeat(3) { i -> // 启动少量的?作业 launch { delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒的时间 println("Coroutine $i is done") } } println("request: I'm done and I don't explicitly join my children that are still active") } request.join() // 等待请求的完成,包括其所有?协程 println("Now processing of the request is complete")
结果如下所?:
request: I'm done and I don't explicitly join my children that are still active Coroutine 0 is done Coroutine 1 is done Coroutine 2 is done Now processing of the request is complete
命名协程以用于调试
当协程经常打印?志并且你只需要关联来?同?个协程的?志记录时,则?动分配的 id 是?常好的。 然?,当?个协程与特定请求的处理相关联时或做?些特定的后台任务,最好将其明确命名以?于调试 ?的。CoroutineName 上下?元素与线程名具有相同的?的。当调试模式开启时,它被包含在正在执 ?此协程的线程名中。
log("Started main coroutine") // 运?两个后台值计算 val v1 = async(CoroutineName("v1coroutine")) { delay(500) log("Computing v1") 252 } val v2 = async(CoroutineName("v2coroutine")) { delay(1000) log("Computing v2") 6 } log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
程序执?使?了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所?:
[main @main#1] Started main coroutine [main @v1coroutine#2] Computing v1 [main @v2coroutine#3] Computing v2 [main @main#1] The answer for v1 / v2 = 42
结合上下文中的元素
有时我们需要在协程上下?中定义多个元素。我们可以使? + 操作符来实现。?如说,我们可以显式指 定?个调度器来启动协程并且同时显式指定?个命名:
launch(Dispatchers.Default + CoroutineName("test")) { println("I'm working in thread ${Thread.currentThread().name}") }
这段代码使?了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所?:
I'm working in thread DefaultDispatcher-worker-1 @test#2
协程作用域
让我们将关于上下?,?协程以及作业的知识综合在?起。假设我们的应?程序拥有?个具有?命周期 的对象,但这个对象并不是?个协程。举例来说,我们编写了?个 Android 应?程序并在 Android 的 activity 上下?中启动了?组协程来使?异步操作拉取并更新数据以及执?动画等等。所有这些协程必 须在这个 activity 销毁的时候取消以避免内存泄漏。当然,我们也可以?动操作上下?与作业,以结合 activity 的?命周期与它的协程,但是 kotlinx.coroutines 提供了?个封装:CoroutineScope 的 抽象。你应该已经熟悉了协程作?域,因为所有的协程构建器都声明为在它之上的扩展。 我们通过创建?个 CoroutineScope 实例来管理协程的?命周期,并使它与 activit 的?命周期相关 联。CoroutineScope 可以通过 CoroutineScope() 创建或者通过MainScope() ??函数。前者创建 了?个通?作?域,?后者为使? Dispatchers.Main 作为默认调度器的 UI 应?程序 创建作?域:
class Activity { private val mainScope = MainScope() fun destroy() { mainScope.cancel() } // 继续运?…… // 在 Activity 类中 fun doSomething() { // 在?例中启动了 10 个协程,且每个都?作了不同的时? repeat(10) { i -> mainScope.launch { delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间 println("Coroutine $i is done") } } } } // Activity 类结束
在 main 函数中我们创建 activity,调?测试函数 doSomething ,并且在 500 毫秒后销毁这个 activity。这取消了从 doSomething 启动的所有协程。我们可以观察到这些是由于在销毁之后,即使 我们再等?会?,activity 也不再打印消息。
val activity = Activity() activity.doSomething() // 运?测试函数 println("Launched coroutines") delay(500L) // 延迟半秒钟 println("Destroying activity!") activity.destroy() // 取消所有的协程 delay(1000) // 为了在视觉上确认它们没有?作
这个?例的输出如下所?:
Launched coroutines Coroutine 0 is done Coroutine 1 is done Destroying activity!
你可以看到,只有前两个协程打印了消息,?另?个协程在 Activity.destroy() 中单次调?了 job.cancel() 。
线程局部数据
有时,能够将?些线程局部数据传递到协程与协程之间是很?便的。然?,由于它们不受任何特定线程 的约束,如果?动完成,可能会导致出现样板代码。
ThreadLocal,asContextElement 扩展函数在这?会充当救兵。它创建了额外的上下?元素,且保 留给定 ThreadLocal 的值,并在每次协程切换其上下?时恢复它。 它很容易在下?的代码中演?:
threadLocal.set("main") println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'") val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) { println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'") yield() println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'") } job.join() println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
在这个例?中我们使? Dispatchers.Default 在后台线程池中启动了?个新的协程,所以它?作在线程 池中的不同线程中,但它仍然具有线程局部变量的值,我们指定使? threadLocal.asContextElement(value = "launch") ,?论协程执?在什么线程中都是没有问题的。因此,其输出如(调试)所?:
Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main' Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch' After yield, current thread: Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main], thread local value: 'launch' Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
这很容易忘记去设置相应的上下?元素。如果运?协程的线程不同,在协程中访问的线程局部变量则可 能会产?意外的值。为了避免这种情况,建议使? ensurePresent ?法并且在不正确的使?时快速失 败。 ThreadLocal 具有?流的?持,可以与任何 kotlinx.coroutines 提供的原语?起使?。但它有 ?个关键限制,即:当?个线程局部变量变化时,则这个新值不会传播给协程调?者(因为上下?元素? 法追踪所有 ThreadLocal 对象访问),并且下次挂起时更新的值将丢失。使? withContext 在协程 中更新线程局部变量,详? asContextElement。 另外,?个值可以存储在?个可变的域中,例如 class Counter(var i: Int) ,是的,反过来,可以 存储在线程局部的变量中。然?,在这个案例中你完全有责任来进?同步可能的对这个可变的域进?的 并发的修改。 对于?级的使?,例如,那些在内部使?线程局部传递数据的?于与?志记录 MDC 集成,以及事务上下 ?或任何其它库,请参?需要实现的 ThreadContextElement 接?的?档。