Pytorch 分布式模式介绍

2022-09-03 19:11:15 浏览数 (1)

一  分布式训练策略

数据较多或者模型较大时,为提高机器学习模型训练效率,一般采用多GPU的分布式训练。

按照并行方式,分布式训练一般分为数据并行和模型并行两种, 模型并行:分布式系统中的不同GPU负责网络模型的不同部分。例如,神经网络模型的不同网络层被分配到不同的GPU,或者同一层内部的不同参数被分配到不同GPU;

数据并行:不同的GPU有同一个模型的多个副本,每个GPU分配到不同的数据,然后将所有GPU的计算结果按照某种方式合并。

注意,上述中的不用GPU可以是同一台机上的多个GPU,也可以是不用机上的GPU。

当然也有数据并行和模型并行的混合模式

因为模型并行各个部分存在一定的依赖,规模伸缩性差(意思是不能随意增加GPU的数量),在实际训练中用的不多。而数据并行,则各部分独立,规模伸缩性好,实际训练中更为常用,提速效果也更好。

数据并行会涉及到各个GPU之间同步模型参数,一般分为同步更新和异步更新。同步更新要等到所有GPU的梯度计算完成,再统一计算新权值,然后所有GPU同步新值后,才进行下一轮计算。异步更新,每个GPU梯度计算完后,无需等待其他GPU的梯度计算(有时可以设置需要等待的梯度个数),可立即更新整体权值,然后同步此权值,即可进行下一轮计算。同步更新有等待,异步更新基本没有等待,但异步更新涉及到梯度过时等更复杂问题。

1.模型并行

所谓模型并行指的是将模型部署到很多设备上(设备可能分布在不同机器上)运行,比如多个机器的GPUs。当神经网络模型很大时,由于显存限制,它是难以在跑在单个GPU上,这个时候就需要模型并行。比如Google的神经机器翻译系统,其可能采用深度LSTM模型,如下图所示,此时模型的不同部分需要分散到许多设备上进行并行训练。深度学习模型一般包含很多层,如果要采用模型并行策略,一般需要将不同的层运行在不同的设备上,但是实际上层与层之间的运行是存在约束的:前向运算时,后面的层需要等待前面层的输出作为输入,而在反向传播时,前面的层又要受限于后面层的计算结果。所以除非模型本身很大,一般不会采用模型并行,因为模型层与层之间存在串行逻辑。但是如果模型本身存在一些可以并行的单元,那么也是可以利用模型并行来提升训练速度,比如GoogLeNet的Inception模块。

2.数据并行

深度学习模型最常采用的分布式训练策略是数据并行,因为训练费时的一个重要原因是训练数据量很大。数据并行就是在很多设备上放置相同的模型,并且各个设备采用不同的训练样本对模型训练。训练深度学习模型常采用的是batch SGD方法,采用数据并行,可以每个设备都训练不同的batch,然后收集这些梯度用于模型参数更新。前面所说的Facebook训练Resnet50就是采用数据并行策略,使用256个GPUs,每个GPU读取32个图片进行训练,如下图所示,这样相当于采用非常大的batch(256 × 32 = 8192)来训练模型。

数据并行可以是同步的(synchronous),也可以是异步的(asynchronous)。所谓同步指的是所有的设备都是采用相同的模型参数来训练,等待所有设备的mini-batch训练完成后,收集它们的梯度然后取均值,然后执行模型的一次参数更新。这相当于通过聚合很多设备上的mini-batch形成一个很大的batch来训练模型,Facebook就是这样做的,但是他们发现当batch大小增加时,同时线性增加学习速率会取得不错的效果。同步训练看起来很不错,但是实际上需要各个设备的计算能力要均衡,而且要求集群的通信也要均衡,类似于木桶效应,一个拖油瓶会严重拖慢训练进度,所以同步训练方式相对来说训练速度会慢一些。异步训练中,各个设备完成一个mini-batch训练之后,不需要等待其它节点,直接去更新模型的参数,这样总体会训练速度会快很多。但是异步训练的一个很严重的问题是梯度失效问题(stale gradients),刚开始所有设备采用相同的参数来训练,但是异步情况下,某个设备完成一步训练后,可能发现模型参数其实已经被其它设备更新过了,此时这个梯度就过期了,因为现在的模型参数和训练前采用的参数是不一样的。由于梯度失效问题,异步训练虽然速度快,但是可能陷入次优解(sub-optimal training performance)。

异步训练和同步训练在TensorFlow中不同点如下图所示:

为了解决异步训练出现的梯度失效问题,微软提出了一种Asynchronous Stochastic Gradient Descent方法,主要是通过梯度补偿来提升训练效果。应该还有其他类似的研究,感兴趣的可以深入了解一下。

二 分布式训练系统架构

系统架构层包括两种架构:

Parameter Server Architecture(就是常见的PS架构,参数服务器)

Ring-allreduce Architecture

1.Parameter server架构

在Parameter server架构(PS架构)中,集群中的节点被分为两类:parameter server和worker。其中parameter server存放模型的参数,而worker负责计算参数的梯度。在每个迭代过程,worker从parameter sever中获得参数,然后将计算的梯度返回给parameter server,parameter server聚合从worker传回的梯度,然后更新参数,并将新的参数广播给worker。

PS架构是深度学习最常采用的分布式训练架构。采用同步SGD方式的PS架构如下图所示:

2.Ring-allreduce架构

在Ring-allreduce架构中,各个设备都是worker,并且形成一个环,如下图所示,没有中心节点来聚合所有worker计算的梯度。在一个迭代过程,每个worker完成自己的mini-batch训练,计算出梯度,并将梯度传递给环中的下一个worker,同时它也接收从上一个worker的梯度。对于一个包含N个worker的环,各个worker需要收到其它N-1个worker的梯度后就可以更新模型参数。其实这个过程需要两个部分:scatter-reduce和allgather,百度的教程对这个过程给出了详细的图文解释。百度开发了自己的allreduce框架,并将其用在了深度学习的分布式训练中。

相比PS架构,Ring-allreduce架构是带宽优化的,因为集群中每个节点的带宽都被充分利用。此外,在深度学习训练过程中,计算梯度采用BP算法,其特点是后面层的梯度先被计算,而前面层的梯度慢于前面层,Ring-allreduce架构可以充分利用这个特点,在前面层梯度计算的同时进行后面层梯度的传递,从而进一步减少训练时间。在百度的实验中,他们发现训练速度基本上线性正比于GPUs数目(worker数)。

一般的多卡gpu训练有一个很大的缺陷,就是因为每次都需要一个gpu(cpu)从其他gpu上收集训练的梯度,然后将新的模型分发到其他gpu上。这样的模型最大的缺陷是gpu 0的通信时间是随着gpu卡数的增长而线性增长的。

所以就有了ring-allreduce,如下图:

该算法的基本思想是取消Reducer,让数据在gpu形成的环内流动,整个ring-allreduce的过程分为两大步,第一步是scatter-reduce,第二步是allgather。

先说第一步:首先我们有n块gpu,那么我们把每个gpu上的数据(均等的)划分成n块,并给每个gpu指定它的左右邻居(图中0号gpu的左邻居是4号,右邻居是1号,1号gpu的左邻居是0号,右邻居是2号……),然后开始执行n-1次操作,在第i次操作时,gpu j会将自己的第(j - i)%n块数据发送给gpu j 1,并接受gpu j-1的(j - i - 1)%n块数据。并将接受来的数据进行reduce操作,示意图如下:

当n-1次操作完成后,ring-allreduce的第一大步scatter-reduce就已经完成了,此时,第i块gpu的第(i 1) % n块数据已经收集到了所有n块gpu的第(i 1) % n块数据,那么,再进行一次allgather就可以完成算法了。

第二步allgather做的事情很简单,就是通过n-1次传递,把第i块gpu的第(i 1) % n块数据传递给其他gpu,同样也是在i次传递时,gpu j把自己的第(j - i - 1)%n块数据发送给右邻居,接受左邻居的第(j - i - 2) % n数据,但是接受来的数据不需要像第一步那样做reduce,而是直接用接受来的数据代替自己的数据就好了。

最后每个gpu的数据就变成了这样:

首先是第一步,scatter-reduce:

然后是allgather的例子:

为什么需要分布式

众所周知,深度神经网络发展到现阶段,离不开GPU和数据。经过这么多年的积累,GPU的计算能力越来越强,数据也积累的越来越多,大家会发现在现有的单机单卡或者单机多卡上很难高效地复现模型,甚至对于有些新的数据集来讲,单机训练简直就是噩梦。

DatasetImages MS COCO115,000  Open Image dataset v4 1,740,000

为什么单机8卡也会是噩梦呢?我们拿COCO和Google最近Release出来的Open Image dataset v4来做比较,训练一个resnet152的检测模型,在COCO上大概需要40个小时,而在OIDV4上大概需要40天,这还是在各种超参数正确的情况下,如果加上调试的时间,可能一个模型调完就该过年了吧。

所以这个时候我们需要分布式。 Pytorch 分布式简介

PyTorch 1.0稳定版终于正式发布了!新版本增加了JIT编译器、全新的分布式包、C 前端,以及Torch Hub等新功能,支持AWS、谷歌云、微软Azure等云平台。

torch.distributed软件包和torch.nn.parallel.DistributedDataParallel模块由全新的、重新设计的分布式库提供支持。新的库的主要亮点有:

  • 新的 torch.distributed 是性能驱动的,并且对所有后端 (Gloo,NCCL 和 MPI) 完全异步操作
  • 显着的分布式数据并行性能改进,尤其适用于网络较慢的主机,如基于以太网的主机
  • 为torch.distributed  package中的所有分布式集合操作添加异步支持
  • 在Gloo后端添加以下CPU操作:send,recv,reduce,all_gather,gather,scatter
  • 在NCCL后端添加barrier操作
  • 为NCCL后端添加new_group支持

1.0的多机多卡的计算模型并没有采用主流的Parameter Server结构,而是直接用了Uber Horovod的形式,也是百度开源的RingAllReduce算法。

采用PS计算模型的分布式,通常会遇到网络的问题,随着worker数量的增加,其加速比会迅速的恶化,例如resnet50这样的模型,目前的TF在10几台机器的时候,加速比已经开始恶化的不可接受了。因此,经常要上RDMA、InfiniBand等技术,并且还带来了一波网卡的升级,有些大厂直接上了100GBs的网卡,有钱任性。而Uber的Horovod,采用的RingAllReduce的计算方案,其特点是网络通信量不随着worker(GPU)的增加而增加,是一个恒定值。简单看下图理解下,GPU 集群被组织成一个逻辑环,每个GPU有一个左邻居、一个右邻居,每个GPU只从左邻居接受数据、并发送数据给右邻居。即每次梯度每个gpu只获得部分梯度更新,等一个完整的Ring完成,每个GPU都获得了完整的参数。

这里引入了一个新的函数model = torch.nn.parallel.DistributedDataParallel(model)为的就是支持分布式模式

不同于原来在multiprocessing中的model = torch.nn.DataParallel(model,device_ids=[0,1,2,3]).cuda()函数,这个函数只是实现了在单机上的多GPU训练,根据官方文档的说法,甚至在单机多卡的模式下,新函数表现也会优于这个旧函数。

这里要提到两个问题:

  • 每个进程都有自己的Optimizer同时每个迭代中都进行完整的优化步骤,虽然这可能看起来是多余的,但由于梯度已经聚集在一起并跨进程平均,因此对于每个进程都是相同的,这意味着不需要参数广播步骤,从而减少了在节点之间传输张量tensor所花费的时间。
  • 另外一个问题是Python解释器的,每个进程都包含一个独立的Python解释器,消除了来自单个Python进程中的多个执行线程,模型副本或GPU的额外解释器开销和“GIL-thrashing”。 这对于大量使用Python运行时的模型尤其重要。

初始化

代码语言:javascript复制
torch.distributed.init_process_group(backend, init_method='env://', **kwargs)

参数说明:

  • backend(str): 后端选择,包括上面那几种 gloo,nccl,mpi
  • init_method(str,optional): 用来初始化包的URL, 我理解是一个用来做并发控制的共享方式
  • world_size(int, optional): 参与这个工作的进程数
  • rank(int,optional): 当前进程的rank
  • group_name(str,optional): 用来标记这组进程名的

Backends

Backends that come with PyTorch

PyTorch distributed currently only supports Linux. By default, the Gloo and NCCL backends are built and included in PyTorch distributed (NCCL only when building with CUDA). MPI is an optional backend that can only be included if you build PyTorch from source. (e.g. building PyTorch on a host that has MPI installed.)

Which backend to use?

In the past, we were often asked: “which backend should I use?”.

  • Rule of thumb
  • Use the NCCL backend for distributed GPU training.
  • Use the Gloo backend for distributed CPU training.

init_method分析

‘init_method’支持三种方式:

1. TCP initialization

tcp:// IP组播(要求所有进程都在同一个网络中)比较好理解,   以TCP协议的方式进行不同分布式进程之间的数据交流,需要设置一个端口,不同进程之间公用这一个端口,并且设置host的级别和host的数量。设计两个参数rank和world_size。其中rank为host的编号,默认0为主机,端口应该位于该主机上。world_size为分布式主机的个数。

用该方式,运行上面的代码可以使用如下指令:

在主机01上:

代码语言:javascript复制
python mnsit.py --init-method tcp://10.172.1.2:22225 --rank 0 --world-size 2

在主机02上:

代码语言:javascript复制
python mnsit.py --init-method tcp://10.172.1.2:22225 --rank 1 --world-size 2

这里没有设置backend参数,所以默认是gloo。22225是端口号,用一个没有没占用的就行。这两句指令的先后顺序没有要求,只有两条指令都输入,程序才会运行起来。

2. Shared file-system initialization

file:// 共享文件系统(要求所有进程可以访问单个文件系统)有共享文件系统可以选择

提供的第二种方式是文件共享,机器有共享的文件系统,故可以采用这种方式,也避免了基于TCP的网络传输。这里使用方式是使用绝对路径在指定一个共享文件系统下不存在的文件。

在主机01上:

代码语言:javascript复制
python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 0 --world-size 2

在主机02上:

代码语言:javascript复制
python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 1 --world-size 2

这里相比于TCP的方式麻烦一点的是运行完一次必须更换共享的文件名,或者删除之前的共享文件,不然第二次运行会报错。

3. Environment variable initialization

env:// 环境变量(需要你手动分配等级并知道所有进程可访问节点的地址)默认是这个

代码语言:javascript复制
MASTER_PORT - required; has to be a free port on machine with rank 0
MASTER_ADDR - required (except for rank 0); address of rank 0 node
WORLD_SIZE - required; can be set either here, or in a call to init function
RANK - required; can be set either here, or in a call to init function

但是前两个并没有在init_process_group的参数里。这里有一个官方文档中的用例:

代码语言:javascript复制
Node 1: (IP: 192.168.1.1, and has a free port: 1234)
     
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
Node 2:
     
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

需要注意的点

  • 一定要加rank, world_size参数
  • train_dataset最好不要用自己写的sampler,否则还需要再实现一遍分布式的数据划分方式

Dataloader中的参数

如果你的选项刚好是最坏情况,优化这个有可能达到2倍左右的性能提升,解释一下DataLoader中其中两个参数:

  • num_worker:数据集加载的时候,控制用于同时加载数据的线程数(默认为0,即在主线程读取) 存在最优值,你会看到运行的时候pytorch会新建恰等于这个值的数据读取线程,我猜,线程多于必要的时候,数据读取线程返回到主线程反而会因为线程间通信减慢数据。因此大了不好小了也不好。建议把模型,loss,优化器全注释了只跑一下数据流速度,确定最优值
  • pin_memory:是否提前申请CUDA内存(默认为False,但有说法除非数据集很小,否则在N卡上推荐总是打开)在MNIST这样的小数据集上好像是关闭比较好,到底多小算小说不清楚,建议自己试一下。

pin_memory就是锁页内存,创建DataLoader时,设置pin_memory=True,则意味着生成的Tensor数据最开始是属于内存中的锁页内存,这样将内存的Tensor转义到GPU的显存就会更快一些。主机中的内存,有两种存在方式,一是锁页,二是不锁页,锁页内存存放的内容在任何情况下都不会与主机的虚拟内存进行交换(注:虚拟内存就是硬盘),而不锁页内存在主机内存不足时,数据会存放在虚拟内存中。显卡中的显存全部是锁页内存,当计算机的内存充足的时候,可以设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。因为pin_memory与电脑硬件性能有关,pytorch开发者不能确保每一个炼丹玩家都有高端设备,因此pin_memory默认为False。

如果机子的内存比较大,建议开启pin_memory=Ture,如果开启后发现有卡顿现象或者内存占用过高。

总之官方的默认值很有可能不是最好的。在MNIST这样的小数据集上,pin_memory关闭比较好。而且,num_workers需要调节,除了默认情况外,最快和最慢是有一定差距的,建议在自己的代码上只跑数据读取这一块,确定这两个参数的最优值。

分布式 Hello World

启动辅助工具 Launch utility

torch.distributed.launch 例子

torch.distributed包提供了一个启动实用程序torch.distributed.launch,此帮助程序可用于为每个节点启动多个进程以进行分布式训练,它在每个训练节点上产生多个分布式训练进程。

这个工具可以用作CPU或者GPU,如果被用于GPU,每个GPU产生一个进程Process

该工具既可以用来做单节点多GPU训练,也可用于多节点多GPU训练。如果是单节点多GPU,将会在单个GPU上运行一个分布式进程,据称可以非常好地改进单节点训练性能。如果用于多节点分布式训练,则通过在每个节点上产生多个进程来获得更好的多节点分布式训练性能。如果有Infiniband接口则加速比会更高。

在单节点分布式训练或多节点分布式训练的两种情况下,该工具将为每个节点启动给定数量的进程(--nproc_per_node)。如果用于GPU训练,则此数字需要小于或等于当前系统上的GPU数量(nproc_per_node),并且每个进程将在从GPU 0到GPU(nproc_per_node - 1)的单个GPU上运行。

1、Single-Node multi-process distributed training

代码语言:javascript复制
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
       YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
       arguments of your training script)

2、Multi-Node multi-process distributed training: (e.g. two nodes)

Node 1: (IP: 192.168.1.1, and has a free port: 1234)

代码语言:javascript复制
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
       --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
       --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
       and all other arguments of your training script)

Node 2:

代码语言:javascript复制
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
       --nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
       --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
       and all other arguments of your training script)

需要注意的地方:

  • 后端最好用“NCCL”,才能获取最好的分布式性能
  • 训练代码必须从命令行解析--local_rank=LOCAL_PROCESS_RANK
代码语言:javascript复制
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int)
args = parser.parse_args()
     
torch.cuda.set_device(arg.local_rank)

torch.distributed初始化方式:

代码语言:javascript复制
torch.distributed.init_process_group(backend='nccl',init_method='env://')

model

代码语言:javascript复制
model = torch.nn.parallel.DistributedDataParallel(model,device_ids=[arg.local_rank],output_device=arg.local_rank)

其他地方一般就不用修改了,

我们的训练代码中这样写:

代码语言:javascript复制
import torch.distributed as dist
# 这个参数是torch.distributed.launch传递过来的,我们设置位置参数来接受,local_rank代表当前程序进程使用的GPU标号
parser.add_argument("--local_rank", type=int, default=0)
     
def synchronize():
    """
    Helper function to synchronize (barrier) among all processes when
    using distributed training
    """
    if not dist.is_available():
       return
    if not dist.is_initialized():
       return
    world_size = dist.get_world_size()
    if world_size == 1:
       return
    dist.barrier()
     
     
## WORLD_SIZE 由torch.distributed.launch.py产生 具体数值为 nproc_per_node*node(主机数,这里为1)
num_gpus = int(os.environ["WORLD_SIZE"]) if "WORLD_SIZE" in os.environ else 1
     
is_distributed = num_gpus > 1
     
if is_distributed:
   torch.cuda.set_device(args.local_rank)  # 这里设定每一个进程使用的GPU是一定的
   torch.distributed.init_process_group(
   backend="nccl", init_method="env://"
    )
synchronize()
     
# 将模型移至到DistributedDataParallel中,此时就可以进行训练了
if is_distributed:
model = torch.nn.parallel.DistributedDataParallel(
        model, device_ids=[args.local_rank], output_device=args.local_rank,
        # this should be removed if we update BatchNorm stats
        broadcast_buffers=False)

# 注意,在测试的时候需要执行 model = model.module

WRITING DISTRIBUTED APPLICATIONS WITH PYTORCH

https://github.com/pytorch/examples/tree/master/imagenet 这里,常规的操作就不多叙述了,主要讲一下和分布式相关的代码部分。

代码语言:javascript复制
parser.add_argument('--world-size', default=2, type=int, help='number of distributed processes')
parser.add_argument('--dist-url', default='tcp://172.16.1.186:2222', type=str, help='url used to set up distributed training')
parser.add_argument('--dist-backend', default='gloo', type=str, help='distributed backend')
parser.add_argument('--dist-rank', default=0, type=int, help='rank of distributed processes')

这几个是必要的参数设置,其中最后一个是官网没有的

代码语言:javascript复制
if args.distributed:
   dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,world_size=args.world_size,rank=args.dist_rank)

这个是分布式的初始化,同样,最后添加一个rank

代码语言:javascript复制
model.cuda()
model = torch.nn.parallel.DistributedDataParallel(model)

这里,把我们平时使用的单机多卡,数据并行的API

代码语言:javascript复制
model = torch.nn.DataParallel(model).cuda()

换掉即可。

代码语言:javascript复制
if args.distributed:
            train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)

最后使用这个官方给的划分方法,把数据集划分即可。

MNIST 分布式例子

MNIST 分布式 (2台 GPU机器,每台一张GPU):

  • GPU1:   python mnist_dist.py --init-method=file:///home/workspace/share/1 --rank=0 --world-size=2
  • GPU2:   python mnist_dist.py --init-method=file:///home/workspace/share/1 --rank=1 --world-size=2
代码语言:javascript复制
from __future__ import print_function
import argparse
import time
     
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
     
import torch.distributed as dist
import torch.utils.data
import torch.utils.data.distributed
     
     
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4 * 4 * 50, 500)
        self.fc2 = nn.Linear(500, 10)
     
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4 * 4 * 50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x)
     
     
def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.item()))
     
     
def test(args, model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        test_loss  = F.nll_loss(output, target, size_average=False).item()  
        # sum up batch loss
        pred = output.data.max(1, keepdim=True)[1]  
        # get the index of the max log-probability
        correct  = pred.eq(target.data.view_as(pred)).cpu().sum()
     
    test_loss /= len(test_loader.dataset)
     
    print('nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)n'.format(
            test_loss, correct, len(test_loader.dataset),
            100. * correct / len(test_loader.dataset)))
     
     
def main():
    # Training settings
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                            help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                            help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',
                            help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                            help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                            help='SGD momentum (default: 0.5)')
    parser.add_argument('--no-cuda', action='store_true',
                            help='disables CUDA training')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                            help='random seed (default: 1)')
    parser.add_argument('--log-interval', type=int, default=500, metavar='N',
                            help='how many batches to wait before logging training status')
     
    parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456')
    parser.add_argument('--rank', type=int)
    parser.add_argument('--world-size', type=int)
     
    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()
    print(args)
     
    # 初始化
    dist.init_process_group(init_method=args.init_method, backend="gloo", world_size=args.world_size, rank=args.rank,
                                group_name="pytorch_test")
     
    torch.manual_seed(args.seed)
    if use_cuda:
       torch.cuda.manual_seed(args.seed)
     
    train_dataset = datasets.MNIST('./data', train=True, download=False,
                                       transform=transforms.Compose([
                                           transforms.ToTensor(),
                                           transforms.Normalize((0.1307,), (0.3081,))
                                       ]))
    # 分发数据
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
     
    kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}
     
    train_loader = torch.utils.data.DataLoader(train_dataset,batch_size=args.batch_size,   shuffle=True, **kwargs)
     
    test_loader = torch.utils.data.DataLoader(
            datasets.MNIST('data', train=False, transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))
            ])),
            batch_size=args.test_batch_size, shuffle=True, **kwargs)
     
     device = torch.device("cuda" if use_cuda else "cpu")
     print(device)
     model = Net().to(device)
     if use_cuda:
        model = torch.nn.parallel.DistributedDataParallel(
                model) if use_cuda else torch.nn.parallel.DistributedDataParallelCPU(model)
     
     optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
     
     total_time = 0
     for epoch in range(1, args.epochs   1):
         # 设置epoch位置,这应该是个为了同步所做的工作
         train_sampler.set_epoch(epoch)
     
         start_cpu_secs = time.time()
         train(args, model, device, train_loader, optimizer, epoch)
         end_cpu_secs = time.time()
     
         print("Epoch {} of {} took {:.3f}s".format(
                epoch, args.epochs, end_cpu_secs - start_cpu_secs))
         total_time  = end_cpu_secs - start_cpu_secs
     
         test(args, model, device, test_loader)
     
     print("Total time= {:.3f}s".format(total_time))
     
     
if __name__ == '__main__':
   main()

单机例子(一台GPU机器一张GPU卡)

python mnist_no_dist.py

代码语言:javascript复制
from __future__ import print_function
import argparse
import time

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
     
     
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4 * 4 * 50, 500)
         self.fc2 = nn.Linear(500, 10)
     
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4 * 4 * 50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)
     
     
def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
           print('Train Epoch: {} [{}/{} ({:.0f}%)]tLoss: {:.6f}'.format(
                 epoch, batch_idx * len(data), len(train_loader.dataset),
                 100. * batch_idx / len(train_loader), loss.item()))
     
     
def test(args, model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss  = F.nll_loss(output, target, reduction='sum').item()  
            # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  
            # get the index of the max log-probability
            correct  = pred.eq(target.view_as(pred)).sum().item()
     
    test_loss /= len(test_loader.dataset)
     
    print('nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)n'.format(
            test_loss, correct, len(test_loader.dataset),
            100. * correct / len(test_loader.dataset)))
     
     
def main():
    # Training settings
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                            help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                            help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',
                            help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                            help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                            help='SGD momentum (default: 0.5)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                            help='disables CUDA training')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                            help='random seed (default: 1)')
    parser.add_argument('--log-interval', type=int, default=500, metavar='N',
                            help='how many batches to wait before logging training status')
     
    # parser.add_argument('--save-model', action='store_true', default=False,
    #                     help='For Saving the current Model')
    args = parser.parse_args()
    print(args)
     
    use_cuda = not args.no_cuda and torch.cuda.is_available()
     
    torch.manual_seed(args.seed)
     
    kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}
    train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('./data', train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                        ])),
        batch_size=args.batch_size, shuffle=True, **kwargs)
    test_loader = torch.utils.data.DataLoader(
            datasets.MNIST('./data', train=False, transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))
            ])),
         batch_size=args.test_batch_size, shuffle=True, **kwargs)
     
    device = torch.device("cuda" if use_cuda else "cpu")
    print(device)
    model = Net().to(device)
    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
     
    total_time = 0
    for epoch in range(1, args.epochs   1):
        start_cpu_secs = time.time()
        train(args, model, device, train_loader, optimizer, epoch)
        end_cpu_secs = time.time()
     
        print("Epoch {} of {} took {:.3f}s".format(
                epoch, args.epochs, end_cpu_secs - start_cpu_secs))
        total_time  = end_cpu_secs - start_cpu_secs
     
        test(args, model, device, test_loader)
     
    # print("Total time= {:.3f}s".format(total_time))
    # if (args.save_model):
    #     torch.save(model.state_dict(), "mnist_cnn.pt")
     
     
if __name__ == '__main__':
   main()

0 人点赞