python学习day37笔记


进程之间的数据是隔离的

在一整个进程内运行时数据是不隔离的

def task():
    global n
    n = 10
    print(f'这是子进程中的{n}')

if __name__ == '__main__':
    n = 100
    task()
    print(f'这是主进程中的{n}')
    
# 这是子进程中的10
# 这是主进程中的10


在不同的进程内运行的数据是隔离的

from multiprocessing import Process
def task():
    global n
    n = 10
    print(f'这是子进程中的{n}')

if __name__ == '__main__':
    n = 100
    p = Process(target=task,)
    p.start()
    print(f'这是主进程中的{n}')
    
# 这是主进程中的100
# 这是子进程中的10

Queue队列

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列(Queue)和管道(Pipe),这两种方式都是使用消息传递的

队列的底层是由管道和锁定的方式实现的

队列:先进的先出

from multiprocessing import Process
if __name__ == '__main__':
    q = Queue(3)
    # 括号内可以放值,不放表示可以无限存
    # 放的值表示队列内可以放几个东西

Queue类内函数

put

put(存放的数据,block=True,timeout=None)
q.put('ly is dsb')  # 此时q内存入了值'ly is dsb'
block=True  # 如果存放的值大于队列设定最大接收的值,则会直接报错
timeout=1  # 在你设定的时间内,如果存放的值大于队列设定最大接收的值,则会直接报错
q.put_nowait('ly is dsb')  # 如果值放不进去则直接报错



如果存放的值大于队列设定最大接收的值,则会阻塞直到队列内的值被拿走后存放进队列

get

get(block=True, timeout=None)
q.get()  # 取出q内的一个值
q.get_nowait  # 如果值取不到则报错

写一次拿一个值,如果待取的大于队列内存的,程序会阻塞到待取的取到值为止

遵守先进先出原则

qsize

q.qsize()  # 返回q队列内有几个值

empty

q.empty()  # 判断q队列内是否还能继续存值,返回布尔值

full

q.full()  # 判断q队列内值是否已满,返回布尔值

了解

在使用Queue中的查询类的函数时结果可能是不可靠的,在运行过程中进行查询的结果往往不准确

解决进程间数据隔离问题

from multiprocessing import Queue,Process
import os,time

def task(queue):
    print('这个进程id:%s 开始放数据了'%os.getpid())
    time.sleep(1)
    queue.put('ly is dsb')
    print('这个进程id:%s 数据放完了'%os.getpid())

if __name__ == '__main__':
    q = Queue(5)
    p = Process(target=task,args=(q,))
    p.start()
    print('主进程')
    print('主进程中取值%s'%q.get())
    
# 主进程
# 这个进程id:16456 开始放数据了
# 这个进程id:16456 数据放完了
# 主进程中取值ly is dsb

多进程放入数据到Queue

from multiprocessing import Queue, Process
import os, time

def get_task(queue):
    print("%s:%s" % (os.getpid(), queue.get()))

def put_task(queue):
    queue.put("%s开始写数据了" % os.getpid())

if __name__ == '__main__':
    q = Queue(3)

    p = Process(target=put_task, args=(q,))
    p.start()

    p1 = Process(target=put_task, args=(q,))
    p1.start()

    p2 = Process(target=get_task, args=(q,))
    p2.start()

    p3 = Process(target=get_task, args=(q,))
    p3.start()
    
# 8600:4316开始写数据了
# 5876:6644开始写数据了

生产者与消费者模型代码

producer -> 生产者
consumer -> 消费者

版本一

from multiprocessing import Process,Queue
import os,time
def producer(queue):
    for i in range(10):
        data = '这个进程id:%s,蒸了%s个包子'%(os.getpid(),i)
        print(data)
        time.sleep(0.5)
        queue.put('第%s个包子'%i)

def consumer(queue):
    while True:
        res = queue.get()
        data = '这个进程id:%s,消费了%s'%(os.getpid(),res)
        print(data)

if __name__ == '__main__':
    q = Queue(10)
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
    
程序会一直卡在消费者函数内,因为里面的while True是个死循环,而get会一直阻塞直到取到值为止

改进方案

给消费者函数内的while循环加上限制
在生产者函数生产值的结尾加上一个特定的值
如果消费者get到了这个特定的值则跳出循环

版本二

from multiprocessing import Process,Queue
import os,time
def producer(queue):
    for i in range(10):
        data = '这个进程id:%s,蒸了%s个包子'%(os.getpid(),i)
        print(data)
        time.sleep(0.5)
        queue.put('第%s个包子'%i)
    queue.put(None)  # 在传值结束后结尾再传入None

def consumer(queue):
    while True:
        res = queue.get()
        if not res:break  # 如果传入值为None跳出循环
        data = '这个进程id:%s,消费了%s'%(os.getpid(),res)
        print(data)

if __name__ == '__main__':
    q = Queue(10)
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()

改进方案

传值结束后传入None这个操作在子进程
将其放入主进程进行操作

版本三

from multiprocessing import Process,Queue
import os,time
def producer(queue):
    for i in range(10):
        data = '这个进程id:%s,蒸了%s个包子'%(os.getpid(),i)
        print(data)
        time.sleep(0.5)
        queue.put('第%s个包子'%i)

def consumer(queue):
    while True:
        res = queue.get()
        if not res:break
        data = '这个进程id:%s,消费了%s'%(os.getpid(),res)
        print(data)

if __name__ == '__main__':
    q = Queue(3)
    p1 = Process(target=producer,args=(q,))
    p1.start()
    p2 = Process(target=consumer,args=(q,))
    p2.start()
    p1.join()  # join 让子进程执行完毕再执行主进程
    q.put(None)

改进方案

制造多生产者多消费者相对应的模型

版本四

生产者多消费者少

from multiprocessing import Process,Queue
import os,time
def producer(queue,food):
    for i in range(10):
        data = '这个进程id:%s,生产了第%s个%s'%(os.getpid(),i,food)
        print(data)
        time.sleep(0.5)
        queue.put('第%s个%s'%(i,food))

def consumer(queue):
    while True:
        res = queue.get()
        if not res:break
        data = '这个进程id:%s,消费了%s'%(os.getpid(),res)
        print(data)
        time.sleep(0.5)

if __name__ == '__main__':
    q = Queue(3)
    p1 = Process(target=producer,args=(q,'屎'))
    p2 = Process(target=producer,args=(q,'大便'))
    p3 = Process(target=producer,args=(q,'shit'))
    p1.start()
    p2.start()
    p3.start()
    p6 = Process(target=consumer,args=(q,))
    p6.start()
    p1.join()
    p2.join()
    p3.join()
    q.put(None)  # 依据消费者多少来决定放几个None

生产者少消费者多

from multiprocessing import Process,Queue
import os,time
def producer(queue,food):
    for i in range(10):
        data = '这个进程id:%s,生产了第%s个%s'%(os.getpid(),i,food)
        print(data)
        time.sleep(0.5)
        queue.put('第%s个%s'%(i,food))

def consumer(queue,name):
    while True:
        try:
            res = queue.get(timeout=5)
            if not res:break
            data = '%s消费了%s'%(name,res)
            print(data)
            time.sleep(0.5)
        except Exception as e:
            print(e)
            break

if __name__ == '__main__':
    q = Queue(3)
    p1 = Process(target=producer,args=(q,'草'))
    p2 = Process(target=producer,args=(q,'土豆'))
    p3 = Process(target=producer,args=(q,'鸭'))
    p1.start()
    p2.start()
    p3.start()
    p6 = Process(target=consumer,args=(q,'猴哥'))
    p7 = Process(target=consumer,args=(q,'鸡哥'))
    p8 = Process(target=consumer,args=(q,'egon'))
    p9 = Process(target=consumer,args=(q,'李洋'))
    p6.start()
    p7.start()
    p8.start()
    p9.start()

扩展

Queue是个简单版本,后面最终会变成
	httpsqs
    rabbiemq
    kafka

相关