golang中channel的Q&A
channel Q&A
什么是CSP?
不要通过共享内存来实现通信,而要通过通信实现共享内存
CSP通常被认为go在并发上成功的关键因素
go一开始就把CSP的思想融入到语言的核心里,所以并发编程称为go的一个独特的优势
大多数编程语言的并发模式是基于线程和内存同步访问控制实现的,Go的并发编程模型通过goroutine和channel来代替
goroutine和线程类似,channel和mutex(内存同步访问控制)类似
goroutine解放了程序员,让我们更贴近业务去思考问题,而不用考虑各种像线程库、线程开销、线程调度这些繁琐的底层问题了,
goroutine天生替你解决好了
channel则天生可以和其它channel组成,我们可以把收集各种子系统结果的channel输入到同一个channel
channel还可以和select,cancel,timeout组合,而mutex没有这些功能
go的并发原则非常优秀,目标就是简单,尽量使用channel,把goroutine当做免费的资源,随便用
channel和select和timeout组合案例:
// channel 和 select timeout组合
chan1 := make(chan int, 1)
//out := make(chan int, 1)
//timer := time.NewTimer(time.Second * 3)
go func() {
time.Sleep(time.Second * 3)
chan1 <- 88
}()
select {
case v := <-chan1:
fmt.Println("chan1接收到数据", v)
//case <-timer.C:
case <-time.After(time.Second * 2):
fmt.Println("超时了")
}
// channel 和 select timeout组合
chan1 := make(chan int, 1)
out := make(chan int, 1)
timer := time.NewTimer(time.Second * 2)
go func() {
go func() {
time.Sleep(time.Second * 3)
chan1 <- 88
}()
select {
case v := <-chan1:
fmt.Println("chan1接收到数据", v)
//case <-timer.C:
case <-timer.C:
fmt.Println("超时了")
}
out <- 88
}()
<-out
channel底层的数据结构是什么
type hchan struct {
// chan 里元素数量
qcount uint
// chan 底层循环数组的长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被关闭的标志
closed uint32
// chan 中元素类型
elemtype *_type // element type
// 已发送元素在循环数组中的索引
sendx uint // send index
// 已接收元素在循环数组中的索引
recvx uint // receive index
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters
// 保护 hchan 中所有字段
lock mutex
}
关于字段的含义都写在注释里了,再来重点说几个字段:
buf 指向底层循环数组,只有缓冲型的 channel 才有。
sendx,recvx 均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。
sendq,recvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。
waitq 是 sudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装:
lock 用来保证每个读 channel 或写 channel 的操作都是原子的。
- 创建
func makechan(t *chantype, size int64) *hchan
从函数原型来看,创建的channel是一个指针,所以我们在函数之间直接传递channel,而不需要传递channel的指针
channel发送和接收元素的本质是什么?
* 就是说channel的发送和接收本质上都是"值拷贝",无论是从sender goroutine的栈道chan buf
还是从chan buf到receiver goroutine,或者直接从sender goroutine到receiver goroutine
案例:
package main
import (
"fmt"
"time"
)
type user struct {
name string
age int8
}
var u = user{name: "马亚南", age: 29}
var g = &u
func printUser(c <-chan *user) {
time.Sleep(time.Second * 2)
fmt.Println("printUser输出的是:", <-c)
}
func modifyUser(u *user) {
fmt.Println("modifyUser输出的是:", u)
u.name = "香香"
}
func main() {
// 非常重要的一点:goroutine发送到channel,从channel接收到goroutine
// 还是goroutine发送到另一个goroutine,全部都是"值拷贝"
ch := make(chan *user, 5)
ch <- g // g发送到channel,是值拷贝,拷贝的是g中的内容,也就是u的指针
g = &user{name: "彤宝贝", age: 5}
go printUser(ch)
go modifyUser(g)
time.Sleep(time.Second * 3)
fmt.Println(g)
}
/* 输出结果
modifyUser输出的是: &[彤宝贝 5]
printUser输出的是 &[马亚南 29]
&[香香 5]
*/
// 结论: goroutine发送到channel,channel读取到goroutine全部是值传递
从channel接收数据的过程是怎样的
* goroutine是用户态的协程,由go runtime进行管理,相比线程由os进行管理
goroutine更轻量,所以我们可以轻松创建数万goroutine
一个内核线程可以管理多个goroutine,当其中的一个goroutine阻塞时,内核线程可以调度其它的goroutine来运行,
内核线程本身不会阻塞,这就是我们常说的M:N模型
channel发送接收报错的场景
错误1:当向无缓冲通道且无接收者发送数据时,会报panic错误
错误2:当从无发送者(无论有无缓冲)通道接收数据时,都会报panic错误
零值3:当从已关闭的(无论有无缓冲)通道接收数据时,都会返回通道对应类型的零值
4: 当向有缓冲且通道未满发送数据时,无论有无接收者,都不会报错
5. 当向有缓冲通道且通道已满,而且无接收者是,会报panic错误
channel在什么情况下会引起资源泄漏
* channel可能会引发goroutine泄漏
泄漏的原因是,当goroutine操作channel时,处于发送或接收阻塞状态,而channel处于满或空的状态,
一直得不到改变,同时垃圾回收器也不会回收此类资源,进而导致goroutine一直处于等待队列中,不见天日
另外在程序运行中,对于一个channel,如果没有任何goroutine对其引用了,gc就会对其回收,不会引起内存泄漏
channel有哪些应用
* channel和goroutine的结合是go并发编程的大杀器
* 而channel和select、cancel、timer的结合能实现各种各样的功能
* 1. 停止信号:关闭channel或向channel发送一个元素,使接收方通过channel获得信息后做相应的操作
* 2. 任务定时:
(1)超时控制
func main() {
c := make(chan int)
out := make(chan bool)
go func() {
go func() {
time.Sleep(time.Second)
c <- 2
}()
select {
case <-time.After(time.Second * 1):
fmt.Println("超时了")
case v := <-c:
fmt.Println("c中读取出的数据:", v)
}
out <- true
}()
<-out
}
(2)定期执行某个任务
func main() {
timer := time.NewTicker(time.Second)
for {
select {
case <-timer.C:
fmt.Println("执行了") // 每隔1秒执行一次
}
}
}
3. 解耦生产方和消费方
服务启动时,启动n个worker,作为工作协程池,这些协程工作在一个for无限循环里, 从某个channel消费工作任务并执行
案例:
func main() {
tasksChan := make(chan int, 100) // 任务队列
go workerTask(tasksChan) // 开启处理任务的协程池
// 发起任务
for i := 0; i < 10; i++ {
tasksChan <- i
}
select {
case <-time.After(time.Second * 3):
}
}
func workerTask(tasksChan chan int) {
// 开启5个协程去处理任务队列中的数据
GOS := 5
for i := 0; i < GOS; i++ {
// 局部变量在堆栈上存储,也是变量逃逸的一种场景(解决方法:使用闭包)
go func(i int) {
for {
value := <-tasksChan
fmt.Printf("finish task: %d by worker %d\n", value, i)
time.Sleep(time.Second)
}
}(i)
}
}
输出结果:
finish task: 1 by worker 4
finish task: 2 by worker 1
finish task: 3 by worker 2
finish task: 4 by worker 3
finish task: 0 by worker 0
finish task: 5 by worker 0
finish task: 6 by worker 1
finish task: 7 by worker 3
finish task: 8 by worker 2
finish task: 9 by worker 4
4. 控制并发数
有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,
因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。
案例:
var limit = make(chan int, 3)
func main() {
// 通过channel控制最大并发数量
tasks := [...]int{11, 22, 33, 44, 55, 66, 77, 88, 99, 100}
for i, v := range tasks {
// 为每一个任务开启一个goroutine
go func(i, v int) {
// 通过channel控制goroutine最大并发数量
limit <- -1
fmt.Println(i, v)
time.Sleep(time.Second)
<-limit
}(i, v)
}
time.Sleep(time.Second * 4)
}
从一个关闭的channel中仍能读出数据吗?
从一个有缓冲的channel里读取数据,当channel关闭后,依然能读出有效值,
只有当ok返回的值为false时,读取出的值是channel中默认类型的零值
案例:
func main() {
ch := make(chan int, 3)
ch <- 55
close(ch)
v, ok := <-ch
if ok {
fmt.Println("first read value: ", v)
}
v = <-ch
fmt.Println(v) // 0
v, ok = <-ch // 从通道中读取数据,既可以直接接收值, 也可以接收值和接收状态
if !ok {
fmt.Println("second read value:", v)
}
}
关于channel的happened-before有哪些
1. 第 n 个 send 一定 happened before 第 n 个 receive finished,无论是缓冲型还是非缓冲型的 channel。
2. 对于容量为 m 的缓冲型 channel,第 n 个 receive 一定 happened before 第 n+m 个 send finished。
3. 对于非缓冲型的 channel,第 n 个 receive 一定 happened before 第 n 个 send finished。
4. channel close 一定 happened before receiver 得到通知。
解释:
1. send不一定发生在recv之前,因为也可以先receive,然后goroutine被挂起,之后被sender唤醒,
无论如何,send一定发生在receive finished之前
2. 缓冲型channel,容量为m,第n+m个send发送之后的情形:
如果第n个receive还没发生,这时候channel填满了,send就会被阻塞,
当第n个receive接收完成后,sender goroutine会被唤醒,之后在继续发送过程
也就是说第n个receive happened-before 第n+m个send finished
3. 第n个send如果被阻塞了,sender goroutine被挂起,第n个receive到来时,先与第n个send finished
如果第n个send未被阻塞,说明第n个receive已经在那等着了,它不仅happened-before send finished
而且还happened-before send
4. 先设置closed = 1,在唤醒等待的receiver, 并将零值拷贝给receiver
案例:
// 顺序一致性的内存模型,这是并发编程的基础
var ch = make(chan bool)
var msg string
func AGoroutine() {
time.Sleep(time.Second)
msg = "哈哈"
<-ch
}
func main() {
go AGoroutine()
ch <- true
fmt.Println(msg) // 一定打印哈哈,为什么?
// 因为 对于无缓冲型通道,第n个receive一定发生在第n个send finished之前
}
// 这里依赖的就是前面讲的happened-before 第一条,send一定发生在receive finished之前,
// 即 ch <- true 在 <-ch 之前
关闭一个channel的过程是怎么样的?
1. 关闭一个nil channel会报错panic
2. close逻辑比较简单,对于一个channel,recvq和sendq分别保存了阻塞的发送者和接受者
关闭channel后,对于等接收者而言,会收到一个相应类型的零值,对于发送者,会直接panic,
所以,在不了解channel还有没有接收者的时候,不能贸然关闭channel
3. close函数先上一把大锁,然后把所有挂载channel上的receiver和sender连成一个sudog链表
在解锁,最后在将所有的sudog全部唤醒
4. 唤醒之后,该干嘛干嘛,sender继续向channel发送就会报panic错误,
receiver继续接收,如果channel为空了,返回对应类型的零值
向channel发送数据的过程是怎样的
1. 如果检测到channel已经关闭,直接panic
2. 如果能从等待接收队列recvq里出队一个sudog(代表一个goroutine),说明此时channel是空的,
没有元素,所以才会有等待接收者,这时会调用send函数将元素从发送者的栈拷贝到接受者的栈中去
3. 向一个非缓冲型的channel发送数据,从一个无元素的channel(非缓冲型或缓冲型但空)接收数据,
都会导致一个goroutine直接操作另一个goroutine的栈
如何优雅的关闭channel
关于channel的使用有几点不方便的地方
1. 在不改变channel自身状况的情况下,无法获知一个channel是否关闭
2. 关闭一个closed channel就会导致panic, 如果关闭channel的一方不知道channel是否处于关闭状态时就去贸然关闭,是件很危险的事情
3. 向一个closed channel发送数据就会导致panic, 如果想channel发送数据的一方不知道是否处于关闭状态时就去贸然发送数据,是件很危险的事情
4. 有一条广泛流传的关闭channel的原则:
不要从一个receiver侧关闭channel,也不要在有多个sender时关闭channel(本质:不要关闭或者发送数据到以关闭的channel)
5. 有两个不那么优雅的关闭channel的方法
* 使用defer-recover机制,大胆放心的向channel发送数据或关闭channel,报错了有defer-recover在兜底
func main() {
ch := make(chan int, 10)
go goroutineA(ch)
go goroutineB(ch)
time.Sleep(time.Second * 4)
}
func goroutineA(ch chan int) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
ch <- 10
time.Sleep(time.Second)
close(ch)
}
func goroutineB(ch chan int) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
ch <- 20
time.Sleep(time.Second * 2)
close(ch)
}
* 使用sync.Once来保证只关闭一次
案例:
var (
once *sync.Once = &sync.Once{}
)
func main() {
ch := make(chan int, 10)
go goroutineA(ch)
go goroutineB(ch)
time.Sleep(time.Second * 4)
}
func goroutineA(ch chan int) {
ch <- 10
time.Sleep(time.Second)
close(ch) // 错误
//once.Do(func() {
// close(ch)
//})
}
func goroutineB(ch chan int) {
ch <- 20
time.Sleep(time.Second * 2)
close(ch) // 错误
//once.Do(func() {
// close(ch)
//})
}
// 上面两处错误的原因:关闭已经关闭的channel就会报panic
6. 如何优雅的关闭channel(四种情况)
1. 如何只有一个sender而且只有一个receiver,那么直接在sender中关闭就好了
2. 如果只有一个sender而且有多个receiver,也是直接在sender中关闭就好了
3. 如果有多个sender, 只有一个receiver的情况下,解决方案就是增加一个传递关闭信号的channel
receiver通过关闭channel指令,sender监听到关闭信号后,停止发送数据
func main() {
rand.Seed(time.Now().UnixNano()) // 真随机
const Max = 100000
const NumSenders = 1000
var wg sync.WaitGroup
wg.Add(NumSenders + 1)
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// senders
for i := 0; i < NumSenders; i++{
go func() {
defer wg.Done()
for {
select {
case <-stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// 开启一个receiver goroutine
go func() {
defer wg.Done()
for value := range dataCh{
if value == Max - 1{
fmt.Println("send close signal to senders", value)
close(stopCh)
// 此处必须得return,因为如果不return的话, 由于dataCh并没有关闭,for循环会一直等待,进入死锁
// 但是有一点,return之后,dataCh中由于是100的容量,所以最后的100条数据不会读取出来
return
}
fmt.Println(value)
}
wg.Done()
}()
wg.Wait()
//select{
//case <-time.After(time.Second * 5): // 1小时超时控制
//}
}
上面程序中dataCh并没有显示的关闭通道,因为在go语言中,对于一个channel,
如果没有任何goroutine引用它,不管该channel有没有关闭,都会被GC回收
所以在这种情况下,所谓的优雅的关闭channel就是不关闭channel,让GC代劳
4. 切记两点:
1. 不要从接收方关闭channel
2. 如果channel有多个发送方,也不要关闭channel(可以通过另外一个信号通道告诉发送方停止发送)
5. 本质:不要关闭已经关闭的channel,不要向已关闭的channel发送数据
6. 如果有多个sender和多个receiver的时候跟第三种情况就又不太一样了,如果由receiver关闭stopCh的话
由于有多个receiver,就会包panic,此时我们需要一个中间人,多个receiver都向该中间人发送关闭通道的指令
中间人接收到第一个请求后就会直接关闭,这里将 toStop 声明成了一个 缓冲型的 channel。
假设 toStop 声明的是一个非缓冲型的 channel,那么第一个发送的关闭 dataCh 请求可能会丢失。
因为无论是 sender 还是 receiver 都是通过 select 语句来发送请求,如果中间人所在的 goroutine 没有准备好,
那 select 语句就不会选中,直接走 default 选项,什么也不做。这样,第一个关闭 dataCh 的请求就会丢失。
func main() {
rand.Seed(time.Now().UnixNano()) // 真随机
const Max = 100000
const NumSenders = 1000
const NumReceivers = 10
var wg sync.WaitGroup
wg.Add(NumSenders + NumReceivers)
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
toStop := make(chan string, 1)
var stoppedBy string
// 中间人监听toStop,一旦有数据关闭stopCh
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++{
go func(id string) {
defer wg.Done()
for {
value := rand.Intn(Max)
if value == 0 {
select {
case toStop <- "sender#" + id:
default:
}
fmt.Println("value=====================================:", value)
return
}
select {
case <-stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++{
go func(id string) {
defer wg.Done()
for{
select {
case <-stopCh:
return
case value := <-dataCh:
if value == Max - 1{
select {
case toStop <- "receiver#" + id:
default:
}
fmt.Println("value==============================:", value)
return
}
fmt.Println(value)
}
}
}(strconv.Itoa(i))
}
wg.Wait()
//select{
//case <-time.After(time.Second * 5): // 1小时超时控制
//}
}
操作channel的情况总结
总结一下操作channel的结果
操作 nil channel closed channel not nil, not closed channel
close panic panic 正常关闭
读<- ch 阻塞 读到对应类型的零值 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞
写ch <- 阻塞 panic 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞
总结一下:发生panic有三种情况:
1. 向一个已经关闭的channel发送数据
2. 重复关闭channel
3. 关闭一个nil channel
阻塞的两种情况:
1. 向一个nil channel发送数据
2. 从一个nil channel读取数据