Python多进程、多线程、协程


  • 以前学习都是知其所以然而不知其所以然,温故而知新也,自己以后也好看看自己当初如何学习的!
    python3的多线程,多进程,协程学习笔记.
系统 Windows10
运行环境 Python3.8.3

多进程

  • 进程之间的数据是无法进行共享的,

什么是进程

  • 知乎讲解

  • 讲到进程那么肯定会涉及程序,程序则是一些指令的集合(仅仅是一堆代码),程序不能够单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而程序的运行过程则就叫进程.

多进程基础

方法 解释
start 开始调用
join 等待子进程完成
is_alive 判断进程是否存活

直接调用

# -*- coding: UTF-8 -*-
from multiprocessing import Process
from time import time
import os

def say(name):
    print('父进程ID:',os.getppid())        #获取父进程ID
    print('子进程ID:',os.getpid())       #获取当前进程ID
    print("hello %s"%name)

def main():
    print('当前进程ID:',os.getpid())
    print("\n")
    p = Process(target=say,args=('forever404',))
    p.start()
    p.join()

if __name__ == '__main__':
    start = time()
    main()
    print(time()-start)
  • 输出结果,因为是在main函数中开启的进程,因此say的父进程为main,父进程ID与main函数进程ID相同,linux运行时子进程名会是_main_
当前进程ID: 14728


父进程ID: 14728
子进程ID: 21828
hello forever404
0.1047201156616211

继承式调用

# -*- coding: UTF-8 -*-
from multiprocessing import Process
from time import time
import os

class Myprocess(Process):
    # 父类中有init方法,因此我们也需要重载调用父类init方法
    def __init__(self,args):
        # 初始化
        Process.__init__(self)
        # 参数传递
        self.args = args

    # 重写run方法
    def run(self):
        print("父进程ID:{}".format(os.getppid()))
        print("子进程ID:{}".format(os.getpid()))
        print("hello {}".format(self.args))

def main():
    print("当前进程id:{}".format(os.getpid()))
    print("\n")
    p = Myprocess('forever404')
    p.start()
    p.join()

if __name__ == '__main__':
    start = time()
    main()
    print(time()-start)
  • 输出结果
当前进程id:15388


父进程ID:15388
子进程ID:19752
hello forever404
0.11170077323913574

进程锁

  • 为了数据的安全性,因此提出了锁的概念,
# -*- coding: UTF-8 -*-
from multiprocessing import Process,Lock
from time import time,sleep

def say(number,locks):
    locks.acquire()
    try:
        print("hello world {}".format(number))
        sleep(1)
    finally:
        locks.release()

def main():
    lock = Lock()
    process = []
    for numbers in range(1, 11):
        p = Process(target=say, args=(numbers, lock))
        process.append(p)
    for m in process:
        m.start()
        m.join()

if __name__ == '__main__':
    start = time()
    main()
    print(time()-start)
  • 输出结果
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9
hello world 10
10.90053391456604
  • 多进程在join时一般有两种方法,第一种就是上面这种还有则是将第二十一行的m.join()变成下面这种
for n in process:
	n.join
  • 输出结果如下,结果却是混乱的,是因为这样写是现将所有进程启动,但是由于锁的原因,一个进程完成了,再接下一个进程,而join则是等待所有子进程完成后再进入主进程,而多线程则不会这样,因为存在GIL所以是有序的.
hello world 7
hello world 3
hello world 2
hello world 1
hello world 6
hello world 8
hello world 4
hello world 9
hello world 5
hello world 10
10.209748268127441

进程间通信

  • 因为进程间是无法共享数据的,因此进程间通信则尤为重要,就讲讲队列吧,出去队列还有Pipe

队列

  • 队列有四种类型分别是FIFO(First in first out)先进先出,LIFO(last in first out)后进先出,PriorityQueue,优先队列,级别越低越优先,Deque,双边队列,只探讨先进先出队列
方法 解释
put 写入数据
get 获取数据
qsize 队列大小
empty 判断队列是否为空,返回布尔值
full 判断队列是否满了,返回布尔值
task_done 判断任务是否完成,常用与线程,或进程
join 阻塞至队列中所有的元素都被接收和处理完毕
# -*- coding: UTF-8 -*-
from multiprocessing import Process,Queue
from time import time,sleep

def write(values,queue):
    for value in values:
        queue.put(value)
        print("[-] write successful:{}".format(value))
        sleep(2)

def read(queue):
    while True:
        if not queue.empty():
            value = queue.get()
            print("[+] read successful:{}".format(value))
            sleep(1)
        else:
            sleep(3)
            if queue.empty():
                break

def main():
    queue = Queue()
    colors = ['red','blue','black','green','yellow','orange']
    write_process = Process(target=write,args=(colors,queue))
    read_process = Process(target=read, args=(queue,))
    write_process.start()
    read_process.start()
    write_process.join()
    read_process.join()
    print("End of mission")

if __name__ == '__main__':
    start = time()
    main()
    print(time()-start)
  • 输出
[-] write successful:red
[+] read successful:red
[-] write successful:blue
[-] write successful:black
[+] read successful:blue
[+] read successful:black
[-] write successful:green
[+] read successful:green
[-] write successful:yellow
[-] write successful:orange
[+] read successful:yellow
[+] read successful:orange
End of mission
15.110584020614624

进程池

  • 手动创建进程太麻烦了,此时multiprocessing中的Pool就OK啦,初始化Pool可指定一个最大的进程数,如果进程池中数量已满,则会等待池中进程结束,再创建新的进程
  • 基础方法
方法 基础
apply_async (非阻塞)并发执行
apply (阻塞式)串行
close 关闭进程池,不再接受新进程请求
terminate 不管程任务是否完成,立即结束
join 主进程堵塞,等待子进程结束,(必须在close或terminate之后使用)

进程池中进程通信

# -*- coding: UTF-8 -*-
from multiprocessing import Pool,Manager
from time import time,sleep

def write(queue,colors):
    for color in colors:
        print("[-] write successful:{}".format(color))
        queue.put(color)
        sleep(2)

def read(queue):
    while True:
        if not queue.empty():
            value = queue.get()
            print("[+] read successful:{}".format(value))
            sleep(1)
        else:
            sleep(3)
            if queue.empty():
                break

def main():
    colors = ['red', 'blue', 'black', 'green', 'yellow', 'orange']
    queue = Manager().Queue()
    pool_number = 5
    pool = Pool(pool_number)
    pool.apply_async(write,(queue,colors))
    pool.apply_async(read,(queue,))
    pool.close()
    pool.join()
    print("End of mission")

if __name__ == '__main__':
    start = time()
    main()
    print(time()-start)
  • 输出
[-] write successful:red
[+] read successful:red
[-] write successful:blue
[-] write successful:black
[+] read successful:blue
[+] read successful:black
[-] write successful:green
[+] read successful:green
[-] write successful:yellow
[-] write successful:orange
[+] read successful:yellow
[+] read successful:orange
End of mission
15.106640338897705

多线程

  • 线程是操作系统能够进行运算调度的最小单位.一个进程有很多个线程,每个线程可以执行不同的任务,同时执行相同的任务则是线程的并发.

Global Interpreter Lock

  • 讲到线程肯定会涉及GIL--Global Interpreter Lock(全局解释器锁),可以他把看成"通行证",只有拿到通行证的线程才有资格做事.但通行证只有一个,于是乎拿不到的线程只能等待其他线程用了释放通行证才能执行,但是这并不是python的特性,比如jpython,pypy就没有.

多线程基础

方法 解释
setDaemon 子线程守护主线程(主线程结束即为结束)
join 等待子线程完毕
start 启动子线程

直接调用

# -*- coding: UTF-8 -*-
from time import time
import threading

def say(name):
    print("hello {}".format(name))

def main():
    t1 = threading.Thread(target=say,args=('forever404',))
    t2 = threading.Thread(target=say,args=('Azrael',))

    print("{}启动".format(t1.getName()))
    t1.start()
    print("{}启动".format(t2.getName()))
    t2.start()

if __name__ == '__main__':
    start = time()
    main()
    print(time()-start)
  • 输出结果
Thread-1启动
hello forever404
Thread-2启动
hello Azrael
0.001013040542602539

线程锁

# -*- coding: UTF-8 -*-
import threading
from time import sleep,time

def say(number,lock):
    lock.acquire()
    print("hello world {}".format(number))
    sleep(1)
    lock.release()

def main():
    lock = threading.Lock()
    thread_list = []
    for num in range(1,11):
        t = threading.Thread(target=say,args=(num,lock))
        thread_list.append(t)
    for i in thread_list:
        i.start()
    for j in thread_list:
        j.join()

if __name__ == '__main__':
    start = time()
    print(threading.current_thread().name)
    main()
    print(time()-start)
  • 输出结果
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9
hello world 10
10.84912371635437

继承式调用

# -*- coding: UTF-8 -*-
from time import time
import threading

class Mythread(threading.Thread):
    # 重载父类中的init方法
    def __init__(self,args):
        # 初始化
        threading.Thread.__init__(self)
        # 参数传递
        self.args = args

    def run(self):
        print("hello {}".format(self.args))

def main():
    t1 = Mythread('forever404')
    t2 = Mythread('Azrael')

    print("{}启动".format(t1.getName()))
    t1.start()
    print("{}启动".format(t2.getName()))
    t2.start()

if __name__ == '__main__':
    start = time()
    main()
    print(time()-start)
  • 输出结果
Thread-1启动
hello forever404
Thread-2启动
hello Azrael
0.0009713172912597656

线程的并发

# -*- coding: UTF-8 -*-
import threading
from time import sleep,time

def say(number):
    print("hello world {}".format(number))
    sleep(1)

def main():
    thread_list = []
    for i in range(1,11):
        t = threading.Thread(target=say,args=(i,))
        thread_list.append(t)
    for j in thread_list:
        j.start()
    for m in thread_list:
        m.join()

if __name__ == '__main__':
    start = time()
    main()
    print(time()-start)
  • 输出结果,假如不是并发那么这个执行时间应该是10s左右,但是只用了1s就完成啦.
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9
hello world 10
1.0039658546447754

协程

  • 又称为微线程,它是实现多任务的另一种方式,只不过是比线程更小的执行单元。这样那么就会把IO操作耗时的放后面执行.就会一直在就绪态,从而减少时间,因为它自带CPU的上下文,这样只要在合适的时机,我们可以把一个协程切换到另一个协程,python中对于协程有两个模块,greenlet(执行顺序手动控制)和gevent(自动切换,因此必须通过monkey patch完成)

gevent

方法 解释
monkey 自动切换阻塞
spawn 创建一个新的协程对象并运行
joinall 创建协程对象列表
from gevent import monkey,spawn,joinall
monkey.patch_all()
from time import time
import requests

url = ['https://baidu.com/','http://forever404.cn','https://taobao.com']

def common(url):
    start = time()
    for value in url:
        response = requests.get(value)
        print(value,'-'*10,str(len(response.content)))
    print(time()-start)

def get_content(url):
    response = requests.get(url)
    return url,len(response.content)

def coroutime(url):
    start = time()
    contents = []
    for value in url:
        g = spawn(get_content,value)
        contents.append(g)
    joinall(contents)
    for content in contents:
        print(list(content.value)[0],list(content.value)[1])
    print(time()-start)

if __name__ == '__main__':
    coroutime(url)
    print('*'*30)
    common(url)
  • 输出
https://baidu.com/ 2381
http://forever404.cn 35789
https://taobao.com 148003
0.5716776847839355
******************************
https://baidu.com/ ---------- 2381
http://forever404.cn ---------- 35789
https://taobao.com ---------- 148003
1.0452063083648682
  • 我们再来一个很明了的执行过程来看嘛
from gevent import monkey
monkey.patch_all()
from gevent import spawn,joinall
from time import time,sleep

def one():
    print("hello")
    sleep(3)
    print("world")

def two():
    print("hello forever404")

def common():
    one()
    two()

def coroutime():
    joinall([
        spawn(one,),
        spawn(two,),
    ])

if __name__ == '__main__':
    common()
    print('*'*30)
    coroutime()
  • 输出,可以看出这是进行了自动切换的
hello
world
hello forever404
******************************
hello
hello forever404
world

总结

  • 总的来说,还是学到了很多,但是我还个疑问,假如你用浏览器打开了A,B,C三个网页,当前停留在A网页,当你去B网页,再去C网页,最后回到A网页时,A网页还是你当初离开的状态,并不会因为你回来而重新刷新,这个应该是客户端与服务器保持的Session,还有msf的好像也是利用类似的session,可以通过background返回,然后sessions -i 来选择连接,但是这种怎么应用到SSH中呢?望大佬们前来指点.