python 笔记 02—线程、进程、协程
线程:
启动/连接/关闭 线程基本操作:
1 from threading import thread 2 3 def countTimer(n, thread_name): 4 print("{}:".format(thread_name), n) 5 n-=1 6 time.sleep(2) 7 8 t1 = Thread(target=countTimer,args(5,)) //实例化线程 9 t2 = Thread(target=countTimer,args(5,)) 10 11 t1.start() // 启动线程 12 t2.start() // 线程结束后会自动销毁
主线程:程序主分支
子线程:从主线程衍生,由threading.Thread生成
(运行过程中,主线程会等待子线程执行结束以后才结束)
检查线程数量:length = len(threading.enumerate())
enumerate()函数应用场景:
1 names = (11,22,33) 2 for a,b in enumerate(names): 3 print(a, b)
执行结果:
0,11
1,22
2,33
用继承的方式开启多线程:
1 import threading 2 import time 3 4 5 class MyThread(threading.Thread): 6 // 必须重写run方法,run方法直接被start调用 7 def run(self): 8 for i in range(3): 9 time.sleep(1) 10 msg = "I`m " + self.name + '@' + str(i) 11 print(msg) 12 13 14 def main(): 15 for i in range(5): 16 t = MyThread() 17 t.start() 18 19 20 if __name__ == "__main__": 21 main()
普通全局变量是否要加global:
1 num = 100 2 nums=[11,22] 3 def test(): 4 global num 5 num+=100 6 def test2(): 7 nums.append(33) 8 print(num) 9 print(nums) 10 test() 11 print(num) 12 print(nums)
// 结论:全局变量不一定要加global
// 函数中对全局变量修改的时候,到底是否需要使用global:要看是否对全局变量的指向进行修改
// 如果修改了执行,即让全局变量指向了一个新的地方,那么必须使用global
多线程共享全局变量 会导致资源竞争的问题,先看一份代码:
1 g_num = 0 2 3 def test(num): 4 global g_num 5 for i in range(num): 6 g_num += 1 7 print("---Test1 g_num=%d-----" % g_num) 8 9 def test2(num): 10 global g_num 11 for i in range(num): 12 g_num += 1 13 print("---Test2 g_num=%d-----" % g_num) 14 15 def main(): 16 t1 = threading.Thread(target=test1, args=(1000000,)) 17 t2 = threading.Thread(target=test2, args=(1000000,)) 18 19 t1.start() 20 t2.start() 21 22 time.sleep(2) 23 24 print("----Main Thread g_num = %d----" % g_num 25 26 if __name__ == "__main__"): 27 main()
输出结果:
---Test1 g_num=1224430-----
---Test2 g_num=1363383-----
----Main Thread g_num=1363383----
预期应该是 2000000
原因分析:
线程1,2处理过程 :1、获取g_num的值 2、把获取的值+1 3、把第2步的结果存到g_num中
以上步骤中1,2,3步骤由于CPU的多线程的机制被拆分成 线程1执行12→线程2后执行12→线程1执行3→线程2执行3
这种线程执行机制导致 g_num 的数值存取被不断覆盖,次数越多出现的可能性、频率越高
解决方案:使用互斥锁
1 // 把必须要有原子性的代码部分加互斥锁 2 g_num = 0 3 4 def test(num): 5 global g_num 6 for i in range(num): 7 # 上锁,如果之前没有被上锁,那么上锁成功,否则阻塞此处,等待其他地方释放锁 8 # 被加锁的代码行数/功能点越少越好(尽量避免有for等循环的存在) 9 mutex.acquire() 10 g_num += 1 11 # 解锁 12 mutrix.release() 13 print("---Test1 g_num=%d-----" % g_num) 14 15 def test2(num): 16 global g_num 17 for i in range(num): 18 mutex.acquire() 19 g_num += 1 20 mutrix.release() 21 print("---Test2 g_num=%d-----" % g_num) 22 23 // 创建一个互斥锁 默认没有上锁 24 mutex = threading.Lock() 25 26 def main(): 27 t1 = threading.Thread(target=test1, args=(1000000,)) 28 t2 = threading.Thread(target=test2, args=(1000000,)) 29 30 t1.start() 31 t2.start() 32 33 time.sleep(2) 34 35 print("----Main Thread g_num = %d----" % g_num 36 37 if __name__ == "__main__"): 38 main()
互斥锁的弊端,会导致死锁:
引自:
何谓死锁:死锁是由于两个或以上的线程互相持有对方需要的资源,导致这些线程处于等待状态,无法执行
产生死锁的四个必要条件:
1.互斥性:线程对资源的占有是排他性的,一个资源只能被一个线程占有,直到释放。
2.请求和保持条件:一个线程对请求被占有资源发生阻塞时,对已经获得的资源不释放。
3.不剥夺:一个线程在释放资源之前,其他的线程无法剥夺占用。
4.循环等待:发生死锁时,线程进入死循环,永久阻塞。
产生死锁的原因:
1.竞争不可抢占性资源
p1已经打开F1,想去打开F2,p2已经打开F2,想去打开F1,但是F1和F2都是不可抢占的,这是发生死锁。
2.竞争可消耗资源引起死锁
进程间通信,如果顺序不当,会产生死锁,比如p1发消息m1给p2,p1接收p3的消息m3,p2接收p1的m1,发m2给p3,p3,以此类推,如果进程之间是先发信息的那么可以完成通信,但是如果是先接收信息就会产生死锁。
3.进程推进顺序不当
进程在运行过程中,请求和释放资源的顺序不当,也同样会导致产生进程死锁
避免死锁的方法:
1.破坏“请求和保持”条件
想办法,让进程不要那么贪心,自己已经有了资源就不要去竞争那些不可抢占的资源。比如,让进程在申请资源时,一次性申请所有需要用到的资源,不要一次一次来申请,当申请的资源有一些没空,那就让线程等待。不过这个方法比较浪费资源,进程可能经常处于饥饿状态。还有一种方法是,要求进程在申请资源前,要释放自己拥有的资源。
2.破坏“不可抢占”条件
允许进程进行抢占,方法一:如果去抢资源,被拒绝,就释放自己的资源。方法二:操作系统允许抢,只要你优先级大,可以抢到。
3.破坏“循环等待”条件
将系统中的所有资源统一编号,进程可在任何时刻提出资源申请,但所有申请必须按照资源的编号顺序(升序)提出
死锁的检测:
1.每个进程、每个资源制定唯一编号
2.设定一张资源分配表,记录各进程与占用资源之间的关系
3.设置一张进程等待表,记录各进程与要申请资源之间的关系
死锁的解除:
1.抢占资源,从一个或多个进程中抢占足够数量的资源,分配给死锁进程,以解除死锁状态。
2.终止(或撤销)进程,终止(或撤销)系统中的一个或多个死锁进程,直至打破循环环路,使系统从死锁状态解脱出来.(设置超时时限)
多线程实例-UDP多线程聊天器:
1 import socket 2 import threading 3 4 5 def send_msg(udp_sockect): 6 # 发送 7 while True: 8 send_data = input("请输入要发送的内容:") 9 udp_sockect.sendto(send_data.encode("gbk"), ("192.168.31.129", 8080)) 10 11 12 def recv_msg(udp_sockect): 13 # 接收 14 while True: 15 recv_data = udp_sockect.recvfrom(1024) 16 print("{}.{}".format(str(recv_data[1]), recv_data[0].decode("gbk"))) 17 18 19 def main(): 20 # 创建套接字 21 udp_sockect = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 22 # 绑定端口 23 udp_sockect.bind(("", 7799)) 24 recv_thread = threading.Thread(target=recv_msg, args=(udp_sockect,)) 25 send_thread = threading.Thread(target=send_msg, args=(udp_sockect,)) 26 recv_thread.start() 27 send_thread.start() 28 29 30 if __name__ == "__main__": 31 main()
进程:
进程:计算机最小的资源分配单位,即一个单位运行中的程序
创建进程:
1 import time 2 import multiprocessing 3 4 def test1(): 5 while True: 6 print("1------") 7 8 def test2(): 9 while True: 10 print("2------") 11 12 def main(): 13 p1= multiprocessing.Process(target=test1) 14 p2= multiprocessing.Process(target=test2) 15 p1.start() 16 p2.start() 17 18 if __name__=="__main__": 19 main()
队列常用方法:
q= multiprocessing.Queue(3) # 初始化一个Queue对象,最多可接收三条put信息,可以put任何类型的数据,不传3表示最大
q.put("3") q.put(4) print(q.full()) # False q.put([11,22,33]) print(q.full()) # True q.empty() # 判断是否为空 q.qsize() # 输出队列的大小 q.get() # '3'
队列在多进程间的应用场景:
1 import multiprocessing 2 3 4 def download_data(q): 5 # 下载数据 6 data = [11, 22, 33, 44] 7 for temp in data: 8 q.put(temp) 9 print("完成下载!") 10 11 12 def analysis_data(q): 13 # 数据处理 14 waitting_analysis_data = list() 15 while True: 16 if q.empty(): 17 break 18 data = q.get() 19 waitting_analysis_data.append(data) 20 21 print(waitting_analysis_data) 22 23 24 def main(): 25 # 创建一个队列 26 q = multiprocessing.Queue() 27 p1 = multiprocessing.Process(target=download_data, args=(q,)) 28 p2 = multiprocessing.Process(target=analysis_data, args=(q,)) 29 p1.start() 30 p2.start() 31 32 33 if __name__ == "__main__": 34 main()
注:普通queue队列只适用同一个程序/同机器内部的多进程, 如果需要多程序多服务间的队列通信,参见redis的使用
进程池(ProcessPoolExecutor):
作用:限制进程上限,节省资源,减少程序创建、销毁进程的时间,提高程序运行效率
1 from multiprocessing import Pool 2 import os, time, random 3 4 5 def worker(msg): 6 t_start = time.time() 7 print("%s开始执行,进程号为%d" % (msg, os.getpid())) 8 time.sleep(random.random()*2) 9 t_stop = time.time() 10 print(msg, "执行完毕, 耗时%0.2f" % (t_stop-t_start)) 11 12 po = Pool(3) 13 14 for i in range(0,10): 15 po.apply_async(worker, (i,)) 16 17 print("开始:") 18 po.close() 19 po.join() # 作用:等待po中所有子进程全部关闭后,主进程才会关闭 注意必须放在close之后 20 print("结束")
进程池+队列等应用:
1 import os 2 import multiprocessing 3 import time 4 5 from utils.config import Config 6 7 8 def copy_file(q, file_name, old_file_path, new_file_path): 9 old_f = open(old_file_path + file_name, "rb") 10 content = old_f.read() 11 old_f.close() 12 new_f = open(new_file_path + file_name, "wb") 13 new_f.write(content) 14 new_f.close() 15 q.put(file_name) 16 17 18 def main(): 19 old_file_path = Config().project_path + "../" + "test_demo/" 20 new_file_path = Config().project_path + "../" + "test_demo_附件/" 21 try: 22 os.makedirs(new_file_path) 23 except Exception as e: 24 pass 25 file_names = os.listdir(old_file_path) 26 po = multiprocessing.Pool(5) 27 q = multiprocessing.Manager().Queue() 28 all_file_num = 0 29 for file_name in file_names: 30 sub_path = old_file_path + file_name 31 if os.path.isfile(sub_path): 32 all_file_num += 1 33 po.apply_async(copy_file, args=(q, file_name, old_file_path, new_file_path)) 34 po.close() 35 copy_times = 0 36 while True: 37 # 获取队列已经存进的复制完的文件名 38 file_name = q.get() 39 # 打印拷贝进度 40 copy_times += 1 41 time.sleep(0.5) 42 # 打印拷贝进度在一行显示 43 print("\r拷贝进度为:%.2f %%" % (copy_times * 100 / all_file_num), end="") 44 if copy_times >= all_file_num: 45 break 46 print() 47 48 49 if __name__ == "__main__": 50 main()
1 class Config(object): 2 """ 3 全局配置文件 4 """ 5 6 def __init__(self): 7 # 获取当前工程文件路径 8 self.project_path = BASE_DIR.split("\\utils")[0] + "\\"
协程:
检查某个数据是否能迭代:
from collections import Interable isinstance([11,22,33], Iterable) # 是否可以迭代,返回True False
迭代器的意义:存储生成数据的方式,而不是生成的数据,占用最小的空间:
举例:
# 生成一个斐波那契数列,常规做法:用一个列表储存,然后输出 num = list() a= 0 b= 1 while i<10: num.append(a) a,b=b,a+b for num in nums: print(num)
class Fibonacci(object): def __init__(self, n): self.a = 0 self.b = 1 self.current_num = 0 self.goal_num = n def __iter__(self): return self def __next__(self): if self.current_num < self.goal_num: self.a, self.b = self.b, self.a + self.b self.current_num += 1 return self.a else: raise StopIteration if __name__ == "__main__": for i in Fibonacci(10): print(i)
生成器:
利用迭代器,我们可以在每次迭代获取数据(根据__next__())时按照特定的规律进行生成。但是我们在实现一个迭代器时,关于当前迭代到的状态需要我们自己记录,进而才能根据当前状态生成下一个数据。为了起到记录当前状态的作用,并配合__next()__函数进行迭代使用,我们可以采用更简便的语法,即使用迭代器。
Python 中,使用了 yield 的函数被称为生成器(generator),跟普通函数不同的是,生成器是一个返回迭代器的函数,只能用于迭代操作,更简单点理解生成器就是一个特殊的迭代器。
在调用生成器运行的过程中,每次遇到 yield 时函数会暂停并保存当前所有的运行信息,返回 yield 的值, 并在下一次执行 next() 方法时从当前位置继续运行。
调用一个生成器函数,返回的是一个迭代器对象,并不执行函数。
def create_num(all_num): a, b = 0, 1 current_num = 0 while current_num < all_num: yield a a, b = b, a + b current_num += 1 return "ok..." obj = create_num(10) while True: try: ret = next(obj) print(ret) except StopIteration as ex: print(ex.value) break
另一种调用生成器的方法(带参数传递),注意思考几点
1) 传递的参数等同于谁的值,在生成器里如何传递 ——传递的参数等同于 yield a整体的结果
2)这样传递的作用或意义是什么 ——用来重新定义生成器里某项需要重置的值
3)传递进去以后下一步等同于__next()__作用的值怎么赋值 —— 继续按下一步(next)yield a的值赋值
4)send()传递的值的调用是否可以放在第一次调用处(第一个next处),如果一定要放第一个位置,传值要有什么限制? ——不能放一个位置,如果一定要放可以传值 None
def create_num(all_num): a, b = 0, 1 current_num = 0 while current_num < all_num: ret = yield a print("生成器内部:", ret) a, b = b, a + b current_num += 1 obj = create_num(10) ret = next(obj) print(ret) ret = obj.send("hahaha") print(ret)
结果:
生成器内部: hahaha
协程:
实现协程的最原始的包:greenlet
使用greenlet进行二次封装,优化后的使用更多的是gevent
from gevent import monkey import gevent import urllib.request monkey.patch_all() def downloader(img_name, img_url): req = urllib.request.urlopen(img_url) img_content = req.read() print(gevent.getcurrent()) # 打印当前协程 with open(img_name, "wb") as f: f.write(img_content) def main(): gevent.joinall([ gevent.spawn(downloader, "1.jpg", "http://..."), gevent.spawn(downloader, "2.jpg", "http://...")]) if __name__ == "__main__": main()
进程、线程、协程对比:
1、进程是服务器资源分配的单位,进程切换需要耗费很多资源,三者中相对最多,对应效率就最低
2、线程是操作系统调度的单位,耗费资源一般,效率一般(三者中排中间,这是在不考虑GIL的情况下)
3、协程切换任务资源最小,效率最高
4、多进程、多协程根据CPU核数不一样,可能是并行,但是协程是在一个线程中,所以是并发
5、三者效率关系:协程>线程>进程,依赖关系:协程(依赖)线程(依赖)进程
应用场景:
计算密集型:进程
IO密集型:线程、协程
GIL(全局解释器锁):
定义:GIL全称Global Interpreter Lock
详细解释见:
https://www.cnblogs.com/SuKiWX/p/8804974.html
结合GIL的特性,有个问题:单线程与多线程比,谁性能更快?
其实,其内部有比较复杂的讨论:
1、答案是可以提高性能,减少时间的,比如:用多线程爬取网络数据,由于收发请求,抓取时间的接收过程可能会有延迟,多线程利于这些中间的延迟响应时间,进行多个线程的合理处理(例如:A线程等待数据返回(假设返回时间是T)的同时,提前执行B线程的发送前的所有操作,线程C,D直到N个,那这N个线程需要等待的N*T时间,就回几何式减少,最终减少成:T+(第N个线程发出请求-第一个线程发出请求)的时间)
2、但是,正如上述链接里的例子,某个线程多个并行执行比单独执行时慢了45%,这表示有GIL全局锁的存在,多线程极大降低了效率,GIL的存在导致多线程无法很好的利用多核CPU的并发处理能力。
3、可以用multiprocess替代Thread,multiprocess库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。但是,它的引入会增加程序实现时线程间数据通讯和同步的困难
4、如果对并行计算性能较高的程序可以考虑把核心部分也成C模块,或者索性用其他语言实现(Jpython),但是这会导致,无法利用社区众多C语言模块有用的特性
5、可以用其他语言(java c c++)实现线程功能,然后利用python胶水语言的特性,直接调用
总结一下:
1、GIL其实是功能和性能之间权衡后的产物,它有其存在的合理性,也有较难改变的客观因素
2、如果对并行计算性能较高的程序可以考虑把核心部分也成C模块,或者索性用其他语言实现(Jpython)
3、GIL在较长一段时间内将会继续存在,但是会不断对其进行改进