守护进程详解 | 消息队列 ipc机制| 互斥锁的概念 | 生产者消费者模型


目录
  • 守护进程
    • 什么是守护进程
      • 举例实现守护进程
  • 互斥锁
    • 什么是互斥锁?
    • 使用锁的注意事项
    • 代码实现锁的效果(模拟买票)
  • 消息队列
    • JoinableQueue
    • 代码解释消息队列
    • ipc机制
      • 代码实现解释ipc机制
    • 生产者消费者模型
      • 定义:
      • 代码实现

守护进程

什么是守护进程

守护进程:即守护着某个进程,一旦这个被守护的进程结束,那么守护进程也跟着结束。

举例实现守护进程

from multiprocessing import Process
import time

def test(name):
    print(f'{name} is running')
    time.sleep(3)
    print(f'{name} is over')

if __name__ == '__main__':
    p = Process(target=test, args = ('gary',))  # 创建子进程
    p.daemon = True   # 设置为守护进程(守护进程一定放在start语句上方)
    p.start()   # 开启子进程
    print('主进程is running')
    time.sleep(0.5)  # 这里为了看出守护进程的效果

比喻说明:主进程就相当于皇帝,皇帝在入葬时需要太监等人陪伴,太监等人就相当于陪葬的人,需要陪伴着皇帝入葬

互斥锁

什么是互斥锁?

问题 :并发情况下操作同一份数据,机器容易造成数据错乱
解决措施 :将并发变成串行 虽然效率降低了 但是提升了数据的安全。

锁就是可以实现并发变成串行的效果

使用锁的注意事项

在主进程中产生 交由子进程使用
1、一定要在需要的地方加锁 千万不要随便加锁
2、不要轻易使用锁,容易出现死锁现象

代码实现锁的效果(模拟买票)

import json
from multiprocessing import Process, Lock  # 导入锁模块
import time
import random


# 查票
def search(name):
    with open(r'piao.txt', 'r', encoding='utf8') as f:
        data_dict = json.load(f)
        ticket_num = data_dict.get('ticket_num')
        print(f'{name}查看的余票')


# 买票
def buy(name):
    with open(r'piao.txt','r',encoding='utf8') as f:
        data_dict = json.load(f)
        ticket_num = data_dict.get('ticket_num') 
    time.sleep(random.random())   # 模拟一个延迟
    # 判断是否有票
    if ticket_num > 0:
        # 将余票减一
        data_dict['tict_num'] -= 1
        # 重新写入数据库
        with open(r'data.txt', 'w', encoding='utf8') as f:
            json.dump(data_dict, f)
            print(f'{name}购买成功')
    else:
        print('不好意思没有票了 !!!')
    
    
def run(name,mutex):
    search(name)
    mutex.acquire()   # 抢锁  抢到锁的才能执行下面代码体
    buy(name)
    mutex.release()   # 释放锁 其他进程可抢锁
    
if __name__ == '__main__'
	mutex= Lock()
    for i in range(1,11):
        p = Process(target=run, args=(f'用户{i}',mutex))
        p.start()

消息队列

消息队列是一种异步的服务间通信方式,适用于无服务器和微服务架构。 消息在被处理和删除之前一直存储在队列上。 每条消息仅可被一位用户处理一次。 消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。

主要作用:消息队列可以实现异步之间的通信

格式:  from multiprocessing import Queue模块 和 JoinableQueue模块

Queue()/JoinableQueue()  创建消息队列  括号内可以填写最大的等待数

q = Queue(5)
q.put()  # 存放数据 括号内填写需要存放的数据
q.get()  # 提取数据 括号内无需填写数据
q.full()  # 判断队列中中的数据是否满了
q.get_nowait()  # 没有数据立刻报错不会出现阻塞现象

img

JoinableQueue

可翻译:为可join的队列

该队列相比普通的Queue的区别在于该对列额外增加的了join函数

join函数的作用:

该函数为阻塞函数,会阻塞直到等待队列中所有数据都被处理完毕。


q = JoinableQueue()
q.put('gary') 
print(q.get())
q.join() # 阻塞 等待队列中所有数据都被处理完毕
print("over")



执行以上函数,将导致进程无法结束,注释掉**join**调用就正常,发现join的确有阻塞的效果,

但是队列中一共就一个数据,明明已经调用get取出了,为什么join依然阻塞呢?

这是因为get仅仅是取出数据,而join是等待数据处理完毕,也就是说:

**取出数据还不算完,你处理完以后必须告知队列处理完毕,通过task_done**


q = JoinableQueue()
q.put('gary') 

print(q.get())
q.task_done() # 数据处理完毕

q.join() # 阻塞 等待队列中所有数据都被处理完毕
print("over")
# 输出: 
gary
over

代码解释消息队列

from multiprocessing import Queue




q = Queue(5)   # 括号内可以填写最大等待数 

# 存放数据 
q.put(11)
q.put(22)
q.put(33)
print(q.full())   # False  判断队列是否存满
q.put(44)
q.put(55)
print(q.full())  # True表示存满了
# q.put(66)  # 这时程序会出现阻塞 因超出范围原地等待 知道有空缺位置
print(q.get())  # 取数据
print(q.get())  # 取数据
print(q.get())  # 取数据
print(q.get())  # 取数据
print(q.get())  # 取数据
print(q.get())  # 这时程序会初选阻塞 没有数据之后原地等待,直到有数据为止
print(q.get_nowait())  # 没有数据立刻报错不会出现阻塞现象
full和get_wait是不能用于多进程情况下的精确使用的。因为当前脚刚判断存满后后脚就有可能有一个进程取走了数据

队列的使用就可以打破进程间默认无法通信的情况

ipc机制

含义为**进程间通信**或者**跨进程通信**,是指两个进程之间进行数据交换的过程。

代码实现解释ipc机制

from multiprocessing import Queue, Process


def producer(q):
    q.put('子进程p放的数据')


def consumer(q):
    print('子进程c取:', q.get())


if __name__ == '__main__':
    q = Queue()  # 创建消息队列
    p = Process(target=producer, args=(q,))  # 创建子进程
    c = Process(target=consumer, args=(q,))  # 创建子进程
    p.start()  # 开启producer子进程
    c.start()  # 开启consumer子进程

可以看到多进程之间可以同时操作消息队列进行数据通信

生产者消费者模型

定义:

生产者:负责产生数据(相当于做包子的)
消费者:负责处理数据(相当于吃包子的)

####  该模型需要解决供需不平衡现象

代码实现

from multiprocessing import Queue, Process, JoinableQueue
import time
import random

def producer(name, food, q):
    for i in range(10):
        print(f'{name}做了{food}')
        q.put(food)
        time.sleep(random.random())
        
def consumer(name, q):
    while True:
        data = q.get()
        print(f'{name}吃了{data}')
        q.task_done()   # 队列处理完之后,需要告知主进程队列处理完毕的信息,
        
if __name__ == '__main__':
    q = JoinableQueue()   # 定义一个可join的消息队列q
    p1 = Process(target=producer,args=('康康','玛莎拉', q))
    p2 = Process(target=producer,args=('红红','油条', q))
    p3 = Process(target=producer,args=('西西','包子', q))
    c1 = Process(target=consumer,args=('小明',q))
    
    p1.start()
    p2.start()
    p3.start()
    c1.daemon = True  # 守护消费者进程 不然的话会一直等待处理数据形成程序阻塞
    c1.start()
    
    p1.join()
    p2.join()
    p3.join()
    
    q.join()  # 等待队列中所有的数据被取干净
    
    print('主进程')