batch-compute & GPU分布式机器学习

2020-07-20 10:33:38 浏览数 (1)

当用户提交一些机器学习任务时,往往需要大规模的计算资源,但是对于响应时间并没有严格的要求。在这种场景下,首先使用腾讯云的batch-compute(批量计算)产品来自动化提交用户的任务,然后使用分布式 gpu的方式解决算力问题,在任务完成后通知用户,是一个可行的解决方案。

本文将分成2部分:首先通过一个demo介绍上述过程的实现,从仅使用gpu、不考虑并行的简单情况开始,扩展至并行 gpu的情况,并简要介绍batch-compute的使用方法;然后介绍一些技术的实现原理(部分资料来源于知乎和博客,仅供参考)。

一个简单的Demo

使用pytorch,利用torch.Tensor对cuda的支持进行数据和模型的迁移。先不考虑并行,仅考虑如何将传统的基于cpu的机器学习任务迁移到gpu上。

  1. 定义一个简单的模型ConvNet:
代码语言:javascript复制
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1,             padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

2. 进行基于gpu的训练

代码语言:javascript复制
def train(gpu, args):
    torch.manual_seed(0)
    model = ConvNet()
    torch.cuda.set_device(gpu)  # set default gpu
    model.cuda(gpu) # move model to gpu
    batch_size = 100
    criterion = nn.CrossEntropyLoss().cuda(gpu)     # move loss function to gpu
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='./data',
                                               train=True,
                                      transform=transforms.ToTensor(),
                                               download=True)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=True,
                                               num_workers=0,
                                               pin_memory=True)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(args.epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            outputs = model(images)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

上面代码中的train函数接收一个gpu的编号gpu作为参数,并且在第4行用其指定torch默认使用的gpu。

在第5行,将模型迁移到gpu上。cuda()函数会返回将调用该函数的对象拷贝一份到cuda memory中并返回该拷贝。如果该对象已经存在cuda memory或是正确的gpu中,则直接返回原对象。

在第7行,将损失函数迁移到gpu上(如果不明白为什么函数也要迁移,可以查看github上这个issue)。

这样,机器学习任务就迁移到了gpu上。

然后来考虑并行。这里假设有多个节点,每个节点上有多个gpu,每个进程使用一块gpu。pytorch提供了分布式训练的包torch.distributed,并且支持跨节点训练。

  1. 在脚本中设置master节点的ip和port
代码语言:javascript复制
import torch.multiprocessing as mp
def main():
    ...
    args.world_size = args.gpus * args.nodes
    os.environ['MASTER_ADDR'] = 'xxx.xxx.xxx.xxx'
    os.environ['MASTER_PORT'] = '8888'
    mp.spawn(train, nprocs=args.gpus, args=(args,))

第5,6行通过环境变量的方式设置了master的ip和端口,之后master将在该端口监听worker的连接请求并完成初始化、广播等操作。

第7行通过spawn函数在本地启动了数量等于gpu数的进程,并且每个进程中运行相同的函数train。如果一个进程异常退出,那么其他进程也会被终止。

2. 初始化本地进程,并等待其他进程初始化完毕

代码语言:javascript复制
import torch.distributed as dist
def train(gpu, args):
    rank = args.nr * args.gpus   gpu
    print('starting making group.......')
    dist.init_process_group(
        backend='nccl', init_method='env://', world_size=args.world_size, rank=rank)
    print('all processes have been started!')
    torch.manual_seed(0)
    ...

第5行的init_process_group是一个阻塞函数,在所有进程启动完毕且socket连接建立成功后返回。这里使用了nccl作为后端(也就是通信架构),可以参考pytorch官方给出的最佳指南;init_method参数表示通过环境变量发现master;rank表示当前进程在进程组中的优先级,rank=0的进程是master进程;world_size表示进程组中总共有多少进程。

2. 模型梯度同步

代码语言:javascript复制
model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])

参与训练的数据集被分成多份,每个进程取一份input输入神经网络,独立计算梯度,然后将各个进程的梯度求平均值,用平均值更新模型参数。

3. 将数据划分到各个gpu上

代码语言:javascript复制
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset,
    num_replicas=args.world_size,
    rank=rank)

DistributedSampler将输入按照batch_size划分到不同的gpu上,使得每个进程能读到不同的batch,且不同进程间不会读到重复的batch。

这样,机器学习任务就可以在不同节点的多个gpu上并行地执行,不同的进程只需指定不同的rank即可。

最后将任务通过batch-compute实现自动化的任务提交和执行。

首先介绍batch-compute的概念。现代云计算有多种形式,其中常见的2种是流式计算(stream computing)和批量计算(batch computing)。流式计算处理对实时性要求高的请求,具有低延迟、持续性等特征,一般用于实时推荐、监控等服务;批量计算处理对实时性要求低但需要大量计算资源的请求,往往是耗时较长的一次性作业。机器学习任务就是一种很典型的批量计算。

利用腾讯云的batch-compute(批量计算)产品,开发者需要提供计算执行的环境、命令和输入输出存放的位置,由该产品自动去根据负载获取腾讯云的弹性资源,并自动调度作业执行流程,将企业和科研机构的双手从架设和配置数据中心中解放出来。

本文中使用batch-compute的python SDK,分为2步:先创建计算环境,然后提交计算作业。

  1. 创建计算环境
代码语言:javascript复制
req = batch_models.CreateComputeEnvRequest()
params = '{"ComputeEnv":{"EnvName":"batch-concurrent-test","EnvData":{"InstanceType":"GN10X.2XLARGE40","ImageId":"img-xxxxxx","SystemDisk":{"DiskSize":120},"DesiredComputeNodeCount":2},"Placement":{"Zone":"ap-guangzhou-3"}}'
print(params)
req.from_json_string(params)
resp = self.batch_client.CreateComputeEnv(req)
self.computeEnvId = json.loads(resp.to_json_string())["EnvId"]

第2行指定了创建2个节点,使用带gpu的机型GN10X.2XLARGE40;通过ImageId指定cvm的镜像,在这个镜像中部署了anaconda,pytorch,nvidia driver,cuda等。

如果需要获取创建节点的ip地址,可以通过第6行获取计算环境的id查看环境的详细信息。

2. 提交计算作业

代码语言:javascript复制
commands = [
    'sudo service docker restart',
    'sudo service docker status',
    'set -x',
    'docker run -t --network host --gpus all <image-name> bash concurrent/task.sh',
]
params = '{"Placement":{"Zone":"ap-guangzhou-3"},"Job":{"JobName":"test-job","Tasks":[{"TaskName":"concurrent-task","InputMappings":[{"SourcePath":"%s","DestinationPath":"%s"}],"TaskInstanceNum":2,"Application":{"Command":"%s"},"EnvId":"%s","RedirectInfo":{"StdoutRedirectPath":"%s","StderrRedirectPath":"%s"}}]}}' % (self.inPath, self.destPath, " && ".join(commands), self.computeEnvId, self.outPath, self.errPath)
req.from_json_string(params)
resp = self.batch_client.SubmitJob(req)

在第5行启动了一个docker容器并使用容器内装好的cuda。此处将网络设置为host模式使得可以在容器内通过host ip直接访问另一个节点上的容器;设置-t参数使得运行结果与在终端通过命令行手动执行的输出保持一致;但是不能设置-i参数,因为输入设备并不是一个真正的tty;设置cmd参数使得容器启动后执行task.sh脚本:

代码语言:javascript复制
[[ $(hostname -I | cut -d ' ' -f 1) == "xxx.xxx.xxx.xxx" ]];
python3 concurrent/mnist-distributed.py -n 2 -nr $?

第1行判断当前节点的ip是否为master节点的ip;第二行运行执行机器学习任务的python脚本,并传入rank参数,如果是master节点则传入0,否则,传入1

3. 运行结果

为了直观地演示并行机器学习的输出结果,笔者在两台cvm上手动执行了脚本:

如图,首先通过ip地址判断脚本输入参数中的rank值,并且等待所有进程启动成功。

然后开始训练,可以看见每个节点上进行了3个epoch,batch_size为300,耗时8秒左右。

为了对比使用并行前后的差距,在一个节点上启动任务。

如图,进行了3个epoch,batch_size为600,耗时为12秒左右。

至此,机器学习的任务就通过batch-compute产品提交并且在2台云服务器上并行地执行了,以下搬运一些pytorch文档/博客/知乎上关于分布式训练的原理实现。

原理

  1. DDP(DistributedDataParallel)的构造函数

每个进程都有一个模型(module)。在构造函数中,DDP首先获得该module的引用,然后将module.state_dict()从master进程广播到全体进程,使得所有进程具有相同的初始状态。state_dict的返回值是buffer等不在参数列表中但是代表了网络状态的数据,例如batch normalization中的running_mean。

不同进程间梯度的汇总、求和和同步是通过一个Reducer类实现的。在构造函数中初始化了一个Reducer对象,并通过该对象管理梯度计算。

在Reducer对象的构造函数中,首先将所有的参数装进若干个bucket(桶),之后一桶一桶地计算可以提高效率。参数进入桶的顺序和其在数组Model.parameters中的顺序相反,后向传播中最后一层的梯度是最先被计算完毕的,因此应该最先参加求和。

然后,Reducer为每个参数注册了一个autograd_hook,在该参数被计算完毕后触发。

2. 前向传播

前向传播没有涉及梯度计算,但是设计一个corner case——如果用户定义了某些参数但是没有将其加入模型之中(即神经网络中存在孤立节点),那么autograd_hook永远不会被触发。为此,DDP的构造函数中提供了find_unused_parameters,如果被设置为True,则在前向传播完毕后会找出这些节点并直接将其标记为已完成计算。当然这一操作会引入额外的开销,因此作为一个参数。

3. 后向传播

当所有节点上的同一编号的bucket中所有梯度均计算完成后,启动异步函数all_reduce求和。本地计算梯度和跨节点求平均值可以并行地进行,因为后向传播中用到的只是本地的计算结果(因为前向传播中的output就是只用local input算出来的)。

4. all_reduce实现细节

all_reduce实现了跨节点的求和计算。一种主流的实现方式是Parameter Server,即一个master节点接收其他节点发送的数值并求和,然后将结果发送给其他节点。

但是这样会引入单点故障,因此Pytorch 1.x使用了一种名为Ring AllReduce的算法(Uber的开源分布式框架Horovord也采用了这一算法)。正如其名字所表现的,所有节点排成一个环,每个节点从作邻居接收数据,在本地完成一部分求和工作,然后向右邻居发送数据。所有节点是平等的,没有master节点。

Ring AllReduce算法分成2个阶段:Share-Reduce阶段和Share-Only阶段。

在Share-Reduce阶段结束后,每个节点上会得到一部分位置的求和结果。

如图,经过3轮迭代后,WorkerA将得到a1 b1 c1 d1,WorkerB将得到a2 b2 c2 d2,WorkerC将得到a3 b3 c3 d3,WorkerD将得到a0 b0 c0 d0。

在Share-Only阶段,节点间共享这些和,使得所有节点最终拥有所有位置的求和结果。

如图,经过3轮迭代后,每个节点都会拥有全部4个位置的和。

5. Master进程有何意义?

既然使用了Ring AllReduce算法,那么在使用torch.distributed包时一定要指定的master ip&port有什么作用呢?

Master的主要作用时在初始化时为各个进程建立连接。具体而言,Master会创建一个守护线程,在这个线程中为所有worker各自创建一个socket,然后等待worker的连接,并在连上后发送其他进程所在的位置。

Worker则创建和master通信的socket,并主动连接master,在连上后获取其他进程的位置信息并报告自己的位置,然后和其他进程建立连接。

参考文献

1. https://pytorch.org/docs/stable/data.html

2. https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html

3. https://pytorch.org/docs/master/generated/torch.nn.parallel.DistributedDataParallel.html

4. https://pytorch.org/docs/stable/notes/ddp.html

5. https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da

6. https://zhuanlan.zhihu.com/p/76638962

7. https://www.zhihu.com/question/306242771/answer/825668022

0 人点赞