技术背景
在之前的博客中我们介绍过concurrent等python多进程任务的方案,而之所以我们又在考虑MPI等方案来实现python并行计算的原因,其实是将python的计算任务与并行计算的任务调度分层实现。在concurrent和multiprocessing等方案中,我们的python计算任务和调度任务是一体化的,而且还有一个比较大的限制是没办法跨节点操作的,这对于任务与环境的定制化程度要求是比较高的。而MPI的方案在设计初期就考虑到了多节点之间通信的问题,而这种分层式的任务调度解决方案其实在架构上看也更加的合理。做计算的人只要考虑单个进程下的任务如何执行就可以了,至于任务如何并行如何调度,那就是上层的MPI该做的事情了。
mpi4py的安装
这里推荐使用conda直接安装,如果采用pip安装的话,可能会有些环境依赖的问题出现:
代码语言:javascript复制$ conda install mpi4py
Collecting package metadata (current_repodata.json): done
Solving environment: done
## Package Plan ##
environment location: /home/dechin/anaconda3
added / updated specs:
- mpi4py
The following packages will be downloaded:
package | build
---------------------------|-----------------
mpi-1.0 | mpich 13 KB defaults
mpi4py-3.0.3 | py38h028fd6f_0 572 KB defaults
mpich-3.3.2 | hc856adb_0 3.8 MB defaults
------------------------------------------------------------
Total: 4.4 MB
The following NEW packages will be INSTALLED:
mpi pkgs/main/linux-64::mpi-1.0-mpich
mpi4py pkgs/main/linux-64::mpi4py-3.0.3-py38h028fd6f_0
mpich pkgs/main/linux-64::mpich-3.3.2-hc856adb_0
The following packages will be UPDATED:
ca-certificates 2021.9.30-h06a4308_1 --> 2021.10.26-h06a4308_2
Proceed ([y]/n)? y
Downloading and Extracting Packages
mpi4py-3.0.3 | 572 KB | ############################################## | 100%
mpich-3.3.2 | 3.8 MB | ############################################## | 100%
mpi-1.0 | 13 KB | ############################################## | 100%
Preparing transaction: done
Verifying transaction: done
Executing transaction: done
安装完成后可以通过python3 -c "from mpi4py import MPI"
来检查是否安装成功,下面我们来看一些具体的使用案例。
使用案例
首先了解下mpi的基本使用方法,如果我们使用mpirun -n 3 python3 test.py
这样的指令去运行一个程序,那么就会给每一个不同的test.py
中发送一个互不相同的rank,这个rank的范围是从0开始数的。比如如下案例我们使用Get_rank()
方法就可以获取到mpi所传递下来的rank id,这样进程就知道了自己所处的进程编号,我们写好每个编号下所需要执行的任务即可:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
print('My rank is ',rank)
执行效果如下:
代码语言:javascript复制$ mpirun -n 4 python3 mpi_test.py
My rank is 2
My rank is 1
My rank is 0
My rank is 3
当然,因为每个任务都是平级的关系,因此mpi的rank id也是随机发放的,这个没办法控制,但是我们可以用如下的方法在进程间通信:
代码语言:javascript复制from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
idata = 1
comm.send(idata, dest=1)
print ('This is process {}'.format(rank), 'nData send to process 1 successfully!')
elif rank == 1:
idata = comm.recv(source=0)
print ('This is process {}, data is '.format(rank),idata)
在这个案例中,我们从rank id为0的进程向rank id为1的进程发送了一个整数变量的数据。因为我们并不知道这个脚本什么时候会被分配到rank 0什么时候会被分配到rank 1,因此在同一个脚本内我们就需要分别对这两种可能发生的情况进行针对性的处理。运行结果如下:
代码语言:javascript复制$ mpirun -n 2 python3 mpi_test.py
This is process 0
Data send to process 1 successfully!
This is process 1, data is 1
整数型的变量被成功的传递,当然,这里面其实还有个更加重要的意义是,rank为1的进程实际上是对rank为0的进程有时间序列上的依赖的,我们必须执行完rank 0中的任务,才能再执行rank 1中的任务,这个是有可能在实际的应用过程中被频繁使用的功能,尤其是任务之间互相有依赖的情况下。当然,进程之间的通信不仅仅可以传递整数型的变量,还可以传递其他类型,比如字典或者一个numpy的数组:
代码语言:javascript复制from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
numData = 10
comm.send(numData, dest=1)
data = np.linspace(0.0, 3.14, numData)
comm.Send(data, dest=1)
print ('This is process {}'.format(rank), 'nData send to process 1 successfully!')
elif rank == 1:
numData = comm.recv(source=0)
print('Number of data to receive: ', numData)
data = np.empty(numData, dtype='d')
comm.Recv(data, source=0)
print ('This is process {}, data is '.format(rank),data)
运行结果如下:
代码语言:javascript复制$ mpirun -n 2 python3 mpi_test.py
This is process 0
Data send to process 1 successfully!
Number of data to receive: 10
This is process 1, data is [0. 0.34888889 0.69777778 1.04666667 1.39555556 1.74444444
2.09333333 2.44222222 2.79111111 3.14 ]
可以看到结果是被成功的接收了的。
总结概要
在这篇文章中,我们并没有介绍非常多的MPI的应用,其实在普通的并行或者分布式任务中,掌握了各个进程的任务处理过程以及进程间的通信方法就足够了。总体来说,MPI是一个非常通用也非常高效的并行计算软件。有了这些专业的并行化任务调度软件,我们就可以专注于专业任务的代码和算法上,而不需要过多的去关注并行任务的调度和分配问题。
版权声明
本文首发链接为:https://cloud.tencent.com/developer/article/1910122
作者ID:DechinPhy