kotlin协程——>共享的可变状态与并发


共享的可变状态与并发

  协程可?多线程调度器(?如默认的 Dispatchers.Default)并发执?。这样就可以提出所有常?的并发 问题。主要的问题是同步访问共享的可变状态。协程领域对这个问题的?些解决?案类似于多线程领域 中的解决?案,但其它解决?案则是独???的。

问题

  我们启动?百个协程,它们都做?千次相同的操作。我们同时会测量它们的完成时间以便进?步的?较

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100 // 启动的协程数量
    val k = 1000 // 每个协程重复执?同?动作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程的作?域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

  我们从?个?常简单的动作开始:使?多线程的 Dispatchers.Default 来递增?个共享的可变变量

var counter = 0
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

  这段代码最后打印出什么结果?它不太可能打印出“Counter = 100000”,因为?百个协程在多个线程中 同时递增计数器但没有做并发处理。

volatile ?济于事

  有?种常?的误解:volatile 可以解决并发问题。让我们尝试?下:

@Volatile // 在 Kotlin 中 `volatile` 是?个注解
var counter = 0
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

  这段代码运?速度更慢了,但我们最后仍然没有得到“Counter = 100000”这个结果,因为 volatile 变量 保证可线性化(这是“原?”的技术术语)读取和写?变量,但在?量动作(在我们的?例中即“递增”操 作)发?时并不提供原?性。

线程安全的数据结构

  ?种对线程、协程都有效的常规解决?法,就是使?线程安全(也称为同步的、可线性化、原?)的数据结 构,它为需要在共享状态上执?的相应操作提供所有必需的同步处理。在简单的计数器场景中,我们可 以使?具有 incrementAndGet 原?操作的 AtomicInteger 类:

val counter = AtomicInteger()
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}

  这是针对此类特定问题的最快解决?案。它适?于普通计数器、集合、队列和其他标准数据结构以及它 们的基本操作。然?,它并不容易被扩展来应对复杂状态、或?些没有现成的线程安全实现的复杂操作

以细粒度限制线程

  限制线程 是解决共享可变状态问题的?种?案:对特定共享状态的所有访问权都限制在单个线程中。它 通常应?于 UI 程序中:所有 UI 状态都局限于单个事件分发线程或应?主线程中。这在协程中很容易实 现,通过使??个单线程上下?:

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
// 将每次?增限制在单线程上下?中
            withContext(counterContext) {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

  这段代码运??常缓慢,因为它进?了 细粒度 的线程限制。每个增量操作都得使? [withContext(counterContext)] 块从多线程 Dispatchers.Default 上下?切换到单线程上下?。

以粗粒度限制线程

  在实践中,线程限制是在?段代码中执?的,例如:状态更新类业务逻辑中?部分都是限于单线程中。下 ?的?例演?了这种情况,在单线程上下?中运?每个协程。

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// 将?切都限制在单线程上下?中
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

  这段代码运?更快?且打印出了正确的结果。

互斥

  该问题的互斥解决?案:使?永远不会同时执?的 关键代码块 来保护共享状态的所有修改。在阻塞的 世界中,你通常会为此?的使? synchronized 或者 ReentrantLock 。在协程中的替代品叫做 Mutex 。它具有 lock 和 unlock ?法,可以隔离关键的部分。关键的区别在于 Mutex.lock() 是?个 挂起函数,它不会阻塞线程。 还有 withLock 扩展函数,可以?便的替代常?的 mutex.lock(); try { …… } finally { mutex.unlock() } 模式:

val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
// ?锁保护每次?增
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

  此?例中锁是细粒度的,因此会付出?些代价。但是对于某些必须定期修改共享状态的场景,它是?个 不错的选择,但是没有?然线程可以限制此状态。

Actors

  ?个 actor 是由协程、被限制并封装到该协程中的状态以及?个与其它协程通信的 通道 组合?成的? 个实体。?个简单的 actor 可以简单的写成?个函数,但是?个拥有复杂状态的 actor 更适合由类来表 ?。

  有?个 actor 协程构建器,它可以?便地将 actor 的邮箱通道组合到其作?域中(?来接收消息)、组合 发送 channel 与结果集对象,这样对 actor 的单个引?就可以作为其句柄持有。

  使? actor 的第?步是定义?个 actor 要处理的消息类。Kotlin 的密封类很适合这种场景。我们使? IncCounter 消息(?来递增计数器)和 GetCounter 消息(?来获取值)来定义 CounterMsg 密 封类。后者需要发送回复。CompletableDeferred 通信原语表?未来可知(可传达)的单个值,这?被? 于此?的。

// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred) : CounterMsg() // 携带回复的请求

  接下来我们定义?个函数,使? actor 协程构建器来启动?个 actor:

// 这个函数启动?个新的计数器 actor
fun CoroutineScope.counterActor() = actor {
    var counter = 0 // actor 状态
    for (msg in channel) { // 即将到来消息的迭代器
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

  main 函数代码很简单:

fun main() = runBlocking {
    val counter = counterActor() // 创建该 actor
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter)
        }
    }
// 发送?条消息以?来从?个 actor 中获取计数值
    val response = CompletableDeferred()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // 关闭该actor
}

  actor 本?执?时所处上下?(就正确性??)?关紧要。?个 actor 是?个协程,??个协程是按顺序 执?的,因此将状态限制到特定协程可以解决共享可变状态的问题。实际上,actor 可以修改??的私有 状态,但只能通过消息互相影响(避免任何锁定)。

  actor 在?负载下?锁更有效,因为在这种情况下它总是有?作要做,?且根本不需要切换到不同的上下?。

注意,actor 协程构建器是?个双重的 produce 协程构建器。?个 actor 与它接收消息的通道相关
联,??个 producer 与它发送元素的通道相关联。

相关