Pytorch多GPU的计算和Sync BatchNorm

2022-09-03 19:04:16 浏览数 (1)

nn.DataParallel

pytorch中使用GPU非常方便和简单:

代码语言:javascript复制
import torch
import torch.nn as nn

input_size = 5
output_size = 2

class Model(nn.Module):

    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("[In Model]: device",torch.cuda.current_device() ," input size", input.size()," output size", output.size())
        return output


device = torch.device('cuda:0')

model = Model(input_size, output_size)
model.to(device)

x = torch.Tensor(2,5)
x = x.to(device)
y = model(x)

这里需要注意的是,仅仅调用Tensor.to()只会在GPU上返回一个新的copy,并不会对原来的引用造成变化,因此需要通过赋值rewrite。

上述只是对单个GPU的使用方法,对于多个GPU,pytorch也提供了封装好的接口——DataParallel,只需要将model 对象放入容器中即可:

代码语言:javascript复制
model = Model(input_size, output_size)

print("Let's use", torch.cuda.device_count(), "GPUs!n")
model = nn.DataParallel(model)
model.to(device)

print(model)

# output
Let's use 2 GPUs!

DataParallel(
  (module): Model(
    (fc): Linear(in_features=5, out_features=2, bias=True)
  )
)

看到这次输出的model外面还有一层DataParallel,但这里并没有体现出存在多个GPU。

接下来构造一个Dummy DataSet,来跑一下模型:

代码语言:javascript复制
from torch.utils.data import Dataset, DataLoader

batch_size = 30
data_size = 100

class RandomDataset(Dataset):

    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)  # 有length个样本,每个样本是size长度的向量

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),
                         batch_size=batch_size, shuffle=True)

for data in rand_loader:
    input = data.to(device)
    output = model(input)
    print("[Outside]: input size", input.size(),
          "output_size", output.size())

# output
[In Model]: device 0  input size torch.Size([15, 5])  output size torch.Size([15, 2])
[In Model]: device 1  input size torch.Size([15, 5])  output size torch.Size([15, 2])
[Outside]: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
[In Model]: device 0  input size torch.Size([15, 5])  output size torch.Size([15, 2])
[In Model]: device 1  input size torch.Size([15, 5])  output size torch.Size([15, 2])
[Outside]: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
[In Model]: device 0  input size torch.Size([15, 5])  output size torch.Size([15, 2])
[In Model]: device 1  input size torch.Size([15, 5])  output size torch.Size([15, 2])
[Outside]: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
[In Model]: device 0  input size torch.Size([5, 5])  output size torch.Size([5, 2])
[In Model]: device 1  input size torch.Size([5, 5])  output size torch.Size([5, 2])
[Outside]: input size torch.Size([10, 5]) output_size torch.Size([10, 2])

可以看到这里每次从data loader中取数据后,在两个GPU上执行了forward,并且每个GPU上的batch size都只有原来的一半,所以DataParallel将输入数据平分到了每个GPU上,从而实现并行计算。

进一步了解 DataParallel

上述文字来自官方文档,在forward阶段,当前GPU上的module会被复制到其他GPU上,输入数据则会被切分,分别传到不同的GPU上进行计算;在backward阶段,每个GPU上的梯度会被求和并传回当前GPU上,并更新参数。也就是复制module -> forward -> 计算loss -> backward -> 汇总gradients -> 更新参数 -> 复制module -> ...的不断重复执行,示意图如下:

因为数据会被均分到不同的GPU上,所以要求batch_size大于GPU的数量。下面对DataParallel的forward函数做一个简单的解释:

代码语言:javascript复制
class DataParallel(Module):

    def __init__(self, module, device_ids=None, output_device=None, dim=0):
        super(DataParallel, self).__init__()

        if not torch.cuda.is_available():
            self.module = module
            self.device_ids = []
            return

        if device_ids is None:
            device_ids = list(range(torch.cuda.device_count()))
        if output_device is None:
            output_device = device_ids[0]

        self.dim = dim
        self.module = module   # 待并行计算的模型
        self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))
        self.output_device = _get_device_index(output_device, True)
        self.src_device_obj = torch.device("cuda:{}".format(self.device_ids[0]))

        _check_balance(self.device_ids)

        if len(self.device_ids) == 1:
            self.module.cuda(device_ids[0])

    def forward(self, *inputs, **kwargs):
        if not self.device_ids:
            return self.module(*inputs, **kwargs)

        for t in chain(self.module.parameters(), self.module.buffers()):
            if t.device != self.src_device_obj:
                raise RuntimeError("module must have its parameters and buffers "
                                   "on device {} (device_ids[0]) but found one of "
                                   "them on device: {}".format(self.src_device_obj, t.device))

        inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)  
        if len(self.device_ids) == 1:
            return self.module(*inputs[0], **kwargs[0])
        replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
        outputs = self.parallel_apply(replicas, inputs, kwargs)
        return self.gather(outputs, self.output_device)

    def replicate(self, module, device_ids):
        '''replicate对输入模型的parameters、buffers、modules都一一进行copy,并返回copy的list,
        因为modules最终是以类似链表的形式存储的,所以list中只包含第一个module'''
        return replicate(module, device_ids)

    def scatter(self, inputs, kwargs, device_ids):
        '''scatter_kwargs内部调用名为scatter的函数,作用是将Tensor对象均分,以及复制其他类型对象的引用'''
        return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)

    def parallel_apply(self, replicas, inputs, kwargs):
        '''内部调用python的Thread将分割好的input分配到不同的GPU上计算,并返回result dict'''
        return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])

    def gather(self, outputs, output_device):
        '''从不同GPU上取回结果'''
        return gather(outputs, output_device, dim=self.dim)

parallel_apply()之前都不能确定input数据会被分配到哪个GPU上,因此在forward之前的Tensor.to()或者Tensor.cuda()都会导致错误。

GatherScatter的进一步观察会发现(如下),两者在backward时,只会传递梯度信息。因此所有在forward期间的update都会被忽略(比如counter什么的),除非是在device[0]上。

代码语言:javascript复制
class Gather(Function):

    @staticmethod
    def forward(ctx, target_device, dim, *inputs):
        ...

    @staticmethod
    def backward(ctx, grad_output):
        scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output)
        if ctx.unsqueezed_scalar:
            scattered_grads = tuple(g[0] for g in scattered_grads)
        return (None, None)   scattered_grads


class Scatter(Function):

    @staticmethod
    def forward(ctx, target_gpus, chunk_sizes, dim, input):
        ...

    @staticmethod
    def backward(ctx, *grad_output):
        return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)

GPU之间除了在scatter和gather时有交集,除此之外不会交换任何信息,这会阻碍一些功能的实现,比如Batch Normalization,如果只是模型加入torch.nn.BatchNorm2d(),那么在并行计算时,它只会统计当前GPU上这一部分数据的信息而不是所有的输入数据,有可能会使统计得到的均值和标准差出现偏差。

0 人点赞