kotlin协程——>异步流


异步流

  挂起函数可以异步的返回单个值,但是该如何异步返回多个计算好的值呢?这正是 Kotlin 流(Flow)的 ?武之地。

表示多个值

  在 Kotlin 中可以使?集合来表?多个值。?如说,我们可以拥有?个函数 foo() ,它返回?个包含三 个数字的 List,然后使? forEach 打印它们:

fun foo(): List = listOf(1, 2, 3)
fun main() {
    foo().forEach { value -> println(value) }
}

  这段代码输出如下:

1
2
3

  序列:如果使??些消耗 CPU 资源的阻塞代码计算数字(每次计算需要 100 毫秒)那么我们可以使? Sequence 来表?数字:

fun foo(): Sequence = sequence { // 序列构建器
    for (i in 1..3) {
        Thread.sleep(100) // 假装我们正在计算
        yield(i) // 产?下?个值
    }
}
fun main() {
    foo().forEach { value -> println(value) }
}

  这段代码输出相同的数字,但在打印每个数字之前等待 100 毫秒。

  挂起函数:然?,计算过程阻塞运?该代码的主线程。当这些值由异步代码计算时,我们可以使? suspend 修饰 符标记函数 foo ,这样它就可以在不阻塞的情况下执?其?作并将结果作为列表返回:

suspend fun foo(): List {
    delay(1000) // 假装我们在这?做了?些异步的事情
    return listOf(1, 2, 3)
}
fun main() = runBlocking {
    foo().forEach { value -> println(value) }
}

  这段代码将会在等待?秒之后打印数字。

  流:使? List 结果类型,意味着我们只能?次返回所有值。为了表?异步计算的值流(stream),我们可以使 ? Flow 类型(正如同步计算值会使? Sequence 类型):

   foo(): Flow = flow { // 流构建器
    for (i in 1..3) {
        delay(100) // 假装我们在这?做了?些有?的事情
        emit(i) // 发送下?个值
    }
    main() = runBlocking {
  // 启动并发的协程以验证主线程并未阻塞
    launch {
            for (k in 1..3) {
                println("I'm not blocked $k")
                delay(100)
            }
     }
// 收集这个流
     foo().collect { value -> println(value) }

  这段代码在不阻塞主线程的情况下每等待 100 毫秒打印?个数字。在主线程中运??个单独的协程每 100 毫秒打印?次“I'm not blocked”已经经过了验证。

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

  注意使? Flow 的代码与先前?例的下述区别:

名为 flow 的 Flow 类型构建器函数。
flow { ... } 构建块中的代码可以挂起。
函数 foo() 不再标有 suspend 修饰符。
流使? emit 函数 发射 值。
流使? collect 函数 收集 值。

流是冷的

  Flow 是?种类似于序列的冷流 — 这段 flow 构建器中的代码直到流被收集的时候才运?。这在以下的 ?例中?常明显:

foo(): Flow = flow {
println("Flow started")
for (i in 1..3) {
    delay(100)
    emit(i)
}

main() = runBlocking {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }

  打印如下:

Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

  这是返回?个流的 foo() 函数没有标记 suspend 修饰符的主要原因。通过它??,foo() 会尽快 返回且不会进?任何等待。该流在每次收集的时候启动,这就是为什么当我们再次调? collect 时我 们会看到“Flow started”。

流取消

  流采?与协程同样的协作取消。然?,流的基础设施未引?其他取消点。取消完全透明。像往常?样,流 的收集可以在当流在?个可取消的挂起函数(例如 delay)中挂起的时候取消,否则不能取消。 下?的?例展?了当 withTimeoutOrNull 块中代码在运?的时候流是如何在超时的情况下取消并停 ?执?其代码的:

foo(): Flow = flow {
for (i in 1..3) {
    delay(100)
    println("Emitting $i")
    emit(i)
}
main() = runBlocking {
withTimeoutOrNull(250) { // 在 250 毫秒后超时
    foo().collect { value -> println(value) }
}
println("Done")

  注意,在 foo() 函数中流仅发射两个数字,产?以下输出:

Emitting 1
1
Emitting 2
2
Done

流的构建起

  先前?例中的 flow { ... } 构建器是最基础的?个。还有其他构建器使流的声明更简单:

flowOf 构建器定义了?个发射固定值集的流。
使? .asFlow() 扩展函数,可以将各种集合与序列转换为流。

  因此,从流中打印从 1 到 3 的数字的?例可以写成:

// 将?个整数区间转化为流
(1..3).asFlow().collect { value -> println(value) }

过渡流操作符

  可以使?操作符转换流,就像使?集合与序列?样。过渡操作符应?于上游流,并返回下游流。这些操 作符也是冷操作符,就像流?样。这类操作符本?不是挂起函数。它运?的速度很快,返回新的转换流的 定义。

  基础的操作符拥有相似的名字,?如 map 与 filter。流与序列的主要区别在于这些操作符中的代码可以 调?挂起函数。

  举例来说,?个请求中的流可以使? map 操作符映射出结果,即使执??个?时间的请求操作也可以 使?挂起函数来实现:

end fun performRequest(request: Int): String {
delay(1000) // 模仿?时间运?的异步?作
return "response $request"

main() = runBlocking {
(1..3).asFlow() // ?个请求流
      .map { request -> performRequest(request) }
      .collect { response -> println(response) }    

  它产?以下三?,每??每秒出现?次:

response 1
response 2
response 3

  转换操作符:在流转换操作符中,最通?的?种称为 transform。它可以?来模仿简单的转换,例如 map 与 filter,以 及实施更复杂的转换。使? transform 操作符,我们可以 发射 任意值任意次。

  ?如说,使? transform 我们可以在执??时间运?的异步请求之前发射?个字符串并跟踪这个响应:

(1..3).asFlow() // ?个请求流
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
    .collect { response -> println(response) }    

  这段代码的输出如下

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

  限长操作符:限?过渡操作符(例如 take)在流触及相应限制的时候会将它的执?取消。协程中的取消操作总是通过 抛出异常来执?,这样所有的资源管理函数(如 try {...} finally {...} 块)会在取消的情况下 正常运?:

fun numbers(): Flow = flow {
    try {
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)
    } finally {
        println("Finally in numbers")
    }
}
fun main() = runBlocking {
    numbers()
            .take(2) // 只获取前两个
            .collect { value -> println(value) }
}

  这段代码的输出清楚地表明,numbers() 函数中对 flow {...} 函数体的执?在发射出第?个数 字后停?:

1
2
Finally in numbers

末端流操作符

  末端操作符是在流上?于启动流收集的挂起函数。collect 是最基础的末端操作符,但是还有另外?些 更?便使?的末端操作符:

转化为各种集合,例如 toList 与 toSet。
获取第?个(first)值与确保流发射单个(single)值的操作符。
使? reduce 与 fold 将流规约到单个值。

  举例来说:

val sum = (1..5).asFlow()
        .map { it * it } // 数字 1 ? 5 的平?
        .reduce { a, b -> a + b } // 求和(末端操作符)
println(sum)

  打印单个数字:

55

流是连续的

  流的每次单独收集都是按顺序执?的,除?进?特殊操作的操作符使?多个流。该收集过程直接在协程 中运?,该协程调?末端操作符。默认情况下不启动新协程。从上游到下游每个过渡操作符都会处理每 个发射出的值然后再交给末端操作符。 请参?以下?例,该?例过滤偶数并将其映射到字符串:

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0
    }
    .map {
        println("Map $it")
        "string $it"
    }.collect {
        println("Collect $it")
    }

  执行:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

流上下文:

  流的收集总是在调?协程的上下?中发?。例如,如果有?个流 foo ,然后以下代码在它的编写者指定 的上下?中运?,??论流 foo 的实现细节如何:

withContext(context) {
    foo.collect { value ->
        println(value) // 运?在指定上下?中
    }
}

  流的该属性称为上下文保存:所以默认的,flow { ... } 构建器中的代码运?在相应流的收集器提供的上下?中。举例来说,考虑 打印线程的 foo 的实现,它被调?并发射三个数字

fun foo(): Flow = flow {
    log("Started foo flow")
    for (i in 1..3) {
        emit(i)
    }
}
fun main() = runBlocking {
    foo().collect { value -> log("Collected $value") }
}

  运?这段代码:

[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

  由于 foo().collect 是在主线程调?的,则 foo 的流主体也是在主线程调?的。这是快速运?或 异步代码的理想默认形式,它不关?执?的上下?并且不会阻塞调?者。

    withContext发出错误:然?,?时间运?的消耗 CPU 的代码也许需要在 Dispatchers.Default 上下?中执?,并且更新 UI 的 代码也许需要在 Dispatchers.Main 中执?。通常,withContext ?于在 Kotlin 协程中改变代码的上下 ?,但是 flow {...} 构建器中的代码必须遵循上下?保存属性,并且不允许从其他上下?中发射 (emit)。

  尝试运?下?的代码:

fun foo(): Flow = flow {
// 在流构建器中更改消耗 CPU 代码的上下?的错误?式
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // 假装我们以消耗 CPU 的?式进?计算
            emit(i) // 发射下?个值
        }
    }
}
fun main() = runBlocking {
    foo().collect { value -> println(value) }
}

  这段代码产?如下的异常:

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
    Flow was collected in [CoroutineId(1),
"coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
    but emission happened in [CoroutineId(1),
"coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
    Please refer to 'flow' documentation or use 'flowOn' instead
at ...

  flowOn 操作符:例外的是 flowOn 函数,该函数?于更改流发射的上下?。以下?例展?了更改流上下?的正确?法, 该?例还通过打印相应线程的名字以展?它们的?作?式:

fun foo(): Flow = flow {
    for (i in 1..3) {
        Thread.sleep(100) // 假装我们以消耗 CPU 的?式进?计算
        log("Emitting $i")
        emit(i) // 发射下?个值
    }
}.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下?的正确?式
fun main() = runBlocking {
    foo().collect { value ->
        log("Collected $value")
    }
}

  注意,当收集发?在主线程中,flow { ... } 是如何在后台线程中?作的: 这?要观察的另?件事是 flowOn 操作符已改变流的默认顺序性。现在收集发?在?个协程中 (“coroutine#1”)?发射发?在运?于另?个线程中与收集协程并发运?的另?个协程 (“coroutine#2”)中。当上游流必须改变其上下?中的 CoroutineDispatcher 的时候,flowOn 操作符 创建了另?个协程。

缓冲:

  从收集流所花费的时间来看,将流的不同部分运?在不同的协程中将会很有帮助,特别是当涉及到?时 间运?的异步操作时。例如,考虑?种情况,foo() 流的发射很慢,它每花费 100 毫秒才产??个元素;?收集器也?常慢,需要花费 300 毫秒来处理元素。让我们看看从该流收集三个数字要花费多?时间:

fun foo(): Flow = flow {
    for (i in 1..3) {
        delay(100) // 假装我们异步等待了 100 毫秒
        emit(i) // 发射下?个值
    }
}
fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().collect { value ->
            delay(300) // 假装我们花费 300 毫秒来处理它
            println(value)
        }
    }
    println("Collected in $time ms")
}

  它会产?这样的结果,整个收集过程?约需要 1200 毫秒(3 个数字,每个花费 400 毫秒):

1
2
3
Collected in 1220 ms

  我们可以在流上使? buffer 操作符来并发运? foo() 中发射元素的代码以及收集的代码,?不是顺序运?它们:

val time = measureTimeMillis {
    foo()
            .buffer() // 缓冲发射项,?需等待
            .collect { value ->
                delay(300) // 假装我们花费 300 毫秒来处理它
                println(value)
            }
}
println("Collected in $time ms")

  它产?了相同的数字,只是更快了,由于我们?效地创建了处理流?线,仅仅需要等待第?个数字产? 的 100 毫秒以及处理每个数字各需花费的 300 毫秒。这种?式?约花费了 1000 毫秒来运?:

1
2
3
Collected in 1071 ms
注意,当必须更改 CoroutineDispatcher 时,flowOn 操作符使?了相同的缓冲机制,但是我们在 这?显式地请求缓冲?不改变执?上下?。

  合并:当流代表部分操作结果或操作状态更新时,可能没有必要处理每个值,?是只处理最新的那个。在本? 例中,当收集器处理它们太慢的时候,conflate 操作符可以?于跳过中间值。构建前?的?例:

val time = measureTimeMillis {
      foo()
            .conflate() // 合并发射项,不对每个值进?处理
            .collect { value ->
                delay(300) // 假装我们花费 300 毫秒来处理它
                println(value)
            }
}
println("Collected in $time ms")

  我们看到,虽然第?个数字仍在处理中,但第?个和第三个数字已经产?,因此第?个是 conflated ,只 有最新的(第三个)被交付给收集器:

1
3
Collected in 758 ms

  处理最新值:当发射器和收集器都很慢的时候,合并是加快处理速度的?种?式。它通过删除发射值来实现。另?种 ?式是取消缓慢的收集器,并在每次发射新值的时候重新启动它。有?组与 xxx 操作符执?相同基本 逻辑的 xxxLatest 操作符,但是在新值产?的时候取消执?其块中的代码。让我们在先前的?例中尝 试更换 conflate 为 collectLatest:

val time = measureTimeMillis {
        foo()
            .collectLatest { value -> // 取消并重新发射最后?个值
                println("Collecting $value")
                delay(300) // 假装我们花费 300 毫秒来处理它
                println("Done $value")
            }
}
println("Collected in $time ms")

  由于 collectLatest 的函数体需要花费 300 毫秒,但是新值每 100 秒发射?次,我们看到该代码块对每 个值运?,但是只收集最后?个值:

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

  由于 collectLatest 的函数体需要花费 300 毫秒,但是新值每 100 秒发射?次,我们看到该代码块对每 个值运?,但是只收集最后?个值:

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

组合多个流,组合多个流有很多种方式

  Zip:就像 Kotlin 标准库中的 Sequence.zip 扩展函数?样,流拥有?个 zip 操作符?于组合两个流中的相关值:

val nums = (1..3).asFlow() // 数字 1..3
val strs = flowOf("one", "two", "three") // 字符串
nums.zip(strs) { a, b -> "$a -> $b" } // 组合单个字符串
    .collect { println(it) } // 收集并打印

  ?例打印如下:

1 -> one
2 -> two
3 -> three

  Combine:当流表??个变量或操作的最新值时(请参阅相关?节 conflation),可能需要执?计算,这依赖于相应 流的最新值,并且每当上游流产?值的时候都需要重新计算。这种相应的操作符家族称为 combine。 例如,先前?例中的数字如果每 300 毫秒更新?次,但字符串每 400 毫秒更新?次,然后使? zip 操作 符合并它们,但仍会产?相同的结果,尽管每 400 毫秒打印?次结果:

val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射?次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使?“zip”组合单个字符串
    .collect { value -> // 收集并打印
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}

  然?,当在这?使? combine 操作符来替换 zip:

val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射?次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.combine(strs) { a, b -> "$a -> $b" } // 使?“combine”组合单个字符串
    .collect { value -> // 收集并打印
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}

  我们得到了完全不同的输出,其中,nums 或 strs 流中的每次发射都会打印??:

1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

展平流:流表?异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另?个值序列的请求。?如 说,我们可以拥有下?这样?个返回间隔 500 毫秒的两个字符串流的函数:

fun requestFlow(i: Int): Flow = flow {
    emit("$i: First")
    delay(500) // 等待 500 毫秒
    emit("$i: Second")
}

  现在,如果我们有?个包含三个整数的流,并为每个整数调? requestFlow ,如下所?:

(1..3).asFlow().map { requestFlow(it) }

  然后我们得到了?个包含流的流( Flow ),需要将其进?展平为单个流以进?下? 步处理。集合与序列都拥有 flatten 与 flatMap 操作符来做这件事。然?,由于流具有异步的性质,因此 需要不同的展平模式,为此,存在?系列的流展平操作符。

  flatMapConcat:连接模式由 flatMapConcat 与 flattenConcat 操作符实现。它们是相应序列操作符最相近的类似物。它 们在等待内部流完成之前开始收集下?个值,如下?的?例所?:

val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射?个数字
    .flatMapConcat { requestFlow(it) }
    .collect { value -> // 收集并打印
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}

  在输出中可以清楚地看到 flatMapConcat 的顺序性质:

1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

  flatMapMerge:另?种展平模式是并发收集所有传?的流,并将它们的值合并到?个单独的流,以便尽快的发射值。它 由 flatMapMerge 与 flattenMerge 操作符实现。他们都接收可选的?于限制并发收集的流的个数的 concurrency 参数(默认情况下,它等于 DEFAULT_CONCURRENCY)。

val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射?个数字
    .flatMapMerge { requestFlow(it) }
    .collect { value -> // 收集并打印
        println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}

  flatMapMerge 的并发性质很明显:

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
注意,flatMapMerge 会顺序调?代码块(本?例中的 { requestFlow(it) }),但是并发收集结 果流,相当于执?顺序是?先执? map { requestFlow(it) }
然后在其返回结果上调? flattenMerge。

  flatMapLatest:与 collectLatest 操作符类似(在"处理最新值" ?节中已经讨论过),也有相对应的“最新”展平模式,在 发出新流后?即取消先前流的收集。这由 flatMapLatest 操作符来实现。

val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射?个数字
    .flatMapLatest { requestFlow(it) }
    .collect { value -> // 收集并打印
     println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}

  该?例的输出很好的展?了 flatMapLatest 的?作?式:

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
注意,flatMapLatest 在?个新值到来时取消了块中的所有代码 (本?例中的 { requestFlow(it) })。这在该特定?例中不会有什么区别,由于调? requestFlow ??的速度是很快的,
不会发?挂起,所以不会被取消。然?,如果我们要在块中调?诸如 delay 之类的挂 起函数,这将会被表现出来。

流异常:当运算符中的发射器或代码抛出异常时,流收集可以带有异常的完成。有?种处理异常的?法。

  收集器 try 与 catch:收集者可以使? Kotlin 的 try/catch 块来处理异常:

fun foo(): Flow = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // 发射下?个值
    }
}
fun main() = runBlocking {
    try {
        foo().collect { value ->
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

  这段代码成功的在末端操作符 collect 中捕获了异常,并且,如我们所?,在这之后不再发出任何值:

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

  一切都已捕获:前?的?例实际上捕获了在发射器或任何过渡或末端操作符中发?的任何异常。例如,让我们修改代码 以便将发出的值映射为字符串,但是相应的代码会产??个异常:

fun foo(): Flow =
        flow {
            for (i in 1..3) {
                println("Emitting $i")
                emit(i) // 发射下?个值
            }
        }.map { value ->
            check(value <= 1) { "Crashed on $value" }
            "string $value"
        }
fun main() = runBlocking {
    try {
        foo().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

  仍然会捕获该异常并停?收集:

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

异常透明性:发射器的代码如何封装其异常处理?为?

  流必须对异常透明,即在 flow { ... } 构建器内部的 try/catch 块中发射值是违反异常透明性 的。这样可以保证收集器抛出的?个异常能被像先前?例中那样的 try/catch 块捕获。

  发射器可以使? catch 操作符来保留此异常的透明性并允许封装它的异常处理。catch 操作符的代码块 可以分析异常并根据捕获到的异常以不同的?式对其做出反应:

可以使? throw 重新抛出异常。
可以使? catch 代码块中的 emit 将异常转换为值发射出去。
可以将异常忽略,或??志打印,或使??些其他代码处理它

  例如,让我们在捕获异常的时候发射?本:

foo()
    .catch { e -> emit("Caught $e") } // 发射?个异常
    .collect { value -> println(value) }

  即使我们不再在代码的外层使? try/catch,?例的输出也是相同的。

  透明捕获:catch 过渡操作符遵循异常透明性,仅捕获上游异常( catch 操作符上游的异常,但是它下?的不是)。 如果 collect { ... } 块(位于 catch 之下)抛出?个异常,那么异常会逃逸:

fun foo(): Flow = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}
fun main() = runBlocking {
    foo()
         .catch { e -> println("Caught $e") } // 不会捕获下游异常
         .collect { value ->
             check(value <= 1) { "Collected $value" }
             println(value)
          }
}

  尽管有 catch 操作符,但不会打印“Caught …”消息:

  声明式捕获:我们可以将 catch 操作符的声明性与处理所有异常的期望相结合,将 collect 操作符的代码块移动到 onEach 中,并将其放到 catch 操作符之前。收集该流必须由调??参的 collect() 来触发:

foo()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }
        println(value)
    }
    .catch { e -> println("Caught $e") }
    .collect()

  现在我们可以看到已经打印了“Caught …”消息,并且我们可以在没有显式使? try/catch 块的情况 下捕获所有异常:

流完成:

  当流收集完成时(普通情况或异常情况),它可能需要执??个动作。你可能已经注意到,它可以通过两 种?式完成:命令式或声明式。

  命令式finally块:除了 try / catch 之外,收集器还能使? finally 块在 collect 完成时执??个动作

fun foo(): Flow = (1..3).asFlow()
fun main() = runBlocking {
    try {
        foo().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}

  这段代码打印出 foo() 流产?的三个数字,后?跟?个“Done”字符串:

1
2
3
Done

  声明式处理:对于声明式,流拥有 onCompletion 过渡操作符,它在流完全收集时调?。可以使? onCompletion 操作符重写前?的?例,并产?相同的输出:

foo()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }

  onCompletion 的主要优点是其 lambda 表达式的可空参数 Throwable 可以?于确定流收集是正常 完成还是有异常发?。在下?的?例中 foo() 流在发射数字 1 之后抛出了?个异常:

fun foo(): Flow = flow {
    emit(1)
    throw RuntimeException()
}
fun main() = runBlocking {
    foo()
         .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
               .catch { cause -> println("Caught exception") }
               .collect { value -> println(value) }
}

  如你所期望的,它打印了:

1
Flow completed exceptionally
Caught exception

  onCompletion 操作符与 catch 不同,它不处理异常。我们可以看到前?的?例代码,异常仍然流向下 游。它将被提供给后?的 onCompletion 操作符,并可以由 catch 操作符处理。

  成功完成:与 catch 操作符的另?个不同点是 onCompletion 能观察到所有异常并且仅在上游流成功完成(没有 取消或失败)的情况下接收?个 null 异常。

fun foo(): Flow = (1..3).asFlow()
fun main() = runBlocking {
    foo()
            .onCompletion { cause -> println("Flow completed with $cause") }
            .collect { value ->
                check(value <= 1) { "Collected $value" }
                println(value)
            }
}

  我们可以看到完成时 cause 不为空,因为流由于下游异常?中?:

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2

命令式还是声明式

  现在我们知道如何收集流,并以命令式与声明式的?式处理其完成及异常情况。这?有?个很?然的问 题是,哪种?式应该是?选的?为什么?作为?个库,我们不主张采?任何特定的?式,并且相信这两种 选择都是有效的,应该根据??的喜好与代码?格进?选择。

启动流

  使?流表?来??些源的异步事件是很简单的。在这个案例中,我们需要?个类似 addEventListener 的函数,该函数注册?段响应的代码处理即将到来的事件,并继续进?进?步的 处理。onEach 操作符可以担任该??。然?,onEach 是?个过渡操作符。我们也需要?个末端操作符 来收集流。否则仅调? onEach 是?效的。 如果我们在 onEach 之后使? collect 末端操作符,那么后?的代码会?直等待直?流被收集

// 模仿事件流
fun events(): Flow = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking {
    events()
            .onEach { event -> println("Event: $event") }
            .collect() // <--- 等待流收集
    println("Done")
}

  你可以看到它的输出:

Event: 1
Event: 2
Event: 3
Done

  launchIn 末端操作符可以在这?派上?场。使? launchIn 替换 collect 我们可以在单独的协程 中启动流的收集,这样就可以?即继续进?步执?代码:

fun main() = runBlocking {
    events()
            .onEach { event -> println("Event: $event") }
            .launchIn(this) // <--- 在单独的协程中执?流
    println("Done")
}

  它打印了:

Done
Event: 1
Event: 2
Event: 3

  launchIn 必要的参数 CoroutineScope 指定了?哪?个协程来启动流的收集。在先前的?例中这个 作?域来? runBlocking 协程构建器,在这个流运?的时候,runBlocking 作?域等待它的?协程执? 完毕并防? main 函数返回并终?此?例。 在实际的应?中,作?域来?于?个寿命有限的实体。在该实体的寿命终?后,相应的作?域就会被取 消,即取消相应流的收集。这种成对的 onEach { ... }.launchIn(scope) ?作?式就像 addEventListener ?样。?且,这不需要相应的 removeEventListener 函数,因为取消与结构 化并发可以达成这个?的。 注意,launchIn 也会返回?个 Job,可以在不取消整个作?域的情况下仅取消相应的流收集或对其进 ? join。

流(Flow)与响应式流(Reactive Streams)

  对于熟悉响应式流(Reactive Streams)或诸如 RxJava 与 Project Reactor 这样的响应式框架的?来 说,Flow 的设计也许看起来会?常熟悉。 确实,其设计灵感来源于响应式流以及其各种实现。但是 Flow 的主要?标是拥有尽可能简单的设计, 对 Kotlin 以及挂起友好且遵从结构化并发。没有响应式的先驱及他们?量的?作,就不可能实现这?? 标。你可以阅读 Reactive Streams and Kotlin Flows 这篇?章来了解完成 Flow 的故事。 虽然有所不同,但从概念上讲,Flow 依然是响应式流,并且可以将它转换为响应式(规范及符合 TCK)的 发布者(Publisher),反之亦然。这些开箱即?的转换器可以在 kotlinx.coroutines 提供的相关响 应式模块( kotlinx-coroutines-reactive ?于 Reactive Streams,kotlinx-coroutinesreactor ?于 Project Reactor,以及 kotlinx-coroutines-rx2 / kotlinx-coroutinesrx3 ?于 RxJava2/RxJava3)中找到。集成模块包含 Flow 与其他实现之间的转换,与 Reactor 的 Context 集成以及与?系列响应式实体配合使?的挂起友好的使??式。

相关