Day08 并发编程
并发编程
01并发的概念
C/S、B/S 网络开发 文件上传案例 今日重点:并发编程 协程 channel管道、并发案例
并发和并行
并发:在同一时间段,处理多个任务的能力 并行:在同一时刻,处理多个任务的能力 进程 线程 协程(重点) 早期计算机:一个程序占用所有资源
fmt.Scan Listen.Accept 阻塞时,cpu是否在工作?cpu不在工作
示例1
做累加时,随着计算的数据越来越多,cpu占用时间越来越长
package main import "fmt" func main() { var ret=0 for i:=0;i<=100;i++{ ret+=i } fmt.Println(ret) }
早期计算机问题
一个计算机无法同时执行多个程序 1 两个程序无法同时运行。 2 CPU浪费 抢占式分配资源 1标准:遇到io则切换 如果程序1,一直没有遇到io时(一直占用CPU)。又变成单任务了 2 引入时间片: 没有遇到io,到10ms,切换给其他程序用10ms
图示
同一个时间段内,多个程序在执行
放歌的时候,cpu切换,是否能感受的到? CPU足够快,正常情况下是感受不到。 卡顿是 任务足够多的时候,cpu切换,直观感受
多核,出现了并行
计算密集型:长时间占着CPU
IO密集型:聊天阻塞的案例。阻塞的时候,不需要占用CPU
开发中大部分是IO密集型的任务
切换的标准:
1 遇到IO,立刻切换 2 达到时间片,切换
02 进程和线程的概念
进程概念
我们都知道计算机的核心是CPU,它承担了所有的计算任务;
操作系统是计算机的管理者,它负责任务的调度、资源的分配和管理,统领整个计算机硬件;应用程序则是具有某种功能的程序,程序是运行于操作系统之上的。
进程是一种抽象的概念,从来没有统一的标准定义。进程一般由程序、数据集合和进程控制块三部分组成。
程序用于描述进程要完成的功能,是控制进程执行的指令集;
数据集合是程序在执行时所需要的数据和工作区;
程序控制块(Program Control Block,简称PCB),包含进程的描述信息和控制信息,是进程存在的唯一标志。
进程特性
动态性:进程是程序的一次执行过程,是临时的,有生命期的,是动态产生,动态消亡的; 并发性:任何进程都可以同其他进程一起并发执行; 独立性:进程是系统进行资源分配和调度的一个独立单位; 结构性:进程由程序、数据和进程控制块三部分组成。
产生线程和协程的 核心2点
1 空间占有率 2 时间切换的开销大
开发的app 一个软件运行多个任务时。 3个进程
切换的时间开销大 线程:真正执行任务的不再是进程了。开销小 线程是执行单元 进程是:管理单元 1 占用资源小了 2 切换的时间开销小了
理解
第1种场景 三个人 开三个教室,执行完告诉下一个人。 开销大,占3个教室,告诉下一个 时间大 第2种场景: 在一个屋子,3个人用。 开销小了,交流方便了。
线程的概念
线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元,是处理器调度和分派的基本单位。
1问题: 1个进程可以没有线程吗?不可以 1个进程最少要有1个线程,主线程 2问题: 线程有没有可能脱离进程独立存在?不可以 需要进程管理 任务调度
任务执行的那一小段时间叫做时间片,任务正在执行时的状态叫运行状态,被暂停的线程任务状态叫做就绪状态,意为等待下一个属于它的时间片的到来。
进程和线程的区别
1 线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位; 2 一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线; 3 进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间(包括代码段、数据集、堆等)及一些进程级的资源(如打开文件和信号),某进程内的线程在其它进程不可见; 4 调度和切换:线程上下文切换比进程上下文切换要快得多。
两个进程中,每个进程可以有多个线程。
线程的生命周期
进程有五种状态,创建、就绪、运行、阻塞(等待)、退出
###未整理###
03 协程的概念
基于线程之上
线程叫:微进程
协程叫:微线程
难点:在操作系统中,感受不到
1、线程空间开销还是大
2、线程时间占用大
线程和线程之前抢占,切换(状态保留:耗时间)
协程:用户级线程
1个线程对应一个任务(1个函数)
在用户级,a b c 三个函数,放在一个线程上。
协程调度器
串行执行
调度器在充当操作系统的角色,在协调执行
协程调度器(程序代码),从用户的级别上形成并发执行。对操作系统而言,一个线程在执行。此时,操作系统不知道切换
1调度器在协调,不用抢占
1开销小,在一个线程上,不用再开空间
进程 几十个
线程 上千个
协程 几十万个
真正执行 go a() 是协程调度器在切换,不是内核的线程在切换
联合使用 线程+协程。在用户级的线程(协程),再扩展。有资源就使用
6个协程(6个函数任务) 内核级的线程:2个线程
04 协程案例
协程的使用
示例1
package main
import (
"fmt"
"time"
)
func foo(){
fmt.Println("foo开始")
time.Sleep(time.Second*3)
fmt.Println("foo结束")
}
func bar() {
fmt.Println("bar开始")
time.Sleep(time.Second*5)
fmt.Println("bar结束")
}
func main() {
start:=time.Now().Unix()
foo()
bar()
end:=time.Now().Unix()
fmt.Println(end-start)
}
使用go
没有启动之前,
启动1个协程
func main() {
start:=time.Now().Unix()
go foo()
bar()
end:=time.Now().Unix()
fmt.Println(end-start)
}
打印结果
bar开始
foo开始
foo结束
bar结束
5
图解
启动新的协程,开始执行foo()代码,是调度器利用多核线程执行
是否开了2cpu执行?调度器决定,是否开多核。
数量少
开始 几乎同时出现
bar开始
foo开始
foo结束
bar结束
5
再开一个协程时,主线程不等子线程执行完,主线程仍在执行
主线程停止,子线程也就停止了
锁 同步等待锁
Sync.WiteGroup
package main
import (
"fmt"
"sync"
"time"
)
func foo(){
defer wg.Done() //计数减一
fmt.Println("foo开始")
time.Sleep(time.Second*3)
fmt.Println("foo结束")
}
func bar() {
defer wg.Done() //计数减一
fmt.Println("bar开始")
time.Sleep(time.Second*5)
fmt.Println("bar结束")
}
var wg sync.WaitGroup
func main() {
start:=time.Now().Unix()
wg.Add(2)
go foo() //启动一个协程(用户级线程)
go bar()
wg.Wait() //主线程等待 wg计数清零,继续运行
end:=time.Now().Unix()
fmt.Println(end-start)
}
05 聊天室案例并发版本
聊天代码问题,只能处理一个用户
server端的问题,陷入死循环中,加入go
package main
import (
"fmt"
"net"
"strings"
)
func handleClient(conn net.Conn){
for true {
//(3)接受客户端的数据
data:=make([]byte,1024)
n,_:=conn.Read(data) //默认阻塞
fmt.Println("n",n)
fmt.Println("接收客户端信息",string(data))
//客户端强制退出
if n==0 {
break
}
//(4)给客户端回复一个消息
res:=strings.ToUpper(string(data[:n]))
n2,_:=conn.Write([]byte(res))
fmt.Println("n",n2)
}
}
//服务端
func main() {
//(1)确定网络三要素,建立服务
listen,_:=net.Listen("tcp","127.0.0.1:8000")
fmt.Println("listen",listen)
for true {
//(2)等待用户连接,没有连接的客户端保持阻塞
fmt.Println("server is waiting...")
conn,_:=listen.Accept()
fmt.Println("conn",conn)
go handleClient(conn)
}
}
图解:客户端连接,就开一个协程
图解:客户端发消息
server端监听着客户的连接
conn 负责各自的会话
思考?此时是并行和并发?具体并行和并发主要由调度器决定
下午 06 互斥锁
上午回顾: 进程 线程 协程 及案例应用
锁的概念
并发时,数据之间产生干扰
互斥锁
调用1次。add() 程序实现+1
package main
import "fmt"
var x=0
func add(){
x++
}
func main() {
for i:=0;i<1000;i++ {
//调用1次add()程序实现+1
add()
}
fmt.Println(x)
}
添加同步等待锁
100个执行完,打印
package main
import (
"fmt"
"sync"
)
var x=0
func add(){
defer wg.Done()
x++
}
var wg sync.WaitGroup
func main() {
for i:=0;i<1000;i++ {
//调用1次add()程序实现+1
wg.Add(1)
go add()
}
wg.Wait()
fmt.Println(x)
}
在for循环+1,在for循坏外边一样
package main
import (
"fmt"
"sync"
)
var x=0
func add(){
defer wg.Done()
x++
}
var wg sync.WaitGroup
func main() {
wg.Add(100)
for i:=0;i<100;i++ {
go add()
}
wg.Wait()
fmt.Println(x)
}
执行1000次,结果会发生变化
数据不安全
package main
import (
"fmt"
"sync"
)
var x=0
func add(){
defer wg.Done()
x++
}
var wg sync.WaitGroup
func main() {
wg.Add(1000)
for i:=0;i<1000;i++ {
go add()
}
wg.Wait()
fmt.Println(x)
}
并发时:开辟1000个协程。都做x自加1操作。
cpu执行过程中
x=x+1 做两件事: 第1个是x+1 第2个是赋值给x
当还没完成赋值时,占用cpu时间片切走了。
当时间片再切回到线程上时,再复制x=4
增加互斥锁,解决
把涉及到数据的时候,做同步化操作
package main
import (
"fmt"
"sync"
)
var x=0
func add(){
defer wg.Done()
lock.Lock()
x++
lock.Unlock()
}
var wg sync.WaitGroup
var lock sync.Mutex
func main() {
wg.Add(1000)
for i:=0;i<1000;i++ {
go add()
}
wg.Wait()
fmt.Println(x)
}
下午07 channel的使用
作用
前面例子:协程和协程之前不通信。Golang并发的核心哲学是不要通过共享内存进行通信
Go语言中的通道(channel)是一种特殊的类型。在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。goroutine 间通过通道就可以通信。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。
channel
不是通过共享内存进行通信,
channel是个 数据类型
解决协程和协程直接通信使用
声明
var 通道变量 chan 通道类型
引用类型(没有默认值)
很像切片,是个队列
func main() {
//var ch chan int //引用类型,没有默认值
var ch = make(chan int,3)
}
案例1
channel有先进先出原则
写入值
package main
import "fmt"
func main() {
// var ch chan int //引用类型,没有默认值
var ch = make(chan int,3)
// 向管道写入值
ch<-100
ch<-200
ch<-300
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
}
打印结果
100
200
300
发生值拷贝
package main
import "fmt"
func main() {
//var ch chan int
ch:=make(chan int,3)
ch<-100
ch<-200
ch<-300
x:=<-ch
fmt.Println("x",x) //x 100
fmt.Println(<-ch) //200
fmt.Println(<-ch) //300
}
容量为3 ,写入第4个值,死锁
package main
import "fmt"
func main() {
//var ch chan int
ch:=make(chan int,3)
ch<-100
ch<-200
ch<-300
ch<-400
x:=<-ch
fmt.Println("x",x) //x 100
fmt.Println(<-ch) //200
fmt.Println(<-ch) //300
//fmt.Println(<-ch)
}
容量为3多写一次发生死锁
fatal error: all goroutines are asleep - deadlock!
多取1次,也死锁
package main
import "fmt"
func main() {
//var ch chan int
ch:=make(chan int,3)
ch<-100
ch<-200
ch<-300
//ch<-400
x:=<-ch
fmt.Println("x",x) //x 100
fmt.Println(<-ch) //200
fmt.Println(<-ch) //300
fmt.Println(<-ch) //fatal error: all goroutines are asleep - deadlock!
}
需要同时有存值和取值。写入ch<-400时,需要同时有取值阻塞。
首先得有俩协程。
1 写满,再写入时,没有协程读取出来,就会报死锁。
2 读取时,没有协程写入,又没有可读数据,报死锁。
下午08 channel的应用
案例2
package main
import (
"fmt"
"reflect"
)
type Stu struct {
Name string
Age int
}
func main() {
var ch2=make(chan interface{},3)
ch2<-100
ch2<-"hello"
ch2<-Stu{Name:"rain",Age:22}
fmt.Println(<-ch2)
fmt.Println(<-ch2)
fmt.Println(<-ch2)
}
打印结果
100
hello
{rain 22}
结构体时,注意
//结构体取出,需注意需要断言
s:=<-ch2
//fmt.Println(s.Name) //报错
fmt.Println(reflect.TypeOf(s)) //main.Stu
fmt.Println(s.(Stu).Name) //rain
用的是空接口,编译器执行前不知道类型。
编译阶段查看。编译时没有取出来,不知道类型
执行阶段才知道类型
结构体时,取元素需断言下类型
package main
import "fmt"
type Stu struct {
Name string
Age int
}
func main() {
var ch2=make(chan interface{},3)
ch2<-100
ch2<-"hello"
ch2<-Stu{Name:"rain",Age:22}
fmt.Println(<-ch2)
fmt.Println(<-ch2)
//fmt.Println(<-ch2)
s:=<-ch2
fmt.Println(s.(Stu).Name)
}
案例3
package main
import "fmt"
func main() {
ch3 :=make(chan int,3)
x:=10
ch3<-x //值拷贝
x=20
fmt.Println(<-ch3)
}
案例4
package main
import "fmt"
func main() {
ch3 :=make(chan int,3)
x:=10
ch3<-x //值拷贝
x=20
fmt.Println(<-ch3)
ch4:=make(chan *int,3)
y:=20
ch4<-&y
y=30
p:=<-ch4
fmt.Println(*p) //30
}
下午09 channel的存储结构
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
qcount 长度 len()
buf 环形队列指针
closed 可以关上
sendx 写
recvx 读相关
recvq
sendq 等待写消息的队列
管道满了 没有读的 dedlock
管道是空了,没有再往里写了 dedlock
lock mutex 互斥锁
3容量 dataqsiz3
0长度 qcount
图解
最开始环形队列全是0
channel引用类型, 引用了环形队列的指针
import "fmt"
func main() {
var ch5 = make(chan int, 3)
var ch6 = ch5
ch5<-100
ch5<-200
ch6<-300
fmt.Println(<-ch5)
fmt.Println(<-ch6)
fmt.Println(<-ch5)
}
打印结果
100
200
300
拷贝的地址
package main
import "fmt"
func foo(c chan int){
c<-50
}
func main() {
//引用类型
var ch5 = make(chan int, 3)
var ch6 = ch5
ch5<-100
ch5<-200
fmt.Println(<-ch6)
fmt.Println(<-ch5)
var ch7=make(chan int,3) //makechan返回结构体地址
foo(ch7)
fmt.Println(":::",<-ch7)
}
打印结果
100
200
::: 50
下午10 channel的循环与close
channel的循环与close
package main
func main() {
ch3:=make(chan int,10)
ch3<-1
ch3<-2
ch3<-3
close(ch3) //ch3只能读不能写
ch3<-4
}
panic: send on closed channel
已经关了,就不能写入了,可以继续读
package main
import "fmt"
func main() {
ch3:=make(chan int,10)
ch3<-1
ch3<-2
ch3<-3
close(ch3) //ch3只能读不能写
//ch3<-4
fmt.Println(<-ch3)
}
循环
package main
import "fmt"
func main() {
ch4:=make(chan int,10)
ch4<-1
ch4<-2
ch4<-3
ch4<-4
ch4<-5
fmt.Println(len(ch4),cap(ch4)) //5 10
}
注意len(ch4)会动态调整
package main
import "fmt"
func main() {
ch4:=make(chan int,10)
ch4<-1
ch4<-2
ch4<-3
ch4<-4
ch4<-5
fmt.Println(":::",len(ch4),cap(ch4)) //5 10
for i:=0;i
}
}
打印结果不全,没有4 5
::: 5 10
1
2
3
修改
package main
import "fmt"
func main() {
ch4:=make(chan int,10)
ch4<-1
ch4<-2
ch4<-3
ch4<-4
ch4<-5
fmt.Println(":::",len(ch4),cap(ch4)) //5 10
ret:=len(ch4)
for i:=0;i
}
}
::: 5 10
1
2
3
4
5
range循环
存在deadlock
package main
import "fmt"
func main() {
ch3:=make(chan int,10)
ch3<-1
ch3<-2
ch3<-3
close(ch3) //ch3只能读不能写
//ch3<-4
fmt.Println(<-ch3)
ch4:=make(chan int,10)
ch4<-1
ch4<-2
ch4<-3
ch4<-4
ch4<-5
for i:=range ch4{
fmt.Println(i,len(ch4))
}
}
打印结果
1 4
2 3
3 2
4 1
5 0
fatal error: all goroutines are asleep - deadlock!
添加一个写协程
package main
import (
"fmt"
"time"
)
func main() {
ch4:=make(chan int,10)
ch4<-1
ch4<-2
ch4<-3
ch4<-4
ch4<-5
go func(){
time.Sleep(time.Second*10)
ch4<-600
}()
for i:=range ch4{
fmt.Println(i,len(ch4))
}
}
如何不让报错
取完 close
package main
import (
"fmt"
)
func main() {
ch4:=make(chan int,10)
ch4<-1
ch4<-2
ch4<-3
ch4<-4
ch4<-5
close(ch4)
for i:=range ch4{
fmt.Println(i,len(ch4))
}
}
打印结果
1 4
2 3
3 2
4 1
5 0
使用close
根据len(ch4)的长度判断,是否要close
package main
import (
"fmt"
)
func main() {
// 循环channel
ch4 := make(chan int, 10)
ch4 <- 100
ch4 <- 200
ch4 <- 300
ch4 <- 400
ch4 <- 500
// fmt.Println(len(ch4), cap(ch4))
/*var ret = len(ch4)
for i := 0; i < ret; i++ {
fmt.Println("OK")
fmt.Println(<-ch4)
}*/
/*go func() {
time.Sleep(time.Second * 10)
ch4 <- 600
}()*/
// close(ch4)
for i := range ch4 {
fmt.Println(i, len(ch4))
if len(ch4) == 0 {
break
}
}
}
下午11 无缓冲的channel
生产者消费者模型
1 结构
2 异步
耦合性降低:不要直接联系
顾客和厨师不要产生直接的联系,通过服务员
案例
两个协程
生产者
func producer(ch chan int){
defer wg.Done()
for i:=1;i<11;i++{
ch<-i
fmt.Println("插入值",i)
}
}
消费者
func consumer(ch chan int){
defer wg.Done()
for i:=1;i<11;i++{
time.Sleep(time.Second)
fmt.Println("取出值",<-ch)
}
}
主线程等待,
1场景 生产能力大于消费能力
2 场景 消费能力大于生产消费能力
没有出现死锁,因为插入和取出刚好
package main
import (
"fmt"
"sync"
"time"
)
func producer(ch chan int){
defer wg.Done()
for i:=1;i<11;i++{
ch<-i
fmt.Println("插入值",i)
}
}
func consumer(ch chan int){
defer wg.Done()
for i:=1;i<11;i++{
time.Sleep(time.Second)
fmt.Println("取出值",<-ch)
}
}
var wg sync.WaitGroup
func main() {
ch:=make(chan int,100)
wg.Add(2)
go producer(ch)
go consumer(ch)
wg.Wait()
}
打印结果
插入值 1
插入值 2
插入值 3
插入值 4
插入值 5
插入值 6
插入值 7
插入值 8
插入值 9
插入值 10
取出值 1
取出值 2
取出值 3
取出值 4
取出值 5
取出值 6
取出值 7
取出值 8
取出值 9
取出值 10
打印结果
无缓冲channel
案例
直接deadlock,没有协程接收
waitgroup
案例 修改为无缓冲的
下午12 单向通道
var 通道实例 chan<- 元素类型 //只能写入数据的通道
var 通道实例 <-chan 元素类型 //只能读取数据的通道
分别实验无缓冲区管道、缓冲区为5、 缓冲区为10的管道
package main
import (
"fmt"
"sync"
"time"
)
func producer (ch chan int) {
defer wg.Done()
for i:=1;i<11;i++{
ch<-i
fmt.Println("插入值",i)
}
}
func consumer(ch chan int) {
wg.Done()
for i:=1;i<11;i++{
time.Sleep(time.Second)
fmt.Println("取出值",<-ch)
}
}
var wg sync.WaitGroup
func main() {
//ch:=make(chan int)
//ch:=make(chan int,5)
ch:=make(chan int,10)
wg.Add(2)
go producer(ch)
go consumer(ch)
wg.Wait()
}
13 select语句
golang中的select语句格式如下
select {
case <-ch1:
//如果从ch1信道成功接收数据,则执行该分支代码
case ch2<-1:
//如果成功向ch2信道成功发送数据,则执行该分支代码
default:
//如果上面都没有成功,则进入default分支处理流程
}
switch 值比对
select 面向channel的值操作
示例 select
所有管道都有值时,select 随机读取
package main
import "fmt"
func main() {
size := 10
ch1 := make(chan int, size+1)
for i := 0; i < size; i++ {
if i%2 == 0 {
ch1 <- i
}
}
ch2 := make(chan int, size+1)
for i := 0; i < size; i++ {
if i%2 == 1 {
ch2 <- i
}
}
//select监听
select {
case v1 := <-ch1:
fmt.Println("v1", v1)
case v2 := <-ch2:
fmt.Println("v2", v2)
default:
fmt.Println("所有chan真正该发生阻塞,等待数据!")
}
}
加for循环
package main
import (
"fmt"
"time"
)
func main() {
size := 10
ch1 := make(chan int, size+1)
for i := 0; i < size; i++ {
if i%2 == 0 {
ch1 <- i
}
}
ch2 := make(chan int, size+1)
for i := 0; i < size; i++ {
if i%2 == 1 {
ch2 <- i
}
}
//select监听
flag:=true
for flag {
time.Sleep(time.Second)
select {
case v1 := <-ch1:
fmt.Println("v1", v1)
case v2 := <-ch2:
fmt.Println("v2", v2)
default:
fmt.Println("所有chan真正发生阻塞,等待数据!")
flag=false
}
}
}
优点:io多路复用
epol
select 模型优点
监听多个文件描述符,多个管道
高效利用cpu
超时使用 time.After
package main
import (
"fmt"
"time"
)
func main() {
ch:=make(chan int)
go func(c chan int) {
//修改时间后,再查看执行结果
time.Sleep(time.Second*3) //当把3改为1时,停1s后select监听到执行此语句
ch<-1
}(ch)
select{
case v:=<-ch:
fmt.Println(v)
case <-time.After(2*time.Second): //等待2s
fmt.Println("no case ok")
}
}
14 小案例
思路
1 net模块 建立三次握手
2 服务端要把消息 1分为3,发给三个客户端
聊天室案例
客户端
dial 连接服务端
协程1 写 监听终端输入行,一旦有数据输入,将数据发送给服务端
协程2 读 监听自己的conn.Read ,一旦有数据,服务端回消息了,将内容打印到终端控制台
服务端
(1)建立服务,确定ip地址和端口
(2)获取conn:= listen.Accept() 客户端套接对象
保存 (客户端套接对象)到map对象中 ip为键net.Conn为值
保存该conn到map对象中var onlineClients=make(map[string]net.Conn)
(3)协程:创建一个广播的chan,一旦有数据写入,将该数据发送给所有的客户端。
(4)针对每一个客户端创建一个监听数据写入(conn.Read)
消息内容
然后再json序列化
type Msg struct{
user string
content string
}
代码部分
01 拿ip和端口
服务端
package main
import (
"fmt"
"net"
)
var onlineClints = make(map[string]net.Conn)
func main() {
fmt.Println("聊天室服务端启动了...")
//创建一个监听器
listener,err:=net.Listen("tcp","127.0.0.1:7000")
if err != nil {
fmt.Println(err)
return
}
}
客户端
package main
import (
"fmt"
"net"
)
func main() {
//1.连接服务端
conn,err:=net.Dial("tcp","127.0.0.1:7000")
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()
}
02服务端
package main
import (
"fmt"
"net"
)
var onlineClints = make(map[string]net.Conn)
func main() {
fmt.Println("聊天室服务端启动了...")
//创建一个监听器
listener,err:=net.Listen("tcp","127.0.0.1:7000")
if err != nil {
fmt.Println(err)
return
}
conn,_:=listener.Accept()
ip:=conn.RemoteAddr().String()
onlineClints[ip]=conn
}
启动广播协程
注意 chan放的消息是结构体,不是字符串
package main
import (
"fmt"
"net"
)
type Msg struct {
publisher string
Content string
}
var onlineClints = make(map[string]net.Conn)
var broadcaseChan=make(chan Msg)
启动协程
conn,_:=listener.Accept()
ip:=conn.RemoteAddr().String()
onlineClints[ip]=conn
//启动一个广播的协程
go broadcaseChan()
}
监听chan
发送消息,怎么发?
循环
package main
import (
"encoding/json"
"fmt"
"net"
)
type Msg struct {
publisher string
Content string
}
var onlineClints = make(map[string]net.Conn)
var broadcaseChan=make(chan Msg)
func broadcase(){
for true {
msg:=<-broadcaseChan
//将该消息发送给所有的客户端
for _,conn:=range onlineClints {
//将消息进行序列化,再转换成字节传播发送
msgBytes,_:=json.Marshal(msg)
conn.Write(msgBytes)
}
}
}
func main() {
fmt.Println("聊天室服务端启动了...")
//创建一个监听器
listener,err:=net.Listen("tcp","127.0.0.1:7000")
if err != nil {
fmt.Println(err)
return
}
conn,_:=listener.Accept()
ip:=conn.RemoteAddr().String()
onlineClints[ip]=conn
//启动一个广播的协程
go broadcase()
}
04 监听
package main
import (
"encoding/json"
"fmt"
"net"
)
type Msg struct {
publisher string
Content string
}
var onlineClints = make(map[string]net.Conn)
var broadcaseChan=make(chan Msg)
func broadcase(){
for true {
msg:=<-broadcaseChan
//将该消息发送给所有的客户端
for _,conn:=range onlineClints {
//将消息进行序列化,再转换成字节传播发送
msgBytes,_:=json.Marshal(msg)
conn.Write(msgBytes)
}
}
}
func main() {
fmt.Println("聊天室服务端启动了...")
//创建一个监听器
listener,err:=net.Listen("tcp","127.0.0.1:7000")
if err != nil {
fmt.Println(err)
return //结束主协程
}
//启动一个广播的协程
go broadcase()
//接收一个客户端
conn,_:=listener.Accept()
ip:=conn.RemoteAddr().String()
onlineClints[ip]=conn
//给conn创建一个监听协程:监听conn.Read()
connRead()
}
修改变更==
消息怎么来 还怎么返回
package main
import "net"
var onlineClientM=make(map[string]net.Conn)
var broadcastChan = make(chan []byte)
//广播站功能
func broadcast(){
for true {
msg:=<-broadcastChan
//将该消息发送给所有的客户端
for _,conn:=range onlineClientM{
conn.Write(msg)
}
}
}
func connRead(conn net.Conn){
for true {
data:=make([]byte,1024)
n,_:=conn.Read(data)
if n==0{
break
}
//将该字节数据写入到广播站
broadcastChan<-data[:n]
}
}
func main() {
fmt.Println("server is waiting...")
listener,err:=net.Listen("tcp","127.0.0.1:7777")
if err != nil {
fmt.Println("net.listen err",err)
return //结束主协程
}
//启动一个广播的协程
go broadcast()
//接收一个客户端
conn,_:=listener.Accept()
ip:=conn.RemoteAddr().String()
onlineClientM[ip]=conn
//给conn创建一个监听协程,监听conn.Read()
go connRead(conn)
}
增加for循环
for循环内 1注册2开协程
func main() {
fmt.Println("server is waiting...")
// 创建一个监听器
listener,err:=net.Listen("tcp","127.0.0.1:7777")
if err != nil {
fmt.Println("net.listen err",err)
return //结束主协程
}
//启动一个广播协程
go broadcast()
//接收一个客户端
for true {
conn,_:=listener.Accept()
ip:=conn.RemoteAddr().String()
onlineClientM[ip]=conn
//给conn创建一个监听协程,监听conn.Read()
go connRead(conn)
}
}
客户端
package main
import (
"bufio"
"fmt"
"net"
"os"
)
func read(conn net.Conn) {
for true{
res:=make([]byte,1024)
n,_:=conn.Read(res)
fmt.Print(string(res[:n]))
}
}
func write(conn net.Conn){
for true {
fmt.Println(">>>")
reader:=bufio.NewReader(os.Stdin) //从标准输入生成读对象
content,_:=reader.ReadBytes('\n') //读到换行
//发送数据
conn.Write(content)
}
}
func main() {
//1 连接服务器
conn,_:=net.Dial("tcp","127.0.0.1:7777")
fmt.Println("conn",conn)
for true {
go write(conn)
read(conn)
}
}
完成后改进,新增发消息人
package main
import (
"fmt"
"net"
)
var onlineClientM=make(map[string]net.Conn)
var broadcastChan = make(chan []byte)
//广播站功能
func broadcast(){
for true {
msg:=<-broadcastChan
//将该消息发送给所有的客户端
for _,conn:=range onlineClientM{
conn.Write(msg)
}
}
}
func connRead(conn net.Conn){
for true {
data:=make([]byte,1024)
n,_:=conn.Read(data)
if n==0{
break
}
//将该字节数据写入到广播站
newData:=conn.RemoteAddr().String()+" "+string(data[:n])
broadcastChan<-[]byte(newData)
}
}
func main() {
fmt.Println("server is waiting...")
// 创建一个监听器
listener,err:=net.Listen("tcp","127.0.0.1:7777")
if err != nil {
fmt.Println("net.listen err",err)
return //结束主协程
}
//启动一个广播协程
go broadcast()
//接收一个客户端
for true {
conn,_:=listener.Accept()
fmt.Println("conn",conn)
ip:=conn.RemoteAddr().String()
onlineClientM[ip]=conn
//给conn创建一个监听协程,监听conn.Read()
go connRead(conn)
}
}
做退出处理
服务端最终代码
package main
import (
"fmt"
"net"
)
var onlineClientM=make(map[string]net.Conn)
var broadcastChan = make(chan []byte)
//广播站功能
func broadcast(){
for true {
msg:=<-broadcastChan
//将该消息发送给所有的客户端
for _,conn:=range onlineClientM{
conn.Write(msg)
}
}
}
func connRead(conn net.Conn){
for true {
data:=make([]byte,1024)
n,_:=conn.Read(data)
if n==0{
delete(onlineClientM,conn.RemoteAddr().String())
data=[]byte(conn.RemoteAddr().String()+"已下线!")
broadcastChan<-data
return
}
//将该字节数据写入到广播站
newData:=conn.RemoteAddr().String()+" "+string(data[:n])
broadcastChan<-[]byte(newData)
}
}
func main() {
fmt.Println("server is waiting...")
// 创建一个监听器
listener,err:=net.Listen("tcp","127.0.0.1:7777")
if err != nil {
fmt.Println("net.listen err",err)
return //结束主协程
}
//启动一个广播协程
go broadcast()
//接收一个客户端
for true {
conn,_:=listener.Accept()
fmt.Println("conn",conn)
ip:=conn.RemoteAddr().String()
onlineClientM[ip]=conn
//给conn创建一个监听协程,监听conn.Read()
go connRead(conn)
}
}