03 | python并发网络通信模型(多进程、多线程的网络传输模型以及简单的ftp模型)


并发网络通信模型

常见网络模型

  1. 循环服务器模型 :循环接收客户端请求,处理请求。同一时刻只能处理一个请求,处理完毕后再处理下一个。

优点:实现简单,占用资源少
缺点:无法同时处理多个客户端请求

适用情况:处理的任务可以很快完成,客户端无需长期占用服务端程序。udp比tcp更适合循环。

  1. 多进程/线程网络并发模型:每当一个客户端连接服务器,就创建一个新的进程/线程为该客户端服务,客户端退出时再销毁该进程/线程。

优点:能同时满足多个客户端长期占有服务端需求,可以处理各种请求。
缺点: 资源消耗较大

适用情况:客户端同时连接量较少,需要处理行为较复杂情况。

  1. IO并发模型:利用IO多路复用,异步IO等技术,同时处理多个客户端IO请求。

    优点 : 资源消耗少,能同时高效处理多个IO行为
    缺点 : 只能处理并发产生的IO事件,无法处理cpu计算

    适用情况:HTTP请求,网络传输等都是IO行为。

基于fork的多进程网络并发模型

实现步骤

  1. 创建监听套接字
  2. 等待接收客户端请求
  3. 客户端连接创建新的进程处理客户端请求
  4. 原进程继续等待其他客户端连接
  5. 如果客户端退出,则销毁对应的进程

?? 基于fork

"""
基于fork的多进程并发
重点代码

创建监听套接字
等待接收客户端请求
客户端连接创建新的进程处理客户端请求
原进程继续等待其他客户端连接
如果客户端退出,则销毁对应的进程
"""

from socket import *
import os
import signal # 处理僵尸进程

ADDR = ('0.0.0.0',8888)

# 客户端处理函数,循环收发消息
def handle(c):
    while True:
        data = c.recv(1024).decode()
        if not data:
            break
        print(data)
        c.send(b'OK')

# 创建监听套接字
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # 设置套接字的端口重用,端口可以立即被重用
s.bind(ADDR)
s.listen(5)

# 处理僵尸进程
signal.signal(signal.SIGCHLD,signal.SIG_IGN)
print("Listen the port 8888....")

while True:
    # 循环等待客户端连接
    try:
        c,addr = s.accept()
        print("Connect from",addr)
    except KeyboardInterrupt:
        os._exit(0)
    except Exception as e:
        print(e)
        continue

    # 创建新的进程
    pid = os.fork()
    if pid == 0:
        s.close() # 子进程关掉 s
        handle(c) # 具体的处理请求函数
        os._exit(0) # 子进程处理请求后销毁
    else:
        c.close() # 父进程不需要和客户端通信
"""
tcp客户端流程
重点代码
"""

from socket import *
from threading import Thread


def test():
    # 创建tcp套接字
    sockfd = socket()  # 默认参数-->tcp套接字

    # 连接服务端程序
    server_addr = ('127.0.0.1', 8888)
    sockfd.connect(server_addr)

    # 发送接收消息
    while True:
        data = input("Msg:")
        # data为空退出循环
        if not data:
            break
        sockfd.send(data.encode())  # 发送字节串
        data = sockfd.recv(1024)
        print("Server:", data.decode())

    # 关闭套接字
    sockfd.close()


test()

?? 基于process的服务端

"""
基于fork的多进程并发
重点代码

创建监听套接字
等待接收客户端请求
客户端连接创建新的进程处理客户端请求
原进程继续等待其他客户端连接
如果客户端退出,则销毁对应的进程
"""

from socket import *
from multiprocessing import Process
import os
import signal  # 处理僵尸进程

ADDR = ('0.0.0.0', 8888)

# 客户端处理函数,循环收发消息


def handle(c):
    while True:
        data = c.recv(1024).decode()
        if not data:
            break
        print(data)
        c.send(b'OK')


# 创建监听套接字
s = socket()
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)  # 设置套接字的端口重用,端口可以立即被重用
s.bind(ADDR)
s.listen(5)

# 处理僵尸进程
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
print("Listen the port 8888....")

while True:
    # 循环等待客户端连接
    try:
        c, addr = s.accept()
        print("Connect from", addr)
    except KeyboardInterrupt:
        os._exit(0)
    except Exception as e:
        print(e)
        continue

    # 创建新的进程
    p = Process(target=handle, args=(c,))
    p.daemon = True  # 父进程结束则所有服务终止
    p.start()       # 不要有 join 否则和循环没有区别,这样会产生僵尸,僵尸已经让上面signal 信号处理了

基于threading的多线程网络并发

代码实现: day10/thread_server.py

实现步骤

  1. 创建监听套接字
  2. 循环接收客户端连接请求
  3. 当有新的客户端连接创建线程处理客户端请求
  4. 主线程继续等待其他客户端连接
  5. 当客户端退出,则对应分支线程退出

??基于threading多线程的网络并发

"""
thread_server.py 基于Thread线程并发
重点代码

创建监听套接字
循环接收客户端连接请求
当有新的客户端连接创建线程处理客户端请求
主线程继续等待其他客户端连接
当客户端退出,则对应分支线程退出
"""
from socket import *
from threading import Thread
import os

ADDR = ('0.0.0.0',8888)

# 客户端处理函数,循环收发消息
def handle(c):
    while True:
        data = c.recv(1024).decode()
        if not data:
            break
        print(data)
        c.send(b'OK')

# 创建监听套接字
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(ADDR)
s.listen(5)

print("Listen the port 8888....")

while True:
    # 循环等待客户端连接
    try:
        c,addr = s.accept()
        print("Connect from",addr)
    except KeyboardInterrupt:
        os._exit(0)
    except Exception as e:
        print(e)
        continue

    # 创建新的线程处理请求
    client = Thread(target=handle,args=(c,))
    client.setDaemon(True)
    client.start()

ftp 文件服务器

代码实现: day11/ftp

  1. 功能
    【1】 分为服务端和客户端,要求可以有多个客户端同时操作。
    【2】 客户端可以查看服务器文件库中有什么文件。
    【3】 客户端可以从文件库中下载文件到本地。
    【4】 客户端可以上传一个本地文件到文件库。
    【5】 使用print在客户端打印命令输入提示,引导操作

  2. 技术点确定

  • 并发 : 多线程并发
  • 数据传输 : tcp传输
  1. 结构设计

    将基本功能封装为类

  2. 功能模块

    • 搭建网络通信
    • 查看文件列表
    • 下载文件
    • 上传文件
    • 客户端退出
  3. 协议 (协议类型 参数)

    查看文件列表 : 'L'
    退出 : 'Q'
    下载 : 'G filename'
    上传 : "P filename"

??服务器代码

"""
ftp 文件服务器 ,服务端
env: python 3.6
多进程多线程并发 socket
"""

from socket import *
from threading import Thread
import os,sys
import time

# 全局变量
HOST = '0.0.0.0'
PORT = 8080
ADDR = (HOST,PORT)
FTP = "/home/wxk/FTP/" # 文件库路径

# 功能类 (线程类)
# 查文档, 下载,上传
class FTPServer(Thread):
    def __init__(self,connfd):
        super().__init__()
        self.connfd = connfd

    # 处理文件列表
    def do_list(self):
        # 获取文件列表
        files = os.listdir(FTP)
        if not files:
            self.connfd.send("文件库为空".encode())
            return
        else:
            self.connfd.send(b'OK')
            time.sleep(0.1)    # 防止 tcp 消息粘连
        # 拼接文件
        filelist = ''
        for file in files:
            filelist += file + '\n'
        self.connfd.send(filelist.encode())

    def do_get(self,filename):
        try:
            f = open(FTP+filename,'rb')
        except Exception:
            # 文件不存在
            self.connfd.send('文件不存在'.encode())
            return
        else:
            self.connfd.send(b'OK')
            time.sleep(0.1)

        # 发送文件
        while True:
            data = f.read(1024)
            if not data:
                time.sleep(0.1)
                self.connfd.send(b'##')
                break
            self.connfd.send(data)

    def do_put(self,filename):
        if os.path.exists(FTP+filename):
            self.connfd.send("文件已存在".encode())
            return
        else:
            self.connfd.send(b'OK')
        # 接收文件
        f = open(FTP + filename,'wb')
        while True:
            data = self.connfd.recv(1024)
            if data == b'##':
                break
            f.write(data)
        f.close()

    # 循环接受来自客户端的请求
    def run(self):
        while True:
            request=self.connfd.recv(1024).decode()
            if not request or request == 'Q':
                return # 线程退出
            elif request == 'L':
                self.do_list()
            elif request[0] == 'G':
                filename = request.split(' ')[-1]
                self.do_get(filename)
            elif request[0] == 'P':
                filename = request.split(' ')[-1]
                self.do_put(filename)

# 启动函数
def main():
    # 创建监听套接字
    s = socket()
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.bind(ADDR)
    s.listen(5)

    print("Listen the port 8080....")

    while True:
        # 循环等待客户端连接
        try:
            c, addr = s.accept()
            print("Connect from", addr)
        except KeyboardInterrupt:
            os._exit(0)
        except Exception as e:
            print(e)
            continue

        # 创建新的线程处理请求
        client = FTPServer(c)
        client.setDaemon(True)
        client.start()  # 运行 run 函数

main()

??客户端代码

"""
ftp 文件服务, 客户端
"""
import time
from socket import *
import sys

# 服务器地址
ADDR = ('127.0.0.1',8080)

# 文件处理类
class FTPClient:
    # 所有函数都使用sockfd,所以把它变为属性变量
    def __init__(self,sockfd):
        self.sockfd = sockfd

    def do_list(self):
        self.sockfd.send(b'L') # 发送请求
        # 等待回复 (服务端能否满足请求)
        data = self.sockfd.recv(128).decode()
        if data == 'OK':
            # 一次性接收所有文件
            data = self.sockfd.recv(4096)
            print(data.decode())
        else:
            print(data)

    def do_quit(self):
        self.sockfd.send(b'Q') # 退出请求
        self.sockfd.close()
        sys.exit("谢谢使用")

    def do_get(self,filename):
        # 发送请求
        self.sockfd.send(('G '+filename).encode())
        # 等待回复
        data = self.sockfd.recv(128).decode()
        if data == 'OK':
            f = open(filename,'wb')
            # 循环接收内容,写入文件
            while True:
                data = self.sockfd.recv(1024)
                if data == b'##': # 以 ## 作为发送完成的标志
                    break
                f.write(data)
            f.close()
        else:
            print(data)

    def do_put(self,filename):
        try:
            f = open(filename,'rb')
        except Exception as e:
            print("该文件不存在")
            return
        # 发送请求
        filename = filename.split('/')[-1]
        self.sockfd.send(('P '+filename).encode())
        # 等待反馈
        data = self.sockfd.recv(128).decode()
        if data == 'OK':
            while True:
                data = f.read(1024)
                if not data:
                    time.sleep(0.1)
                    self.sockfd.send(b'##')
                    break
                self.sockfd.send(data)
            f.close()
        else:
            print(data)

# 启动函数
def main():
    sockfd = socket()
    try:
        sockfd.connect(ADDR)
    except Exception as e:
        print(e)
        return

    ftp = FTPClient(sockfd) # 实例化对象,用于调用功能
    # 循环发送请求给服务器
    while True:
        print("""\n
          =========Command============
          ****       list        ****
          ****    get   file     ****
          ****    put   file     ****
          ****       quit        ****
          ============================
        """)
        cmd = input("输入命令:")
        if cmd.strip() == 'list':
            ftp.do_list()
        elif cmd.strip() == 'quit':
            ftp.do_quit()
        elif cmd[:3] == 'get':
            filename = cmd.split(' ')[-1]
            ftp.do_get(filename)
        elif cmd[:3] == 'put':
            filename = cmd.split(' ')[-1]
            ftp.do_put(filename)
        else:
            print("请输入正确命令")

main()