python 笔记 02—线程、进程、协程


线程:

启动/连接/关闭 线程基本操作:

 1 from threading import thread
 2 
 3 def countTimer(n, thread_name):
 4     print("{}:".format(thread_name), n)
 5     n-=1
 6     time.sleep(2)
 7    
 8 t1 = Thread(target=countTimer,args(5,))  //实例化线程
 9 t2 = Thread(target=countTimer,args(5,))
10 
11 t1.start()   // 启动线程
12 t2.start()  // 线程结束后会自动销毁

主线程:程序主分支
子线程:从主线程衍生,由threading.Thread生成

(运行过程中,主线程会等待子线程执行结束以后才结束)

检查线程数量:length = len(threading.enumerate())
enumerate()函数应用场景:

1 names = (11,22,33)
2 for a,b in enumerate(names):
3     print(a, b)

执行结果:
0,11
1,22
2,33

用继承的方式开启多线程:

 1 import threading
 2 import time
 3 
 4 
 5 class MyThread(threading.Thread):
 6     // 必须重写run方法,run方法直接被start调用
 7     def run(self):
 8         for i in range(3):
 9             time.sleep(1)
10             msg = "I`m " + self.name + '@' + str(i)
11             print(msg)
12 
13 
14 def main():
15     for i in range(5):
16         t = MyThread()
17         t.start()
18 
19 
20 if __name__ == "__main__":
21     main()

普通全局变量是否要加global:

 1 num = 100
 2 nums=[11,22]
 3 def test():
 4     global num
 5     num+=100
 6 def test2():
 7     nums.append(33)
 8 print(num)
 9 print(nums)
10 test()
11 print(num)
12 print(nums)
// 结论:全局变量不一定要加global
// 函数中对全局变量修改的时候,到底是否需要使用global:要看是否对全局变量的指向进行修改
// 如果修改了执行,即让全局变量指向了一个新的地方,那么必须使用global

多线程共享全局变量 会导致资源竞争的问题,先看一份代码:

 1 g_num = 0
 2 
 3 def test(num):
 4     global g_num
 5     for i in range(num):
 6         g_num += 1
 7     print("---Test1 g_num=%d-----" % g_num)
 8     
 9 def test2(num):
10     global g_num
11     for i in range(num):
12         g_num += 1
13     print("---Test2 g_num=%d-----" % g_num)
14     
15 def main():
16     t1 = threading.Thread(target=test1, args=(1000000,))
17     t2 = threading.Thread(target=test2, args=(1000000,))
18     
19     t1.start()
20     t2.start()
21     
22     time.sleep(2)
23     
24     print("----Main Thread g_num = %d----" % g_num
25     
26 if __name__ == "__main__"):
27     main()

输出结果:
---Test1 g_num=1224430-----
---Test2 g_num=1363383-----
----Main Thread g_num=1363383----
预期应该是 2000000

原因分析:
线程1,2处理过程 :1、获取g_num的值 2、把获取的值+1 3、把第2步的结果存到g_num中
以上步骤中1,2,3步骤由于CPU的多线程的机制被拆分成 线程1执行12→线程2后执行12→线程1执行3→线程2执行3
这种线程执行机制导致 g_num 的数值存取被不断覆盖,次数越多出现的可能性、频率越高

解决方案:使用互斥锁

 1 // 把必须要有原子性的代码部分加互斥锁
 2 g_num = 0
 3 
 4 def test(num):
 5     global g_num
 6     for i in range(num):
 7     # 上锁,如果之前没有被上锁,那么上锁成功,否则阻塞此处,等待其他地方释放锁
 8     # 被加锁的代码行数/功能点越少越好(尽量避免有for等循环的存在)
 9         mutex.acquire()
10         g_num += 1
11         # 解锁
12         mutrix.release()
13     print("---Test1 g_num=%d-----" % g_num)
14     
15 def test2(num):
16     global g_num
17     for i in range(num):
18         mutex.acquire()
19         g_num += 1
20         mutrix.release()
21     print("---Test2 g_num=%d-----" % g_num)
22 
23 // 创建一个互斥锁 默认没有上锁
24 mutex = threading.Lock()
25 
26 def main():
27     t1 = threading.Thread(target=test1, args=(1000000,))
28     t2 = threading.Thread(target=test2, args=(1000000,))
29     
30     t1.start()
31     t2.start()
32     
33     time.sleep(2)
34     
35     print("----Main Thread g_num = %d----" % g_num
36     
37 if __name__ == "__main__"):
38     main()

互斥锁的弊端,会导致死锁:

引自:
何谓死锁:死锁是由于两个或以上的线程互相持有对方需要的资源,导致这些线程处于等待状态,无法执行
产生死锁的四个必要条件:
1.互斥性:线程对资源的占有是排他性的,一个资源只能被一个线程占有,直到释放。
2.请求和保持条件:一个线程对请求被占有资源发生阻塞时,对已经获得的资源不释放。
3.不剥夺:一个线程在释放资源之前,其他的线程无法剥夺占用。
4.循环等待:发生死锁时,线程进入死循环,永久阻塞。
产生死锁的原因:
1.竞争不可抢占性资源
p1已经打开F1,想去打开F2,p2已经打开F2,想去打开F1,但是F1和F2都是不可抢占的,这是发生死锁。
2.竞争可消耗资源引起死锁
进程间通信,如果顺序不当,会产生死锁,比如p1发消息m1给p2,p1接收p3的消息m3,p2接收p1的m1,发m2给p3,p3,以此类推,如果进程之间是先发信息的那么可以完成通信,但是如果是先接收信息就会产生死锁。
3.进程推进顺序不当
进程在运行过程中,请求和释放资源的顺序不当,也同样会导致产生进程死锁
避免死锁的方法:
1.破坏“请求和保持”条件
想办法,让进程不要那么贪心,自己已经有了资源就不要去竞争那些不可抢占的资源。比如,让进程在申请资源时,一次性申请所有需要用到的资源,不要一次一次来申请,当申请的资源有一些没空,那就让线程等待。不过这个方法比较浪费资源,进程可能经常处于饥饿状态。还有一种方法是,要求进程在申请资源前,要释放自己拥有的资源。
2.破坏“不可抢占”条件
允许进程进行抢占,方法一:如果去抢资源,被拒绝,就释放自己的资源。方法二:操作系统允许抢,只要你优先级大,可以抢到。
3.破坏“循环等待”条件
将系统中的所有资源统一编号,进程可在任何时刻提出资源申请,但所有申请必须按照资源的编号顺序(升序)提出
死锁的检测:
1.每个进程、每个资源制定唯一编号
2.设定一张资源分配表,记录各进程与占用资源之间的关系
3.设置一张进程等待表,记录各进程与要申请资源之间的关系
死锁的解除:
1.抢占资源,从一个或多个进程中抢占足够数量的资源,分配给死锁进程,以解除死锁状态。
2.终止(或撤销)进程,终止(或撤销)系统中的一个或多个死锁进程,直至打破循环环路,使系统从死锁状态解脱出来.(设置超时时限)

多线程实例-UDP多线程聊天器:

 1 import socket
 2 import threading
 3 
 4 
 5 def send_msg(udp_sockect):
 6     # 发送
 7     while True:
 8         send_data = input("请输入要发送的内容:")
 9         udp_sockect.sendto(send_data.encode("gbk"), ("192.168.31.129", 8080))
10 
11 
12 def recv_msg(udp_sockect):
13     # 接收
14     while True:
15         recv_data = udp_sockect.recvfrom(1024)
16         print("{}.{}".format(str(recv_data[1]), recv_data[0].decode("gbk")))
17 
18 
19 def main():
20     # 创建套接字
21     udp_sockect = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
22     # 绑定端口
23     udp_sockect.bind(("", 7799))
24     recv_thread = threading.Thread(target=recv_msg, args=(udp_sockect,))
25     send_thread = threading.Thread(target=send_msg, args=(udp_sockect,))
26     recv_thread.start()
27     send_thread.start()
28 
29 
30 if __name__ == "__main__":
31     main()

 

进程:

进程:计算机最小的资源分配单位,即一个单位运行中的程序

创建进程:

 1 import time
 2 import multiprocessing
 3 
 4 def test1():
 5     while True:
 6         print("1------")
 7 
 8 def test2():
 9     while True:
10         print("2------")
11 
12 def main():
13     p1= multiprocessing.Process(target=test1)
14     p2= multiprocessing.Process(target=test2)
15     p1.start()
16     p2.start()
17 
18 if __name__=="__main__":
19     main()

队列常用方法:

q= multiprocessing.Queue(3)   # 初始化一个Queue对象,最多可接收三条put信息,可以put任何类型的数据,不传3表示最大
q.put("3") q.put(4) print(q.full()) # False q.put([11,22,33]) print(q.full()) # True q.empty() # 判断是否为空 q.qsize() # 输出队列的大小 q.get() # '3'

 队列在多进程间的应用场景:

 1 import multiprocessing
 2 
 3 
 4 def download_data(q):
 5     # 下载数据
 6     data = [11, 22, 33, 44]
 7     for temp in data:
 8         q.put(temp)
 9     print("完成下载!")
10 
11 
12 def analysis_data(q):
13     # 数据处理
14     waitting_analysis_data = list()
15     while True:
16         if q.empty():
17             break
18         data = q.get()
19         waitting_analysis_data.append(data)
20 
21     print(waitting_analysis_data)
22 
23 
24 def main():
25     # 创建一个队列
26     q = multiprocessing.Queue()
27     p1 = multiprocessing.Process(target=download_data, args=(q,))
28     p2 = multiprocessing.Process(target=analysis_data, args=(q,))
29     p1.start()
30     p2.start()
31 
32 
33 if __name__ == "__main__":
34     main()

注:普通queue队列只适用同一个程序/同机器内部的多进程, 如果需要多程序多服务间的队列通信,参见redis的使用

进程池(ProcessPoolExecutor):

作用:限制进程上限,节省资源,减少程序创建、销毁进程的时间,提高程序运行效率

 1 from multiprocessing import Pool
 2 import os, time, random
 3 
 4 
 5 def worker(msg):
 6     t_start = time.time()
 7     print("%s开始执行,进程号为%d" % (msg, os.getpid()))
 8     time.sleep(random.random()*2)
 9     t_stop = time.time()
10     print(msg, "执行完毕, 耗时%0.2f" % (t_stop-t_start))
11 
12 po =  Pool(3)
13 
14 for i in range(0,10):
15     po.apply_async(worker, (i,))
16 
17 print("开始:")
18 po.close()
19 po.join()   # 作用:等待po中所有子进程全部关闭后,主进程才会关闭 注意必须放在close之后
20 print("结束")

 进程池+队列等应用:

 1 import os
 2 import multiprocessing
 3 import time
 4 
 5 from utils.config import Config
 6 
 7 
 8 def copy_file(q, file_name, old_file_path, new_file_path):
 9     old_f = open(old_file_path + file_name, "rb")
10     content = old_f.read()
11     old_f.close()
12     new_f = open(new_file_path + file_name, "wb")
13     new_f.write(content)
14     new_f.close()
15     q.put(file_name)
16 
17 
18 def main():
19     old_file_path = Config().project_path + "../" + "test_demo/"
20     new_file_path = Config().project_path + "../" + "test_demo_附件/"
21     try:
22         os.makedirs(new_file_path)
23     except Exception as e:
24         pass
25     file_names = os.listdir(old_file_path)
26     po = multiprocessing.Pool(5)
27     q = multiprocessing.Manager().Queue()
28     all_file_num = 0
29     for file_name in file_names:
30         sub_path = old_file_path + file_name
31         if os.path.isfile(sub_path):
32             all_file_num += 1
33             po.apply_async(copy_file, args=(q, file_name, old_file_path, new_file_path))
34     po.close()
35     copy_times = 0
36     while True:
37         # 获取队列已经存进的复制完的文件名
38         file_name = q.get()
39         # 打印拷贝进度
40         copy_times += 1
41         time.sleep(0.5)
42         # 打印拷贝进度在一行显示
43         print("\r拷贝进度为:%.2f %%" % (copy_times * 100 / all_file_num), end="")
44         if copy_times >= all_file_num:
45             break
46     print()
47 
48 
49 if __name__ == "__main__":
50     main()
1 class Config(object):
2     """
3     全局配置文件
4     """
5 
6     def __init__(self):
7         # 获取当前工程文件路径
8         self.project_path = BASE_DIR.split("\\utils")[0] + "\\"

 

协程:

检查某个数据是否能迭代:

from collections import Interable
isinstance([11,22,33], Iterable)  # 是否可以迭代,返回True False

 迭代器的意义:存储生成数据的方式,而不是生成的数据,占用最小的空间:

 举例:

# 生成一个斐波那契数列,常规做法:用一个列表储存,然后输出
num = list()

a= 0
b= 1
while i<10:
    num.append(a)
    a,b=b,a+b

for num in nums:
    print(num)
class Fibonacci(object):
    def __init__(self, n):
        self.a = 0
        self.b = 1
        self.current_num = 0
        self.goal_num = n

    def __iter__(self):
        return self

    def __next__(self):
        if self.current_num < self.goal_num:
            self.a, self.b = self.b, self.a + self.b
            self.current_num += 1
            return self.a
        else:
            raise StopIteration


if __name__ == "__main__":
    for i in Fibonacci(10):
        print(i)

 生成器:

利用迭代器,我们可以在每次迭代获取数据(根据__next__())时按照特定的规律进行生成。但是我们在实现一个迭代器时,关于当前迭代到的状态需要我们自己记录,进而才能根据当前状态生成下一个数据。为了起到记录当前状态的作用,并配合__next()__函数进行迭代使用,我们可以采用更简便的语法,即使用迭代器。

 Python 中,使用了 yield 的函数被称为生成器(generator),跟普通函数不同的是,生成器是一个返回迭代器的函数,只能用于迭代操作,更简单点理解生成器就是一个特殊的迭代器。

在调用生成器运行的过程中,每次遇到 yield 时函数会暂停并保存当前所有的运行信息,返回 yield 的值, 并在下一次执行 next() 方法时从当前位置继续运行。

调用一个生成器函数,返回的是一个迭代器对象,并不执行函数。

def create_num(all_num):
    a, b = 0, 1
    current_num = 0
    while current_num < all_num:
        yield a
        a, b = b, a + b
        current_num += 1
    return "ok..."


obj = create_num(10)

while True:
    try:
        ret = next(obj)
        print(ret)
    except StopIteration as ex:
        print(ex.value)
        break

另一种调用生成器的方法(带参数传递),注意思考几点

1)  传递的参数等同于谁的值,在生成器里如何传递  ——传递的参数等同于 yield a整体的结果

2)这样传递的作用或意义是什么  ——用来重新定义生成器里某项需要重置的值

3)传递进去以后下一步等同于__next()__作用的值怎么赋值  —— 继续按下一步(next)yield a的值赋值

4)send()传递的值的调用是否可以放在第一次调用处(第一个next处),如果一定要放第一个位置,传值要有什么限制?  ——不能放一个位置,如果一定要放可以传值 None

def create_num(all_num):
    a, b = 0, 1
    current_num = 0
    while current_num < all_num:
        ret = yield a
        print("生成器内部:", ret)
        a, b = b, a + b
        current_num += 1


obj = create_num(10)

ret = next(obj)
print(ret)

ret = obj.send("hahaha")
print(ret)

结果:

生成器内部: hahaha

协程:

实现协程的最原始的包:greenlet

使用greenlet进行二次封装,优化后的使用更多的是gevent

from gevent import monkey
import gevent
import urllib.request

monkey.patch_all()


def downloader(img_name, img_url):
    req = urllib.request.urlopen(img_url)
    img_content = req.read()
    print(gevent.getcurrent())  # 打印当前协程
    with open(img_name, "wb") as f:
        f.write(img_content)


def main():
    gevent.joinall([
        gevent.spawn(downloader, "1.jpg", "http://..."),
        gevent.spawn(downloader, "2.jpg", "http://...")])


if __name__ == "__main__":
    main()

进程、线程、协程对比:

1、进程是服务器资源分配的单位,进程切换需要耗费很多资源,三者中相对最多,对应效率就最低

2、线程是操作系统调度的单位,耗费资源一般,效率一般(三者中排中间,这是在不考虑GIL的情况下)

3、协程切换任务资源最小,效率最高

4、多进程、多协程根据CPU核数不一样,可能是并行,但是协程是在一个线程中,所以是并发

5、三者效率关系:协程>线程>进程,依赖关系:协程(依赖)线程(依赖)进程

应用场景:

计算密集型:进程

IO密集型:线程、协程

GIL(全局解释器锁):

 定义:GIL全称Global Interpreter Lock

详细解释见:

https://www.cnblogs.com/SuKiWX/p/8804974.html

 结合GIL的特性,有个问题:单线程与多线程比,谁性能更快?

其实,其内部有比较复杂的讨论:

1、答案是可以提高性能,减少时间的,比如:用多线程爬取网络数据,由于收发请求,抓取时间的接收过程可能会有延迟,多线程利于这些中间的延迟响应时间,进行多个线程的合理处理(例如:A线程等待数据返回(假设返回时间是T)的同时,提前执行B线程的发送前的所有操作,线程C,D直到N个,那这N个线程需要等待的N*T时间,就回几何式减少,最终减少成:T+(第N个线程发出请求-第一个线程发出请求)的时间)

2、但是,正如上述链接里的例子,某个线程多个并行执行比单独执行时慢了45%,这表示有GIL全局锁的存在,多线程极大降低了效率,GIL的存在导致多线程无法很好的利用多核CPU的并发处理能力。

3、可以用multiprocess替代Thread,multiprocess库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。但是,它的引入会增加程序实现时线程间数据通讯和同步的困难

4、如果对并行计算性能较高的程序可以考虑把核心部分也成C模块,或者索性用其他语言实现(Jpython),但是这会导致,无法利用社区众多C语言模块有用的特性

5、可以用其他语言(java c c++)实现线程功能,然后利用python胶水语言的特性,直接调用

总结一下:

1、GIL其实是功能和性能之间权衡后的产物,它有其存在的合理性,也有较难改变的客观因素

2、如果对并行计算性能较高的程序可以考虑把核心部分也成C模块,或者索性用其他语言实现(Jpython)

3、GIL在较长一段时间内将会继续存在,但是会不断对其进行改进