concurrent.futures --- 启动并行任务 — Python 3.7.13 文档
- concurrent.futures 模块提供异步执行可调用对象高层接口
- 异步执行可以由 ThreadPoolExecutor 使用线程或由 ProcessPoolExecutor 使用单独的进程来实现。 两者都是实现抽像类 Executor 定义的接口,都是Executor的子类
- class
concurrent.futures.**Executor
** - ThreadPoolExecutor 线程池
```python
代码语言:txt复制import concurrent.futures
代码语言:txt复制import urllib.request
代码语言:txt复制URLS = ['http://www.foxnews.com/',
代码语言:txt复制 'http://www.cnn.com/',
代码语言:txt复制 'http://europe.wsj.com/',
代码语言:txt复制 'http://www.bbc.co.uk/',
代码语言:txt复制 'http://some-made-up-domain.com/']
代码语言:txt复制# Retrieve a single page and report the URL and contents
代码语言:txt复制def load_url(url, timeout):
代码语言:txt复制 with urllib.request.urlopen(url, timeout=timeout) as conn:
代码语言:txt复制 return conn.read()
代码语言:txt复制# We can use a with statement to ensure threads are cleaned up promptly
代码语言:txt复制with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
代码语言:txt复制 # Start the load operations and mark each future with its URL
代码语言:txt复制 future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
代码语言:txt复制 for future in concurrent.futures.as_completed(future_to_url):
代码语言:txt复制 url = future_to_url[future]
代码语言:txt复制 try:
代码语言:txt复制 data = future.result()
代码语言:txt复制 except Exception as exc:
代码语言:txt复制 print('%r generated an exception: %s' % (url, exc))
代码语言:txt复制 else:
代码语言:txt复制 print('%r page is %d bytes' % (url, len(data)))
代码语言:txt复制```
- ProcessPoolExecutor 进程池
- 使用进程池来实现异步执行调用。 ProcessPoolExecutor 使用 multiprocessing 回避 Global Interpreter Lock 但也意味着只可以处理和返回可序列化的对象。
__main__
模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor 不可以工作在交互式解释器中- 从可调用对象中调用 Executor 或 Future 的方法提交给 ProcessPoolExecutor 会导致死锁。
- class
concurrent.futures.**ProcessPoolExecutor**
(max_workers=None, mp_context=None, initializer=None, initargs=())- 异步执行调用的 Executor 子类使用一个最多有 max_workers 个进程的进程池。 如果 max_workers 为
None
或未给出,它将默认为机器的处理器个数。 如果 max_workers 小于等于0
,则将引发 ValueError。 在 Windows 上,max_workers 必须小于等于61
,否则将引发 ValueError。 如果 max_workers 为None
,则所选择的默认最多为61
,即使存在更多处理器。 - mp_context 可以是一个多进程上下文或是 None。 它将被用来启动工作者。 如果 mp_context 为
None
或未给出,将使用默认的多进程上下文。 - initializer 是在每个工作者进程开始处调用的一个可选可调用对象。 initargs 是传递给初始化器的元组参数。任何向池提交更多工作的尝试, initializer 都将引发一个异常,当前所有等待的工作都会引发一个
BrokenProcessPool
。 - 在 3.3 版更改: 如果其中一个工作进程被突然终止,
BrokenProcessPool
就会马上触发。 可预计的行为没有定义,但执行器上的操作或它的 future 对象会被冻结或死锁。 - 在 3.7 版更改: 添加 mp_context 参数允许用户控制由进程池创建给工作者进程的开始方法 。加入 initializer 和initargs 参数。
- 异步执行调用的 Executor 子类使用一个最多有 max_workers 个进程的进程池。 如果 max_workers 为
```python
代码语言:txt复制import concurrent.futures
代码语言:txt复制import math
代码语言:txt复制PRIMES = [
代码语言:txt复制 112272535095293,
代码语言:txt复制 112582705942171,
代码语言:txt复制 112272535095293,
代码语言:txt复制 115280095190773,
代码语言:txt复制 115797848077099,
代码语言:txt复制 1099726899285419]
代码语言:txt复制def is_prime(n):
代码语言:txt复制 if n % 2 == 0:
代码语言:txt复制 return False
代码语言:txt复制 sqrt_n = int(math.floor(math.sqrt(n)))
代码语言:txt复制 for i in range(3, sqrt_n 1, 2):
代码语言:txt复制 if n % i == 0:
代码语言:txt复制 return False
代码语言:txt复制 return True
代码语言:txt复制def main():
代码语言:txt复制 with concurrent.futures.ProcessPoolExecutor() as executor:
代码语言:txt复制 for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
代码语言:txt复制 print('%d is prime: %s' % (number, prime))
代码语言:txt复制if __name__ == '__main__':
代码语言:txt复制```
- Future 对象
- Future 类将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建。