当用户提交一些机器学习任务时,往往需要大规模的计算资源,但是对于响应时间并没有严格的要求。在这种场景下,首先使用腾讯云的batch-compute(批量计算)产品来自动化提交用户的任务,然后使用分布式 gpu的方式解决算力问题,在任务完成后通知用户,是一个可行的解决方案。
本文将分成2部分:首先通过一个demo介绍上述过程的实现,从仅使用gpu、不考虑并行的简单情况开始,扩展至并行 gpu的情况,并简要介绍batch-compute的使用方法;然后介绍一些技术的实现原理(部分资料来源于知乎和博客,仅供参考)。
一个简单的Demo
使用pytorch,利用torch.Tensor对cuda的支持进行数据和模型的迁移。先不考虑并行,仅考虑如何将传统的基于cpu的机器学习任务迁移到gpu上。
- 定义一个简单的模型ConvNet:
class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7*7*32, num_classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
2. 进行基于gpu的训练
代码语言:javascript复制def train(gpu, args):
torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu) # set default gpu
model.cuda(gpu) # move model to gpu
batch_size = 100
criterion = nn.CrossEntropyLoss().cuda(gpu) # move loss function to gpu
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Data loading code
train_dataset = torchvision.datasets.MNIST(root='./data',
train=True,
transform=transforms.ToTensor(),
download=True)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=0,
pin_memory=True)
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
上面代码中的train函数接收一个gpu的编号gpu作为参数,并且在第4行用其指定torch默认使用的gpu。
在第5行,将模型迁移到gpu上。cuda()函数会返回将调用该函数的对象拷贝一份到cuda memory中并返回该拷贝。如果该对象已经存在cuda memory或是正确的gpu中,则直接返回原对象。
在第7行,将损失函数迁移到gpu上(如果不明白为什么函数也要迁移,可以查看github上这个issue)。
这样,机器学习任务就迁移到了gpu上。
然后来考虑并行。这里假设有多个节点,每个节点上有多个gpu,每个进程使用一块gpu。pytorch提供了分布式训练的包torch.distributed,并且支持跨节点训练。
- 在脚本中设置master节点的ip和port
import torch.multiprocessing as mp
def main():
...
args.world_size = args.gpus * args.nodes
os.environ['MASTER_ADDR'] = 'xxx.xxx.xxx.xxx'
os.environ['MASTER_PORT'] = '8888'
mp.spawn(train, nprocs=args.gpus, args=(args,))
第5,6行通过环境变量的方式设置了master的ip和端口,之后master将在该端口监听worker的连接请求并完成初始化、广播等操作。
第7行通过spawn函数在本地启动了数量等于gpu数的进程,并且每个进程中运行相同的函数train。如果一个进程异常退出,那么其他进程也会被终止。
2. 初始化本地进程,并等待其他进程初始化完毕
代码语言:javascript复制import torch.distributed as dist
def train(gpu, args):
rank = args.nr * args.gpus gpu
print('starting making group.......')
dist.init_process_group(
backend='nccl', init_method='env://', world_size=args.world_size, rank=rank)
print('all processes have been started!')
torch.manual_seed(0)
...
第5行的init_process_group是一个阻塞函数,在所有进程启动完毕且socket连接建立成功后返回。这里使用了nccl作为后端(也就是通信架构),可以参考pytorch官方给出的最佳指南;init_method参数表示通过环境变量发现master;rank表示当前进程在进程组中的优先级,rank=0的进程是master进程;world_size表示进程组中总共有多少进程。
2. 模型梯度同步
代码语言:javascript复制model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
参与训练的数据集被分成多份,每个进程取一份input输入神经网络,独立计算梯度,然后将各个进程的梯度求平均值,用平均值更新模型参数。
3. 将数据划分到各个gpu上
代码语言:javascript复制train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset,
num_replicas=args.world_size,
rank=rank)
DistributedSampler将输入按照batch_size划分到不同的gpu上,使得每个进程能读到不同的batch,且不同进程间不会读到重复的batch。
这样,机器学习任务就可以在不同节点的多个gpu上并行地执行,不同的进程只需指定不同的rank即可。
最后将任务通过batch-compute实现自动化的任务提交和执行。
首先介绍batch-compute的概念。现代云计算有多种形式,其中常见的2种是流式计算(stream computing)和批量计算(batch computing)。流式计算处理对实时性要求高的请求,具有低延迟、持续性等特征,一般用于实时推荐、监控等服务;批量计算处理对实时性要求低但需要大量计算资源的请求,往往是耗时较长的一次性作业。机器学习任务就是一种很典型的批量计算。
利用腾讯云的batch-compute(批量计算)产品,开发者需要提供计算执行的环境、命令和输入输出存放的位置,由该产品自动去根据负载获取腾讯云的弹性资源,并自动调度作业执行流程,将企业和科研机构的双手从架设和配置数据中心中解放出来。
本文中使用batch-compute的python SDK,分为2步:先创建计算环境,然后提交计算作业。
- 创建计算环境
req = batch_models.CreateComputeEnvRequest()
params = '{"ComputeEnv":{"EnvName":"batch-concurrent-test","EnvData":{"InstanceType":"GN10X.2XLARGE40","ImageId":"img-xxxxxx","SystemDisk":{"DiskSize":120},"DesiredComputeNodeCount":2},"Placement":{"Zone":"ap-guangzhou-3"}}'
print(params)
req.from_json_string(params)
resp = self.batch_client.CreateComputeEnv(req)
self.computeEnvId = json.loads(resp.to_json_string())["EnvId"]
第2行指定了创建2个节点,使用带gpu的机型GN10X.2XLARGE40;通过ImageId指定cvm的镜像,在这个镜像中部署了anaconda,pytorch,nvidia driver,cuda等。
如果需要获取创建节点的ip地址,可以通过第6行获取计算环境的id查看环境的详细信息。
2. 提交计算作业
代码语言:javascript复制commands = [
'sudo service docker restart',
'sudo service docker status',
'set -x',
'docker run -t --network host --gpus all <image-name> bash concurrent/task.sh',
]
params = '{"Placement":{"Zone":"ap-guangzhou-3"},"Job":{"JobName":"test-job","Tasks":[{"TaskName":"concurrent-task","InputMappings":[{"SourcePath":"%s","DestinationPath":"%s"}],"TaskInstanceNum":2,"Application":{"Command":"%s"},"EnvId":"%s","RedirectInfo":{"StdoutRedirectPath":"%s","StderrRedirectPath":"%s"}}]}}' % (self.inPath, self.destPath, " && ".join(commands), self.computeEnvId, self.outPath, self.errPath)
req.from_json_string(params)
resp = self.batch_client.SubmitJob(req)
在第5行启动了一个docker容器并使用容器内装好的cuda。此处将网络设置为host模式使得可以在容器内通过host ip直接访问另一个节点上的容器;设置-t参数使得运行结果与在终端通过命令行手动执行的输出保持一致;但是不能设置-i参数,因为输入设备并不是一个真正的tty;设置cmd参数使得容器启动后执行task.sh脚本:
代码语言:javascript复制[[ $(hostname -I | cut -d ' ' -f 1) == "xxx.xxx.xxx.xxx" ]];
python3 concurrent/mnist-distributed.py -n 2 -nr $?
第1行判断当前节点的ip是否为master节点的ip;第二行运行执行机器学习任务的python脚本,并传入rank参数,如果是master节点则传入0,否则,传入1
3. 运行结果
为了直观地演示并行机器学习的输出结果,笔者在两台cvm上手动执行了脚本:
如图,首先通过ip地址判断脚本输入参数中的rank值,并且等待所有进程启动成功。
然后开始训练,可以看见每个节点上进行了3个epoch,batch_size为300,耗时8秒左右。
为了对比使用并行前后的差距,在一个节点上启动任务。
如图,进行了3个epoch,batch_size为600,耗时为12秒左右。
至此,机器学习的任务就通过batch-compute产品提交并且在2台云服务器上并行地执行了,以下搬运一些pytorch文档/博客/知乎上关于分布式训练的原理实现。
原理
- DDP(DistributedDataParallel)的构造函数
每个进程都有一个模型(module)。在构造函数中,DDP首先获得该module的引用,然后将module.state_dict()从master进程广播到全体进程,使得所有进程具有相同的初始状态。state_dict的返回值是buffer等不在参数列表中但是代表了网络状态的数据,例如batch normalization中的running_mean。
不同进程间梯度的汇总、求和和同步是通过一个Reducer类实现的。在构造函数中初始化了一个Reducer对象,并通过该对象管理梯度计算。
在Reducer对象的构造函数中,首先将所有的参数装进若干个bucket(桶),之后一桶一桶地计算可以提高效率。参数进入桶的顺序和其在数组Model.parameters中的顺序相反,后向传播中最后一层的梯度是最先被计算完毕的,因此应该最先参加求和。
然后,Reducer为每个参数注册了一个autograd_hook,在该参数被计算完毕后触发。
2. 前向传播
前向传播没有涉及梯度计算,但是设计一个corner case——如果用户定义了某些参数但是没有将其加入模型之中(即神经网络中存在孤立节点),那么autograd_hook永远不会被触发。为此,DDP的构造函数中提供了find_unused_parameters,如果被设置为True,则在前向传播完毕后会找出这些节点并直接将其标记为已完成计算。当然这一操作会引入额外的开销,因此作为一个参数。
3. 后向传播
当所有节点上的同一编号的bucket中所有梯度均计算完成后,启动异步函数all_reduce求和。本地计算梯度和跨节点求平均值可以并行地进行,因为后向传播中用到的只是本地的计算结果(因为前向传播中的output就是只用local input算出来的)。
4. all_reduce实现细节
all_reduce实现了跨节点的求和计算。一种主流的实现方式是Parameter Server,即一个master节点接收其他节点发送的数值并求和,然后将结果发送给其他节点。
但是这样会引入单点故障,因此Pytorch 1.x使用了一种名为Ring AllReduce的算法(Uber的开源分布式框架Horovord也采用了这一算法)。正如其名字所表现的,所有节点排成一个环,每个节点从作邻居接收数据,在本地完成一部分求和工作,然后向右邻居发送数据。所有节点是平等的,没有master节点。
Ring AllReduce算法分成2个阶段:Share-Reduce阶段和Share-Only阶段。
在Share-Reduce阶段结束后,每个节点上会得到一部分位置的求和结果。
如图,经过3轮迭代后,WorkerA将得到a1 b1 c1 d1,WorkerB将得到a2 b2 c2 d2,WorkerC将得到a3 b3 c3 d3,WorkerD将得到a0 b0 c0 d0。
在Share-Only阶段,节点间共享这些和,使得所有节点最终拥有所有位置的求和结果。
如图,经过3轮迭代后,每个节点都会拥有全部4个位置的和。
5. Master进程有何意义?
既然使用了Ring AllReduce算法,那么在使用torch.distributed包时一定要指定的master ip&port有什么作用呢?
Master的主要作用时在初始化时为各个进程建立连接。具体而言,Master会创建一个守护线程,在这个线程中为所有worker各自创建一个socket,然后等待worker的连接,并在连上后发送其他进程所在的位置。
Worker则创建和master通信的socket,并主动连接master,在连上后获取其他进程的位置信息并报告自己的位置,然后和其他进程建立连接。
参考文献
1. https://pytorch.org/docs/stable/data.html
2. https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html
3. https://pytorch.org/docs/master/generated/torch.nn.parallel.DistributedDataParallel.html
4. https://pytorch.org/docs/stable/notes/ddp.html
5. https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da
6. https://zhuanlan.zhihu.com/p/76638962
7. https://www.zhihu.com/question/306242771/answer/825668022