DeepSpeed大模型分布式训练

2023-09-13 14:15:11 浏览数 (1)

DeepSpeed

DeepSpeed is a deep learning optimization library that makes distributed training and inference easy, efficient, and effective.

Innovation pillars

img

Training

Argument Parsing
  • argparse变成deepspeed parser
代码语言:javascript复制
parser = deepspeed.add_config_arguments(parser)
  • 样例
代码语言:javascript复制
 import argparse
 import deepspeed

 def add_argument():

     parser=argparse.ArgumentParser(description='CIFAR')

     #data
     # cuda
     parser.add_argument('--with_cuda', default=False, action='store_true',
                         help='use CPU in case there's no GPU support')
     parser.add_argument('--use_ema', default=False, action='store_true',
                         help='whether use exponential moving average')

     # train
     parser.add_argument('-b', '--batch_size', default=32, type=int,
                         help='mini-batch size (default: 32)')
     parser.add_argument('-e', '--epochs', default=30, type=int,
                         help='number of total epochs (default: 30)')
     parser.add_argument('--local_rank', type=int, default=-1,
                        help='local rank passed from distributed launcher')

     # Include DeepSpeed configuration arguments
     parser = deepspeed.add_config_arguments(parser)

     args=parser.parse_args()

     return args
Initialization
  • 必须把args, model结构和参数变成deepspeed版本
  • 对于dataloder可以用deepspeed.initialize变成分布式(需要传入trainset),也可以自定义(不需要传入trainset)
  • API
代码语言:javascript复制
def initialize(args,
               model,
               optimizer=None,
               model_parameters=None,
               training_data=None,
               lr_scheduler=None,
               mpu=None,
               dist_init_required=True,
               collate_fn=None):
  • 样例
代码语言:javascript复制
 parameters = filter(lambda p: p.requires_grad, net.parameters())
 args=add_argument()

 # Initialize DeepSpeed to use the following features
 # 1) Distributed model
 # 2) Distributed data loader
 # 3) DeepSpeed optimizer
 
 # 需要传入`trainset`
 model_engine, optimizer, trainloader, _ = deepspeed.initialize(args=args, model=net, model_parameters=parameters, training_data=trainset)
 # 不需要传入`trainset`
 model_engine, optimizer, _, _ = deepspeed.initialize(args=args, model=net, model_parameters=parameters, training_data=None)
DataLoder
  • 因为deepspeed基于pytorch.dist,所以可以用dataloader,注意自定义dataloder需要分布式采样DistributedSampler
  • 自定义分布式dataloder,注意batch_size定义,分布式的batch_size是指每张gpu上批大小,不开启分布式batch_size指所有gpu的总大小
代码语言:javascript复制
# 开启分布式 用DistributedSampler
if local_rank >= 0: 
    if data_sampler is None:
        data_sampler = DistributedSampler(dataset)
        device_count = 1

# 不开启分布式 用 RandomSampler
else: 
    if data_sampler is None:
        data_sampler = RandomSampler(dataset)
        device_count = torch.cuda.device_count()
        batch_size *= device_count
            
self.dataloader = DataLoader(self.dataset,
    batch_size=self.batch_size,
    pin_memory=self.pin_memory,
    sampler=self.data_sampler,
    num_workers=self.num_local_io_workers)
  • 最好定义成生成器(相当于IterableDataset
代码语言:javascript复制
    def __iter__(self):
        self._create_dataloader()
        return self

    def __len__(self):
        return self.len

    def __next__(self):
        if self.tput_timer:
            self.tput_timer.start()
        return next(self.data)
  • 样例
代码语言:javascript复制
import torch
import logging
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm


class DeepSpeedDataLoader(object):
    def __init__(self,
                 dataset,
                 batch_size,
                 pin_memory,
                 local_rank,
                 tput_timer,
                 collate_fn=None,
                 num_local_io_workers=None,
                 data_sampler=None):
        self.tput_timer = tput_timer
        self.batch_size = batch_size

        if local_rank >= 0: # 开启分布式 用DistributedSampler
            if data_sampler is None:
                data_sampler = DistributedSampler(dataset)
            device_count = 1
        else: # 不开启分布式 用 RandomSampler
            if data_sampler is None:
                data_sampler = RandomSampler(dataset)
            device_count = torch.cuda.device_count()
            batch_size *= device_count

        if num_local_io_workers is None:
            num_local_io_workers = 2 * device_count

        self.num_local_io_workers = num_local_io_workers
        self.data_sampler = data_sampler
        self.dataset = dataset
        self.collate_fn = collate_fn
        self.device_count = device_count
        self.batch_size = batch_size
        self.pin_memory = pin_memory
        self.len = len(self.data_sampler)
        self.data = None

    def __iter__(self):
        self._create_dataloader()
        return self

    def __len__(self):
        return self.len

    def __next__(self):
        if self.tput_timer:
            self.tput_timer.start()
        return next(self.data)

    def _create_dataloader(self):
        if self.collate_fn is None:
            self.dataloader = DataLoader(self.dataset,
                                         batch_size=self.batch_size,
                                         pin_memory=self.pin_memory,
                                         sampler=self.data_sampler,
                                         num_workers=self.num_local_io_workers)
        else:
            self.dataloader = DataLoader(self.dataset,
                                         batch_size=self.batch_size,
                                         pin_memory=self.pin_memory,
                                         sampler=self.data_sampler,
                                         collate_fn=self.collate_fn,
                                         num_workers=self.num_local_io_workers)
        self.data = (x for x in self.dataloader)

        return self.dataloader
Training API
  • 把读取的分布式data放到相应的显卡上 inputs, labels = data[0].to(model_engine.local_rank), data[1].to(model_engine.local_rank)
  • 注意lossoptimizer分别是model_engine.backward(loss)model_engine.step(),不需要optimizer.zero_grad(). (Zeroing the gradients is handled automatically by DeepSpeed after the weights have been updated using a mini-batch.)
  • 样例
代码语言:javascript复制
  for i, data in enumerate(trainloader):
      # get the inputs; data is a list of [inputs, labels]
      inputs, labels = data[0].to(model_engine.local_rank), data[1].to(model_engine.local_rank)

      outputs = model_engine(inputs)
      loss = criterion(outputs, labels)

      model_engine.backward(loss)
      model_engine.step()
Configuration
  • 定义 JSON file (ds_config.json)
代码语言:javascript复制
 {
   "train_batch_size": 4,
   "steps_per_print": 2000,
   "optimizer": {
     "type": "Adam",
     "params": {
       "lr": 0.001,
       "betas": [
         0.8,
         0.999
       ],
       "eps": 1e-8,
       "weight_decay": 3e-7
     }
   },
   "scheduler": {
     "type": "WarmupLR",
     "params": {
       "warmup_min_lr": 0,
       "warmup_max_lr": 0.001,
       "warmup_num_steps": 1000
     }
   },
   "wall_clock_breakdown": false
 }
Running
代码语言:javascript复制
$ deepspeed deepspeed.py --deepspeed --deepspeed_config ds_config.json

Inference

Inference API
  • deepspeed.init_inference() returns an inference engine of type InferenceEngine.

0 人点赞