进程之间的数据是隔离的
在一整个进程内运行时数据是不隔离的
def task():
global n
n = 10
print(f'这是子进程中的{n}')
if __name__ == '__main__':
n = 100
task()
print(f'这是主进程中的{n}')
# 这是子进程中的10
# 这是主进程中的10
在不同的进程内运行的数据是隔离的
from multiprocessing import Process
def task():
global n
n = 10
print(f'这是子进程中的{n}')
if __name__ == '__main__':
n = 100
p = Process(target=task,)
p.start()
print(f'这是主进程中的{n}')
# 这是主进程中的100
# 这是子进程中的10
Queue队列
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列(Queue)和管道(Pipe),这两种方式都是使用消息传递的
队列的底层是由管道和锁定的方式实现的
队列:先进的先出
from multiprocessing import Process
if __name__ == '__main__':
q = Queue(3)
# 括号内可以放值,不放表示可以无限存
# 放的值表示队列内可以放几个东西
Queue类内函数
put
put(存放的数据,block=True,timeout=None)
q.put('ly is dsb') # 此时q内存入了值'ly is dsb'
block=True # 如果存放的值大于队列设定最大接收的值,则会直接报错
timeout=1 # 在你设定的时间内,如果存放的值大于队列设定最大接收的值,则会直接报错
q.put_nowait('ly is dsb') # 如果值放不进去则直接报错
如果存放的值大于队列设定最大接收的值,则会阻塞直到队列内的值被拿走后存放进队列
get
get(block=True, timeout=None)
q.get() # 取出q内的一个值
q.get_nowait # 如果值取不到则报错
写一次拿一个值,如果待取的大于队列内存的,程序会阻塞到待取的取到值为止
遵守先进先出原则
qsize
q.qsize() # 返回q队列内有几个值
empty
q.empty() # 判断q队列内是否还能继续存值,返回布尔值
full
q.full() # 判断q队列内值是否已满,返回布尔值
了解
在使用Queue中的查询类的函数时结果可能是不可靠的,在运行过程中进行查询的结果往往不准确
解决进程间数据隔离问题
from multiprocessing import Queue,Process
import os,time
def task(queue):
print('这个进程id:%s 开始放数据了'%os.getpid())
time.sleep(1)
queue.put('ly is dsb')
print('这个进程id:%s 数据放完了'%os.getpid())
if __name__ == '__main__':
q = Queue(5)
p = Process(target=task,args=(q,))
p.start()
print('主进程')
print('主进程中取值%s'%q.get())
# 主进程
# 这个进程id:16456 开始放数据了
# 这个进程id:16456 数据放完了
# 主进程中取值ly is dsb
多进程放入数据到Queue
from multiprocessing import Queue, Process
import os, time
def get_task(queue):
print("%s:%s" % (os.getpid(), queue.get()))
def put_task(queue):
queue.put("%s开始写数据了" % os.getpid())
if __name__ == '__main__':
q = Queue(3)
p = Process(target=put_task, args=(q,))
p.start()
p1 = Process(target=put_task, args=(q,))
p1.start()
p2 = Process(target=get_task, args=(q,))
p2.start()
p3 = Process(target=get_task, args=(q,))
p3.start()
# 8600:4316开始写数据了
# 5876:6644开始写数据了
生产者与消费者模型代码
producer -> 生产者
consumer -> 消费者
版本一
from multiprocessing import Process,Queue
import os,time
def producer(queue):
for i in range(10):
data = '这个进程id:%s,蒸了%s个包子'%(os.getpid(),i)
print(data)
time.sleep(0.5)
queue.put('第%s个包子'%i)
def consumer(queue):
while True:
res = queue.get()
data = '这个进程id:%s,消费了%s'%(os.getpid(),res)
print(data)
if __name__ == '__main__':
q = Queue(10)
p1 = Process(target=producer,args=(q,))
p2 = Process(target=consumer,args=(q,))
p1.start()
p2.start()
程序会一直卡在消费者函数内,因为里面的while True是个死循环,而get会一直阻塞直到取到值为止
改进方案
给消费者函数内的while循环加上限制
在生产者函数生产值的结尾加上一个特定的值
如果消费者get到了这个特定的值则跳出循环
版本二
from multiprocessing import Process,Queue
import os,time
def producer(queue):
for i in range(10):
data = '这个进程id:%s,蒸了%s个包子'%(os.getpid(),i)
print(data)
time.sleep(0.5)
queue.put('第%s个包子'%i)
queue.put(None) # 在传值结束后结尾再传入None
def consumer(queue):
while True:
res = queue.get()
if not res:break # 如果传入值为None跳出循环
data = '这个进程id:%s,消费了%s'%(os.getpid(),res)
print(data)
if __name__ == '__main__':
q = Queue(10)
p1 = Process(target=producer,args=(q,))
p2 = Process(target=consumer,args=(q,))
p1.start()
p2.start()
改进方案
传值结束后传入None这个操作在子进程
将其放入主进程进行操作
版本三
from multiprocessing import Process,Queue
import os,time
def producer(queue):
for i in range(10):
data = '这个进程id:%s,蒸了%s个包子'%(os.getpid(),i)
print(data)
time.sleep(0.5)
queue.put('第%s个包子'%i)
def consumer(queue):
while True:
res = queue.get()
if not res:break
data = '这个进程id:%s,消费了%s'%(os.getpid(),res)
print(data)
if __name__ == '__main__':
q = Queue(3)
p1 = Process(target=producer,args=(q,))
p1.start()
p2 = Process(target=consumer,args=(q,))
p2.start()
p1.join() # join 让子进程执行完毕再执行主进程
q.put(None)
改进方案
制造多生产者多消费者相对应的模型
版本四
生产者多消费者少
from multiprocessing import Process,Queue
import os,time
def producer(queue,food):
for i in range(10):
data = '这个进程id:%s,生产了第%s个%s'%(os.getpid(),i,food)
print(data)
time.sleep(0.5)
queue.put('第%s个%s'%(i,food))
def consumer(queue):
while True:
res = queue.get()
if not res:break
data = '这个进程id:%s,消费了%s'%(os.getpid(),res)
print(data)
time.sleep(0.5)
if __name__ == '__main__':
q = Queue(3)
p1 = Process(target=producer,args=(q,'屎'))
p2 = Process(target=producer,args=(q,'大便'))
p3 = Process(target=producer,args=(q,'shit'))
p1.start()
p2.start()
p3.start()
p6 = Process(target=consumer,args=(q,))
p6.start()
p1.join()
p2.join()
p3.join()
q.put(None) # 依据消费者多少来决定放几个None
生产者少消费者多
from multiprocessing import Process,Queue
import os,time
def producer(queue,food):
for i in range(10):
data = '这个进程id:%s,生产了第%s个%s'%(os.getpid(),i,food)
print(data)
time.sleep(0.5)
queue.put('第%s个%s'%(i,food))
def consumer(queue,name):
while True:
try:
res = queue.get(timeout=5)
if not res:break
data = '%s消费了%s'%(name,res)
print(data)
time.sleep(0.5)
except Exception as e:
print(e)
break
if __name__ == '__main__':
q = Queue(3)
p1 = Process(target=producer,args=(q,'草'))
p2 = Process(target=producer,args=(q,'土豆'))
p3 = Process(target=producer,args=(q,'鸭'))
p1.start()
p2.start()
p3.start()
p6 = Process(target=consumer,args=(q,'猴哥'))
p7 = Process(target=consumer,args=(q,'鸡哥'))
p8 = Process(target=consumer,args=(q,'egon'))
p9 = Process(target=consumer,args=(q,'李洋'))
p6.start()
p7.start()
p8.start()
p9.start()
扩展
Queue是个简单版本,后面最终会变成
httpsqs
rabbiemq
kafka