python 线程
消息队列
# 队列的概念
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
队列:先进先出(使用频率很高)
堆栈:先进后出(特定常见下用)
'''Queue([maxsize])
创建共享的进程队列
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现'''
# 以后我们会直接使用别人封装好的消息队列 实现各种数据传输
from multiprocessing import Queue
消息队列的方法
#q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
#q.get_nowait()
同q.get(False)方法。
#q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
#q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
#q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
#q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)
"""
上述方法不能在并发的场景下精准使用
之所以介绍队列是因为它可以支持进程间数据通信
"""
IPC机制(进程间通信)
"""
1.主进程与子进程数据交互
2.两个子进程数据交互
本质:不同内存空间中的进程数据交互
"""
from multiprocessing import Process, Queue
def producer(q):
# print('子进程producer从队列中取值>>>:', q.get())
q.put('子进程producer往队列中添加值')
def consumer(q):
print('子进程consumer从队列中取值>>>:', q.get())
if __name__ == '__main__':
q = Queue()
p = Process(target=producer, args=(q, ))
p1 = Process(target=consumer, args=(q,))
p.start()
p1.start()
# q.put(123) # 主进程往队列中存放数据123
print('主进程')
生产者消费者模型
生产者消费者模型的概念
# 概念
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
# 为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式
# 什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
生产者消费者模型示例
# 生产者
负责生产/制作数据
# 消费者
负责消费/处理数据
负责消费/处理数据
# JoinableQueue([maxsize])
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的
"""
比如在爬虫领域中
会先通过代码爬取网页数据(爬取网页的代码就可以称之为是生产者)
之后针对网页数据做筛选处理(处理网页的代码就可以称之为消费者)
如果使用进程来演示
除了有至少两个进程之外 还需要一个媒介(消息队列)
以后遇到该模型需要考虑的问题其实就是供需平衡的问题
生产力与消费力要均衡
"""
from multiprocessing import Process, Queue, JoinableQueue
import time
def producer(name, food, q):
for i in range(5):
data = f'{name}生产了{food}{i}'
print(data)
time.sleep(2)
q.put(data)
def consumer(name,q):
while True:
food = q.get()
time.sleep(1)
print(f'{name}吃了{food}')
q.task_done() # 每次去完数据必须给队列一个反馈
if __name__ == '__main__':
q = Queue()
# q = JoinableQueue()
p1 = Process(target=producer, args=('厨师1','番茄炒蛋',q,))
p2 = Process(target=producer, args=('厨师2','萝卜炒蛋',q,))
c1 = Process(target=consumer, args=('顾客1', q))
c2 = Process(target=consumer, args=('顾客2', q))
# c1.daemon = True
# c2.daemon = True
p1.start()
p2.start()
c1.start()
c2.start()
# 生产者生产完所有数据之后 往队列中添加结束的信号
p1.join()
p2.join()
# q.join()
"""队列中其实已经自己加了锁 所以多进程取值也不会冲突 并且取走了就没了"""
q.join() # 等待队列中数据全部被取出(一定要让生产者全部结束才能判断正确)
"""执行完上述的join方法表示消费者也已经消费完数据了"""

线程
线程理论
# 什么是线程
进程:资源单位
线程:执行单位
进程相当于车间(一个个空间),线程相当于车间里面的流水线(真正干活的)
'''一个进程中至少有一个线程'''
"""
进程仅仅是在内存中开辟一块空间(提供线程工作所需的资源)
线程真正被CPU执行,线程需要的资源跟所在进程的要
"""
# 为什么要有线程
开设线程的消耗远远小于进程
开进程
1.申请内存空间
2.拷贝代码
开线程
一个进程内可以开设多个线程 无需申请内存空间、拷贝代码
一个进程内的多个线程数据是共享的
开设线程的两种方式
多线程的使用方式一:直接使用
from threading import Thread
import time
def dask(name):
print(f'{name} is running')
time.sleep(3)
print(f'{name} is over')
t = Thread(target=dask, args=('jason',))
t.start()
print("主线程")
线程的第二种使用方式:继承式调用
# 继承式调用
from threading import Thread
import time
class MyThreading(Thread):
def __init__(self, name):
super(MyThreading, self).__init__()
self.name = name
# 线程要运行的代码
def run(self):
print("我是线程%s" % self.name)
time.sleep(2)
print("线程%s运行结束" % self.name)
t1 = MyThreading(1)
t2 = MyThreading(2)
start_time = time.time()
t1.start()
t2.start()
end_time = time.time()
print("两个线程一共的运行时间为:", end_time-start_time)
print("主线程结束")
线程实现TCP服务端的并发
import socket
from threading import Thread
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen()
# 将通信代码封装成一个函数
def talk(sock):
while True:
data = sock.recv(1024)
print(data.decode('utf8'))
sock.send(data.upper())
while True:
sock, addr = server.accept()
# 每类一个客户端就创建一个线程做数据交互
t = Thread(target=talk, args=(sock,))
t.start()
线程join方法
from threading import Thread
import time
def dask(name):
print(f'{name} is running')
time.sleep(3)
print(f'{name} is over')
t = Thread(target=dask, args=('jason',))
t.start()
t.join() # 主线程代码等待子线程代码运行完毕之后再往下执行
print("主线程")
"""
主线程为什么要等着子线程结束才会结束整个进程
因为主线程结束也就标志着整个进程的结束 要确保子线程运行过程中所需的各项资源
"""
同一个进程内的多个线程数据共享
# 代码示例:
from threading import Thread
money = 10000000000
def task():
global money
money = 1
'子线程中修改全局变量'
t = Thread(target=task)
t.start()
t.join()
print(money) # 1
线程对象属性和方法
# 验证一个进程下的多个线程是否真的处于一个进程
from threading import Thread
import os
def dask():
print('子线程的进程号:', os.getpid())
t = Thread(target=dask)
t.start()
print('主线程的进程号:', os.getpid())
''' 子线程的进程号: 8564
主线程的进程号: 8564'''
# 统计进程下活跃的线程数
active_count()
'注意主线程也算'
# 获取线程的名字
1.current_thread().name
MainThread 主线程
Thread-1、Thread-2 子线程
2.self.name
守护线程
'主进程执行结束,由该主进程创建的子进程必须跟着结束'
#1.对主进程来说,运行完毕指的是主进程代码运行完毕
#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(3)
print(f'{name} is over')
t1 = Thread(target=task, args=('jason',))
t2 = Thread(target=task, args=('kevin',))
t1.daemon = True
'一定要在p.start()前设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行'
t1.start()
t2.start()
print('主线程')
GIL全局解释器锁
1.回顾
python解释器的类别有很多
Cpython Jpython Ppython
垃圾回收机制
引用计数、标记清除、分代回收
GIL只存在于CPython解释器中,不是python的特征
GIL是一把互斥锁用于阻止同一个进程下的多个线程同时执行
原因是因为CPython解释器中的垃圾回收机制不是线程安全的
反向验证GIL的存在 如果不存在会产生垃圾回收机制与正常线程之间数据错乱
GIL是加在CPython解释器上面的互斥锁
同一个进程下的多个线程要想执行必须先抢GIL锁 所以同一个进程下多个线程肯定不能同时运行 即无法利用多核优势
#问题1:
同一个进程下的多个线程不能同时执行即不能利用多核优势
很多不懂python的程序员会喷python是垃圾 速度太慢 有多核都不能用
解答:虽然用一个进程下的多个线程不能利用多核优势 但是还可以开设多进程!!!
#问题2:
python的多线程就是垃圾!!!
解答:要结合实际情况
如果多个任务都是IO密集型的 那么多线程更有优势(消耗的资源更少)
多道技术:切换+保存状态
如果多个任务都是计算密集型 那么多线程确实没有优势 但是可以用多进程
CPU越多越好
以后用python就可以多进程下面开设多线程从而达到效率最大化
"""
1.所有的解释型语言都无法做到同一个进程下多个线程利用多核优势
2.GIL在实际编程中其实不用考虑
