注:不少学过点编程语言的人,都会抱怨 Python 语言的程序执行速度慢,因此对学习和使用此语言嗤之以鼻。暂且不论程序的执行速度是否是开发者追求的唯一目标(有意对此进行争论的,请参阅人民邮电出版社出版的《编程的原则》一书),单就提升 Python 计算速度而言,并行计算是一个重要的选项。本文即为这方面的入门资料。
Python 非常适合训练机器学习模型、进行数值计算、以及快速开发验证性的模型等。使用 Python ,所需要的辅助工具和也依赖项都很少。在执行这些任务时,你还希望尽可能多地使用底层硬件,以便获得更高的速度。Python 代码的并行化可以实现这一目标。但是,使用标准的 CPython 则无法充分使用底层硬件的计算能力,因为全局解释器锁(GIL)会阻止多个线程同时运行字节码。
本文汇总了一些用 Python 代码实现并行计算的常见方法,包括:
- 基于进程的并行计算
- 使用专用库实现并行计算
- IPython 中的并行计算
- 用第三方库 Ray 实现并行计算
对于每种实现并行计算的技术,本文都列出了一些优点和缺点,并展示了代码示例,以帮助你了解其使用情况。
并行化 Python 代码
有几种常见的方法可以让 Python 代码实现并行运行——可以说成“并行化”。例如启动多个应用程序实例或启动某个脚本来并行执行程序。若不需要在并行的进程之间交换数据时,这种方法非常有用。否则,在进程之间共享数据会在聚合数据时显著降低运算性能。
在同一个进程中启动多个线程可以更有效地在作业之间共享数据。在这种情况下,基于线程的并行化可以将一些工作转移到后台。然而,CPython 实现的全局解释器锁(GIL)阻止了字节码在多个线程中同时运行。
下面示例中的函数模拟了复杂计算(旨在模拟激活函数)。
代码语言:javascript复制import math
import numpy as np
from timebudget import timebudget
iterations_count = round(1e7)
def complex_operation(input_index):
print("Complex operation. Input index: {:2d}".format(input_index))
[math.exp(i) * math.sinh(i) for i in [1] * iterations_count]
为了更直观地计算时间,将函数 complex_operation()
执行多次。将输入的数据划分为几个子集,然后对这些子集并行计算。
下面调用函数 complex_operation()
的代码中,将其多次执行( input
的区间是 0~10
),并使用 timebudget
包来度量执行时间( pip install timebudget
)
@timebudget
def run_complex_operations(operation, input):
for i in input:
operation(i)
input = range(10)
run_complex_operations(complex_operation, input)
执行上述程序,输出结果如下:
代码语言:javascript复制Complex operation. Input index: 0
Complex operation. Input index: 1
Complex operation. Input index: 2
Complex operation. Input index: 3
Complex operation. Input index: 4
Complex operation. Input index: 5
Complex operation. Input index: 6
Complex operation. Input index: 7
Complex operation. Input index: 8
Complex operation. Input index: 9
run_complex_operations took 34.495sec
如你所见,在本文中使用的笔记本电脑上执行这段代码大约花了 34.5 秒。这是没有采用任何并行化技术的执行结果,下面就让我们看看如何用并行化方式优化。
基于进程的并行计算
第一种方法是基于进程的并行。使用这种方法,可以同时(即“并发”)启动多个进程,这样,它们就可以并发地执行计算。
从 Python 3开始,标准库中已经有了实现多进程的模块 multiprocessing ,用它可以非常便捷地实现多进程进程并发。multiprocessing 模块中的 Pool 类,能自动将输入划分为若干个子集,并将这些子集分配给多个进程。
在前述代码中,使用 Pool 启动 10 个进程,完整代码如下:
代码语言:javascript复制import math
import numpy as np
from timebudget import timebudget
from multiprocessing import Pool
iterations_count = round(1e7)
def complex_operation(input_index):
print("Complex operation. Input index: {:2d}".format(input_index))
[math.exp(i) * math.sinh(i) for i in [1] * iterations_count]
@timebudget
def run_complex_operations(operation, input, pool):
pool.map(operation, input)
# for i in input:
# operation(i)
# input = range(10)
# run_complex_operations(complex_operation, input)
processes_count = 10
if __name__ == '__main__':
processes_pool = Pool(processes_count)
run_complex_operations(complex_operation, range(10), processes_pool)
每个进程同时执行 complex_operations()
函数,因此,从理论上讲,这些代码可以将总的执行时间减少 10 倍。然而,试试并非如此。以下是译者的执行结果(在翻译本文的时候,译者将所有代码重新执行,在文中显示的是译者的执行结果):
Complex operation. Input index: 0
Complex operation. Input index: 1
Complex operation. Input index: 2
Complex operation. Input index: 3
Complex operation. Input index: 4
Complex operation. Input index: 5
Complex operation. Input index: 6
Complex operation. Input index: 7
Complex operation. Input index: 8
Complex operation. Input index: 9
run_complex_operations took 10.645sec
与之前的运行结果比较,并没有将执行时间缩短 10 倍,其原因有多方面,首先要考察的是本地计算机中 CPU 的数量,它决定了最大进程数。
代码语言:javascript复制>>> import os
>>> print('Number of CPUs in the system: {}'.format(os.cpu_count()))
Number of CPUs in the system: 8
用 os
模块中的 os.cpu_count()
函数能得到本地计算机中 CPU 的数量。
另外一个导致上述程序没有如预想那样大幅度降低运算时间的原因,跟程序汇总的计算量较小也有关系。这是因为进程之间必须通过进程间通信机制实现通信,这些计算开销,对于比较小的计算任务而言,并行计算通常比 Python 编写的普通程序所执行的串行计算更慢。
总结基于进程的并行计算的优劣:
优点 | 劣势 |
---|---|
应用简单 | 性能不如 Ray (关于 Ray ,见后续) |
摆脱了 GIL 限制 | 因共享数据而降低性能 |
对结果的聚合需要手动实现 |
利用专用库
NumPy 等专用于计算的库可以在许多计算上不受 GIL 的限制,于是就能用进程和其他技术实现并行计算。下面就介绍将 NumPy 用于并行计算的方式。
为了比较使用 Numpy 与否在计算中的差异,需要编写如下函数。
代码语言:javascript复制def complex_operation_numpy(input_index):
print(f"Complex operation (numpy). Input index: {input_index:2d}")
data = np.ones(iterations_count)
np.exp(data) * np.sinh(data)
函数中使用 NumPy 的 np.exp()
和 np.sinh()
两个函数对输入数据执行计算。然后,使用进程池执行 complex_operation()
和 complex_operation_numpy()
函数各十次,以比较它们的性能。
processes_count = 10
input = range(10)
if __name__ == '__main__':
processes_pool = Pool(processes_count)
print(‘Without NumPy’)
run_complex_operations(complex_operation, input, processes_pool)
print(‘NumPy’)
run_complex_operations(complex_operation_numpy, input, processes_pool)
以下为执行结果:
代码语言:javascript复制Without Numpy
Complex operation. Input index: 0
Complex operation. Input index: 1
Complex operation. Input index: 2
Complex operation. Input index: 3
Complex operation. Input index: 4
Complex operation. Input index: 5
Complex operation. Input index: 6
Complex operation. Input index: 8
Complex operation. Input index: 7
Complex operation. Input index: 9
run_complex_operations took 11.874sec
Numpy
Complex operation (numpy). Input index: 1
Complex operation (numpy). Input index: 2
Complex operation (numpy). Input index: 3
Complex operation (numpy). Input index: 4
Complex operation (numpy). Input index: 0
Complex operation (numpy). Input index: 5
Complex operation (numpy). Input index: 6
Complex operation (numpy). Input index: 7
Complex operation (numpy). Input index: 8
Complex operation (numpy). Input index: 9
run_complex_operations took 845.87ms
NumPy 使性能得到了大幅度提升,846ms vs 12s 。之所 NumPy 能更快,其原因是其中的大多数处理都是向量化的。向量化实际上使底层代码可以“并行化”,因为该操作可以一次计算多个数组元素,而不是一次遍历一个数组元素。
NumPy 的优点 | NumPy 的劣势 |
---|---|
简单易用 | 对结果的聚合需要手动实现 |
多数 NumPy 计算不受 GIL 限制,但不是全部 | 有限的数值计算 |
支持向量化 | 自定义算法比较麻烦 |
###使用 IPython 的并行计算包
IPython 是数据科学研究者使用的一个工具,能够实现交互式操作,后来被更名为 Jupyter (参阅《跟老齐学 Python:数据分析》)。除了这些之外,它还提供了一个用于并行计算的包“IPython Parallel”,安装方法如下:
代码语言:javascript复制pip install ipyparallel
的官方网站:https://ipyparallel.readthedocs.io/en/latest/ 。
IPython Parallel 有很多优点,其中最令人神往的可能是它允许以交互的方式开发、执行和监视并行应用程序。
一种使用 IPython Parallel 的方式是参考官方文档中的样式,在 Jupyter 中直接调用。
下面演示的是另外一种方式。首先准备好代码,如下所示(文件名称 parallelprocess.py
)
import math
import numpy as np
from timebudget import timebudget
import ipyparallel as ipp
iterations_count = round(1e7)
def complex_operation(input_index):
print("Complex operation. Input index: {:2d}".format(input_index))
[math.exp(i) * math.sinh(i) for i in [1] * iterations_count]
def complex_operation_numpy(input_index):
print("Complex operation (numpy). Input index: {:2d}".format(input_index))
data = np.ones(iterations_count)
np.exp(data) * np.sinh(data)
@timebudget
def run_complex_operations(operation, input, pool):
pool.map(operation, input)
client_ids = ipp.Client()
pool = client_ids[:]
input = range(10)
print('Without NumPy')
run_complex_operations(complex_operation, input, pool)
print('NumPy')
run_complex_operations(complex_operation_numpy, input, pool)
然后打开一个终端,输入如下 ipcluster 命令(是在命令行状态):
代码语言:javascript复制% ipcluster start -n 10
2021-09-17 13:21:24.805 [IPClusterStart] Starting ipcluster with [daemonize=False]
2021-09-17 13:21:25.898 [IPClusterStart] Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
2021-09-17 13:21:56.945 [IPClusterStart] Engines appear to have started successfully
出现上述 Engines appear to have started successfully
提示之后,再打开一个终端,执行前述程序文件,如下所示:
% python parallelprocess.py
Without NumPy
run_complex_operations took 8.18ms
NumPy
run_complex_operations took 6.76ms
上述结果显示,对于使用和不使用 NumPy 两种情况下下,均用 IPython Parallel 进行并行处理,运算速度远远快于前述两种条件下的执行结果。
IPython 的有点 | IPython 的劣势 |
---|---|
支持并行和分布计算 | 适用于较短的作业内容 |
能用于 Jupyter notebook | 如果要执行过程的输出,需要额外的配置 |
配置简单 |
Ray
Ray 是一款实现并行和分布计算的第三方库,它具有快速、简单的特点,可以轻松地扩展应用程序,并适用于最先进的机器学习库。使用 Ray,还是像以往那样运行 Python 代码,只需要做很小的改动。
下面会简要介绍 Ray 是如何轻松地并行化普通的 Python 代码的,但需要注意的是,Ray 及其生态系统也可以轻松地并行化其他库,如 scikit-learn,XGBoost, LightGBM, PyTorch, 等等。
首先要安装 Ray :
代码语言:javascript复制pip install ray
然后在前面的 parallelprocess.py
文件基础上进行修改,最后的完整代码如下(并命名为 rayprocess.py
文件)
import math
import numpy as np
from timebudget import timebudget
import ray
iterations_count = round(1e7)
@ray.remote
def complex_operation(input_index):
print(f"Complex operation. Input index: {input_index:2d}")
[math.exp(i) * math.sinh(i) for i in [1] * iterations_count]
@ray.remote
def complex_operation_numpy(input_index):
print(f"Complex operation (numpy). Input index: {input_index:2d}")
data = np.ones(iterations_count)
np.exp(data) * np.sinh(data)
@timebudget
def run_complex_operations(operation, input):
ray.get([operation.remote(i) for i in input])
ray.init()
input = range(10)
print('Without NumPy')
run_complex_operations(complex_operation, input)
print('NumPy')
run_complex_operations(complex_operation_numpy, input)
其中 ray.init()
的作用是启动所有相关的 Ray 进程。默认情况下,Ray 为每个 CPU 核创建一个进程。如果希望在集群上运行 Ray ,则需要传入一个类似于ray.init(address='insertAddressHere')
的集群地址。
用装饰器 @ray.remote
装饰一个普通的 Python 函数,从而实现创建一个 Ray 任务。这个操作可以在笔记本电脑 CPU 核之间(或 Ray 集群)实现任务调度。
在最后一步中,以 @timebudget
装饰 run_complex_operations()
函数,在 Ray 的调用时间内执行这些函数。
执行此程序后,会得到一个类似于下面的输出:
代码语言:javascript复制% python rayprocess.py
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/ray/_private/services.py:238: UserWarning: Not all Ray Dashboard dependencies were found. To use the dashboard please install Ray using `pip install ray[default]`. To disable this message, set RAY_DISABLE_IMPORT_WARNING env var to '1'.
warnings.warn(warning_message)
Without NumPy
(pid=9351) Complex operation. Input index: 7
(pid=9352) Complex operation. Input index: 5
(pid=9353) Complex operation. Input index: 6
(pid=9354) Complex operation. Input index: 1
(pid=9356) Complex operation. Input index: 2
(pid=9358) Complex operation. Input index: 0
(pid=9355) Complex operation. Input index: 3
(pid=9357) Complex operation. Input index: 4
(pid=9351) Complex operation. Input index: 8
(pid=9358) Complex operation. Input index: 9
run_complex_operations took 12.731sec
NumPy
(pid=9351) Complex operation (numpy). Input index: 1
(pid=9352) Complex operation (numpy). Input index: 7
(pid=9353) Complex operation (numpy). Input index: 3
(pid=9354) Complex operation (numpy). Input index: 5
(pid=9356) Complex operation (numpy). Input index: 2
(pid=9358) Complex operation (numpy). Input index: 0
(pid=9355) Complex operation (numpy). Input index: 4
(pid=9357) Complex operation (numpy). Input index: 6
(pid=9351) Complex operation (numpy). Input index: 9
(pid=9354) Complex operation (numpy). Input index: 8
run_complex_operations took 858.52ms
结果中显示了对于当前 Ray 任务而言的、使用和不使用 NumPy 的运行时间。这里似乎没有体现出 Ray 相对于前述其他并行计算方法的优势,这是因为我们在上面演示的属于小量的计算任务,如果遇到更大的业务,Ray 的优势就会非常显著,如下图所示。
在下面的表格中,对 Ray 给予简要总结。
Ray 的优点 | Ray 的劣势 |
---|---|
支持并行计算和分布计算 | 针对更大型的业务才会有显著效果 |
可以在 Jupyter 上使用 | |
能够应用于现有的常见机器学习和神经网络库 | |
整合了多个 Ray 库,如 RLlib(用于强化学习)、Ray Tune(超参数调优)、Ray Serve(可伸缩模式) |
结论
有多种方法可以让 Python 程序实现并行化执行,并且本文还介绍了它们的一些优缺点。并行化的代码通常会带来一些开销;并行化的好处在较大的业务中更明显,而不是在本文中的简短计算中。
特别是在处理典型的基于人工智能的任务时,你必须对你的模型进行重复的微调。在这种情况下,Ray 提供了最好的支持,因为它拥有丰富的生态系统、自动伸缩、容错和远程服务等能力。
参考文献
https://www.anyscale.com/blog/parallelizing-python-code