Python中的并发编程(7)协程

2024-01-02 16:06:03 浏览数 (2)

异步编程

Python3.4后新增了asyncio模块,支持异步编程。异步是在一个线程中通过任务切换的方式让多个任务”同时“进展。异步不涉及线程/进程切换,减少了线程/进程创建、上下文切换的开销,更轻量级。 asyncio的核心是事件循环,不断监听/执行队列中的任务。

事件循环

由于asyncio是在一个线程中通过任务切换的方式执行多任务,所以这些任务需要是非阻塞的。如果某个任务是阻塞的,比如常规的sleep函数、数值计算等,那么这个任务会占据线程,让其它任务没有机会执行。

async和await

在函数定义的def关键字之前加上async,就可以定义一个协程

代码语言:javascript复制
async def async_hello(): 
    print("hello, world!") 
    await asyncio.sleep(1) # 异步的睡眠任务。如果用常规的time.sleep()会阻塞程序。
    print("1秒钟过去了...") 

async 关键字定义的函数很特殊。调用时,它们不会执行内部代码,而是返回一个协程对象(coroutine object)。

代码语言:javascript复制
In [2]: async_hello()
Out[2]: <coroutine object async_hello at 0x0000012904713CC8>

await在异步任务启动之后,暂停当前 async 函数的执行,把执行权交给其他任务。等到异步任务结束,再把执行权交回 async 函数,继续往下执行。在上面这个async_hello()的例子中,当执行到await asyncio.sleep(1)时,会启动任务asyncio.sleep(1),并交出执行权,让其他任务执行。1秒后,任务asyncio.sleep(1)完成了,会继续执行async_hello()的下一行print("1秒钟过去了...")

事件循环中安排其执行之前,协程对象不会执行任何操作。下面我们来执行这个协程。

代码语言:javascript复制
import asyncio
async def async_hello(): 
    print("hello, world!") 
    await asyncio.sleep(1)
    print("1秒钟过去了...") 
    

    
# 1.获取事件循环
loop = asyncio.get_event_loop()
# 2.执行协程
loop.run_until_complete(async_hello())
# 3.关闭事件循环
loop.close()

# 上面三步等价于:
asyncio.run(async_hello()) # python3.7新增asyncio.run()执行协程

执行多个任务/协程

如果您有多个任务或协程等待,可以使用 asyncio.gather() 将它们聚合到一个对象中。

代码语言:javascript复制
import asyncio 
import random
async def print_number(number):
    await asyncio.sleep(random.random())
    print(number) 
    return number

async def main():
    results = await asyncio.gather(*[ 
            print_number(number) 
                for number in range(10) 
        ]) 
    print("results=", results)
    
asyncio.run(main())

运行结果:

代码语言:javascript复制
6 8 9 5 0 7 3 4 1 2
results= [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

asyncio.gather() 用于收集多个协程以并发执行它们。结果是一个对象,表示运行所有提供的协程的future结果。

异步编程的实例

网络IO是一个合适用异步编程处理的任务,可惜requests库没有提供异步请求的方法,不过aiohttp提供了异步 HTTP方法 。

代码语言:javascript复制
import asyncio
import time
import aiohttp

async def get_rates(session: aiohttp.ClientSession, base: str):
    async with session.get(
        f"https://api.vatcomply.com/rates?base={base}"
    ) as response:
        rates = (await response.json())['rates']
        rates[base] = 1.
        return base, rates

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

def present_result(base, rates):
    rates_line = ", ".join(
    [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")

async def main():
    async with aiohttp.ClientSession() as session:
        for result in await asyncio.gather(*[
           get_rates(session, base) for base in BASES]):
            present_result(*result)

if __name__ == "__main__":
    started = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    elapsed = time.time() - started
    print()
    print("time elapsed: {:.2f}s".format(elapsed))
代码语言:javascript复制
1 USD =     1.0 USD,   0.916 EUR,    3.98 PLN,    10.4 NOK,    22.5 CZK
1 EUR =    1.09 USD,     1.0 EUR,    4.34 PLN,    11.3 NOK,    24.5 CZK
1 PLN =   0.251 USD,    0.23 EUR,     1.0 PLN,    2.61 NOK,    5.65 CZK
1 NOK =  0.0962 USD,  0.0881 EUR,   0.383 PLN,     1.0 NOK,    2.16 CZK
1 CZK =  0.0445 USD,  0.0407 EUR,   0.177 PLN,   0.462 NOK,     1.0 CZK

time elapsed: 1.05s

0 人点赞