在Python中优雅地用多进程:进程池 Pool、管道通信 Pipe、队列通信 Queue、共享内存 Manager Value

2023-10-11 17:36:17 浏览数 (2)

Python 自带的多进程库 multiprocessing 可实现多进程。我想用这些短例子示范如何优雅地用多线程。中文网络上,有些人只是翻译了旧版的 Python 官网的多进程文档。而我这篇文章会额外讲一讲下方加粗部分的内容。

  • 创建进程 Process,fork 直接继承资源,所以初始化更快,spawn 只继承必要的资源,所以更省内存,「程序的入口」 if name == main
  • 进程池 Pool,Pool 只能接受一个参数,但有办法传入多个
  • 管道通信 Pipe,最基本的功能,运行速度快
  • 队列通信 Queue,有最常用的功能,运行速度稍慢
  • 共享内存 Manager Value,Python3.9 新特性 真正的共享内存 shared_memory

如下所示,中文网络上一些讲 Python 多进程的文章,很多重要的东西没讲(毕竟只是翻译了 Python 官网的多进程旧版文档)。上方的加粗部分他们没讲,但是这是做多进程总需要知道的内容。

  • 若你无法流畅阅读有专人更新的 Python 官网多进程英文文档 ,那么姑且可看写于 2019 不保证更新的南山南:一篇文章搞定 Python 多进程 (全)
  • 知我莫言:谈谈 python 的 GIL、多线程、多进程 ,点赞多但旧,写于 2016,你还不如看我下方写的「简述何为多线程 threading 与多进程 processing」

1.多线程与多进程的区别

多线程 threading: 一个人有与异性聊天和看剧两件事要做。单线程的她可以看完剧再去聊天,但这样子可能就没人陪她聊天了「哼,发消息不回」。我们把她看成一个 CPU 核心,为她开起多线程——先看一会剧,偶尔看看新消息,在两件事(线程)间来回切换。多线程:单个 CPU 核心可以同时做几件事,不至于卡在某一步傻等着。

用处:爬取网站信息(爬虫),等待多个用户输入

多进程 processing: 一个人有很多砖需要搬,他领取手套、推车各种物资(向系统申请了资源)然后开始搬砖。然而他身边有很多人,我们让这些人去帮他!(一核有难,八核围观)。于是他们做了分工,砖很快就搬完了。多进程让多个 CPU 核心可以一起做事,不至于只有一人干活而其他人傻站着。

用处:进行高性能计算。只有多进程方案设计合理,才能加速计算。

2. 全局锁与多进程

为何在 Python 里用多进程这么麻烦? 因为 Python 的线程是操作系统线程,因此要有 Python 全局解释器锁。一个 python 解释器进程内有一条主线程,以及多条用户程序的执行线程。即使在多核 CPU 平台上,由于 GIL 的存在,所以禁止多线程的并行执行。——来自百度百科词条 全局解释器锁。发展历程:

  1. Python 全局锁。Python 3.2 的时候更新过 GIL。在我小时候,由于 Python GIL 的存在(全局解释器锁 Global Interpreter Lock) ,此时 Python 无法靠自己实现多进程
  2. 外部多进程通信。Python3.5。在 2015 年,要么用 Python 调用 C 语言(如 Numpy 此类用其他语言在底层实现多进程的第三方库),要么需要在外部代码(MPI 2015)
  3. 内置多进程通信。Python 3.6 才让 multiprocessing 逐渐发展成一个能用的 Python 内置多进程库,可以进行进程间的通信,以及有限的内存共享
  4. 共享内存。Python 3.8 在 2019 年增加了新特性 shared_memory

3.子进程 Process

多进程的主进程一定要写在程序入口 if name ==‘main’: 内部

代码语言:javascript复制
def function1(id):  # 这里是子进程
    print(f'id {id}')

def run__process():  # 这里是主进程
    from multiprocessing import Process
    process = [mp.Process(target=function1, args=(1,)),
               mp.Process(target=function1, args=(2,)), ]
    [p.start() for p in process]  # 开启了两个进程
    [p.join() for p in process]   # 等待两个进程依次结束

# run__process()  # 主线程不建议写在 if外部。由于这里的例子很简单,你强行这么做可能不会报错
if __name__ =='__main__':
    run__process()  # 正确做法:主线程只能写在 if内部

尽管在这个简单的例子里,把主进程 run__process() 写在程序入口 if 外部不会有报错。但是你最好还是按我要求去做。详细解释的内容过长,我写在→「Python 程序入口有重要功能(多线程)而非编程习惯」

上面的例子只是用 Process 开启了多进程,不涉及进程通信。当我准备把一个串行任务编排成多进程时,我还需要多进程通信。进程池 Pool 可以让主程序获得子进程的计算结果(不太灵活,适合简单任务),管道 Pipe 队列 Queue 等等 可以让进程之间进行通信(足够灵活)。共享值 Value 共享数组 Array 共享内容 shared_memory(Python 3.6 Python3.9 的新特性,还不太成熟)下面开讲。

Python 多进程可以选择两种创建进程的方式,spawn 与 fork。分支创建:fork 会直接复制一份自己给子进程运行,并把自己所有资源的 handle 都让子进程继承,因而创建速度很快,但更占用内存资源。分产创建:spawn 只会把必要的资源的 handle 交给子进程,因此创建速度稍慢。详细解释请看 Stack OverFlow multiprocessing fork vs spawn 。(分产 spawn 是我自己随便翻译的,有更好的翻译请推荐。我绝不把 handle 翻译成句柄)

代码语言:javascript复制
multiprocessing.set_start_method('spawn')  # default on WinOS or MacOS
multiprocessing.set_start_method('fork')   # default on Linux (UnixOS)

请注意:我说 分支 fork 在初始化创建多进程的时候比 分产 spawn 快,而不是说高性能计算会比较快。通常高性能计算需要让程序运行很久,因此为了节省内存以及进程安全,我建议选择 spawn。

4.进程池 Pool

几乎 Python 多进程代码都需要你明明白白地调用 Process。而进程池 Pool 会自动帮我们管理子进程。Python 的 Pool 不方便传入多个参数,我这里提供两个解决思路:

思路 1:函数 func2 需要传入多个参数,现在把它改成一个参数,无论你直接让 args 作为一个元组 tuple、词典 dict、类 class 都可以

思路 2:使用 function.partial Passing multiple parameters to pool.map() function in Python。这个不灵活的方法固定了其他参数,且需要导入 Python 的内置库,我不推荐

代码语言:javascript复制
import time

def func2(args):  # multiple parameters (arguments)
    # x, y = args
    x = args[0]  # write in this way, easier to locate errors
    y = args[1]  # write in this way, easier to locate errors

    time.sleep(1)  # pretend it is a time-consuming operation
    return x - y


def run__pool():  # main process
    from multiprocessing import Pool

    cpu_worker_num = 3
    process_args = [(1, 1), (9, 9), (4, 4), (3, 3), ]

    print(f'| inputs:  {process_args}')
    start_time = time.time()
    with Pool(cpu_worker_num) as p:
        outputs = p.map(func2, process_args)
    print(f'| outputs: {outputs}    TimeUsed: {time.time() - start_time:.1f}    n')

    '''Another way (I don't recommend)
    Using 'functions.partial'. See https://stackoverflow.com/a/25553970/9293137
    from functools import partial
    # from functools import partial
    # pool.map(partial(f, a, b), iterable)
    '''

if __name__ =='__main__':
    run__pool()

5.管道 Pipe

顾名思义,管道 Pipe 有两端,因而 main_conn, child_conn = Pipe() ,管道的两端可以放在主进程或子进程内,我在实验中没发现主管道口 main_conn 和子管道口 child_conn 的区别。两端可以同时放进去东西,放进去的对象都经过了深拷贝:用 conn.send() 在一端放入,用 conn.recv() 另一端取出,管道的两端可以同时给多个进程。conn 是 connect 的缩写。

代码语言:javascript复制
import time

def func_pipe1(conn, p_id):
    print(p_id)

    time.sleep(0.1)
    conn.send(f'{p_id}_send1')
    print(p_id, 'send1')

    time.sleep(0.1)
    conn.send(f'{p_id}_send2')
    print(p_id, 'send2')

    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)

    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)


def func_pipe2(conn, p_id):
    print(p_id)

    time.sleep(0.1)
    conn.send(p_id)
    print(p_id, 'send')
    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)


def run__pipe():
    from multiprocessing import Process, Pipe

    conn1, conn2 = Pipe()

    process = [Process(target=func_pipe1, args=(conn1, 'I1')),
               Process(target=func_pipe2, args=(conn2, 'I2')),
               Process(target=func_pipe2, args=(conn2, 'I3')), ]

    [p.start() for p in process]
    print('| Main', 'send')
    conn1.send(None)
    print('| Main', conn2.recv())
    [p.join() for p in process]

if __name__ =='__main__':
    run__pipe()

如果追求运行更快,那么最好使用管道 Pipe 而非下面介绍的队列 Queue,详细请移步 Python pipes and queues performance ↓

So yes, pipes are faster than queues - but only by 1.5 to 2 times, what did surprise me was that Python 3 is MUCH slower than Python 2 - most other tests I have done have been a bit up and down (as long as it is Python 3.4 - Python 3.2 seems to be a bit of a dog - especially for memory usage).

曾经用到 Python 多线程队列功能写过一个实际例子 ↓,若追求极致性能,还可以把里面的 Queue 改为 Pipe。

Pipe 还有 duplex 参数poll() 方法 需要了解。默认情况下 duplex==True,若不开启双向管道,那么传数据的方向只能 conn1 ← conn2 。conn2.poll()==True 意味着可以马上使用 conn2.recv() 拿到传过来的数据。conn2.poll(n) 会让它等待 n 秒钟再进行查询。

代码语言:javascript复制
from multiprocessing import Pipe

conn1, conn2 = Pipe(duplex=True)  # 开启双向管道,管道两端都能存取数据。默认开启
# 
conn1.send('A')
print(conn1.poll())  # 会print出 False,因为没有东西等待conn1去接收
print(conn2.poll())  # 会print出 True ,因为conn1 send 了个 'A' 等着conn2 去接收
print(conn2.recv(), conn2.poll(2))  # 会等待2秒钟再开始查询,然后print出 'A False'

尽管我下面的例子不会报错,但这是因为它过于简单,没有真的开多线程去跑,也没有写在程序入口的 if 内部。很多时候 Pipe 运行会快一点,但是它的功能太少了,得用 Queue。最明显的一个区别是:

代码语言:javascript复制
conn1, conn2 = multiprocessing.Pipe()  # 管道有两端,某一端放入的东西,只能在另一端拿到
queue = multiprocessing.Queue()        # 队列只有一个,放进去的东西可以在任何地方拿到。

6. 队列 Queue

可以 import queue 调用 Python 内置的队列,在多线程里也有队列 from multiprocessing import Queue。下面提及的都是多线程的队列。

队列 Queue 的功能与前面的管道 Pipe 非常相似:无论主进程或子进程,都能访问到队列,放进去的对象都经过了深拷贝。不同的是:管道 Pipe 只有两个断开,而队列 Queue 有基本的队列属性,更加灵活,详细请移步 Stack Overflow Multiprocessing - Pipe vs Queue。

代码语言:javascript复制
def func1(i):
    time.sleep(1)
    print(f'args {i}')

def run__queue():
    from multiprocessing import Process, Queue

    queue = Queue(maxsize=4)  # the following attribute can call in anywhere
    queue.put(True)
    queue.put([0, None, object])  # you can put deepcopy thing
    queue.qsize()  # the length of queue
    print(queue.get())  # First In First Out
    print(queue.get())  # First In First Out
    queue.qsize()  # the length of queue

    process = [Process(target=func1, args=(queue,)),
               Process(target=func1, args=(queue,)), ]
    [p.start() for p in process]
    [p.join() for p in process]

if __name__ =='__main__':
    run__queue()

除了上面提及的 Python 多线程,读取多个 (海康 大华) 网络摄像头的视频流 ,我自己写的开源的强化学习库:小雅 ElegantRL 也使用了 Queue 进行多 CPU 多 GPU 训练,为了提速,我已经把 Queue 改为 Pipe。

7. 共享内存 Manager

为了在 Python 里面实现多进程通信,上面提及的 Pipe Queue 把需要通信的信息从内存里深拷贝了一份给其他线程使用(需要分发的线程越多,其占用的内存越多)。而共享内存会由解释器负责维护一块共享内存(而不用深拷贝),这块内存每个进程都能读取到,读写的时候遵守管理(因此不要以为用了共享内存就一定变快)。

Manager 可以创建一块共享的内存区域,但是存入其中的数据需要按照特定的格式,Value 可以保存数值,Array 可以保存数组,如下。这里不推荐认为自己写代码能力弱的人尝试。下面这里例子来自 Python 官网的 Document。

代码语言:javascript复制
# https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing array#multiprocessing.Array

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

我删掉了 Python 3.8 的 shared_momery 介绍,这部分有 Bug

下文来自 Stack Overflow,问题 Shared memory in multiprocessing 下 thuzhf 的回答 2021-01 :

For those interested in using Python3.8 's shared_memory module, it still has a bug which hasn’t been fixed and is affecting Python3.8/3.9/3.10 by now (2021-01-15). The bug is about resource tracker destroys shared memory segments when other processes should still have valid access. So take care if you use it in your code.

PyTorch 也有自带的多进程 torch.multiprocessing

How to share a list of tensors in PyTorch multiprocessing? rozyang 的回答 ,非常简单,核心代码如下:

代码语言:javascript复制
import torch.multiprocessing as mp
tensor.share_memory_()

正文已经结束,我把部分 multiprocessing 的代码都放在 github。希望大家能写出让自己满意的多线程。我设计高性能的多进程时,会遵守以下规则:

  • 尽可能少传一点数据
  • 尽可能减少主线程的负担
  • 尽可能不让某个进程傻等着
  • 尽可能减少进程间通信的频率

开源的深度强化学习 (DRL) 算法库 伯克利的 Ray-project Rllib 训练快,但太复杂,OpenAI 的 SpinningUp 简单,但不快。刚好我又懂一点多进程、Numpy、深度学习框架、深度强化学习这些双层优化算法,所以我觉得自己也写一个 DRL 库难度不大,于是开源了强化学习库:小雅 ElegantRL。让别人好好看看,DRL 库挺简单的一个东西弄那么复杂做什么?

尽管这个库会一直保持框架小巧、代码优雅来方便入门深度强化学习的人,但 ElegantRL 却把训练效率放在首位(正因如此,ElegantRL 与 SpinningUp 的定位不同),所以我需要用 Python 的多进程来加速 DRL 的训练。因而顺便写【在 Python 中优雅地用多进程】这篇东西。

0 人点赞