引言
Python 的 Asyncio 模块在处理 I/O 密集型任务时表现出色,并且在最近的 Python 版本迭代中获得了诸多增强。不过,由于处理异步任务的途径多样,选择在特定情境下最合适的方法可能会让人感到迷惑。在这篇文章[1]中,我会先从任务对象的基本概念讲起,接着探讨各种处理异步任务的方法,并分析它们各自的优势和劣势。
等待多个任务
现在,让我们来看看有趣的事情 - 等待多个任务!等待任务集合主要有三种方式;每种方法都有其优点和缺点,并且在不同的情况下会有所帮助。
asyncio.wait
我们的第一个选项类似于 wait_for 函数,但它是为一组任务或更为基础的 Future 对象设计的,这些对象可以是列表、元组或集合等形式。
代码语言:javascript复制asyncio.wait(collection_of_tasks, *, timeout=None, return_when=ALL_COMPLETED)
此函数返回一个由两个集合组成的元组:第一个集合包含已完成的任务,第二个集合则包含尚未完成的任务。如果在超时期限或 return_when 参数指定的条件满足之前任务已完成,它们将被归入已完成的任务集合;未完成的任务则被放入第二个集合,这个集合通常被称作 pending,或者如果你不打算使用这些任务,也可以简单地用下划线 _ 来表示。然而,与 'asyncio.wait' 函数不同的是,在超时发生时,未完成的任务不会被自动取消。
return_when 参数允许你指定 asyncio.wait 函数在以下三种情况之一发生时返回:
- FIRST_COMPLETED 当第一个任务完成或被取消时返回结果。
- FIRST_EXCEPTION 当任一任务引发异常,或所有任务都已完成时返回结果。
- ALL_COMPLETED 是默认选项,它将在所有 futures 完成或被取消时返回结果。
让我们通过一个实际例子来演示这个过程:
代码语言:javascript复制import asyncio
import random
async def job():
await asyncio.sleep(random.randint(1, 5))
async def main():
tasks = [
asyncio.create_task(job(), name=index)
for index in range(1, 5)
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f’The first task completed was {done.pop().get_name()}’)
asyncio.run(main())
Output:
代码语言:javascript复制The first task completed was 4
asyncio.gather
现在,让我们深入了解 asyncio.gather 函数,特别是带有参数 return_exceptions=False 的用法。
与 wait_for 函数仅接受任务或Futuer对象的集合不同,gather 函数可以接受任意数量的任务、Futuer对象,甚至是协程对象,作为一系列位置参数传递给它。传入 gather 的协程对象会自动转换为任务对象,以便它们能够在事件循环中执行。所有任务完成后,gather 会将所有通过 Task.result() 方法获得的返回值,作为一个列表返回。gather 一个非常贴心的特性是,返回的列表会按照任务传入的顺序排列。
gather 的另一个优点是,它是这三个函数中唯一能够优雅地处理并返回异常的。如果设置了 return_exceptions 参数为 True,那么在任务原本应该返回结果的位置,列表将包含由任务引发的异常。
下面,让我们通过一个实例来具体了解这一机制是如何运作的。
代码语言:javascript复制import asyncio
import random
async def job(id):
print(f’Starting job {id}’)
await asyncio.sleep(random.randint(1, 3))
print(f’Finished job {id}’)
return id
async def main():
# create a list of worker tasks
coros = [job(i) for i in range(4)]
# gather the results of all worker tasks
results = await asyncio.gather(*coros)
# print the results
print(f’Results: {results}’)
asyncio.run(main())
我们首先定义了一个包含多个协程对象的列表,接着通过 * 操作符将这些协程对象作为位置参数展开,供 gather 函数处理。当我们对 gather 函数返回的对象进行等待(即调用 await),它就会开始执行这些任务,并一直运行直至所有任务完成。值得注意的是,尽管由于 await asyncio.sleep(random.randint(1,3)) 的调用导致任务以随机顺序完成,但 gather 返回的结果列表依然保持了我们最初传入任务的顺序。
代码语言:javascript复制Starting job 0
Starting job 1
Starting job 2
Starting job 3
Finished job 3
Finished job 0
Finished job 1
Finished job 2
Results: [0, 1, 2, 3]
在下一个示例中,我将两个协程直接放入 Gather 中,并将 return_exceptions 设置为 True,这会在同一结果列表中优雅地返回异常:
代码语言:javascript复制import asyncio
async def task1():
raise ValueError()
async def task2():
raise KeyError()
async def main():
results = await asyncio.gather(task1(), task2(), return_exceptions=True)
print(results) # Will print [ValueError(), KeyError()]
asyncio.run(main())
asyncio.gather 的最后一个功能是,就像使用 Task.cancel() 取消单个任务一样,gather 返回的对象(然后等待)有自己的 cancel() 方法,该方法将循环遍历所有它正在管理的任务并取消所有这些任务。
asyncio.as_completed
这个函数与前面提到的两个有所不同;它不是一次性提供所有结果的集合或列表,而是提供了一个可迭代的对象,这样你可以在每个结果生成时即时处理它们。这个函数可以处理所有类型的可等待对象,包括协程、任务和未来对象。与其他许多方法类似,它也包含一个用于设置超时的关键字参数,如果到了设定的时间任务还没有完成,就会抛出 TimeoutError 异常。
代码语言:javascript复制asyncio.as_completed(aws, *, timeout=None)
以下是 as_completed 工作原理的示例:
代码语言:javascript复制import asyncio
async def my_task(id):
return f’I am number {id}’
async def main():
tasks = [my_task(id) for id in range(5)]
for coro in asyncio.as_completed(tasks):
result = await coro
print(result)
asyncio.run(main())
我们可以看到输出是随机的,因为它只是打印出先完成的内容:
代码语言:javascript复制I am number 3
I am number 0
I am number 4
I am number 2
I am number 1
等待一组任务
在 Python 3.11 中,Asyncio 引入了一个新特性——asyncio.TaskGroup,它以上下文管理器的形式简化了对一组任务的管理。这个特性的一个关键优势在于,如果任务组中的某个任务遇到错误,其他所有任务都会自动取消,这有助于在异步编程中实现更加健壮的错误处理机制。
设想这样一个情形:你有两段代码,每段都负责调用不同的 API 接口。当这两个 API 接口的响应都收集齐后,你打算将这些数据统一存储到数据库中。但如果其中一个 API 调用失败,你希望整个数据插入操作都不要执行。这种情况下,使用 TaskGroup 就非常合适,因为它可以确保两个协程要么都完成,要么在其中一个失败时立即取消另一个。
你可以通过调用 tg.create_task() 方法来向任务组中添加任务。如果任务组中的任何一个任务失败,组内其他所有任务都将被取消。随后,异常会以 ExceptionGroup 或 BaseExceptionGroup 的形式传递到包含任务组的协程中。
以下是一个展示如何使用任务组的示例:
代码语言:javascript复制import asyncio
async def do_something():
return 1
async def do_something_else():
return 2
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(do_something())
task2 = tg.create_task(do_something_else())
print(f’Everything done: {task1.result()}, {task2.result()}’)
asyncio.run(main())
Output:
代码语言:javascript复制Everything done: 1, 2
总结
我们已经探讨了多种处理可等待对象(awaitables)的方法,现在来回顾一下:
await
是最基本的等待操作,你可以将它放在任何可等待对象前面来执行其内部的代码。但await
不支持直接同时处理多个任务。asyncio.wait_for
与await
类似,用于处理单个可等待对象,但它允许设置超时,适用于长时间运行的任务。asyncio.wait
接受一组任务或未来对象,并允许设置超时。你可以根据需求选择返回的时机,例如所有任务完成、第一个任务完成或遇到第一个异常。asyncio.gather
接受多个可等待对象作为位置参数,并返回一个列表,列表中的顺序与传入的参数顺序相同。它还能处理那些抛出异常的任务。asyncio.as_completed
提供了一个可迭代的方式,允许你逐个处理完成的任务,而不是一次性处理所有任务。它同样支持超时参数。asyncio.TaskGroup
是 Python 3.11 新增的特性,它让你可以管理一组任务,并根据是否有任务抛出异常来决定是否全部或一个也不返回结果。
Reference
[1]
Source: https://jacobpadilla.com/articles/handling-asyncio-tasks