Asyncio 的简单使用


本文链接:

一、单个的任务

import asyncio
import requests


async def get_baidu():
    return requests.get('https://www.baidu.com/')

if __name__ == '__main__':
    task = get_baidu()
    r = asyncio.run(task)
    print(r.status_code)

二、异步执行多个任务

import asyncio
import requests


async def get_baidu():
    r = requests.get('https://www.baidu.com/')
    print(r.status_code)


async def get_baidu_ten_times():
    for i in range(10):
        asyncio.create_task(get_baidu())


if __name__ == '__main__':
    asyncio.run(get_baidu_ten_times())

三、异步执行多个任务并获取返回值

import asyncio
import requests


async def get_baidu(results: list):
    r = requests.get('https://www.baidu.com/')
    results.append(r)


def get_baidu_ten_times():
    tasks = []
    results = []
    for i in range(10):
        tasks.append(get_baidu(results))

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    return results


if __name__ == '__main__':
    results = get_baidu_ten_times()
    print([x.status_code for x in results])

注:利用 dict 和 list 的特性(可变类型的传值)


完。