kotlin协程——>通道


通道:延期的值提供了?种便捷的?法使单个值在多个协程之间进?相互传输。通道提供了?种在流中传输 值的?法。

通道基础:

  ?个 Channel 是?个和 BlockingQueue ?常相似的概念。其中?个不同是它代替了阻塞的 put 操 作并提供了挂起的 send,还替代了阻塞的 take 操作并提供了挂起的 receive。

val channel = Channel()
launch {
// 这?可能是消耗?量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平?并发送
    for (x in 1..5) channel.send(x * x)
}
// 这?我们打印了 5 次被接收的整数:
repeat(5) { println(channel.receive()) }
println("Done!")

  这段代码的输出如下:

1
4
9
16
25
Done!

关闭与迭代通道

  和队列不同,?个通道可以通过被关闭来表明没有更多的元素将会进?通道。在接收者中可以定期的使 ? for 循环来从通道中接收元素。 从概念上来说,?个 close 操作就像向通道发送了?个特殊的关闭指令。这个迭代停?就说明关闭指令 已经被接收了。所以这?保证所有先前发送出去的元素都在通道关闭前被接收到。

val channel = Channel()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // 我们结束发送
}
// 这?我们使? `for` 循环来打印所有被接收到的元素(直到通道被关闭)
for (y in channel) println(y)
println("Done!")

构建通道?产者

  协程?成?系列元素的模式很常?。这是 ?产者?消费者 模式的?部分,并且经常能在并发的代码中 看到它。你可以将?产者抽象成?个函数,并且使通道作为它的参数,但这与必须从函数中返回结果的 常识相违悖。 这?有?个名为 produce 的便捷的协程构建器,可以很容易的在?产者端正确?作,并且我们使?扩 展函数 consumeEach 在消费者端替代 for 循环:

val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")

管道:管道是?种?个协程在流中开始?产可能?穷多个元素的模式:

fun CoroutineScope.produceNumbers() = produce {
    var x = 1
    while (true) send(x++) // 在流中开始从 1 ?产?穷多个整数
}

  并且另?个或多个协程开始消费这些流,做?些操作,并?产了?些额外的结果。在下?的例?中,对这 些数字仅仅做了平?操作:

fun CoroutineScope.square(numbers: ReceiveChannel): ReceiveChannel = produce {
    for (x in numbers) send(x * x)
}

  主要的代码启动并连接了整个管道:

val numbers = produceNumbers() // 从 1 开始?成整数
val squares = square(numbers) // 整数求平?
repeat(5) {
    println(squares.receive()) // 输出前五个
}
println("Done!") // ?此已完成
coroutineContext.cancelChildren() // 取消?协程

  所有创建了协程的函数被定义在了 CoroutineScope 的扩展上,所以我们可以依靠结构化并发来 确保没有常驻在我们的应?程序中的全局协程。

使用管道的素数:让我们来展??个极端的例??在协程中使??个管道来?成素数。我们开启了?个数字的?限序列。

fun CoroutineScope.numbersFrom(start: Int) = produce {
    var x = start
    while (true) send(x++) // 开启了?个?限的整数流
}

  在下?的管道阶段中过滤了来源于流中的数字,删除了所有可以被给定素数整除的数字。

fun CoroutineScope.filter(numbers: ReceiveChannel, prime: Int) = produce {
    for (x in numbers) if (x % prime != 0) send(x)
}

  现在我们开启了?个从 2 开始的数字流管道,从当前的通道中取?个素数,并为每?个我们发现的素数 启动?个流?线阶段:

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ……

  下?的例?打印了前?个素数,在主线程的上下?中运?整个管道。直到所有的协程在该主协程 runBlocking 的作?域中被启动完成。我们不必使??个显式的列表来保存所有被我们已经启动的协 程。我们使? cancelChildren 扩展函数在我们打印了前?个素数以后来取消所有的?协程。

var cur = numbersFrom(2)
repeat(10) {
    val prime = cur.receive()
    println(prime)
    cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // 取消所有的?协程来让主协程结束

  这段代码的输出如下:

2
3
5
7
11
13
17
19
23
29

  注意,你可以在标准库中使? iterator 协程构建器来构建?个相似的管道。使? iterator 替换 produce 、yield 替换 send 、next 替换 receive 、Iterator 替换 ReceiveChannel 来摆 脱协程作?域,你将不再需要 runBlocking 。然?,如上所?,如果你在 Dispatchers.Default 上下? 中运?它,使?通道的管道的好处在于它可以充分利?多核? CPU。

  不过,这是?种?常不切实际的寻找素数的?法。在实践中,管道调?了另外的?些挂起中的调?(就像 异步调?远程服务)并且这些管道不能内置使? sequence / iterator ,因为它们不被允许随意的挂 起,不像 produce 是完全异步的。

扇出:

  多个协程也许会接收相同的管道,在它们之间进?分布式?作。让我们启动?个定期产?整数的?产者 协程(每秒?个数字):

fun CoroutineScope.produceNumbers() = produce {
    var x = 1 // 从 1 开始
    while (true) {
        send(x++) // 产?下?个数字
        delay(100) // 等待 0.1 秒
    }
}

  现在让我们启动五个处理器协程并让它们?作将近?秒。看看发?了什么:

val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 取消协程?产者从?将它们全部杀死

  该输出将类似于如下所?,尽管接收每个特定整数的处理器 id 可能会不同:

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

扇入

  多个协程可以发送到同?个通道。?如说,让我们创建?个字符串的通道,和?个在这个通道中以指定 的延迟反复发送?个指定字符串的挂起函数:

suspend fun sendString(channel: SendChannel, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

  现在,我们启动了?个发送字符串的协程,让我们看看会发?什么(在?例中,我们在主线程的上下?中 作为主协程的?协程来启动它们):

val channel = Channel()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 接收前六个
    println(channel.receive())
}
coroutineContext.cancelChildren() // 取消所有?协程来让主协程结束

  输出如下:

foo
foo
BAR!
foo
foo
BAR!

带缓冲的通道

  到?前为?展?的通道都是没有缓冲区的。?缓冲的通道在发送者和接收者相遇时传输元素(也称“对 接”)。如果发送先被调?,则它将被挂起直到接收被调?,如果接收先被调?,它将被挂起直到发送被调 ?。 Channel() ??函数与 produce 建造器通过?个可选的参数 capacity 来指定 缓冲区?? 。缓冲允 许发送者在被挂起前发送多个元素,就像 BlockingQueue 有指定的容量?样,当缓冲区被占满的时 候将会引起阻塞。

val channel = Channel(4) // 启动带缓冲的通道
val sender = launch { // 启动发送者协程
    repeat(10) {
        println("Sending $it") // 在每?个元素发送前打印它们
        channel.send(it) // 将在缓冲区被占满时挂起
    }
}
// 没有接收到东西……只是等待……
delay(1000)
sender.cancel() // 取消发送者协程

  使?缓冲通道并给 capacity 参数传? 四 它将打印“sending”五 次:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

  前四个元素被加?到了缓冲区并且发送者在试图发送第五个元素的时候被挂起。

通道是公平的

  发送和接收操作是 公平的 并且尊重调?它们的多个协程。它们遵守先进先出原则,可以看到第?个协 程调? receive 并得到了元素。在下?的例?中两个协程“乒”和“乓”都从共享的“桌?”通道接收到 这个“球”元素

data class Ball(var hits: Int)
fun main() = runBlocking {
    val table = Channel() // ?个共享的 table(桌?)
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // 乒乓球
    delay(1000) // 延迟 1 秒钟
    coroutineContext.cancelChildren() // 游戏结束,取消它们
}
suspend fun player(name: String, table: Channel) {
    for (ball in table) { // 在循环中接收球
        ball.hits++
        println("$name $ball")
        delay(300) // 等待?段时间
        table.send(ball) // 将球发送回去
    }
}

  “乒”协程?先被启动,所以它?先接收到了球。甚?虽然“乒”协程在将球发送会桌?以后?即开始接 收,但是球还是被“乓”协程接收了,因为它?直在等待着接收球:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

  注意,有时候通道执?时由于执?者的性质?看起来不那么公平。点击这个提案来查看更多细节。

计数器通道:

  计时器通道是?种特别的会合通道,每次经过特定的延迟都会从该通道进?消费并产? Unit 。虽然它 看起来似乎没?,它被?来构建分段来创建复杂的基于时间的 produce 管道和进?窗?化操作以及其 它时间相关的处理。可以在 select 中使?计时器通道来进?“打勾”操作。 使????法 ticker 来创建这些通道。为了表明不需要其它元素,请使? ReceiveChannel.cancel ? 法。 现在让我们看看它是如何在实践中?作的:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //创建计时器通道
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // no initial delay
    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent
    elements have 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")
// 模拟?量消费延迟
    println("Consumer pauses for 150ms")
    delay(150)
// 下?个元素?即可?
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay:
            $nextElement")
// 请注意,`receive` 调?之间的暂停被考虑在内,下?个元素的到达速度更快
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
    tickerChannel.cancel() // 表明不再需要更多的元素
}

  它的打印如下:

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

  请注意,ticker 知道可能的消费者暂停,并且默认情况下会调整下?个?成的元素如果发?暂停则延迟, 试图保持固定的?成元素率。 给可选的 mode 参数传? TickerMode.FIXED_DELAY 可以保持固定元素之间的延迟。

相关