kotlin协程——>通道
通道:延期的值提供了?种便捷的?法使单个值在多个协程之间进?相互传输。通道提供了?种在流中传输 值的?法。
通道基础:
?个 Channel 是?个和 BlockingQueue ?常相似的概念。其中?个不同是它代替了阻塞的 put 操 作并提供了挂起的 send,还替代了阻塞的 take 操作并提供了挂起的 receive。
val channel = Channel() launch { // 这?可能是消耗?量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平?并发送 for (x in 1..5) channel.send(x * x) } // 这?我们打印了 5 次被接收的整数: repeat(5) { println(channel.receive()) } println("Done!")
这段代码的输出如下:
1 4 9 16 25 Done!
关闭与迭代通道
和队列不同,?个通道可以通过被关闭来表明没有更多的元素将会进?通道。在接收者中可以定期的使 ? for 循环来从通道中接收元素。 从概念上来说,?个 close 操作就像向通道发送了?个特殊的关闭指令。这个迭代停?就说明关闭指令 已经被接收了。所以这?保证所有先前发送出去的元素都在通道关闭前被接收到。
val channel = Channel() launch { for (x in 1..5) channel.send(x * x) channel.close() // 我们结束发送 } // 这?我们使? `for` 循环来打印所有被接收到的元素(直到通道被关闭) for (y in channel) println(y) println("Done!")
构建通道?产者
协程?成?系列元素的模式很常?。这是 ?产者?消费者 模式的?部分,并且经常能在并发的代码中 看到它。你可以将?产者抽象成?个函数,并且使通道作为它的参数,但这与必须从函数中返回结果的 常识相违悖。 这?有?个名为 produce 的便捷的协程构建器,可以很容易的在?产者端正确?作,并且我们使?扩 展函数 consumeEach 在消费者端替代 for 循环:
val squares = produceSquares() squares.consumeEach { println(it) } println("Done!")
管道:管道是?种?个协程在流中开始?产可能?穷多个元素的模式:
fun CoroutineScope.produceNumbers() = produce{ var x = 1 while (true) send(x++) // 在流中开始从 1 ?产?穷多个整数 }
并且另?个或多个协程开始消费这些流,做?些操作,并?产了?些额外的结果。在下?的例?中,对这 些数字仅仅做了平?操作:
fun CoroutineScope.square(numbers: ReceiveChannel): ReceiveChannel = produce { for (x in numbers) send(x * x) }
主要的代码启动并连接了整个管道:
val numbers = produceNumbers() // 从 1 开始?成整数 val squares = square(numbers) // 整数求平? repeat(5) { println(squares.receive()) // 输出前五个 } println("Done!") // ?此已完成 coroutineContext.cancelChildren() // 取消?协程
所有创建了协程的函数被定义在了 CoroutineScope 的扩展上,所以我们可以依靠结构化并发来 确保没有常驻在我们的应?程序中的全局协程。
使用管道的素数:让我们来展??个极端的例??在协程中使??个管道来?成素数。我们开启了?个数字的?限序列。
fun CoroutineScope.numbersFrom(start: Int) = produce{ var x = start while (true) send(x++) // 开启了?个?限的整数流 }
在下?的管道阶段中过滤了来源于流中的数字,删除了所有可以被给定素数整除的数字。
fun CoroutineScope.filter(numbers: ReceiveChannel, prime: Int) = produce { for (x in numbers) if (x % prime != 0) send(x) }
现在我们开启了?个从 2 开始的数字流管道,从当前的通道中取?个素数,并为每?个我们发现的素数 启动?个流?线阶段:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ……
下?的例?打印了前?个素数,在主线程的上下?中运?整个管道。直到所有的协程在该主协程 runBlocking 的作?域中被启动完成。我们不必使??个显式的列表来保存所有被我们已经启动的协 程。我们使? cancelChildren 扩展函数在我们打印了前?个素数以后来取消所有的?协程。
var cur = numbersFrom(2) repeat(10) { val prime = cur.receive() println(prime) cur = filter(cur, prime) } coroutineContext.cancelChildren() // 取消所有的?协程来让主协程结束
这段代码的输出如下:
2 3 5 7 11 13 17 19 23 29
注意,你可以在标准库中使? iterator 协程构建器来构建?个相似的管道。使? iterator 替换 produce 、yield 替换 send 、next 替换 receive 、Iterator 替换 ReceiveChannel 来摆 脱协程作?域,你将不再需要 runBlocking 。然?,如上所?,如果你在 Dispatchers.Default 上下? 中运?它,使?通道的管道的好处在于它可以充分利?多核? CPU。
不过,这是?种?常不切实际的寻找素数的?法。在实践中,管道调?了另外的?些挂起中的调?(就像 异步调?远程服务)并且这些管道不能内置使? sequence / iterator ,因为它们不被允许随意的挂 起,不像 produce 是完全异步的。
扇出:
多个协程也许会接收相同的管道,在它们之间进?分布式?作。让我们启动?个定期产?整数的?产者 协程(每秒?个数字):
fun CoroutineScope.produceNumbers() = produce{ var x = 1 // 从 1 开始 while (true) { send(x++) // 产?下?个数字 delay(100) // 等待 0.1 秒 } }
现在让我们启动五个处理器协程并让它们?作将近?秒。看看发?了什么:
val producer = produceNumbers() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() // 取消协程?产者从?将它们全部杀死
该输出将类似于如下所?,尽管接收每个特定整数的处理器 id 可能会不同:
Processor #2 received 1 Processor #4 received 2 Processor #0 received 3 Processor #1 received 4 Processor #3 received 5 Processor #2 received 6 Processor #4 received 7 Processor #0 received 8 Processor #1 received 9 Processor #3 received 10
扇入
多个协程可以发送到同?个通道。?如说,让我们创建?个字符串的通道,和?个在这个通道中以指定 的延迟反复发送?个指定字符串的挂起函数:
suspend fun sendString(channel: SendChannel, s: String, time: Long) { while (true) { delay(time) channel.send(s) } }
现在,我们启动了?个发送字符串的协程,让我们看看会发?什么(在?例中,我们在主线程的上下?中 作为主协程的?协程来启动它们):
val channel = Channel() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(6) { // 接收前六个 println(channel.receive()) } coroutineContext.cancelChildren() // 取消所有?协程来让主协程结束
输出如下:
foo foo BAR! foo foo BAR!
带缓冲的通道
到?前为?展?的通道都是没有缓冲区的。?缓冲的通道在发送者和接收者相遇时传输元素(也称“对 接”)。如果发送先被调?,则它将被挂起直到接收被调?,如果接收先被调?,它将被挂起直到发送被调 ?。 Channel() ??函数与 produce 建造器通过?个可选的参数 capacity 来指定 缓冲区?? 。缓冲允 许发送者在被挂起前发送多个元素,就像 BlockingQueue 有指定的容量?样,当缓冲区被占满的时 候将会引起阻塞。
val channel = Channel(4) // 启动带缓冲的通道 val sender = launch { // 启动发送者协程 repeat(10) { println("Sending $it") // 在每?个元素发送前打印它们 channel.send(it) // 将在缓冲区被占满时挂起 } } // 没有接收到东西……只是等待…… delay(1000) sender.cancel() // 取消发送者协程
使?缓冲通道并给 capacity 参数传? 四 它将打印“sending”五 次:
Sending 0 Sending 1 Sending 2 Sending 3 Sending 4
前四个元素被加?到了缓冲区并且发送者在试图发送第五个元素的时候被挂起。
通道是公平的
发送和接收操作是 公平的 并且尊重调?它们的多个协程。它们遵守先进先出原则,可以看到第?个协 程调? receive 并得到了元素。在下?的例?中两个协程“乒”和“乓”都从共享的“桌?”通道接收到 这个“球”元素
data class Ball(var hits: Int) fun main() = runBlocking { val table = Channel() // ?个共享的 table(桌?) launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) // 乒乓球 delay(1000) // 延迟 1 秒钟 coroutineContext.cancelChildren() // 游戏结束,取消它们 } suspend fun player(name: String, table: Channel ) { for (ball in table) { // 在循环中接收球 ball.hits++ println("$name $ball") delay(300) // 等待?段时间 table.send(ball) // 将球发送回去 } }
“乒”协程?先被启动,所以它?先接收到了球。甚?虽然“乒”协程在将球发送会桌?以后?即开始接 收,但是球还是被“乓”协程接收了,因为它?直在等待着接收球:
ping Ball(hits=1) pong Ball(hits=2) ping Ball(hits=3) pong Ball(hits=4)
注意,有时候通道执?时由于执?者的性质?看起来不那么公平。点击这个提案来查看更多细节。
计数器通道:
计时器通道是?种特别的会合通道,每次经过特定的延迟都会从该通道进?消费并产? Unit 。虽然它 看起来似乎没?,它被?来构建分段来创建复杂的基于时间的 produce 管道和进?窗?化操作以及其 它时间相关的处理。可以在 select 中使?计时器通道来进?“打勾”操作。 使????法 ticker 来创建这些通道。为了表明不需要其它元素,请使? ReceiveChannel.cancel ? 法。 现在让我们看看它是如何在实践中?作的:
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking{ val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //创建计时器通道 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Initial element is available immediately: $nextElement") // no initial delay nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay println("Next element is not ready in 50 ms: $nextElement") nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 100 ms: $nextElement") // 模拟?量消费延迟 println("Consumer pauses for 150ms") delay(150) // 下?个元素?即可? nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Next element is available immediately after large consumer delay: $nextElement") // 请注意,`receive` 调?之间的暂停被考虑在内,下?个元素的到达速度更快 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") tickerChannel.cancel() // 表明不再需要更多的元素 }
它的打印如下:
Initial element is available immediately: kotlin.Unit Next element is not ready in 50 ms: null Next element is ready in 100 ms: kotlin.Unit Consumer pauses for 150ms Next element is available immediately after large consumer delay: kotlin.Unit Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
请注意,ticker 知道可能的消费者暂停,并且默认情况下会调整下?个?成的元素如果发?暂停则延迟, 试图保持固定的?成元素率。 给可选的 mode 参数传? TickerMode.FIXED_DELAY 可以保持固定元素之间的延迟。