kotlin协程——>异步流
异步流
挂起函数可以异步的返回单个值,但是该如何异步返回多个计算好的值呢?这正是 Kotlin 流(Flow)的 ?武之地。
表示多个值
在 Kotlin 中可以使?集合来表?多个值。?如说,我们可以拥有?个函数 foo() ,它返回?个包含三 个数字的 List,然后使? forEach 打印它们:
fun foo(): List= listOf(1, 2, 3) fun main() { foo().forEach { value -> println(value) } }
这段代码输出如下:
1 2 3
序列:如果使??些消耗 CPU 资源的阻塞代码计算数字(每次计算需要 100 毫秒)那么我们可以使? Sequence 来表?数字:
fun foo(): Sequence= sequence { // 序列构建器 for (i in 1..3) { Thread.sleep(100) // 假装我们正在计算 yield(i) // 产?下?个值 } } fun main() { foo().forEach { value -> println(value) } }
这段代码输出相同的数字,但在打印每个数字之前等待 100 毫秒。
挂起函数:然?,计算过程阻塞运?该代码的主线程。当这些值由异步代码计算时,我们可以使? suspend 修饰 符标记函数 foo ,这样它就可以在不阻塞的情况下执?其?作并将结果作为列表返回:
suspend fun foo(): List{ delay(1000) // 假装我们在这?做了?些异步的事情 return listOf(1, 2, 3) } fun main() = runBlocking { foo().forEach { value -> println(value) } }
这段代码将会在等待?秒之后打印数字。
流:使? List 结果类型,意味着我们只能?次返回所有值。为了表?异步计算的值流(stream),我们可以使 ? Flow 类型(正如同步计算值会使? Sequence 类型):
foo(): Flow= flow { // 流构建器 for (i in 1..3) { delay(100) // 假装我们在这?做了?些有?的事情 emit(i) // 发送下?个值 } main() = runBlocking { // 启动并发的协程以验证主线程并未阻塞 launch { for (k in 1..3) { println("I'm not blocked $k") delay(100) } } // 收集这个流 foo().collect { value -> println(value) }
这段代码在不阻塞主线程的情况下每等待 100 毫秒打印?个数字。在主线程中运??个单独的协程每 100 毫秒打印?次“I'm not blocked”已经经过了验证。
I'm not blocked 1 1 I'm not blocked 2 2 I'm not blocked 3 3
注意使? Flow 的代码与先前?例的下述区别:
名为 flow 的 Flow 类型构建器函数。 flow { ... } 构建块中的代码可以挂起。 函数 foo() 不再标有 suspend 修饰符。 流使? emit 函数 发射 值。 流使? collect 函数 收集 值。
流是冷的
Flow 是?种类似于序列的冷流 — 这段 flow 构建器中的代码直到流被收集的时候才运?。这在以下的 ?例中?常明显:
foo(): Flow= flow { println("Flow started") for (i in 1..3) { delay(100) emit(i) } main() = runBlocking { println("Calling foo...") val flow = foo() println("Calling collect...") flow.collect { value -> println(value) } println("Calling collect again...") flow.collect { value -> println(value) }
打印如下:
Calling foo... Calling collect... Flow started 1 2 3 Calling collect again... Flow started 1 2 3
这是返回?个流的 foo() 函数没有标记 suspend 修饰符的主要原因。通过它??,foo() 会尽快 返回且不会进?任何等待。该流在每次收集的时候启动,这就是为什么当我们再次调? collect 时我 们会看到“Flow started”。
流取消
流采?与协程同样的协作取消。然?,流的基础设施未引?其他取消点。取消完全透明。像往常?样,流 的收集可以在当流在?个可取消的挂起函数(例如 delay)中挂起的时候取消,否则不能取消。 下?的?例展?了当 withTimeoutOrNull 块中代码在运?的时候流是如何在超时的情况下取消并停 ?执?其代码的:
foo(): Flow= flow { for (i in 1..3) { delay(100) println("Emitting $i") emit(i) } main() = runBlocking { withTimeoutOrNull(250) { // 在 250 毫秒后超时 foo().collect { value -> println(value) } } println("Done")
注意,在 foo() 函数中流仅发射两个数字,产?以下输出:
Emitting 1 1 Emitting 2 2 Done
流的构建起
先前?例中的 flow { ... } 构建器是最基础的?个。还有其他构建器使流的声明更简单:
flowOf 构建器定义了?个发射固定值集的流。 使? .asFlow() 扩展函数,可以将各种集合与序列转换为流。
因此,从流中打印从 1 到 3 的数字的?例可以写成:
// 将?个整数区间转化为流 (1..3).asFlow().collect { value -> println(value) }
过渡流操作符
可以使?操作符转换流,就像使?集合与序列?样。过渡操作符应?于上游流,并返回下游流。这些操 作符也是冷操作符,就像流?样。这类操作符本?不是挂起函数。它运?的速度很快,返回新的转换流的 定义。
基础的操作符拥有相似的名字,?如 map 与 filter。流与序列的主要区别在于这些操作符中的代码可以 调?挂起函数。
举例来说,?个请求中的流可以使? map 操作符映射出结果,即使执??个?时间的请求操作也可以 使?挂起函数来实现:
end fun performRequest(request: Int): String { delay(1000) // 模仿?时间运?的异步?作 return "response $request" main() = runBlocking{ (1..3).asFlow() // ?个请求流 .map { request -> performRequest(request) } .collect { response -> println(response) }
它产?以下三?,每??每秒出现?次:
response 1 response 2 response 3
转换操作符:在流转换操作符中,最通?的?种称为 transform。它可以?来模仿简单的转换,例如 map 与 filter,以 及实施更复杂的转换。使? transform 操作符,我们可以 发射 任意值任意次。
?如说,使? transform 我们可以在执??时间运?的异步请求之前发射?个字符串并跟踪这个响应:
(1..3).asFlow() // ?个请求流 .transform { request -> emit("Making request $request") emit(performRequest(request)) } .collect { response -> println(response) }
这段代码的输出如下
Making request 1 response 1 Making request 2 response 2 Making request 3 response 3
限长操作符:限?过渡操作符(例如 take)在流触及相应限制的时候会将它的执?取消。协程中的取消操作总是通过 抛出异常来执?,这样所有的资源管理函数(如 try {...} finally {...} 块)会在取消的情况下 正常运?:
fun numbers(): Flow= flow { try { emit(1) emit(2) println("This line will not execute") emit(3) } finally { println("Finally in numbers") } } fun main() = runBlocking { numbers() .take(2) // 只获取前两个 .collect { value -> println(value) } }
这段代码的输出清楚地表明,numbers() 函数中对 flow {...} 函数体的执?在发射出第?个数 字后停?:
1 2 Finally in numbers
末端流操作符
末端操作符是在流上?于启动流收集的挂起函数。collect 是最基础的末端操作符,但是还有另外?些 更?便使?的末端操作符:
转化为各种集合,例如 toList 与 toSet。 获取第?个(first)值与确保流发射单个(single)值的操作符。 使? reduce 与 fold 将流规约到单个值。
举例来说:
val sum = (1..5).asFlow() .map { it * it } // 数字 1 ? 5 的平? .reduce { a, b -> a + b } // 求和(末端操作符) println(sum)
打印单个数字:
55
流是连续的
流的每次单独收集都是按顺序执?的,除?进?特殊操作的操作符使?多个流。该收集过程直接在协程 中运?,该协程调?末端操作符。默认情况下不启动新协程。从上游到下游每个过渡操作符都会处理每 个发射出的值然后再交给末端操作符。 请参?以下?例,该?例过滤偶数并将其映射到字符串:
(1..5).asFlow() .filter { println("Filter $it") it % 2 == 0 } .map { println("Map $it") "string $it" }.collect { println("Collect $it") }
执行:
Filter 1 Filter 2 Map 2 Collect string 2 Filter 3 Filter 4 Map 4 Collect string 4 Filter 5
流上下文:
流的收集总是在调?协程的上下?中发?。例如,如果有?个流 foo ,然后以下代码在它的编写者指定 的上下?中运?,??论流 foo 的实现细节如何:
withContext(context) { foo.collect { value -> println(value) // 运?在指定上下?中 } }
流的该属性称为上下文保存:所以默认的,flow { ... } 构建器中的代码运?在相应流的收集器提供的上下?中。举例来说,考虑 打印线程的 foo 的实现,它被调?并发射三个数字
fun foo(): Flow= flow { log("Started foo flow") for (i in 1..3) { emit(i) } } fun main() = runBlocking { foo().collect { value -> log("Collected $value") } }
运?这段代码:
[main @coroutine#1] Started foo flow [main @coroutine#1] Collected 1 [main @coroutine#1] Collected 2 [main @coroutine#1] Collected 3
由于 foo().collect 是在主线程调?的,则 foo 的流主体也是在主线程调?的。这是快速运?或 异步代码的理想默认形式,它不关?执?的上下?并且不会阻塞调?者。
withContext发出错误:然?,?时间运?的消耗 CPU 的代码也许需要在 Dispatchers.Default 上下?中执?,并且更新 UI 的 代码也许需要在 Dispatchers.Main 中执?。通常,withContext ?于在 Kotlin 协程中改变代码的上下 ?,但是 flow {...} 构建器中的代码必须遵循上下?保存属性,并且不允许从其他上下?中发射 (emit)。
尝试运?下?的代码:
fun foo(): Flow= flow { // 在流构建器中更改消耗 CPU 代码的上下?的错误?式 kotlinx.coroutines.withContext(Dispatchers.Default) { for (i in 1..3) { Thread.sleep(100) // 假装我们以消耗 CPU 的?式进?计算 emit(i) // 发射下?个值 } } } fun main() = runBlocking { foo().collect { value -> println(value) } }
这段代码产?如下的异常:
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher]. Please refer to 'flow' documentation or use 'flowOn' instead at ...
flowOn 操作符:例外的是 flowOn 函数,该函数?于更改流发射的上下?。以下?例展?了更改流上下?的正确?法, 该?例还通过打印相应线程的名字以展?它们的?作?式:
fun foo(): Flow= flow { for (i in 1..3) { Thread.sleep(100) // 假装我们以消耗 CPU 的?式进?计算 log("Emitting $i") emit(i) // 发射下?个值 } }.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下?的正确?式 fun main() = runBlocking { foo().collect { value -> log("Collected $value") } }
注意,当收集发?在主线程中,flow { ... } 是如何在后台线程中?作的: 这?要观察的另?件事是 flowOn 操作符已改变流的默认顺序性。现在收集发?在?个协程中 (“coroutine#1”)?发射发?在运?于另?个线程中与收集协程并发运?的另?个协程 (“coroutine#2”)中。当上游流必须改变其上下?中的 CoroutineDispatcher 的时候,flowOn 操作符 创建了另?个协程。
缓冲:
从收集流所花费的时间来看,将流的不同部分运?在不同的协程中将会很有帮助,特别是当涉及到?时 间运?的异步操作时。例如,考虑?种情况,foo() 流的发射很慢,它每花费 100 毫秒才产??个元素;?收集器也?常慢,需要花费 300 毫秒来处理元素。让我们看看从该流收集三个数字要花费多?时间:
fun foo(): Flow= flow { for (i in 1..3) { delay(100) // 假装我们异步等待了 100 毫秒 emit(i) // 发射下?个值 } } fun main() = runBlocking { val time = measureTimeMillis { foo().collect { value -> delay(300) // 假装我们花费 300 毫秒来处理它 println(value) } } println("Collected in $time ms") }
它会产?这样的结果,整个收集过程?约需要 1200 毫秒(3 个数字,每个花费 400 毫秒):
1 2 3 Collected in 1220 ms
我们可以在流上使? buffer 操作符来并发运? foo() 中发射元素的代码以及收集的代码,?不是顺序运?它们:
val time = measureTimeMillis { foo() .buffer() // 缓冲发射项,?需等待 .collect { value -> delay(300) // 假装我们花费 300 毫秒来处理它 println(value) } } println("Collected in $time ms")
它产?了相同的数字,只是更快了,由于我们?效地创建了处理流?线,仅仅需要等待第?个数字产? 的 100 毫秒以及处理每个数字各需花费的 300 毫秒。这种?式?约花费了 1000 毫秒来运?:
1 2 3 Collected in 1071 ms
注意,当必须更改 CoroutineDispatcher 时,flowOn 操作符使?了相同的缓冲机制,但是我们在 这?显式地请求缓冲?不改变执?上下?。
合并:当流代表部分操作结果或操作状态更新时,可能没有必要处理每个值,?是只处理最新的那个。在本? 例中,当收集器处理它们太慢的时候,conflate 操作符可以?于跳过中间值。构建前?的?例:
val time = measureTimeMillis { foo() .conflate() // 合并发射项,不对每个值进?处理 .collect { value -> delay(300) // 假装我们花费 300 毫秒来处理它 println(value) } } println("Collected in $time ms")
我们看到,虽然第?个数字仍在处理中,但第?个和第三个数字已经产?,因此第?个是 conflated ,只 有最新的(第三个)被交付给收集器:
1 3 Collected in 758 ms
处理最新值:当发射器和收集器都很慢的时候,合并是加快处理速度的?种?式。它通过删除发射值来实现。另?种 ?式是取消缓慢的收集器,并在每次发射新值的时候重新启动它。有?组与 xxx 操作符执?相同基本 逻辑的 xxxLatest 操作符,但是在新值产?的时候取消执?其块中的代码。让我们在先前的?例中尝 试更换 conflate 为 collectLatest:
val time = measureTimeMillis { foo() .collectLatest { value -> // 取消并重新发射最后?个值 println("Collecting $value") delay(300) // 假装我们花费 300 毫秒来处理它 println("Done $value") } } println("Collected in $time ms")
由于 collectLatest 的函数体需要花费 300 毫秒,但是新值每 100 秒发射?次,我们看到该代码块对每 个值运?,但是只收集最后?个值:
Collecting 1 Collecting 2 Collecting 3 Done 3 Collected in 741 ms
由于 collectLatest 的函数体需要花费 300 毫秒,但是新值每 100 秒发射?次,我们看到该代码块对每 个值运?,但是只收集最后?个值:
Collecting 1 Collecting 2 Collecting 3 Done 3 Collected in 741 ms
组合多个流,组合多个流有很多种方式
Zip:就像 Kotlin 标准库中的 Sequence.zip 扩展函数?样,流拥有?个 zip 操作符?于组合两个流中的相关值:
val nums = (1..3).asFlow() // 数字 1..3 val strs = flowOf("one", "two", "three") // 字符串 nums.zip(strs) { a, b -> "$a -> $b" } // 组合单个字符串 .collect { println(it) } // 收集并打印
?例打印如下:
1 -> one 2 -> two 3 -> three
Combine:当流表??个变量或操作的最新值时(请参阅相关?节 conflation),可能需要执?计算,这依赖于相应 流的最新值,并且每当上游流产?值的时候都需要重新计算。这种相应的操作符家族称为 combine。 例如,先前?例中的数字如果每 300 毫秒更新?次,但字符串每 400 毫秒更新?次,然后使? zip 操作 符合并它们,但仍会产?相同的结果,尽管每 400 毫秒打印?次结果:
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒 val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射?次字符串 val startTime = System.currentTimeMillis() // 记录开始的时间 nums.zip(strs) { a, b -> "$a -> $b" } // 使?“zip”组合单个字符串 .collect { value -> // 收集并打印 println("$value at ${System.currentTimeMillis() - startTime} ms from start") }
然?,当在这?使? combine 操作符来替换 zip:
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒 val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射?次字符串 val startTime = System.currentTimeMillis() // 记录开始的时间 nums.combine(strs) { a, b -> "$a -> $b" } // 使?“combine”组合单个字符串 .collect { value -> // 收集并打印 println("$value at ${System.currentTimeMillis() - startTime} ms from start") }
我们得到了完全不同的输出,其中,nums 或 strs 流中的每次发射都会打印??:
1 -> one at 452 ms from start 2 -> one at 651 ms from start 2 -> two at 854 ms from start 3 -> two at 952 ms from start 3 -> three at 1256 ms from start
展平流:流表?异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另?个值序列的请求。?如 说,我们可以拥有下?这样?个返回间隔 500 毫秒的两个字符串流的函数:
fun requestFlow(i: Int): Flow= flow { emit("$i: First") delay(500) // 等待 500 毫秒 emit("$i: Second") }
现在,如果我们有?个包含三个整数的流,并为每个整数调? requestFlow ,如下所?:
(1..3).asFlow().map { requestFlow(it) }
然后我们得到了?个包含流的流( Flow
flatMapConcat:连接模式由 flatMapConcat 与 flattenConcat 操作符实现。它们是相应序列操作符最相近的类似物。它 们在等待内部流完成之前开始收集下?个值,如下?的?例所?:
val startTime = System.currentTimeMillis() // 记录开始时间 (1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射?个数字 .flatMapConcat { requestFlow(it) } .collect { value -> // 收集并打印 println("$value at ${System.currentTimeMillis() - startTime} ms from start") }
在输出中可以清楚地看到 flatMapConcat 的顺序性质:
1: First at 121 ms from start 1: Second at 622 ms from start 2: First at 727 ms from start 2: Second at 1227 ms from start 3: First at 1328 ms from start 3: Second at 1829 ms from start
flatMapMerge:另?种展平模式是并发收集所有传?的流,并将它们的值合并到?个单独的流,以便尽快的发射值。它 由 flatMapMerge 与 flattenMerge 操作符实现。他们都接收可选的?于限制并发收集的流的个数的 concurrency 参数(默认情况下,它等于 DEFAULT_CONCURRENCY)。
val startTime = System.currentTimeMillis() // 记录开始时间 (1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射?个数字 .flatMapMerge { requestFlow(it) } .collect { value -> // 收集并打印 println("$value at ${System.currentTimeMillis() - startTime} ms from start") }
flatMapMerge 的并发性质很明显:
1: First at 136 ms from start 2: First at 231 ms from start 3: First at 333 ms from start 1: Second at 639 ms from start 2: Second at 732 ms from start 3: Second at 833 ms from start
注意,flatMapMerge 会顺序调?代码块(本?例中的 { requestFlow(it) }),但是并发收集结 果流,相当于执?顺序是?先执? map { requestFlow(it) }
然后在其返回结果上调? flattenMerge。
flatMapLatest:与 collectLatest 操作符类似(在"处理最新值" ?节中已经讨论过),也有相对应的“最新”展平模式,在 发出新流后?即取消先前流的收集。这由 flatMapLatest 操作符来实现。
val startTime = System.currentTimeMillis() // 记录开始时间 (1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射?个数字 .flatMapLatest { requestFlow(it) } .collect { value -> // 收集并打印 println("$value at ${System.currentTimeMillis() - startTime} ms from start") }
该?例的输出很好的展?了 flatMapLatest 的?作?式:
1: First at 142 ms from start 2: First at 322 ms from start 3: First at 425 ms from start 3: Second at 931 ms from start
注意,flatMapLatest 在?个新值到来时取消了块中的所有代码 (本?例中的 { requestFlow(it) })。这在该特定?例中不会有什么区别,由于调? requestFlow ??的速度是很快的,
不会发?挂起,所以不会被取消。然?,如果我们要在块中调?诸如 delay 之类的挂 起函数,这将会被表现出来。
流异常:当运算符中的发射器或代码抛出异常时,流收集可以带有异常的完成。有?种处理异常的?法。
收集器 try 与 catch:收集者可以使? Kotlin 的 try/catch 块来处理异常:
fun foo(): Flow= flow { for (i in 1..3) { println("Emitting $i") emit(i) // 发射下?个值 } } fun main() = runBlocking { try { foo().collect { value -> println(value) check(value <= 1) { "Collected $value" } } } catch (e: Throwable) { println("Caught $e") } }
这段代码成功的在末端操作符 collect 中捕获了异常,并且,如我们所?,在这之后不再发出任何值:
Emitting 1 1 Emitting 2 2 Caught java.lang.IllegalStateException: Collected 2
一切都已捕获:前?的?例实际上捕获了在发射器或任何过渡或末端操作符中发?的任何异常。例如,让我们修改代码 以便将发出的值映射为字符串,但是相应的代码会产??个异常:
fun foo(): Flow= flow { for (i in 1..3) { println("Emitting $i") emit(i) // 发射下?个值 } }.map { value -> check(value <= 1) { "Crashed on $value" } "string $value" } fun main() = runBlocking { try { foo().collect { value -> println(value) } } catch (e: Throwable) { println("Caught $e") } }
仍然会捕获该异常并停?收集:
Emitting 1 string 1 Emitting 2 Caught java.lang.IllegalStateException: Crashed on 2
异常透明性:发射器的代码如何封装其异常处理?为?
流必须对异常透明,即在 flow { ... } 构建器内部的 try/catch 块中发射值是违反异常透明性 的。这样可以保证收集器抛出的?个异常能被像先前?例中那样的 try/catch 块捕获。
发射器可以使? catch 操作符来保留此异常的透明性并允许封装它的异常处理。catch 操作符的代码块 可以分析异常并根据捕获到的异常以不同的?式对其做出反应:
可以使? throw 重新抛出异常。 可以使? catch 代码块中的 emit 将异常转换为值发射出去。 可以将异常忽略,或??志打印,或使??些其他代码处理它
例如,让我们在捕获异常的时候发射?本:
foo() .catch { e -> emit("Caught $e") } // 发射?个异常 .collect { value -> println(value) }
即使我们不再在代码的外层使? try/catch,?例的输出也是相同的。
透明捕获:catch 过渡操作符遵循异常透明性,仅捕获上游异常( catch 操作符上游的异常,但是它下?的不是)。 如果 collect { ... } 块(位于 catch 之下)抛出?个异常,那么异常会逃逸:
fun foo(): Flow= flow { for (i in 1..3) { println("Emitting $i") emit(i) } } fun main() = runBlocking { foo() .catch { e -> println("Caught $e") } // 不会捕获下游异常 .collect { value -> check(value <= 1) { "Collected $value" } println(value) } }
尽管有 catch 操作符,但不会打印“Caught …”消息:
声明式捕获:我们可以将 catch 操作符的声明性与处理所有异常的期望相结合,将 collect 操作符的代码块移动到 onEach 中,并将其放到 catch 操作符之前。收集该流必须由调??参的 collect() 来触发:
foo() .onEach { value -> check(value <= 1) { "Collected $value" } println(value) } .catch { e -> println("Caught $e") } .collect()
现在我们可以看到已经打印了“Caught …”消息,并且我们可以在没有显式使? try/catch 块的情况 下捕获所有异常:
流完成:
当流收集完成时(普通情况或异常情况),它可能需要执??个动作。你可能已经注意到,它可以通过两 种?式完成:命令式或声明式。
命令式finally块:除了 try / catch 之外,收集器还能使? finally 块在 collect 完成时执??个动作
fun foo(): Flow= (1..3).asFlow() fun main() = runBlocking { try { foo().collect { value -> println(value) } } finally { println("Done") } }
这段代码打印出 foo() 流产?的三个数字,后?跟?个“Done”字符串:
1 2 3 Done
声明式处理:对于声明式,流拥有 onCompletion 过渡操作符,它在流完全收集时调?。可以使? onCompletion 操作符重写前?的?例,并产?相同的输出:
foo() .onCompletion { println("Done") } .collect { value -> println(value) }
onCompletion 的主要优点是其 lambda 表达式的可空参数 Throwable 可以?于确定流收集是正常 完成还是有异常发?。在下?的?例中 foo() 流在发射数字 1 之后抛出了?个异常:
fun foo(): Flow= flow { emit(1) throw RuntimeException() } fun main() = runBlocking { foo() .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") } .catch { cause -> println("Caught exception") } .collect { value -> println(value) } }
如你所期望的,它打印了:
1 Flow completed exceptionally Caught exception
onCompletion 操作符与 catch 不同,它不处理异常。我们可以看到前?的?例代码,异常仍然流向下 游。它将被提供给后?的 onCompletion 操作符,并可以由 catch 操作符处理。
成功完成:与 catch 操作符的另?个不同点是 onCompletion 能观察到所有异常并且仅在上游流成功完成(没有 取消或失败)的情况下接收?个 null 异常。
fun foo(): Flow= (1..3).asFlow() fun main() = runBlocking { foo() .onCompletion { cause -> println("Flow completed with $cause") } .collect { value -> check(value <= 1) { "Collected $value" } println(value) } }
我们可以看到完成时 cause 不为空,因为流由于下游异常?中?:
1 Flow completed with java.lang.IllegalStateException: Collected 2 Exception in thread "main" java.lang.IllegalStateException: Collected 2
命令式还是声明式
现在我们知道如何收集流,并以命令式与声明式的?式处理其完成及异常情况。这?有?个很?然的问 题是,哪种?式应该是?选的?为什么?作为?个库,我们不主张采?任何特定的?式,并且相信这两种 选择都是有效的,应该根据??的喜好与代码?格进?选择。
启动流
使?流表?来??些源的异步事件是很简单的。在这个案例中,我们需要?个类似 addEventListener 的函数,该函数注册?段响应的代码处理即将到来的事件,并继续进?进?步的 处理。onEach 操作符可以担任该??。然?,onEach 是?个过渡操作符。我们也需要?个末端操作符 来收集流。否则仅调? onEach 是?效的。 如果我们在 onEach 之后使? collect 末端操作符,那么后?的代码会?直等待直?流被收集
// 模仿事件流 fun events(): Flow= (1..3).asFlow().onEach { delay(100) } fun main() = runBlocking { events() .onEach { event -> println("Event: $event") } .collect() // <--- 等待流收集 println("Done") }
你可以看到它的输出:
Event: 1 Event: 2 Event: 3 Done
launchIn 末端操作符可以在这?派上?场。使? launchIn 替换 collect 我们可以在单独的协程 中启动流的收集,这样就可以?即继续进?步执?代码:
fun main() = runBlocking{ events() .onEach { event -> println("Event: $event") } .launchIn(this) // <--- 在单独的协程中执?流 println("Done") }
它打印了:
Done Event: 1 Event: 2 Event: 3
launchIn 必要的参数 CoroutineScope 指定了?哪?个协程来启动流的收集。在先前的?例中这个 作?域来? runBlocking 协程构建器,在这个流运?的时候,runBlocking 作?域等待它的?协程执? 完毕并防? main 函数返回并终?此?例。 在实际的应?中,作?域来?于?个寿命有限的实体。在该实体的寿命终?后,相应的作?域就会被取 消,即取消相应流的收集。这种成对的 onEach { ... }.launchIn(scope) ?作?式就像 addEventListener ?样。?且,这不需要相应的 removeEventListener 函数,因为取消与结构 化并发可以达成这个?的。 注意,launchIn 也会返回?个 Job,可以在不取消整个作?域的情况下仅取消相应的流收集或对其进 ? join。
流(Flow)与响应式流(Reactive Streams)
对于熟悉响应式流(Reactive Streams)或诸如 RxJava 与 Project Reactor 这样的响应式框架的?来 说,Flow 的设计也许看起来会?常熟悉。 确实,其设计灵感来源于响应式流以及其各种实现。但是 Flow 的主要?标是拥有尽可能简单的设计, 对 Kotlin 以及挂起友好且遵从结构化并发。没有响应式的先驱及他们?量的?作,就不可能实现这?? 标。你可以阅读 Reactive Streams and Kotlin Flows 这篇?章来了解完成 Flow 的故事。 虽然有所不同,但从概念上讲,Flow 依然是响应式流,并且可以将它转换为响应式(规范及符合 TCK)的 发布者(Publisher),反之亦然。这些开箱即?的转换器可以在 kotlinx.coroutines 提供的相关响 应式模块( kotlinx-coroutines-reactive ?于 Reactive Streams,kotlinx-coroutinesreactor ?于 Project Reactor,以及 kotlinx-coroutines-rx2 / kotlinx-coroutinesrx3 ?于 RxJava2/RxJava3)中找到。集成模块包含 Flow 与其他实现之间的转换,与 Reactor 的 Context 集成以及与?系列响应式实体配合使?的挂起友好的使??式。