磐创AI分享
来源 | Smarter
作者 | 薰风初入弦
【导读】之前我在并行训练的时候一直用的是DataParallel,而不管是同门师兄弟还是其他大佬一直推荐Distributed DataParallel。前两天改代码的时候我终于碰到坑了,各种原因导致单进程多卡的时候只有一张卡在进行运算。痛定思痛,该学习一下传说中的分布式并行了。
基本上是一篇教程的翻译,原文链接:
Distributed data parallel training in Pytorchyangkky.github.io
后续等我把这些并行计算的内容捋清楚了,会再自己写一份更详细的tutorial~
注意:需要在每一个进程设置相同的随机种子,以便所有模型权重都初始化为相同的值。
1. 动机
加速神经网络训练最简单的办法就是上GPU,如果一块GPU还是不够,就多上几块。
事实上,比如BERT和GPT-2这样的大型语言模型甚至是在上百块GPU上训练的。
为了实现多GPU训练,我们必须想一个办法在多个GPU上分发数据和模型,并且协调训练过程。
2. Why Distributed Data Parallel?
Pytorch兼顾了主要神经网络结构的易用性和可控性。而其提供了两种办法在多GPU上分割数据和模型:即
nn.DataParallel 以及 nn.DistributedDataParallel。
nn.DataParallel 使用起来更加简单(通常只要封装模型然后跑训练代码就ok了)。但是在每个训练批次(batch)中,因为模型的权重都是在 一个进程上先算出来 然后再把他们分发到每个GPU上,所以网络通信就成为了一个瓶颈,而GPU使用率也通常很低。
除此之外,nn.DataParallel 需要所有的GPU都在一个节点(一台机器)上,且并不支持 Apex 的 混合精度训练.
3. 现有文档的局限性
总的来说,Pytorch的文档是全面且清晰的,特别是在1.0版本的那些。完全通过文档和教程就可以自学Pytorch,这并不是显示一个人有多大佬,而显然更多地反映了Pytorch的易用性和优秀的文档。
但是好巧不巧的,就是在(Distributed)DataParallel这个系列的文档讲的就不甚清楚,或者干脆没有/不完善/有很多无关内容。以下是一些例子(抱怨)。
- Pytorch提供了一个使用AWS(亚马逊网络服务)进行分布式训练的教程,这个教程在教你如何使用AWS方面很出色,但甚至没提到 nn.DistributedDataParallel 是干什么用的,这导致相关的代码块很难follow。
- 而另外一篇Pytorch提供的教程又太细了,它对于一个不是很懂Python中MultiProcessing的人(比如我)来说很难读懂。因为它花了大量的篇幅讲 nn.DistributedDataParallel 中的复制功能(数据是怎么复制的)。然而,他并没有在高层逻辑上总结一下都在扯啥,甚至没说这个DistributedDataParallel是咋用的?
- 这里还有一个Pytorch关于入门分布式数据并行的(Distributed data parallel)教程。这个教程展示了如何进行一些设置,但并没解释这些设置是干啥用的,之后也展示了一些讲模型分到各个GPU上并执行一个优化步骤(optimization step)。然而,这篇教程里的代码是跑不同的(函数名字都对不上),也没告诉你怎么跑这个代码。和之前的教程一样,他也没给一个逻辑上分布式训练的工作概括。
- 而官方给的最好的例子,无疑是ImageNet的训练,然而因为这个例子要 素 过 多,导致也看不出来哪个部分是用于分布式多GPU训练的。
- Apex提供了他们自己的ImageNet的训练例。例子的文档告诉大家他们的 nn.DistributedDataParallel 是自己重写的,但是如果连最初的版本都不会用,更别说重写的了。
- 而这个教程很好地描述了在底层, nn.DistributedDataParallel 和 nn.DataParallel 到底有什么不同。然而他并没有如何使用 nn.DataParallel 的例程。
4. 大纲
本教程实际上是针对那些已经熟悉在Pytorch中训练神经网络模型的人的,本文不会详细介绍这些代码的任何一部分。
本文将首先概述一下总体情况,然后展示一个最小的使用GPU训练MNIST数据集的例程。之后对这个例程进行修改,以便在多个gpu(可能跨多个节点)上进行训练,并逐行解释这些更改。重要的是,本文还将解释如何运行代码。
另外,本文还演示了如何使用Apex进行简单的混合精度分布式训练。
5.大图景(The big picture)
使用 nn.DistributedDataParallel 进行Multiprocessing可以在多个gpu之间复制该模型,每个gpu由一个进程控制。(如果你想,也可以一个进程控制多个GPU,但这会比控制一个慢得多。也有可能有多个工作进程为每个GPU获取数据,但为了简单起见,本文将省略这一点。)这些GPU可以位于同一个节点上,也可以分布在多个节点上。每个进程都执行相同的任务,并且每个进程与所有其他进程通信。
只有梯度会在进程/GPU之间传播,这样网络通信就不至于成为一个瓶颈了。
训练过程中,每个进程从磁盘加载自己的小批(minibatch)数据,并将它们传递给自己的GPU。每个GPU都做它自己的前向计算,然后梯度在GPU之间全部约简。每个层的梯度不仅仅依赖于前一层,因此梯度全约简与并行计算反向传播,进一步缓解网络瓶颈。在反向传播结束时,每个节点都有平均的梯度,确保模型权值保持同步(synchronized)。
上述的步骤要求需要多个进程,甚至可能是不同结点上的多个进程同步和通信。而Pytorch通过它的 distributed.init_process_group 函数实现。这个函数需要知道如何找到进程0(process 0),一边所有的进程都可以同步,也知道了一共要同步多少进程。每个独立的进程也要知道总共的进程数,以及自己在所有进程中的阶序(rank),当然也要知道自己要用那张GPU。总进程数称之为 world size。最后,每个进程都需要知道要处理的数据的哪一部分,这样批处理就不会重叠。而Pytorch通过 nn.utils.data.DistributedSampler 来实现这种效果。
6. 最小例程与解释
为了展示如何做到这些,这里有一个在MNIST上训练的例子,并且之后把它修改为可以在多节点多GPU上运行,最终修改的版本还可以支持混合精度运算。
首先,我们import所有我们需要的库
代码语言:javascript复制import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from apex.parallel import DistributedDataParallel as DDP
from apex import amp
之后,我们训练了一个MNIST分类的简单卷积网络
代码语言:javascript复制 class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7*7*32, num_classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
这个 main() 函数会接受一些参数并运行训练函数。
代码语言:javascript复制 def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
train(0, args)
而这部分则是训练函数
代码语言:javascript复制 def train(gpu, args):
torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Data loading code
train_dataset = torchvision.datasets.MNIST(root='./data',
train=True,
transform=transforms.ToTensor(),
download=True)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=0,
pin_memory=True)
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i 1) % 100 == 0 and gpu == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
epoch 1,
args.epochs,
i 1,
total_step,
loss.item())
)
if gpu == 0:
print("Training complete in: " str(datetime.now() - start))
最后,我们要确保 main() 函数会被调用
代码语言:javascript复制if __name__ == '__main__':
main()
上述代码中肯定有一些我们还不需要的额外的东西(例如gpu和节点的数量),但是将整个框架放置到位是很有帮助的。之后在命令行输入
代码语言:javascript复制python src/mnist.py -n 1 -g 1 -nr 0
就可以在一个结点上的单个GPU上训练啦~
7. 加上MultiProcessing
我们需要一个脚本,用来启动一个进程的每一个GPU。每个进程需要知道使用哪个GPU,以及它在所有正在运行的进程中的阶序(rank)。而且,我们需要在每个节点上运行脚本。
现在让我们康康每个函数的变化,这些改变将被单独框出方便查找。
代码语言:javascript复制 def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1,
type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int,
metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
#########################################################
args.world_size = args.gpus * args.nodes #
os.environ['MASTER_ADDR'] = '10.57.23.164' #
os.environ['MASTER_PORT'] = '8888' #
mp.spawn(train, nprocs=args.gpus, args=(args,)) #
#########################################################
上一节中一些参数在这个地方才需要
- args.nodes 是我们使用的结点数
- args.gpus 是每个结点的GPU数.
- args.nr 是当前结点的阶序rank,这个值的取值范围是 0 到 args.nodes - 1.
OK,现在我们一行行看都改了什么
- Line 14:基于结点数以及每个结点的GPU数,我们可以计算 world_size 或者需要运行的总进程数,这和总GPU数相等。
- Line 15:告诉Multiprocessing模块去哪个IP地址找process 0以确保初始同步所有进程。
- Line 16:同样的,这个是process 0所在的端口
- Line 17:现在,我们需要生成 args.gpus 个进程, 每个进程都运行 train(i, args), 其中 i 从 0 到 args.gpus - 1。注意, main() 在每个结点上都运行, 因此总共就有 args.nodes * args.gpus = args.world_size 个进程.
除了14,15行的设置,也可以在终端中运行
export MASTER_ADDR=10.57.23.164 和 export MASTER_PORT=8888
接下来,需要修改的就是训练函数了,改动的地方依然被框出来啦。
代码语言:javascript复制 def train(gpu, args):
############################################################
rank = args.nr * args.gpus gpu
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=args.world_size,
rank=rank
)
############################################################
torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
###############################################################
# Wrap the model
model = nn.parallel.DistributedDataParallel(model,
device_ids=[gpu])
###############################################################
# Data loading code
train_dataset = torchvision.datasets.MNIST(
root='./data',
train=True,
transform=transforms.ToTensor(),
download=True
)
################################################################
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset,
num_replicas=args.world_size,
rank=rank
)
################################################################
train_loader = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=batch_size,
##############################
shuffle=False, #
##############################
num_workers=0,
pin_memory=True,
#############################
sampler=train_sampler) #
#############################
...
为了简单起见,上面的代码去掉了简单循环并用 ... 代替,不过你可以在这里看到完整脚本 。
- Line3:这里是该进程在所有进程中的全局rank(一个进程对应一个GPU)。这个rank在Line6会用到
- Line4~6:初始化进程并加入其他进程。这就叫做“blocking”,也就是说只有当所有进程都加入了,单个进程才会运行。这里使用了 nccl 后端,因为Pytorch文档说它是跑得最快的。init_method 让进程组知道去哪里找到它需要的设置。在这里,它就在寻找名为 MASTER_ADDR 以及 MASTER_PORT 的环境变量,这些环境变量在 main 函数中设置过。当然,本来可以把world_size 设置成一个全局变量,不过本脚本选择把它作为一个关键字参量(和当前进程的全局阶序global rank一样)
- Line23:将模型封装为一个 DistributedDataParallel 模型。这将把模型复制到GPU上进行处理。
- Line35~39:nn.utils.data.DistributedSampler 确保每个进程拿到的都是不同的训练数据切片。
- Line46/Line51:因为用了 nn.utils.data.DistributedSampler 所以不能用正常的办法做shuffle。
要在4个节点上运行它(每个节点上有8个gpu),我们需要4个终端(每个节点上有一个)。在节点0上(由 main 中的第13行设置):
python src/mnist-distributed.py -n 4 -g 8 -nr 0
而在其他的节点上:
python src/mnist-distributed.py -n 4 -g 8 -nr i
其中 i∈1,2,3. 换句话说,我们要把这个脚本在每个结点上运行脚本,让脚本运行 args.gpus 个进程以在训练开始之前同步每个进程。
注意,脚本中的batchsize设置的是每个GPU的batchsize,因此实际的batchsize要乘上总共的GPU数目(worldsize)。
8. 使用Apex进行混合混合精度训练
混合精度训练,即组合浮点数 (FP32)和半精度浮点数 (FP16)进行训练,允许我们使用更大的batchsize,并利用NVIDIA张量核进行更快的计算。AWS p3实例使用了8块带张量核的NVIDIA Tesla V100 GPU。
我们只需要修改 train 函数即可,为了简便表示,下面已经从示例中剔除了数据加载代码和反向传播之后的代码,并将它们替换为 ... ,不过你可以在这看到完整脚本。
代码语言:javascript复制 rank = args.nr * args.gpus gpu
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=args.world_size,
rank=rank)
torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Wrap the model
##############################################################
model, optimizer = amp.initialize(model, optimizer,
opt_level='O2')
model = DDP(model)
##############################################################
# Data loading code
...
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
##############################################################
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
##############################################################
optimizer.step()
...
这个脚本和之前的分布式训练脚本的运行方式相同。
代码语言:javascript复制扫码看好书,满100减50超值优惠活动等你哦
✄------------------------------------------------