03 | python并发网络通信模型(多进程、多线程的网络传输模型以及简单的ftp模型)
并发网络通信模型
常见网络模型
- 循环服务器模型 :循环接收客户端请求,处理请求。同一时刻只能处理一个请求,处理完毕后再处理下一个。
优点:实现简单,占用资源少
缺点:无法同时处理多个客户端请求
适用情况:处理的任务可以很快完成,客户端无需长期占用服务端程序。udp比tcp更适合循环。
- 多进程/线程网络并发模型:每当一个客户端连接服务器,就创建一个新的进程/线程为该客户端服务,客户端退出时再销毁该进程/线程。
优点:能同时满足多个客户端长期占有服务端需求,可以处理各种请求。
缺点: 资源消耗较大
适用情况:客户端同时连接量较少,需要处理行为较复杂情况。
-
IO并发模型:利用IO多路复用,异步IO等技术,同时处理多个客户端IO请求。
优点 : 资源消耗少,能同时高效处理多个IO行为
缺点 : 只能处理并发产生的IO事件,无法处理cpu计算适用情况:HTTP请求,网络传输等都是IO行为。
基于fork的多进程网络并发模型
实现步骤
- 创建监听套接字
- 等待接收客户端请求
- 客户端连接创建新的进程处理客户端请求
- 原进程继续等待其他客户端连接
- 如果客户端退出,则销毁对应的进程
?? 基于fork
"""
基于fork的多进程并发
重点代码
创建监听套接字
等待接收客户端请求
客户端连接创建新的进程处理客户端请求
原进程继续等待其他客户端连接
如果客户端退出,则销毁对应的进程
"""
from socket import *
import os
import signal # 处理僵尸进程
ADDR = ('0.0.0.0',8888)
# 客户端处理函数,循环收发消息
def handle(c):
while True:
data = c.recv(1024).decode()
if not data:
break
print(data)
c.send(b'OK')
# 创建监听套接字
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # 设置套接字的端口重用,端口可以立即被重用
s.bind(ADDR)
s.listen(5)
# 处理僵尸进程
signal.signal(signal.SIGCHLD,signal.SIG_IGN)
print("Listen the port 8888....")
while True:
# 循环等待客户端连接
try:
c,addr = s.accept()
print("Connect from",addr)
except KeyboardInterrupt:
os._exit(0)
except Exception as e:
print(e)
continue
# 创建新的进程
pid = os.fork()
if pid == 0:
s.close() # 子进程关掉 s
handle(c) # 具体的处理请求函数
os._exit(0) # 子进程处理请求后销毁
else:
c.close() # 父进程不需要和客户端通信
"""
tcp客户端流程
重点代码
"""
from socket import *
from threading import Thread
def test():
# 创建tcp套接字
sockfd = socket() # 默认参数-->tcp套接字
# 连接服务端程序
server_addr = ('127.0.0.1', 8888)
sockfd.connect(server_addr)
# 发送接收消息
while True:
data = input("Msg:")
# data为空退出循环
if not data:
break
sockfd.send(data.encode()) # 发送字节串
data = sockfd.recv(1024)
print("Server:", data.decode())
# 关闭套接字
sockfd.close()
test()
?? 基于process的服务端
"""
基于fork的多进程并发
重点代码
创建监听套接字
等待接收客户端请求
客户端连接创建新的进程处理客户端请求
原进程继续等待其他客户端连接
如果客户端退出,则销毁对应的进程
"""
from socket import *
from multiprocessing import Process
import os
import signal # 处理僵尸进程
ADDR = ('0.0.0.0', 8888)
# 客户端处理函数,循环收发消息
def handle(c):
while True:
data = c.recv(1024).decode()
if not data:
break
print(data)
c.send(b'OK')
# 创建监听套接字
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # 设置套接字的端口重用,端口可以立即被重用
s.bind(ADDR)
s.listen(5)
# 处理僵尸进程
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
print("Listen the port 8888....")
while True:
# 循环等待客户端连接
try:
c, addr = s.accept()
print("Connect from", addr)
except KeyboardInterrupt:
os._exit(0)
except Exception as e:
print(e)
continue
# 创建新的进程
p = Process(target=handle, args=(c,))
p.daemon = True # 父进程结束则所有服务终止
p.start() # 不要有 join 否则和循环没有区别,这样会产生僵尸,僵尸已经让上面signal 信号处理了
基于threading的多线程网络并发
代码实现: day10/thread_server.py
实现步骤
- 创建监听套接字
- 循环接收客户端连接请求
- 当有新的客户端连接创建线程处理客户端请求
- 主线程继续等待其他客户端连接
- 当客户端退出,则对应分支线程退出
??基于threading多线程的网络并发
"""
thread_server.py 基于Thread线程并发
重点代码
创建监听套接字
循环接收客户端连接请求
当有新的客户端连接创建线程处理客户端请求
主线程继续等待其他客户端连接
当客户端退出,则对应分支线程退出
"""
from socket import *
from threading import Thread
import os
ADDR = ('0.0.0.0',8888)
# 客户端处理函数,循环收发消息
def handle(c):
while True:
data = c.recv(1024).decode()
if not data:
break
print(data)
c.send(b'OK')
# 创建监听套接字
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(ADDR)
s.listen(5)
print("Listen the port 8888....")
while True:
# 循环等待客户端连接
try:
c,addr = s.accept()
print("Connect from",addr)
except KeyboardInterrupt:
os._exit(0)
except Exception as e:
print(e)
continue
# 创建新的线程处理请求
client = Thread(target=handle,args=(c,))
client.setDaemon(True)
client.start()
ftp 文件服务器
代码实现: day11/ftp
-
功能
【1】 分为服务端和客户端,要求可以有多个客户端同时操作。
【2】 客户端可以查看服务器文件库中有什么文件。
【3】 客户端可以从文件库中下载文件到本地。
【4】 客户端可以上传一个本地文件到文件库。
【5】 使用print在客户端打印命令输入提示,引导操作 -
技术点确定
- 并发 : 多线程并发
- 数据传输 : tcp传输
-
结构设计
将基本功能封装为类
-
功能模块
- 搭建网络通信
- 查看文件列表
- 下载文件
- 上传文件
- 客户端退出
-
协议 (协议类型 参数)
查看文件列表 : 'L'
退出 : 'Q'
下载 : 'G filename'
上传 : "P filename"
??服务器代码
"""
ftp 文件服务器 ,服务端
env: python 3.6
多进程多线程并发 socket
"""
from socket import *
from threading import Thread
import os,sys
import time
# 全局变量
HOST = '0.0.0.0'
PORT = 8080
ADDR = (HOST,PORT)
FTP = "/home/wxk/FTP/" # 文件库路径
# 功能类 (线程类)
# 查文档, 下载,上传
class FTPServer(Thread):
def __init__(self,connfd):
super().__init__()
self.connfd = connfd
# 处理文件列表
def do_list(self):
# 获取文件列表
files = os.listdir(FTP)
if not files:
self.connfd.send("文件库为空".encode())
return
else:
self.connfd.send(b'OK')
time.sleep(0.1) # 防止 tcp 消息粘连
# 拼接文件
filelist = ''
for file in files:
filelist += file + '\n'
self.connfd.send(filelist.encode())
def do_get(self,filename):
try:
f = open(FTP+filename,'rb')
except Exception:
# 文件不存在
self.connfd.send('文件不存在'.encode())
return
else:
self.connfd.send(b'OK')
time.sleep(0.1)
# 发送文件
while True:
data = f.read(1024)
if not data:
time.sleep(0.1)
self.connfd.send(b'##')
break
self.connfd.send(data)
def do_put(self,filename):
if os.path.exists(FTP+filename):
self.connfd.send("文件已存在".encode())
return
else:
self.connfd.send(b'OK')
# 接收文件
f = open(FTP + filename,'wb')
while True:
data = self.connfd.recv(1024)
if data == b'##':
break
f.write(data)
f.close()
# 循环接受来自客户端的请求
def run(self):
while True:
request=self.connfd.recv(1024).decode()
if not request or request == 'Q':
return # 线程退出
elif request == 'L':
self.do_list()
elif request[0] == 'G':
filename = request.split(' ')[-1]
self.do_get(filename)
elif request[0] == 'P':
filename = request.split(' ')[-1]
self.do_put(filename)
# 启动函数
def main():
# 创建监听套接字
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind(ADDR)
s.listen(5)
print("Listen the port 8080....")
while True:
# 循环等待客户端连接
try:
c, addr = s.accept()
print("Connect from", addr)
except KeyboardInterrupt:
os._exit(0)
except Exception as e:
print(e)
continue
# 创建新的线程处理请求
client = FTPServer(c)
client.setDaemon(True)
client.start() # 运行 run 函数
main()
??客户端代码
"""
ftp 文件服务, 客户端
"""
import time
from socket import *
import sys
# 服务器地址
ADDR = ('127.0.0.1',8080)
# 文件处理类
class FTPClient:
# 所有函数都使用sockfd,所以把它变为属性变量
def __init__(self,sockfd):
self.sockfd = sockfd
def do_list(self):
self.sockfd.send(b'L') # 发送请求
# 等待回复 (服务端能否满足请求)
data = self.sockfd.recv(128).decode()
if data == 'OK':
# 一次性接收所有文件
data = self.sockfd.recv(4096)
print(data.decode())
else:
print(data)
def do_quit(self):
self.sockfd.send(b'Q') # 退出请求
self.sockfd.close()
sys.exit("谢谢使用")
def do_get(self,filename):
# 发送请求
self.sockfd.send(('G '+filename).encode())
# 等待回复
data = self.sockfd.recv(128).decode()
if data == 'OK':
f = open(filename,'wb')
# 循环接收内容,写入文件
while True:
data = self.sockfd.recv(1024)
if data == b'##': # 以 ## 作为发送完成的标志
break
f.write(data)
f.close()
else:
print(data)
def do_put(self,filename):
try:
f = open(filename,'rb')
except Exception as e:
print("该文件不存在")
return
# 发送请求
filename = filename.split('/')[-1]
self.sockfd.send(('P '+filename).encode())
# 等待反馈
data = self.sockfd.recv(128).decode()
if data == 'OK':
while True:
data = f.read(1024)
if not data:
time.sleep(0.1)
self.sockfd.send(b'##')
break
self.sockfd.send(data)
f.close()
else:
print(data)
# 启动函数
def main():
sockfd = socket()
try:
sockfd.connect(ADDR)
except Exception as e:
print(e)
return
ftp = FTPClient(sockfd) # 实例化对象,用于调用功能
# 循环发送请求给服务器
while True:
print("""\n
=========Command============
**** list ****
**** get file ****
**** put file ****
**** quit ****
============================
""")
cmd = input("输入命令:")
if cmd.strip() == 'list':
ftp.do_list()
elif cmd.strip() == 'quit':
ftp.do_quit()
elif cmd[:3] == 'get':
filename = cmd.split(' ')[-1]
ftp.do_get(filename)
elif cmd[:3] == 'put':
filename = cmd.split(' ')[-1]
ftp.do_put(filename)
else:
print("请输入正确命令")
main()