Python--Python并行编程实战(第2版) multiprocessing
python 并行编程
multiprocessing 进程
multiprocessing.Process 类
init(group=None, target=None, name=None, args=(), kwargs={})
- 参数
- group: 进程组,必须为None,为将来的实现保留的
- target: 目标函数
- name: 进程名
- args: 目标函数的参数元组
- kwargs: 目标函数的关键字参数字典
- daemon: 是否为后台进程
- 在windows系统上的结果是:不管是否设置为True,主进程推出子进程都会退出。目前处理方式就是在必须完整执行的进程调用join()方法,其他可以随主进程退出而退出的子进程就不调用join()方法。
start()
启动进程,最终会自动调用run方法
join()
等待进程结束
run()
业务逻辑,定义Process子类时,可覆盖此方法。在默认的实现中会调用target指定的目标函数
terminal()
终止进程,一般在调用terminal之后再调用下join方法,让子进程有足够时间来释放资源。
is_alive()
判断进程是否还活着
exitcode
- ==0: 没有产生任何错误
-
0: 进程有一个错误,并退出这个代码
- < 0: 进程由一个 -1 * exitcode 信号杀死, 调用termimal终止进程,exitcode一般是-15
示例1
import datetime
import multiprocessing
import time
def print_(content):
print(f'{datetime.datetime.now()} {content}')
def run(index):
print_(f'process {index} running ...')
for i in range(index):
print_(f'process {index} produce item {i}')
print_(f'process {index} end.')
def main():
processes = []
for i in range(6):
processes.append(multiprocessing.Process(target=run, args=(i,)))
# if i == 1:
# processes[i].daemon = True
# else:
# processes[i].daemon = False
for process in processes:
process.start()
for process in processes:
process.join()
print_('main end.')
def run_or_daemon():
name = multiprocessing.current_process().name
print_(f'process {name} start ...')
if name == 'background_process':
for i in range(0, 5):
print_(f'process {name} ---> {i}')
time.sleep(3)
else:
for i in range(5, 10):
print_(f'process {name} ---> {i}')
time.sleep(1)
print_(f'process {name} exist.')
def main_daemon():
background_proc = multiprocessing.Process(target=run_or_daemon, name='background_process')
no_background_proc = multiprocessing.Process(target=run_or_daemon, name='no_background_process')
# 在windows系统上的结果是:不管是否设置为True,主进程推出子进程都会退出。
# 目前处理方式就是在必须完整执行的进程调用join()方法,其他可以随主进程退出而退出的子进程就不调用join()方法。
# background_proc.daemon = True
# no_background_proc.daemon = False
no_background_proc.start()
background_proc.start()
# background_proc.join()
no_background_proc.join()
print_('main_daemon end.')
if __name__ == '__main__':
# 如果target函数与创建process的语句在同一个脚本里,一定要在这里启动
# main()
main_daemon()
# main()
# 直接在脚本调用会有问题
# 因为子进程会导入target函数的脚本,
# 这样此处的main在每次被子进程导入都会被执行,最终无限嵌套。
# 避免方式:
# 一种方式是在 if __name__ == '__main__':中调用 main()
# 另一种就是把target函数放到其他脚本文件中
multiprocessing.Queue
使用队列进行进程间交换数据,Queue是建立在Pipe之上的,如果只有两个对象需要通信,优先使用Pipe。
empty()
判断队列是否为空
put()
往队列中增加元素
get()
从队列中获取元素
qsize()
获取队列当前元素数量
示例1
import datetime
import multiprocessing
import random
import time
def print_(content):
print(f'{datetime.datetime.now()} {content}')
class Producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
for i in range(20):
time.sleep(random.randint(1, 3))
self.queue.put(i)
print_(f'Producer {self.name} produce item {i}')
print_(f'Producer {self.name} now queue size is {self.queue.qsize()}')
class Consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
print_(f'Consumer {self.name} getting...')
if self.queue.empty():
print_(f'Consumer {self.name} the queue is empty')
time.sleep(3)
# break
else:
item = self.queue.get()
print_(f'Consumer {self.name} consume item {item}')
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
queue = multiprocessing.Queue()
p_1 = Producer(queue)
c_1 = Consumer(queue)
c_2 = Consumer(queue)
c_3 = Consumer(queue)
p_1.start()
c_1.start()
c_2.start()
c_3.start()
p_1.join()
c_1.join()
c_2.join()
c_3.join()
multiprocessing.Pipe
管道
pipe_a, pipe_b = Pipe(duplex=True)
- duplex如果设置为False,就是单向的,pipe_a只能接收数据,pipe_b只能发送数据
- duplex如果设置为True,则是双向的,即pipe_a、pipe_b都能发生数据和接收数据
send()
发送数据
recv()
接收数据
close()
关闭
- 关于关闭要注意下面这个问题,以下说明来自: https://blog.csdn.net/auspark/article/details/105681826
- 管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。
- 假设有一个管道 分为A,B端,A端send数据,B端recv数据(其实是双工,只是这样不乱)
- 最后想明白了,B端recv报错是在确定了管道的另一端不会有数据传来后才报错,否则就一直阻塞,直到有数据从管道另一端传来。一个进程的A端close了只是这个进程的A端不能send数据了,但是别的进程如果有A端,别的进程就能发送数据,只有当所有进程中的A端都关闭了才能算真正的close了。
示例1
import multiprocessing
def create_items(pipe):
output_pipe, _ = pipe
for item in range(10):
output_pipe.send(item)
output_pipe.close()
print('create_items end.')
def create_items_2(output_pipe):
for item in range(10):
output_pipe.send(item)
output_pipe.close()
print('create_items_2 end.')
def multiply_items(pipe_1, pipe_2):
close, input_pipe = pipe_1
close.close()
output_pipe, _ = pipe_2
try:
while True:
item = input_pipe.recv()
output_pipe.send(item * item)
except EOFError:
print('multiply_items end.')
output_pipe.close()
def multiply_items_2(input_pipe, output_pipe):
try:
while True:
item = input_pipe.recv()
output_pipe.send(item * item)
except EOFError:
print('multiply_items_2 end.')
output_pipe.close()
def main():
pipe_1 = multiprocessing.Pipe(True)
proc_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
proc_1.start()
pipe_2 = multiprocessing.Pipe(True)
proc_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
proc_2.start()
pipe_1[0].close()
pipe_2[0].close()
try:
while True:
print(pipe_2[1].recv())
except EOFError:
print('main end.')
def main_2():
pipe_in, pipe_out = multiprocessing.Pipe(True)
pipe_in_2, pipe_out2 = multiprocessing.Pipe(True)
proc_1 = multiprocessing.Process(target=create_items_2, args=(pipe_out,))
proc_2 = multiprocessing.Process(target=multiply_items_2, args=(pipe_in, pipe_out2))
proc_1.start()
proc_2.start()
'''
这里需要关闭
以下说明来自: https://blog.csdn.net/auspark/article/details/105681826
管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。
假设有一个管道 分为A,B端,A端send数据,B端recv数据(其实是双工,只是这样不乱)
最后想明白了,B端recv报错是在确定了管道的另一端不会有数据传来后才报错,否则就一直阻塞,直到有数据从管道另一端传来。一个进程的A端close了只是这个进程的A端不能send数据了,但是别的进程如果有A端,别的进程就能发送数据,只有当所有进程中的A端都关闭了才能算真正的close了。
'''
pipe_out.close()
pipe_in.close()
pipe_out2.close()
try:
while True:
print(pipe_in_2.recv())
except EOFError:
print('main_2 end.')
if __name__ == '__main__':
# main()
main_2()
进程同步
类似线程,进程也有以下同步原语:
- Lock
- RLock
- Semaphore
- Condition
- Event
- Barrier
multiprocessing.Pool
进程池
map()
内置函数map()的并行版本,这个方法会阻塞,直到结果就绪,它将可迭代的数据划分为多个块,作为单独的任务提交到进程池。
-
参数
- func: 目标函数
- iterable: 需要传给目标函数的参数集
-
并行的在输入元素上应用相同的操作,这种场景称为:数据并行性(data parallelism)
map_async()
map的异步版本,提供一个canllback回调函数参数
示例1
import multiprocessing
def square(data):
return data * data
def main():
data_list = list(range(10))
print(f'data_list: {data_list}')
pool = multiprocessing.Pool(processes=4)
result_list = pool.map(square, data_list)
pool.close()
pool.join()
print(f'result_list: {result_list}')
if __name__ == '__main__':
main()