kotlin协程——>协程上下文与调度器


协程上下?与调度器

  协程总是运?在?些以 CoroutineContext 类型为代表的上下?中,它们被定义在了 Kotlin 的标准库 ?。 协程上下?是各种不同元素的集合。其中主元素是协程中的 Job,我们在前?的?档中?过它以及它的 调度器,?本?将对它进?介绍。

调度器与线程

  协程上下?包含?个 协程调度器(参? CoroutineDispatcher)它确定了哪些线程或与线程相对应的 协程执?。协程调度器可以将协程限制在?个特定的线程执?,或将它分派到?个线程池,亦或是让它 不受限地运?。 所有的协程构建器诸如 launch 和 async 接收?个可选的 CoroutineContext 参数,它可以被?来显式 的为?个新协程或其它上下?元素指定?个调度器。

  尝试下?的?例:

launch { // 运?在?协程的上下?中,即 runBlocking 主协程
    println("main runBlocking : I'm working in thread
            ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // 不受限的——将?作在主线程中
    println("Unconfined : I'm working in thread
            ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // 将会获取默认调度器
    println("Default : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得?个新的线程
    println("newSingleThreadContext: I'm working in thread
            ${Thread.currentThread().name}")
}

  它执?后得到了如下输出(也许顺序会有所不同):

Unconfined : I'm working in thread main
Default : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking : I'm working in thread main

  当调? launch { …… } 时不传参数,它从启动了它的 CoroutineScope 中承袭了上下?(以及调度 器)。在这个案例中,它从 main 线程中的 runBlocking 主协程承袭了上下?。 Dispatchers.Unconfined 是?个特殊的调度器且似乎也运?在 main 线程中,但实际上,它是?种不 同的机制,这会在后?中讲到。 当协程在 GlobalScope 中启动时,使?的是由 Dispatchers.Default 代表的默认调度器。默认调度器使 ?共享的后台线程池。所以 launch(Dispatchers.Default) { …… } 与 GlobalScope.launch { …… } 使?相同的调度器。 newSingleThreadContext 为协程的运?启动了?个线程。?个专?的线程是?种?常昂贵的资源。 在真实的应?程序中两者都必须被释放,当不再需要的时候,使? close 函数,或存储在?个顶层变量 中使它在整个应?程序中被重?。

?受限调度器 vs 受限调度器

  Dispatchers.Unconfined 协程调度器在调?它的线程启动了?个协程,但它仅仅只是运?到第?个挂 起点。挂起后,它恢复线程中的协程,?这完全由被调?的挂起函数来决定。?受限的调度器?常适?于 执?不消耗 CPU 时间的任务,以及不更新局限于特定线程的任何共享数据(如UI)的协程。 另???,该调度器默认继承了外部的 CoroutineScope。runBlocking 协程的默认调度器,特别是,当 它被限制在了调?者线程时,继承?它将会有效地限制协程在该线程运?并且具有可预测的 FIFO 调 度。

launch(Dispatchers.Unconfined) { // ?受限的——将和主线程?起?作
    println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
    delay(500)
    println("Unconfined : After delay in thread ${Thread.currentThread().name}")
}
launch { // ?协程的上下?,主 runBlocking 协程
    println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
    delay(1000)
    println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}

  执?后的输出:

Unconfined : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

  所以,该协程的上下?继承? runBlocking {...} 协程并在 main 线程中运?,当 delay 函数调 ?的时候,?受限的那个协程在默认的执?者线程中恢复执?。

?受限的调度器是?种?级机制,可以在某些极端情况下提供帮助?不需要调度协程以便稍后执?或产?不希望的副作?,因为某些操作必须?即在协程中执?。?受限调度器不应该在通常的
代码中使?。

调试协程与线程

  协程可以在?个线程上挂起并在其它线程上恢复。甚??个单线程的调度器也是难以弄清楚协程在何 时何地正在做什么事情。使?通常调试应?程序的?法是让线程在每?个?志?件的?志声明中打印 线程的名字。这种特性在?志框架中是普遍受?持的。但是在使?协程时,单独的线程名称不会给出很 多协程上下?信息,所以 kotlinx.coroutines 包含了调试?具来让它更简单。 使? -Dkotlinx.coroutines.debug JVM 参数运?下?的代码:

val a = async {
    log("I'm computing a piece of the answer")
    6
}
val b = async {
    log("I'm computing another piece of the answer")
    7
}
log("The answer is ${a.await() * b.await()}")

  这?有三个协程,包括 runBlocking 内的主协程 (#1),以及计算延期的值的另外两个协程 a (#2) 和 b (#3)。它们都在 runBlocking 上下?中执?并且被限制在了主线程内。这段代码的输出如下:

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

  这个 log 函数在?括号种打印了线程的名字,并且你可以看到它是 main 线程,并且附带了当前正在 其上执?的协程的标识符。这个标识符在调试模式开启时,将连续分配给所有创建的协程。

  当 JVM 以 -ea 参数配置运?时,调试模式也会开启。你可以在 DEBUG_PROPERTY_NAME 属性 的?档中阅读有关调试?具的更多信息。

在不同线程间跳转

  使? -Dkotlinx.coroutines.debug JVM 参数运?下?的代码

newSingleThreadContext("Ctx1").use { ctx1 ->
    newSingleThreadContext("Ctx2").use { ctx2 ->
        runBlocking(ctx1) {
            log("Started in ctx1")
            withContext(ctx2) {
                log("Working in ctx2")
            }
            log("Back to ctx1")
        }
    }
}

  它演?了?些新技术。其中?个使? runBlocking 来显式指定了?个上下?,并且另?个使? withContext 函数来改变协程的上下?,?仍然驻留在相同的协程中,正如可以在下?的输出中所?到的:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

  注意,在这个例?中,当我们不再需要某个在 newSingleThreadContext 中创建的线程的时候,它使? 了 Kotlin 标准库中的 use 函数来释放该线程。

上下?中的作业

  协程的 Job 是上下?的?部分,并且可以使? coroutineContext [Job] 表达式在上下?中检索 它:

println("My job is ${coroutineContext[Job]}")

  在调试模式下,它将输出如下这些信息:

My job is "coroutine#1":BlockingCoroutine{Active}@6d311334 

  请注意,CoroutineScope 中的 isActive 只是 coroutineContext[Job]?.isActive == true 的?种?便的快捷?式。

?协程

  当?个协程被其它协程在 CoroutineScope 中启动的时候,它将通过 CoroutineScope.coroutineContext 来承袭上下?,并且这个新协程的 Job 将会成为?协程作业的?作业。当?个?协程被取消的时候,所有它的?协程也会被递归的取消。 然?,当使? GlobalScope 来启动?个协程时,则新协程的作业没有?作业。因此它与这个启动的作?域?关且独?运作。

// 启动?个协程来处理某种传?请求(request)
val request = launch {
// 孵化了两个?作业, 其中?个通过 GlobalScope 启动
    GlobalScope.launch {
        println("job1: I run in GlobalScope and execute independently!")
        delay(1000)
        println("job1: I am not affected by cancellation of the request")
    }
// 另?个则承袭了?协程的上下?
    launch {
        delay(100)
        println("job2: I am a child of the request coroutine")
        delay(1000)
        println("job2: I will not execute this line if my parent request is cancelled")
    }
}
delay(500)
request.cancel() // 取消请求(request)的执?
delay(1000) // 延迟?秒钟来看看发?了什么
println("main: Who has survived request cancellation?")

  这段代码的输出如下:

job1: I run in GlobalScope and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

父协程的职责

  ?个?协程总是等待所有的?协程执?结束。?协程并不显式的跟踪所有?协程的启动,并且不必使? Job.join 在最后的时候等待它们:

// 启动?个协程来处理某种传?请求(request)
val request = launch {
    repeat(3) { i -> // 启动少量的?作业
        launch {
            delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒的时间
            println("Coroutine $i is done")
        }
    }
    println("request: I'm done and I don't explicitly join my children that are still
            active")
}
request.join() // 等待请求的完成,包括其所有?协程
println("Now processing of the request is complete")

  结果如下所?:

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete

命名协程以用于调试

  当协程经常打印?志并且你只需要关联来?同?个协程的?志记录时,则?动分配的 id 是?常好的。 然?,当?个协程与特定请求的处理相关联时或做?些特定的后台任务,最好将其明确命名以?于调试 ?的。CoroutineName 上下?元素与线程名具有相同的?的。当调试模式开启时,它被包含在正在执 ?此协程的线程名中。

log("Started main coroutine")
// 运?两个后台值计算
val v1 = async(CoroutineName("v1coroutine")) {
    delay(500)
    log("Computing v1")
    252
}
val v2 = async(CoroutineName("v2coroutine")) {
    delay(1000)
    log("Computing v2")
    6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")

  程序执?使?了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所?:

[main @main#1] Started main coroutine
[main @v1coroutine#2] Computing v1
[main @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42

结合上下文中的元素

  有时我们需要在协程上下?中定义多个元素。我们可以使? + 操作符来实现。?如说,我们可以显式指 定?个调度器来启动协程并且同时显式指定?个命名:

launch(Dispatchers.Default + CoroutineName("test")) {
    println("I'm working in thread ${Thread.currentThread().name}")
}

  这段代码使?了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所?:

I'm working in thread DefaultDispatcher-worker-1 @test#2

协程作用域

  让我们将关于上下?,?协程以及作业的知识综合在?起。假设我们的应?程序拥有?个具有?命周期 的对象,但这个对象并不是?个协程。举例来说,我们编写了?个 Android 应?程序并在 Android 的 activity 上下?中启动了?组协程来使?异步操作拉取并更新数据以及执?动画等等。所有这些协程必 须在这个 activity 销毁的时候取消以避免内存泄漏。当然,我们也可以?动操作上下?与作业,以结合 activity 的?命周期与它的协程,但是 kotlinx.coroutines 提供了?个封装:CoroutineScope 的 抽象。你应该已经熟悉了协程作?域,因为所有的协程构建器都声明为在它之上的扩展。 我们通过创建?个 CoroutineScope 实例来管理协程的?命周期,并使它与 activit 的?命周期相关 联。CoroutineScope 可以通过 CoroutineScope() 创建或者通过MainScope() ??函数。前者创建 了?个通?作?域,?后者为使? Dispatchers.Main 作为默认调度器的 UI 应?程序 创建作?域:

class Activity {
    private val mainScope = MainScope()
    fun destroy() {
        mainScope.cancel()
    }
    // 继续运?……
    // 在 Activity 类中
    fun doSomething() {
    // 在?例中启动了 10 个协程,且每个都?作了不同的时?
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
} // Activity 类结束

  在 main 函数中我们创建 activity,调?测试函数 doSomething ,并且在 500 毫秒后销毁这个 activity。这取消了从 doSomething 启动的所有协程。我们可以观察到这些是由于在销毁之后,即使 我们再等?会?,activity 也不再打印消息。

val activity = Activity()
activity.doSomething() // 运?测试函数
println("Launched coroutines")
delay(500L) // 延迟半秒钟
println("Destroying activity!")
activity.destroy() // 取消所有的协程
delay(1000) // 为了在视觉上确认它们没有?作

  这个?例的输出如下所?:

Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!

  你可以看到,只有前两个协程打印了消息,?另?个协程在 Activity.destroy() 中单次调?了 job.cancel() 。

线程局部数据

  有时,能够将?些线程局部数据传递到协程与协程之间是很?便的。然?,由于它们不受任何特定线程 的约束,如果?动完成,可能会导致出现样板代码。

  ThreadLocal,asContextElement 扩展函数在这?会充当救兵。它创建了额外的上下?元素,且保 留给定 ThreadLocal 的值,并在每次协程切换其上下?时恢复它。               它很容易在下?的代码中演?:

threadLocal.set("main")
println("Pre-main, current thread: ${Thread.currentThread()}, thread local value:
'${threadLocal.get()}'")
val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
    println("Launch start, current thread: ${Thread.currentThread()}, thread local value:
            '${threadLocal.get()}'")
            yield()
            println("After yield, current thread: ${Thread.currentThread()}, thread local value:
            '${threadLocal.get()}'")
}
job.join()
println("Post-main, current thread: ${Thread.currentThread()}, thread local value:
'${threadLocal.get()}'")

  在这个例?中我们使? Dispatchers.Default 在后台线程池中启动了?个新的协程,所以它?作在线程 池中的不同线程中,但它仍然具有线程局部变量的值,我们指定使? threadLocal.asContextElement(value = "launch") ,?论协程执?在什么线程中都是没有问题的。因此,其输出如(调试)所?:

Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main],
thread local value: 'launch'
After yield, current thread: Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main],
thread local value: 'launch'
Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'

  这很容易忘记去设置相应的上下?元素。如果运?协程的线程不同,在协程中访问的线程局部变量则可 能会产?意外的值。为了避免这种情况,建议使? ensurePresent ?法并且在不正确的使?时快速失 败。 ThreadLocal 具有?流的?持,可以与任何 kotlinx.coroutines 提供的原语?起使?。但它有 ?个关键限制,即:当?个线程局部变量变化时,则这个新值不会传播给协程调?者(因为上下?元素? 法追踪所有 ThreadLocal 对象访问),并且下次挂起时更新的值将丢失。使? withContext 在协程 中更新线程局部变量,详? asContextElement。 另外,?个值可以存储在?个可变的域中,例如 class Counter(var i: Int) ,是的,反过来,可以 存储在线程局部的变量中。然?,在这个案例中你完全有责任来进?同步可能的对这个可变的域进?的 并发的修改。 对于?级的使?,例如,那些在内部使?线程局部传递数据的?于与?志记录 MDC 集成,以及事务上下 ?或任何其它库,请参?需要实现的 ThreadContextElement 接?的?档。

相关