线程池,进程池

2023-05-04 22:43:19 浏览数 (2)

concurrent.futures --- 启动并行任务 — Python 3.7.13 文档

  • concurrent.futures 模块提供异步执行可调用对象高层接口
  • 异步执行可以由 ThreadPoolExecutor 使用线程或由 ProcessPoolExecutor 使用单独的进程来实现。 两者都是实现抽像类 Executor 定义的接口,都是Executor的子类
  • class concurrent.futures.**Executor**
  • ThreadPoolExecutor 线程池
代码语言:txt复制
```python
代码语言:txt复制
import concurrent.futures
代码语言:txt复制
import urllib.request
代码语言:txt复制
URLS = ['http://www.foxnews.com/',
代码语言:txt复制
        'http://www.cnn.com/',
代码语言:txt复制
        'http://europe.wsj.com/',
代码语言:txt复制
        'http://www.bbc.co.uk/',
代码语言:txt复制
        'http://some-made-up-domain.com/']
代码语言:txt复制
# Retrieve a single page and report the URL and contents
代码语言:txt复制
def load_url(url, timeout):
代码语言:txt复制
    with urllib.request.urlopen(url, timeout=timeout) as conn:
代码语言:txt复制
        return conn.read()
代码语言:txt复制
# We can use a with statement to ensure threads are cleaned up promptly
代码语言:txt复制
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
代码语言:txt复制
    # Start the load operations and mark each future with its URL
代码语言:txt复制
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
代码语言:txt复制
    for future in concurrent.futures.as_completed(future_to_url):
代码语言:txt复制
        url = future_to_url[future]
代码语言:txt复制
        try:
代码语言:txt复制
            data = future.result()
代码语言:txt复制
        except Exception as exc:
代码语言:txt复制
            print('%r generated an exception: %s' % (url, exc))
代码语言:txt复制
        else:
代码语言:txt复制
            print('%r page is %d bytes' % (url, len(data)))
代码语言:txt复制
```
  • ProcessPoolExecutor 进程池
    • 使用进程池来实现异步执行调用。 ProcessPoolExecutor 使用 multiprocessing 回避 Global Interpreter Lock 但也意味着只可以处理和返回可序列化的对象。
    • __main__ 模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor 不可以工作在交互式解释器中
    • 从可调用对象中调用 Executor 或 Future 的方法提交给 ProcessPoolExecutor 会导致死锁。
    • class concurrent.futures.**ProcessPoolExecutor**(max_workers=Nonemp_context=Noneinitializer=Noneinitargs=())
      • 异步执行调用的 Executor 子类使用一个最多有 max_workers 个进程的进程池。 如果 max_workers 为 None 或未给出,它将默认为机器的处理器个数。 如果 max_workers 小于等于 0,则将引发 ValueError。 在 Windows 上,max_workers 必须小于等于 61,否则将引发 ValueError。 如果 max_workers 为 None,则所选择的默认最多为 61,即使存在更多处理器。
      • mp_context 可以是一个多进程上下文或是 None。 它将被用来启动工作者。 如果 mp_context 为 None 或未给出,将使用默认的多进程上下文。
      • initializer 是在每个工作者进程开始处调用的一个可选可调用对象。 initargs 是传递给初始化器的元组参数。任何向池提交更多工作的尝试, initializer 都将引发一个异常,当前所有等待的工作都会引发一个 BrokenProcessPool
      • 在 3.3 版更改: 如果其中一个工作进程被突然终止,BrokenProcessPool 就会马上触发。 可预计的行为没有定义,但执行器上的操作或它的 future 对象会被冻结或死锁。
      • 在 3.7 版更改: 添加 mp_context 参数允许用户控制由进程池创建给工作者进程的开始方法 。加入 initializer 和initargs 参数。
代码语言:txt复制
```python
代码语言:txt复制
import concurrent.futures
代码语言:txt复制
import math
代码语言:txt复制
PRIMES = [
代码语言:txt复制
    112272535095293,
代码语言:txt复制
    112582705942171,
代码语言:txt复制
    112272535095293,
代码语言:txt复制
    115280095190773,
代码语言:txt复制
    115797848077099,
代码语言:txt复制
    1099726899285419]
代码语言:txt复制
def is_prime(n):
代码语言:txt复制
    if n % 2 == 0:
代码语言:txt复制
        return False
代码语言:txt复制
    sqrt_n = int(math.floor(math.sqrt(n)))
代码语言:txt复制
    for i in range(3, sqrt_n   1, 2):
代码语言:txt复制
        if n % i == 0:
代码语言:txt复制
            return False
代码语言:txt复制
    return True
代码语言:txt复制
def main():
代码语言:txt复制
    with concurrent.futures.ProcessPoolExecutor() as executor:
代码语言:txt复制
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
代码语言:txt复制
            print('%d is prime: %s' % (number, prime))
代码语言:txt复制
if __name__ == '__main__':
代码语言:txt复制
```
  • Future 对象
    • Future 类将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建。

0 人点赞