Day08 并发编程


并发编程

01并发的概念

C/S、B/S
网络开发
文件上传案例

今日重点:并发编程
协程 channel管道、并发案例

并发和并行

并发:在同一时间段,处理多个任务的能力
并行:在同一时刻,处理多个任务的能力

进程
线程
协程(重点)
早期计算机:一个程序占用所有资源

fmt.Scan
Listen.Accept
阻塞时,cpu是否在工作?cpu不在工作

示例1

做累加时,随着计算的数据越来越多,cpu占用时间越来越长

package main

import "fmt"

func main() {
   var ret=0
   for i:=0;i<=100;i++{
      ret+=i
   }
   fmt.Println(ret)
}

早期计算机问题

一个计算机无法同时执行多个程序
    1 两个程序无法同时运行。
    2 CPU浪费

抢占式分配资源

1标准:遇到io则切换
    如果程序1,一直没有遇到io时(一直占用CPU)。又变成单任务了
2 引入时间片:
    没有遇到io,到10ms,切换给其他程序用10ms

图示

同一个时间段内,多个程序在执行

放歌的时候,cpu切换,是否能感受的到?
CPU足够快,正常情况下是感受不到。
卡顿是 任务足够多的时候,cpu切换,直观感受

多核,出现了并行

计算密集型:长时间占着CPU

IO密集型:聊天阻塞的案例。阻塞的时候,不需要占用CPU

开发中大部分是IO密集型的任务

切换的标准:

1 遇到IO,立刻切换
2 达到时间片,切换

02 进程和线程的概念

进程概念

我们都知道计算机的核心是CPU,它承担了所有的计算任务

操作系统是计算机的管理者,它负责任务的调度、资源的分配和管理,统领整个计算机硬件;应用程序则是具有某种功能的程序,程序是运行于操作系统之上的。

进程是一种抽象的概念,从来没有统一的标准定义。进程一般由程序、数据集合进程控制块三部分组成。

程序用于描述进程要完成的功能,是控制进程执行的指令集;
数据集合是程序在执行时所需要的数据和工作区;
程序控制块(Program Control Block,简称PCB),包含进程的描述信息控制信息,是进程存在的唯一标志

进程特性

动态性:进程是程序的一次执行过程,是临时的,有生命期的,是动态产生,动态消亡的;
并发性:任何进程都可以同其他进程一起并发执行;
独立性:进程是系统进行资源分配和调度的一个独立单位;
结构性:进程由程序、数据和进程控制块三部分组成。

产生线程和协程的 核心2点

1 空间占有率
2 时间切换的开销大
开发的app
    一个软件运行多个任务时。
    3个进程

切换的时间开销大

线程:真正执行任务的不再是进程了。开销小
    线程是执行单元
进程是:管理单元

1 占用资源小了
2 切换的时间开销小了

 

理解

第1种场景
三个人 开三个教室,执行完告诉下一个人。
开销大,占3个教室,告诉下一个 时间大

第2种场景:
在一个屋子,3个人用。
开销小了,交流方便了。

线程的概念

线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元,是处理器调度和分派的基本单位。

1问题: 1个进程可以没有线程吗?不可以
    1个进程最少要有1个线程,主线程
2问题: 线程有没有可能脱离进程独立存在?不可以
    需要进程管理
任务调度

任务执行的那一小段时间叫做时间片,任务正在执行时的状态运行状态,被暂停的线程任务状态叫做就绪状态,意为等待下一个属于它的时间片的到来。

进程和线程的区别

1 线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;
2 一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线;
3 进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间(包括代码段、数据集、堆等)及一些进程级的资源(如打开文件和信号),某进程内的线程在其它进程不可见;
4 调度和切换:线程上下文切换比进程上下文切换要快得多。

两个进程中,每个进程可以有多个线程。

线程的生命周期

进程有五种状态,创建、就绪、运行、阻塞(等待)、退出

###未整理###

03 协程的概念

基于线程之上

线程叫:微进程

协程叫:微线程

难点:在操作系统中,感受不到

1、线程空间开销还是大

2、线程时间占用大

线程和线程之前抢占,切换(状态保留:耗时间)

协程:用户级线程

1个线程对应一个任务(1个函数)

在用户级,a b c 三个函数,放在一个线程上。

协程调度器

串行执行

调度器在充当操作系统的角色,在协调执行

协程调度器(程序代码),从用户的级别上形成并发执行。对操作系统而言,一个线程在执行。此时,操作系统不知道切换

1调度器在协调,不用抢占

1开销小,在一个线程上,不用再开空间

进程 几十个

线程 上千个

协程 几十万个

真正执行 go a() 是协程调度器在切换,不是内核的线程在切换

联合使用 线程+协程。在用户级的线程(协程),再扩展。有资源就使用

6个协程(6个函数任务)  内核级的线程:2个线程

04 协程案例

协程的使用

示例1

package main

import (
   "fmt"
   "time"
)

func foo(){
   fmt.Println("foo开始")
   time.Sleep(time.Second*3)
   fmt.Println("foo结束")
}

func bar() {
   fmt.Println("bar开始")
   time.Sleep(time.Second*5)
   fmt.Println("bar结束")
}

func main() {
   start:=time.Now().Unix()
   foo()
   bar()
   end:=time.Now().Unix()
   fmt.Println(end-start)
}

使用go

没有启动之前,

启动1个协程

func main() {
   start:=time.Now().Unix()
   go foo()
   bar()
   end:=time.Now().Unix()
   fmt.Println(end-start)
}

打印结果

bar开始

foo开始

foo结束

bar结束

5

图解

启动新的协程,开始执行foo()代码,是调度器利用多核线程执行

是否开了2cpu执行?调度器决定,是否开多核。

数量少

开始 几乎同时出现

bar开始

foo开始

foo结束

bar结束

5

再开一个协程时,主线程不等子线程执行完,主线程仍在执行

主线程停止,子线程也就停止了

同步等待锁

Sync.WiteGroup

package main

import (
   "fmt"
   "sync"
   "time"
)

func foo(){
   defer wg.Done() //计数减一
   fmt.Println("foo开始")
   time.Sleep(time.Second*3)
   fmt.Println("foo结束")
}

func bar()  {
   defer wg.Done() //计数减一
   fmt.Println("bar开始")
   time.Sleep(time.Second*5)
   fmt.Println("bar结束")
}

var wg sync.WaitGroup

func main() {
   start:=time.Now().Unix()
   wg.Add(2)
   go foo() //启动一个协程(用户级线程)
   go bar()
   wg.Wait() //主线程等待 wg计数清零,继续运行
   end:=time.Now().Unix()
   fmt.Println(end-start)
}

05 聊天室案例并发版本

聊天代码问题,只能处理一个用户

server端的问题,陷入死循环中,加入go

package main

import (
   "fmt"
   "net"
   "strings"
)

func handleClient(conn net.Conn){
   for true {
      //(3)接受客户端的数据
      data:=make([]byte,1024)
      n,_:=conn.Read(data) //默认阻塞
      fmt.Println("n",n)
      fmt.Println("接收客户端信息",string(data))
      //客户端强制退出
      if n==0 {
         break
      }

      //(4)给客户端回复一个消息
      res:=strings.ToUpper(string(data[:n]))
      n2,_:=conn.Write([]byte(res))
      fmt.Println("n",n2)
   }
}

//服务端
func main() {
   //(1)确定网络三要素,建立服务
   listen,_:=net.Listen("tcp","127.0.0.1:8000")
   fmt.Println("listen",listen)
   for true {
      //(2)等待用户连接,没有连接的客户端保持阻塞
      fmt.Println("server is waiting...")
      conn,_:=listen.Accept()
      fmt.Println("conn",conn)

      go  handleClient(conn)
   }
}

图解:客户端连接,就开一个协程

图解:客户端发消息

server端监听着客户的连接

conn 负责各自的会话

思考?此时是并行和并发?具体并行和并发主要由调度器决定

下午 06 互斥锁

上午回顾: 进程 线程 协程 及案例应用

锁的概念

并发时,数据之间产生干扰

互斥锁

调用1次。add() 程序实现+1

package main

import "fmt"

var x=0

func add(){
   x++
}

func main() {
   for i:=0;i<1000;i++ {
      //调用1次add()程序实现+1
      add()
   }
   fmt.Println(x)
}

添加同步等待锁

100个执行完,打印

package main

import (
   "fmt"
   "sync"
)

var x=0

func add(){
   defer wg.Done()
   x++
}

var wg sync.WaitGroup
func main() {
   for i:=0;i<1000;i++ {
      //调用1次add()程序实现+1
      wg.Add(1)
      go add()
   }
   wg.Wait()
   fmt.Println(x)
}

for循环+1,for循坏外边一样

package main

import (
   "fmt"
   "sync"
)

var x=0

func add(){
   defer wg.Done()
   x++
}

var wg sync.WaitGroup
func main() {
   wg.Add(100)
   for i:=0;i<100;i++ {
      go add()
   }
   wg.Wait()
   fmt.Println(x)
}

执行1000次,结果会发生变化

数据不安全

package main

import (
   "fmt"
   "sync"
)

var x=0

func add(){
   defer wg.Done()
   x++
}

var wg sync.WaitGroup
func main() {
   wg.Add(1000)
   for i:=0;i<1000;i++ {
      go add()
   }
   wg.Wait()
   fmt.Println(x)
}

并发时:开辟1000个协程。都做x自加1操作。

cpu执行过程中

x=x+1 做两件事: 1个是x+1 2个是赋值给x

当还没完成赋值时,占用cpu时间片切走了。

当时间片再切回到线程上时,再复制x=4

增加互斥锁,解决

把涉及到数据的时候,做同步化操作

package main

import (
   "fmt"
   "sync"
)

var x=0

func add(){
   defer wg.Done()
   lock.Lock()
   x++
   lock.Unlock()
}

var wg sync.WaitGroup
var lock sync.Mutex
func main() {
   wg.Add(1000)
   for i:=0;i<1000;i++ {
      go add()
   }
   wg.Wait()
   fmt.Println(x)
}

下午07 channel的使用

作用

前面例子:协程和协程之前不通信。Golang并发的核心哲学是不要通过共享内存进行通信

Go语言中的通道(channel)是一种特殊的类型。在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。goroutine 间通过通道就可以通信。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。

channel

不是通过共享内存进行通信,

channel是个 数据类型

解决协程和协程直接通信使用

声明

var 通道变量 chan 通道类型

引用类型(没有默认值)

很像切片,是个队列

func main() {

    //var ch chan int //引用类型,没有默认值
   var ch = make(chan int,3)
}

案例1

channel有先进先出原则

写入值

package main

import "fmt"

func main() {
   // var ch chan int //引用类型,没有默认值

   var ch = make(chan int,3)
   // 向管道写入值
   ch<-100
   ch<-200
   ch<-300
   
   fmt.Println(<-ch)
   fmt.Println(<-ch)
   fmt.Println(<-ch)
}

打印结果

100

200

300

发生值拷贝

package main

import "fmt"

func main() {
   //var ch chan int
   ch:=make(chan int,3)
   ch<-100
   ch<-200
   ch<-300
   x:=<-ch
   fmt.Println("x",x) //x 100
   fmt.Println(<-ch) //200
   fmt.Println(<-ch) //300
}

容量为3 ,写入第4个值,死锁

package main

import "fmt"

func main() {
   //var ch chan int
   ch:=make(chan int,3)
   ch<-100
   ch<-200
   ch<-300
   ch<-400
   x:=<-ch
   fmt.Println("x",x) //x 100
   fmt.Println(<-ch) //200
   fmt.Println(<-ch) //300
   //fmt.Println(<-ch)
}

 

容量为3多写一次发生死锁

fatal error: all goroutines are asleep - deadlock!

多取1次,也死锁

package main

import "fmt"

func main() {
   //var ch chan int
   ch:=make(chan int,3)
   ch<-100
   ch<-200
   ch<-300
   //ch<-400
   x:=<-ch
   fmt.Println("x",x) //x 100
   fmt.Println(<-ch) //200
   fmt.Println(<-ch) //300
   fmt.Println(<-ch) //fatal error: all goroutines are asleep - deadlock!
}

需要同时有存值和取值。写入ch<-400时,需要同时有取值阻塞。

首先得有俩协程。

1 写满,再写入时,没有协程读取出来,就会报死锁。

2 读取时,没有协程写入,又没有可读数据,报死锁。

下午08 channel的应用

案例2

package main

import (
   "fmt"
   "reflect"
)

type Stu struct {
   Name string
   Age int
}

func main() {
   var ch2=make(chan interface{},3)
   ch2<-100
   ch2<-"hello"
   ch2<-Stu{Name:"rain",Age:22}
   fmt.Println(<-ch2)
   fmt.Println(<-ch2)

   fmt.Println(<-ch2)
}

 

打印结果

100

hello

{rain 22}

结构体时,注意

//结构体取出,需注意需要断言
s:=<-ch2
//fmt.Println(s.Name) //报错
fmt.Println(reflect.TypeOf(s)) //main.Stu
fmt.Println(s.(Stu).Name)  //rain

用的是空接口,编译器执行前不知道类型。

编译阶段查看。编译时没有取出来,不知道类型

执行阶段才知道类型

结构体时,取元素需断言下类型

package main

import "fmt"

type Stu struct {
   Name string
   Age int
}

func main() {
   var ch2=make(chan interface{},3)
   ch2<-100
   ch2<-"hello"
   ch2<-Stu{Name:"rain",Age:22}
   fmt.Println(<-ch2)
   fmt.Println(<-ch2)
   //fmt.Println(<-ch2)
   s:=<-ch2
   fmt.Println(s.(Stu).Name)
}

案例3

package main

import "fmt"

func main() {
   ch3 :=make(chan int,3)
   x:=10
   ch3<-x //值拷贝
   x=20
   fmt.Println(<-ch3)
}

案例4

package main

import "fmt"

func main() {
   ch3 :=make(chan int,3)
   x:=10
   ch3<-x //值拷贝
   x=20
   fmt.Println(<-ch3)

   ch4:=make(chan *int,3)
   y:=20
   ch4<-&y
   y=30
   p:=<-ch4
   fmt.Println(*p) //30
}

下午09 channel的存储结构

type hchan struct {
   qcount   uint           // total data in the queue
   dataqsiz uint           // size of the circular queue
   buf      unsafe.Pointer // points to an array of dataqsiz elements
   elemsize uint16
   closed   uint32
   elemtype *_type // element type
   sendx    uint   // send index
   recvx    uint   // receive index
   recvq    waitq  // list of recv waiters
   sendq    waitq  // list of send waiters
   lock mutex
}

qcount 长度 len()

buf    环形队列指针

closed  可以关上

sendx

recvx 读相关

recvq

sendq 等待写消息的队列

管道满了 没有读的 dedlock

管道是空了,没有再往里写了 dedlock

lock mutex 互斥锁

3容量 dataqsiz3

0长度 qcount

图解

最开始环形队列全是0

channel引用类型, 引用了环形队列的指针

import "fmt"

func main() {
   var ch5 = make(chan int, 3)
   var ch6 = ch5
   ch5<-100
   ch5<-200
   ch6<-300
   fmt.Println(<-ch5)
   fmt.Println(<-ch6)
   fmt.Println(<-ch5)
}

打印结果

100

200

300

拷贝的地址

package main

import "fmt"

func foo(c chan int){
   c<-50
}

func main() {

   //引用类型
   var ch5 = make(chan int, 3)
   var ch6 = ch5
   ch5<-100
   ch5<-200
   fmt.Println(<-ch6)
   fmt.Println(<-ch5)

   var ch7=make(chan int,3) //makechan返回结构体地址
   foo(ch7)
   fmt.Println(":::",<-ch7)
}

打印结果

100

200

::: 50

下午10 channel的循环与close

channel的循环与close

package main

func main() {
   ch3:=make(chan int,10)
   ch3<-1
   ch3<-2
   ch3<-3
   close(ch3) //ch3只能读不能写
   ch3<-4
}
panic: send on closed channel

已经关了,就不能写入了,可以继续读

package main

import "fmt"

func main() {
   ch3:=make(chan int,10)
   ch3<-1
   ch3<-2
   ch3<-3
   close(ch3) //ch3只能读不能写
   //ch3<-4
   fmt.Println(<-ch3)
}

循环

package main

import "fmt"

func main() {
   ch4:=make(chan int,10)
   ch4<-1
   ch4<-2
   ch4<-3
   ch4<-4
   ch4<-5
   fmt.Println(len(ch4),cap(ch4)) //5 10
}

注意len(ch4)会动态调整

package main

import "fmt"

func main() {
   ch4:=make(chan int,10)
   ch4<-1
   ch4<-2
   ch4<-3
   ch4<-4
   ch4<-5
   fmt.Println(":::",len(ch4),cap(ch4)) //5 10
   for i:=0;i      fmt.Println(<-ch4)
   }
}

打印结果不全,没有4 5

::: 5 10

1

2

3

修改

package main

import "fmt"

func main() {
   ch4:=make(chan int,10)
   ch4<-1
   ch4<-2
   ch4<-3
   ch4<-4
   ch4<-5
   fmt.Println(":::",len(ch4),cap(ch4)) //5 10
   ret:=len(ch4)
   for i:=0;i      fmt.Println(<-ch4)
   }
}

::: 5 10

1

2

3

4

5

range循环

存在deadlock

package main

import "fmt"

func main() {
   ch3:=make(chan int,10)
   ch3<-1
   ch3<-2
   ch3<-3
   close(ch3) //ch3只能读不能写
   //ch3<-4
   fmt.Println(<-ch3)

   ch4:=make(chan int,10)
   ch4<-1
   ch4<-2
   ch4<-3
   ch4<-4
   ch4<-5
   for i:=range ch4{
      fmt.Println(i,len(ch4))
   }
}

打印结果

1 4

2 3

3 2

4 1

5 0

fatal error: all goroutines are asleep - deadlock!

添加一个写协程

package main

import (
   "fmt"
   "time"
)

func main() {
   ch4:=make(chan int,10)
   ch4<-1
   ch4<-2
   ch4<-3
   ch4<-4
   ch4<-5
   go func(){
      time.Sleep(time.Second*10)
      ch4<-600
   }()
   for i:=range ch4{
      fmt.Println(i,len(ch4))
   }
}

如何不让报错

取完 close

package main

import (
   "fmt"
)

func main() {
   ch4:=make(chan int,10)
   ch4<-1
   ch4<-2
   ch4<-3
   ch4<-4
   ch4<-5
   close(ch4)
   for i:=range ch4{
      fmt.Println(i,len(ch4))
   }
}

打印结果

1 4

2 3

3 2

4 1

5 0

使用close

根据len(ch4)的长度判断,是否要close

package main

import (
"fmt"
)

func main() {
   // 循环channel
   ch4 := make(chan int, 10)
   ch4 <- 100
   ch4 <- 200
   ch4 <- 300
   ch4 <- 400
   ch4 <- 500

   // fmt.Println(len(ch4), cap(ch4))
   /*var ret = len(ch4)
   for i := 0; i < ret; i++ {
      fmt.Println("OK")
      fmt.Println(<-ch4)
   }*/

   /*go func() {
      time.Sleep(time.Second * 10)
      ch4 <- 600
   }()*/

   // close(ch4)
   for i := range ch4 {
      fmt.Println(i, len(ch4))
      if len(ch4) == 0 {
         break
      }
   }

}

下午11 无缓冲的channel

生产者消费者模型

1 结构

2 异步

耦合性降低:不要直接联系

顾客和厨师不要产生直接的联系,通过服务员

案例

两个协程

生产者

func producer(ch chan int){
   defer wg.Done()
   for i:=1;i<11;i++{
      ch<-i
      fmt.Println("插入值",i)
   }
}

消费者

func consumer(ch chan int){
   defer wg.Done()
   for i:=1;i<11;i++{
      time.Sleep(time.Second)
      fmt.Println("取出值",<-ch)
   }
}

主线程等待,

1场景 生产能力大于消费能力

2 场景 消费能力大于生产消费能力

没有出现死锁,因为插入和取出刚好

package main

import (
   "fmt"
   "sync"
   "time"
)

func producer(ch chan int){
   defer wg.Done()
   for i:=1;i<11;i++{
      ch<-i
      fmt.Println("插入值",i)
   }
}

func consumer(ch chan int){
   defer wg.Done()
   for i:=1;i<11;i++{
      time.Sleep(time.Second)
      fmt.Println("取出值",<-ch)
   }
}

var wg sync.WaitGroup
func main() {
   ch:=make(chan int,100)

   wg.Add(2)
   go producer(ch)
   go consumer(ch)
   wg.Wait()
}

打印结果

插入值 1

插入值 2

插入值 3

插入值 4

插入值 5

插入值 6

插入值 7

插入值 8

插入值 9

插入值 10

取出值 1

取出值 2

取出值 3

取出值 4

取出值 5

取出值 6

取出值 7

取出值 8

取出值 9

取出值 10

打印结果

无缓冲channel

案例

直接deadlock,没有协程接收

waitgroup

案例 修改为无缓冲的

下午12 单向通道

var 通道实例 chan<- 元素类型 //只能写入数据的通道
var 通道实例 <-chan 元素类型 //只能读取数据的通道

分别实验无缓冲区管道、缓冲区为5、 缓冲区为10的管道

package main

import (
   "fmt"
   "sync"
   "time"
)

func producer (ch chan int) {
   defer wg.Done()
   for i:=1;i<11;i++{
      ch<-i
      fmt.Println("插入值",i)
   }
}

func consumer(ch chan int) {
   wg.Done()
   for i:=1;i<11;i++{
      time.Sleep(time.Second)
      fmt.Println("取出值",<-ch)
   }
}

var wg sync.WaitGroup
func main() {
   //ch:=make(chan int)
   //ch:=make(chan int,5)
   ch:=make(chan int,10)
   wg.Add(2)
   go producer(ch)
   go consumer(ch)
   wg.Wait()
}

13 select语句

golang中的select语句格式如下

select {
case <-ch1:
   //如果从ch1信道成功接收数据,则执行该分支代码
case ch2<-1:
   //如果成功向ch2信道成功发送数据,则执行该分支代码
default:
   //如果上面都没有成功,则进入default分支处理流程
}

switch 值比对

select 面向channel的值操作

示例 select

所有管道都有值时,select 随机读取

package main

import "fmt"

func main() {
   size := 10

   ch1 := make(chan int, size+1)
   for i := 0; i < size; i++ {
      if i%2 == 0 {
         ch1 <- i
      }
   }

   ch2 := make(chan int, size+1)
   for i := 0; i < size; i++ {
      if i%2 == 1 {
         ch2 <- i
      }
   }

   //select监听
   select {
   case v1 := <-ch1:
      fmt.Println("v1", v1)
   case v2 := <-ch2:
      fmt.Println("v2", v2)
   default:
      fmt.Println("所有chan真正该发生阻塞,等待数据!")
   }
}

for循环

package main

import (
   "fmt"
   "time"
)

func main() {
   size := 10

   ch1 := make(chan int, size+1)
   for i := 0; i < size; i++ {
      if i%2 == 0 {
         ch1 <- i
      }
   }

   ch2 := make(chan int, size+1)
   for i := 0; i < size; i++ {
      if i%2 == 1 {
         ch2 <- i
      }
   }

   //select监听
   flag:=true
   for flag {
      time.Sleep(time.Second)
      select {
      case v1 := <-ch1:
         fmt.Println("v1", v1)
      case v2 := <-ch2:
         fmt.Println("v2", v2)
      default:
         fmt.Println("所有chan真正发生阻塞,等待数据!")
         flag=false
      }
   }
}

优点:io多路复用

epol

select 模型优点

监听多个文件描述符,多个管道

高效利用cpu

超时使用 time.After

package main

import (
   "fmt"
   "time"
)

func main() {
   ch:=make(chan int)
   go func(c chan int) {
      //修改时间后,再查看执行结果
      time.Sleep(time.Second*3) //当把3改为1时,停1s后select监听到执行此语句
      ch<-1
   }(ch)

   select{
   case v:=<-ch:
      fmt.Println(v)
   case <-time.After(2*time.Second): //等待2s
      fmt.Println("no case ok")
   }
}

14 小案例

思路

1 net模块 建立三次握手

2 服务端要把消息 1分为3,发给三个客户端

聊天室案例

客户端

dial 连接服务端

协程1 写 监听终端输入行,一旦有数据输入,将数据发送给服务端

协程2 读 监听自己的conn.Read ,一旦有数据,服务端回消息了,将内容打印到终端控制台

服务端

1)建立服务,确定ip地址和端口

2)获取conn:= listen.Accept() 客户端套接对象

保存 (客户端套接对象)到map对象中 ip为键net.Conn为值

保存该connmap对象中var onlineClients=make(map[string]net.Conn)

3)协程:创建一个广播的chan,一旦有数据写入,将该数据发送给所有的客户端。

4)针对每一个客户端创建一个监听数据写入(conn.Read)

消息内容

然后再json序列化

type Msg struct{

user string

content string

}

代码部分

01 拿ip和端口

服务端

package main

import (
   "fmt"
   "net"
)

var onlineClints = make(map[string]net.Conn)

func main() {
   fmt.Println("聊天室服务端启动了...")
   //创建一个监听器
   listener,err:=net.Listen("tcp","127.0.0.1:7000")
   if err != nil {
      fmt.Println(err)
      return
   }
}

客户端

package main

import (
   "fmt"
   "net"
)

func main() {
   //1.连接服务端
   conn,err:=net.Dial("tcp","127.0.0.1:7000")
   if err != nil {
      fmt.Println(err)
      return
   }
   defer conn.Close()
}

02服务端

package main

import (
   "fmt"
   "net"
)

var onlineClints = make(map[string]net.Conn)

func main() {
   fmt.Println("聊天室服务端启动了...")
   //创建一个监听器
   listener,err:=net.Listen("tcp","127.0.0.1:7000")
   if err != nil {
      fmt.Println(err)
      return
   }

   conn,_:=listener.Accept()
   ip:=conn.RemoteAddr().String()
   onlineClints[ip]=conn
}

启动广播协程

注意 chan放的消息是结构体,不是字符串

package main

import (
   "fmt"
   "net"
)

type Msg struct {
   publisher string
   Content string
}

var onlineClints = make(map[string]net.Conn)
var broadcaseChan=make(chan Msg)

启动协程

    conn,_:=listener.Accept()
   ip:=conn.RemoteAddr().String()
   onlineClints[ip]=conn
   //启动一个广播的协程
   go broadcaseChan()
}

监听chan

发送消息,怎么发?

循环

package main

import (
   "encoding/json"
   "fmt"
   "net"
)

type Msg struct {
   publisher string
   Content string
}

var onlineClints = make(map[string]net.Conn)
var broadcaseChan=make(chan Msg)

func broadcase(){
   for true {
      msg:=<-broadcaseChan
      //将该消息发送给所有的客户端
      for _,conn:=range onlineClints {
         //将消息进行序列化,再转换成字节传播发送
         msgBytes,_:=json.Marshal(msg)
         conn.Write(msgBytes)
      }
   }
}

func main() {
   fmt.Println("聊天室服务端启动了...")
   //创建一个监听器
   listener,err:=net.Listen("tcp","127.0.0.1:7000")
   if err != nil {
      fmt.Println(err)
      return
   }

   conn,_:=listener.Accept()
   ip:=conn.RemoteAddr().String()
   onlineClints[ip]=conn
   //启动一个广播的协程
   go broadcase()
}

04 监听

package main

import (
   "encoding/json"
   "fmt"
   "net"
)

type Msg struct {
   publisher string
   Content string
}

var onlineClints = make(map[string]net.Conn)
var broadcaseChan=make(chan Msg)

func broadcase(){
   for true {
      msg:=<-broadcaseChan
      //将该消息发送给所有的客户端
      for _,conn:=range onlineClints {
         //将消息进行序列化,再转换成字节传播发送
         msgBytes,_:=json.Marshal(msg)
         conn.Write(msgBytes)
      }
   }
}

func main() {
   fmt.Println("聊天室服务端启动了...")
   //创建一个监听器
   listener,err:=net.Listen("tcp","127.0.0.1:7000")
   if err != nil {
      fmt.Println(err)
      return //结束主协程
   }


   //启动一个广播的协程
   go broadcase()
   //接收一个客户端
   conn,_:=listener.Accept()
   ip:=conn.RemoteAddr().String()
   onlineClints[ip]=conn
   //给conn创建一个监听协程:监听conn.Read()
   connRead()
}

修改变更==

消息怎么来 还怎么返回

package main

import "net"

var onlineClientM=make(map[string]net.Conn)

var broadcastChan = make(chan []byte)

//广播站功能
func broadcast(){
   for true {
      msg:=<-broadcastChan
      //将该消息发送给所有的客户端
      for _,conn:=range onlineClientM{
         conn.Write(msg)
      }
   }
}

func connRead(conn net.Conn){
   for true {
      data:=make([]byte,1024)
      n,_:=conn.Read(data)
      if n==0{
         break
      }
      //将该字节数据写入到广播站
      broadcastChan<-data[:n]
   }
}


func main() {
   fmt.Println("server is waiting...")
   listener,err:=net.Listen("tcp","127.0.0.1:7777")
   if err != nil {
      fmt.Println("net.listen err",err)
      return //结束主协程
   }
   //启动一个广播的协程
   go broadcast()
   //接收一个客户端
   conn,_:=listener.Accept()
   ip:=conn.RemoteAddr().String()
   onlineClientM[ip]=conn
   //给conn创建一个监听协程,监听conn.Read()
   go connRead(conn)
}

增加for循环

for循环内  1注册2开协程

func main() {
   fmt.Println("server is waiting...")
   // 创建一个监听器
   listener,err:=net.Listen("tcp","127.0.0.1:7777")
   if err != nil {
      fmt.Println("net.listen err",err)
      return //结束主协程
   }
   //启动一个广播协程
   go broadcast()
   //接收一个客户端
   for true {
      conn,_:=listener.Accept()
      ip:=conn.RemoteAddr().String()
      onlineClientM[ip]=conn
      //给conn创建一个监听协程,监听conn.Read()
      go connRead(conn)
   }
}

客户端

package main

import (
   "bufio"
   "fmt"
   "net"
   "os"
)

func read(conn net.Conn) {
   for true{
      res:=make([]byte,1024)
      n,_:=conn.Read(res)
      fmt.Print(string(res[:n]))
   }
}

func write(conn net.Conn){
   for true {
      fmt.Println(">>>")
      reader:=bufio.NewReader(os.Stdin)  //从标准输入生成读对象
      content,_:=reader.ReadBytes('\n') //读到换行
      //发送数据
      conn.Write(content)
   }
}

func main() {
   //1 连接服务器
   conn,_:=net.Dial("tcp","127.0.0.1:7777")
   fmt.Println("conn",conn)
   for true {
      go write(conn)
      read(conn)
   }
}

完成后改进,新增发消息人

package main

import (
   "fmt"
   "net"
)

var onlineClientM=make(map[string]net.Conn)

var broadcastChan = make(chan []byte)

//广播站功能
func broadcast(){
   for true {
      msg:=<-broadcastChan
      //将该消息发送给所有的客户端
      for _,conn:=range onlineClientM{
         conn.Write(msg)
      }
   }
}

func connRead(conn net.Conn){
   for true {
      data:=make([]byte,1024)
      n,_:=conn.Read(data)
      if n==0{
         break
      }
      //将该字节数据写入到广播站
      newData:=conn.RemoteAddr().String()+" "+string(data[:n])
      broadcastChan<-[]byte(newData)
   }
}

func main() {
   fmt.Println("server is waiting...")
   // 创建一个监听器
   listener,err:=net.Listen("tcp","127.0.0.1:7777")
   if err != nil {
      fmt.Println("net.listen err",err)
      return //结束主协程
   }
   //启动一个广播协程
   go broadcast()
   //接收一个客户端
   for true {
      conn,_:=listener.Accept()
      fmt.Println("conn",conn)
      ip:=conn.RemoteAddr().String()
      onlineClientM[ip]=conn
      //给conn创建一个监听协程,监听conn.Read()
      go connRead(conn)
   }
}

做退出处理

服务端最终代码

package main

import (
   "fmt"
   "net"
)

var onlineClientM=make(map[string]net.Conn)

var broadcastChan = make(chan []byte)

//广播站功能
func broadcast(){
   for true {
      msg:=<-broadcastChan
      //将该消息发送给所有的客户端
      for _,conn:=range onlineClientM{
         conn.Write(msg)
      }
   }
}

func connRead(conn net.Conn){
   for true {
      data:=make([]byte,1024)
      n,_:=conn.Read(data)
      if n==0{
         delete(onlineClientM,conn.RemoteAddr().String())
         data=[]byte(conn.RemoteAddr().String()+"已下线!")
         broadcastChan<-data
         return
      }
      //将该字节数据写入到广播站
      newData:=conn.RemoteAddr().String()+" "+string(data[:n])
      broadcastChan<-[]byte(newData)
   }
}

func main() {
   fmt.Println("server is waiting...")
   // 创建一个监听器
   listener,err:=net.Listen("tcp","127.0.0.1:7777")
   if err != nil {
      fmt.Println("net.listen err",err)
      return //结束主协程
   }
   //启动一个广播协程
   go broadcast()
   //接收一个客户端
   for true {
      conn,_:=listener.Accept()
      fmt.Println("conn",conn)
      ip:=conn.RemoteAddr().String()
      onlineClientM[ip]=conn
      //给conn创建一个监听协程,监听conn.Read()
      go connRead(conn)
   }
}

相关