用 Python 实现并行计算

2021-10-11 17:09:46 浏览数 (1)

:不少学过点编程语言的人,都会抱怨 Python 语言的程序执行速度慢,因此对学习和使用此语言嗤之以鼻。暂且不论程序的执行速度是否是开发者追求的唯一目标(有意对此进行争论的,请参阅人民邮电出版社出版的《编程的原则》一书),单就提升 Python 计算速度而言,并行计算是一个重要的选项。本文即为这方面的入门资料。


Python 非常适合训练机器学习模型、进行数值计算、以及快速开发验证性的模型等。使用 Python ,所需要的辅助工具和也依赖项都很少。在执行这些任务时,你还希望尽可能多地使用底层硬件,以便获得更高的速度。Python 代码的并行化可以实现这一目标。但是,使用标准的 CPython 则无法充分使用底层硬件的计算能力,因为全局解释器锁(GIL)会阻止多个线程同时运行字节码。

本文汇总了一些用 Python 代码实现并行计算的常见方法,包括:

  • 基于进程的并行计算
  • 使用专用库实现并行计算
  • IPython 中的并行计算
  • 用第三方库 Ray 实现并行计算

对于每种实现并行计算的技术,本文都列出了一些优点和缺点,并展示了代码示例,以帮助你了解其使用情况。

并行化 Python 代码

有几种常见的方法可以让 Python 代码实现并行运行——可以说成“并行化”。例如启动多个应用程序实例或启动某个脚本来并行执行程序。若不需要在并行的进程之间交换数据时,这种方法非常有用。否则,在进程之间共享数据会在聚合数据时显著降低运算性能。

在同一个进程中启动多个线程可以更有效地在作业之间共享数据。在这种情况下,基于线程的并行化可以将一些工作转移到后台。然而,CPython 实现的全局解释器锁(GIL)阻止了字节码在多个线程中同时运行。

下面示例中的函数模拟了复杂计算(旨在模拟激活函数)。

代码语言:javascript复制
import math
import numpy as np
from timebudget import timebudget

iterations_count = round(1e7)

def complex_operation(input_index):
    print("Complex operation. Input index: {:2d}".format(input_index))
    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

为了更直观地计算时间,将函数 complex_operation() 执行多次。将输入的数据划分为几个子集,然后对这些子集并行计算。

下面调用函数 complex_operation() 的代码中,将其多次执行( input 的区间是 0~10 ),并使用 timebudget 包来度量执行时间( pip install timebudget

代码语言:javascript复制
@timebudget
def run_complex_operations(operation, input):
    for i in input:
        operation(i)

input = range(10)
run_complex_operations(complex_operation, input)

执行上述程序,输出结果如下:

代码语言:javascript复制
Complex operation. Input index:  0
Complex operation. Input index:  1
Complex operation. Input index:  2
Complex operation. Input index:  3
Complex operation. Input index:  4
Complex operation. Input index:  5
Complex operation. Input index:  6
Complex operation. Input index:  7
Complex operation. Input index:  8
Complex operation. Input index:  9
run_complex_operations took 34.495sec

如你所见,在本文中使用的笔记本电脑上执行这段代码大约花了 34.5 秒。这是没有采用任何并行化技术的执行结果,下面就让我们看看如何用并行化方式优化。

基于进程的并行计算

第一种方法是基于进程的并行。使用这种方法,可以同时(即“并发”)启动多个进程,这样,它们就可以并发地执行计算。

从 Python 3开始,标准库中已经有了实现多进程的模块 multiprocessing ,用它可以非常便捷地实现多进程进程并发。multiprocessing 模块中的 Pool 类,能自动将输入划分为若干个子集,并将这些子集分配给多个进程。

在前述代码中,使用 Pool 启动 10 个进程,完整代码如下:

代码语言:javascript复制
import math
import numpy as np
from timebudget import timebudget
from multiprocessing import Pool

iterations_count = round(1e7)

def complex_operation(input_index):
    print("Complex operation. Input index: {:2d}".format(input_index))
    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

@timebudget
def run_complex_operations(operation, input, pool):
    pool.map(operation, input)
    # for i in input:
    #     operation(i)

# input = range(10)
# run_complex_operations(complex_operation, input)

processes_count = 10

if __name__ == '__main__':
    processes_pool = Pool(processes_count)
    run_complex_operations(complex_operation, range(10), processes_pool)

每个进程同时执行 complex_operations() 函数,因此,从理论上讲,这些代码可以将总的执行时间减少 10 倍。然而,试试并非如此。以下是译者的执行结果(在翻译本文的时候,译者将所有代码重新执行,在文中显示的是译者的执行结果):

代码语言:javascript复制
Complex operation. Input index:  0
Complex operation. Input index:  1
Complex operation. Input index:  2
Complex operation. Input index:  3
Complex operation. Input index:  4
Complex operation. Input index:  5
Complex operation. Input index:  6
Complex operation. Input index:  7
Complex operation. Input index:  8
Complex operation. Input index:  9
run_complex_operations took 10.645sec

与之前的运行结果比较,并没有将执行时间缩短 10 倍,其原因有多方面,首先要考察的是本地计算机中 CPU 的数量,它决定了最大进程数。

代码语言:javascript复制
>>> import os
>>> print('Number of CPUs in the system: {}'.format(os.cpu_count()))
Number of CPUs in the system: 8

os 模块中的 os.cpu_count() 函数能得到本地计算机中 CPU 的数量。

另外一个导致上述程序没有如预想那样大幅度降低运算时间的原因,跟程序汇总的计算量较小也有关系。这是因为进程之间必须通过进程间通信机制实现通信,这些计算开销,对于比较小的计算任务而言,并行计算通常比 Python 编写的普通程序所执行的串行计算更慢。

总结基于进程的并行计算的优劣:

优点

劣势

应用简单

性能不如 Ray (关于 Ray ,见后续)

摆脱了 GIL 限制

因共享数据而降低性能

对结果的聚合需要手动实现

利用专用库

NumPy 等专用于计算的库可以在许多计算上不受 GIL 的限制,于是就能用进程和其他技术实现并行计算。下面就介绍将 NumPy 用于并行计算的方式。

为了比较使用 Numpy 与否在计算中的差异,需要编写如下函数。

代码语言:javascript复制
def complex_operation_numpy(input_index):
      print(f"Complex operation (numpy). Input index: {input_index:2d}")

      data = np.ones(iterations_count)
      np.exp(data) * np.sinh(data)

函数中使用 NumPy 的 np.exp()np.sinh() 两个函数对输入数据执行计算。然后,使用进程池执行 complex_operation()complex_operation_numpy() 函数各十次,以比较它们的性能。

代码语言:javascript复制
processes_count = 10
input = range(10)

if __name__ == '__main__':
    processes_pool = Pool(processes_count)
    print(‘Without NumPy’)
    run_complex_operations(complex_operation, input, processes_pool)
    print(‘NumPy’)
    run_complex_operations(complex_operation_numpy, input, processes_pool)

以下为执行结果:

代码语言:javascript复制
Without Numpy
Complex operation. Input index:  0
Complex operation. Input index:  1
Complex operation. Input index:  2
Complex operation. Input index:  3
Complex operation. Input index:  4
Complex operation. Input index:  5
Complex operation. Input index:  6
Complex operation. Input index:  8
Complex operation. Input index:  7
Complex operation. Input index:  9
run_complex_operations took 11.874sec
Numpy
Complex operation (numpy). Input index:  1
Complex operation (numpy). Input index:  2
Complex operation (numpy). Input index:  3
Complex operation (numpy). Input index:  4
Complex operation (numpy). Input index:  0
Complex operation (numpy). Input index:  5
Complex operation (numpy). Input index:  6
Complex operation (numpy). Input index:  7
Complex operation (numpy). Input index:  8
Complex operation (numpy). Input index:  9
run_complex_operations took 845.87ms

NumPy 使性能得到了大幅度提升,846ms vs 12s 。之所 NumPy 能更快,其原因是其中的大多数处理都是向量化的。向量化实际上使底层代码可以“并行化”,因为该操作可以一次计算多个数组元素,而不是一次遍历一个数组元素。

NumPy 的优点

NumPy 的劣势

简单易用

对结果的聚合需要手动实现

多数 NumPy 计算不受 GIL 限制,但不是全部

有限的数值计算

支持向量化

自定义算法比较麻烦

###使用 IPython 的并行计算包

IPython 是数据科学研究者使用的一个工具,能够实现交互式操作,后来被更名为 Jupyter (参阅《跟老齐学 Python:数据分析》)。除了这些之外,它还提供了一个用于并行计算的包“IPython Parallel”,安装方法如下:

代码语言:javascript复制
pip install ipyparallel

的官方网站:https://ipyparallel.readthedocs.io/en/latest/ 。

IPython Parallel 有很多优点,其中最令人神往的可能是它允许以交互的方式开发、执行和监视并行应用程序。

一种使用 IPython Parallel 的方式是参考官方文档中的样式,在 Jupyter 中直接调用。

下面演示的是另外一种方式。首先准备好代码,如下所示(文件名称 parallelprocess.py

代码语言:javascript复制
import math
import numpy as np
from timebudget import timebudget
import ipyparallel as ipp

iterations_count = round(1e7)

def complex_operation(input_index):
    print("Complex operation. Input index: {:2d}".format(input_index))

    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

def complex_operation_numpy(input_index):
    print("Complex operation (numpy). Input index: {:2d}".format(input_index))

    data = np.ones(iterations_count)
    np.exp(data) * np.sinh(data)

@timebudget
def run_complex_operations(operation, input, pool):
    pool.map(operation, input)

client_ids = ipp.Client()
pool = client_ids[:]

input = range(10)
print('Without NumPy')
run_complex_operations(complex_operation, input, pool)
print('NumPy')
run_complex_operations(complex_operation_numpy, input, pool)

然后打开一个终端,输入如下 ipcluster 命令(是在命令行状态):

代码语言:javascript复制
% ipcluster start -n 10
2021-09-17 13:21:24.805 [IPClusterStart] Starting ipcluster with [daemonize=False]
2021-09-17 13:21:25.898 [IPClusterStart] Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
2021-09-17 13:21:56.945 [IPClusterStart] Engines appear to have started successfully

出现上述 Engines appear to have started successfully 提示之后,再打开一个终端,执行前述程序文件,如下所示:

代码语言:javascript复制
% python parallelprocess.py
Without NumPy
run_complex_operations took 8.18ms
NumPy
run_complex_operations took 6.76ms

上述结果显示,对于使用和不使用 NumPy 两种情况下下,均用 IPython Parallel 进行并行处理,运算速度远远快于前述两种条件下的执行结果。

IPython 的有点

IPython 的劣势

支持并行和分布计算

适用于较短的作业内容

能用于 Jupyter notebook

如果要执行过程的输出,需要额外的配置

配置简单

Ray

Ray 是一款实现并行和分布计算的第三方库,它具有快速、简单的特点,可以轻松地扩展应用程序,并适用于最先进的机器学习库。使用 Ray,还是像以往那样运行 Python 代码,只需要做很小的改动。

下面会简要介绍 Ray 是如何轻松地并行化普通的 Python 代码的,但需要注意的是,Ray 及其生态系统也可以轻松地并行化其他库,如 scikit-learn,XGBoost, LightGBM, PyTorch, 等等。

首先要安装 Ray :

代码语言:javascript复制
pip install ray

然后在前面的 parallelprocess.py 文件基础上进行修改,最后的完整代码如下(并命名为 rayprocess.py 文件)

代码语言:javascript复制
import math
import numpy as np
from timebudget import timebudget
import ray

iterations_count = round(1e7)

@ray.remote
def complex_operation(input_index):
    print(f"Complex operation. Input index: {input_index:2d}")
    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

@ray.remote
def complex_operation_numpy(input_index):
    print(f"Complex operation (numpy). Input index: {input_index:2d}")
    data = np.ones(iterations_count)
    np.exp(data) * np.sinh(data)

@timebudget
def run_complex_operations(operation, input):
    ray.get([operation.remote(i) for i in input])

ray.init()

input = range(10)
print('Without NumPy')
run_complex_operations(complex_operation, input)
print('NumPy')
run_complex_operations(complex_operation_numpy, input)

其中 ray.init() 的作用是启动所有相关的 Ray 进程。默认情况下,Ray 为每个 CPU 核创建一个进程。如果希望在集群上运行 Ray ,则需要传入一个类似于ray.init(address='insertAddressHere') 的集群地址。

用装饰器 @ray.remote 装饰一个普通的 Python 函数,从而实现创建一个 Ray 任务。这个操作可以在笔记本电脑 CPU 核之间(或 Ray 集群)实现任务调度。

在最后一步中,以 @timebudget 装饰 run_complex_operations() 函数,在 Ray 的调用时间内执行这些函数。

执行此程序后,会得到一个类似于下面的输出:

代码语言:javascript复制
% python rayprocess.py 
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/ray/_private/services.py:238: UserWarning: Not all Ray Dashboard dependencies were found. To use the dashboard please install Ray using `pip install ray[default]`. To disable this message, set RAY_DISABLE_IMPORT_WARNING env var to '1'.
  warnings.warn(warning_message)
Without NumPy
(pid=9351) Complex operation. Input index:  7
(pid=9352) Complex operation. Input index:  5
(pid=9353) Complex operation. Input index:  6
(pid=9354) Complex operation. Input index:  1
(pid=9356) Complex operation. Input index:  2
(pid=9358) Complex operation. Input index:  0
(pid=9355) Complex operation. Input index:  3
(pid=9357) Complex operation. Input index:  4
(pid=9351) Complex operation. Input index:  8
(pid=9358) Complex operation. Input index:  9
run_complex_operations took 12.731sec
NumPy
(pid=9351) Complex operation (numpy). Input index:  1
(pid=9352) Complex operation (numpy). Input index:  7
(pid=9353) Complex operation (numpy). Input index:  3
(pid=9354) Complex operation (numpy). Input index:  5
(pid=9356) Complex operation (numpy). Input index:  2
(pid=9358) Complex operation (numpy). Input index:  0
(pid=9355) Complex operation (numpy). Input index:  4
(pid=9357) Complex operation (numpy). Input index:  6
(pid=9351) Complex operation (numpy). Input index:  9
(pid=9354) Complex operation (numpy). Input index:  8
run_complex_operations took 858.52ms

结果中显示了对于当前 Ray 任务而言的、使用和不使用 NumPy 的运行时间。这里似乎没有体现出 Ray 相对于前述其他并行计算方法的优势,这是因为我们在上面演示的属于小量的计算任务,如果遇到更大的业务,Ray 的优势就会非常显著,如下图所示。

在下面的表格中,对 Ray 给予简要总结。

Ray 的优点

Ray 的劣势

支持并行计算和分布计算

针对更大型的业务才会有显著效果

可以在 Jupyter 上使用

能够应用于现有的常见机器学习和神经网络库

整合了多个 Ray 库,如 RLlib(用于强化学习)、Ray Tune(超参数调优)、Ray Serve(可伸缩模式)

结论

有多种方法可以让 Python 程序实现并行化执行,并且本文还介绍了它们的一些优缺点。并行化的代码通常会带来一些开销;并行化的好处在较大的业务中更明显,而不是在本文中的简短计算中。

特别是在处理典型的基于人工智能的任务时,你必须对你的模型进行重复的微调。在这种情况下,Ray 提供了最好的支持,因为它拥有丰富的生态系统、自动伸缩、容错和远程服务等能力。

参考文献

https://www.anyscale.com/blog/parallelizing-python-code

0 人点赞