6、Channel(通道)


简单收发channel

func main()  {
    chanDemo()
}

func chanDemo() {
    c := make(chan int)
    go func() {
        for {
            n := <- c // 接收 channel 的数据
            fmt.Println(n)
        }
    }()

    // 发送 channel 数据
    c <- 1
    c <- 2

    time.Sleep(time.Millisecond)
}

channel 批量收发数据

func main()  {
    chanDemo()
}

func chanDemo() {
    // 定义 channel 数组
    var channels [10]chan int

    // 批量收数据
    for i := 0; i < 10; i++ {
        channels[i] = make(chan int)
        go worker(i,channels[i])
    }

    // 批量发数据
    for i := 0; i < 10; i++ {
        channels[i] <- i + 'a'
    }

    time.Sleep(time.Millisecond)
}

func worker (i int, c chan int){
    for {
        n := <- c // 接收 channel 的数据
        fmt.Printf("接收来自 %d 通道,数据%v\n",i,n)
    }
}

channel 通道类型

// 双向通道
var a chan int

// 仅发送类型
var b chan<- int

//仅接收类型
var c <-chan int

channel 的缓冲区

func main()  {
    bufferedChan()
}

func bufferedChan() {
    c := make(chan int,3) // 给通道设定缓冲区
    go worker(0,c)
    c <- 11
    c <- 22
    c <- 33
    close(c) // 关闭通道

    time.Sleep(time.Millisecond)
}

func worker (i int, c chan int){
    // 若从通道收不到数据就退出
    //for {
    //    n,ok := <- c
    //    if !ok {
    //        break
    //    }
    //    fmt.Printf("接收来自 %d 通道,数据%v\n",i,n)
    //}

    // 同理,若通道有数据就打印
    for n := range c {
        fmt.Printf("接收来自 %d 通道,数据%v\n",i,n)
    }
}

channel 等待所有 goroutine 结束

func main()  {
    chanDemo()
}

type workStruct struct {
    in chan int
    done chan bool
}

func chanDemo() {
    // 定义 channel 数组
    var channels [10]workStruct
    for i := 0; i < 10; i++ {
        channels[i] = workStruct{
            in : make(chan int),
            done: make(chan bool),
        }
    }

    // 批量收数据
    for i,w := range channels{
        go worker(i,w)
    }

    // 批量发数据
    for i,w := range channels{
        w.in <- 'a' + i
    }

    // 当接收完 channels 里面的 done ,表示 channel 执行完毕
    for _,w := range channels{
        <-w.done
        close(w.in)
        close(w.done)
    }

    fmt.Printf("执行后续操作")
}

func worker (i int, c workStruct){
    // 同理,若通道有数据就打印
    for n := range c.in {
        fmt.Printf("接收来自 %d 通道,数据%v\n",i,n)
        c.done <- true
    }
}

 

WaitGroup 等待所有 goroutine 结束

func main()  {
    chanDemo()
}

type workStruct struct {
    in chan int
    wg *sync.WaitGroup
}

func chanDemo() {
    var wg sync.WaitGroup

    // 定义 channel 数组
    var channels [10]workStruct
    for i := 0; i < 10; i++ {
        channels[i] = workStruct{
            in : make(chan int),
            wg: &wg,
        }
    }

    wg.Add(10)

    // 批量收数据
    for i,w := range channels{
        go worker(i,w)
    }

    // 批量发数据
    for i,w := range channels{
        w.in <- 'a' + i
    }

    wg.Wait()

    fmt.Printf("执行后续操作")
}

func worker (i int, c workStruct){
    // 若通道有数据就打印
    for n := range c.in {
        fmt.Printf("接收来自 %d 通道,数据%v\n",i,n)
        c.wg.Done()
    }
}

select 接收或发送某个 channel 的值

func main() {
    var c1, c2 = generator(), generator()
    for {
        select {
        case n := <-c1:
            fmt.Println("c1里面来了数据", n)
        case n := <-c2:
            fmt.Println("c2里面来了数据", n)
        }
    }
}

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

传统同步机制

WaitGroup

Cond

Mutex

type atomicInt struct {
    value int
    lock sync.Mutex
}

func (a *atomicInt) increment() {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value++
}

func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return int(a.value)
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get())
}

相关