DeepSpeed
DeepSpeed is a deep learning optimization library that makes distributed training and inference easy, efficient, and effective.
Innovation pillars
img
Training
Argument Parsing
- 将
argparse
变成deepspeed parser
parser = deepspeed.add_config_arguments(parser)
- 样例
import argparse
import deepspeed
def add_argument():
parser=argparse.ArgumentParser(description='CIFAR')
#data
# cuda
parser.add_argument('--with_cuda', default=False, action='store_true',
help='use CPU in case there's no GPU support')
parser.add_argument('--use_ema', default=False, action='store_true',
help='whether use exponential moving average')
# train
parser.add_argument('-b', '--batch_size', default=32, type=int,
help='mini-batch size (default: 32)')
parser.add_argument('-e', '--epochs', default=30, type=int,
help='number of total epochs (default: 30)')
parser.add_argument('--local_rank', type=int, default=-1,
help='local rank passed from distributed launcher')
# Include DeepSpeed configuration arguments
parser = deepspeed.add_config_arguments(parser)
args=parser.parse_args()
return args
Initialization
- 必须把
args, model结构和参数
变成deepspeed版本 - 对于
dataloder
可以用deepspeed.initialize
变成分布式(需要传入trainset
),也可以自定义(不需要传入trainset
) - API
def initialize(args,
model,
optimizer=None,
model_parameters=None,
training_data=None,
lr_scheduler=None,
mpu=None,
dist_init_required=True,
collate_fn=None):
- 样例
parameters = filter(lambda p: p.requires_grad, net.parameters())
args=add_argument()
# Initialize DeepSpeed to use the following features
# 1) Distributed model
# 2) Distributed data loader
# 3) DeepSpeed optimizer
# 需要传入`trainset`
model_engine, optimizer, trainloader, _ = deepspeed.initialize(args=args, model=net, model_parameters=parameters, training_data=trainset)
# 不需要传入`trainset`
model_engine, optimizer, _, _ = deepspeed.initialize(args=args, model=net, model_parameters=parameters, training_data=None)
DataLoder
- 因为deepspeed基于pytorch.dist,所以可以用dataloader,注意自定义dataloder需要分布式采样
DistributedSampler
- 自定义分布式dataloder,注意batch_size定义,分布式的batch_size是指每张gpu上批大小,不开启分布式batch_size指所有gpu的总大小
# 开启分布式 用DistributedSampler
if local_rank >= 0:
if data_sampler is None:
data_sampler = DistributedSampler(dataset)
device_count = 1
# 不开启分布式 用 RandomSampler
else:
if data_sampler is None:
data_sampler = RandomSampler(dataset)
device_count = torch.cuda.device_count()
batch_size *= device_count
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
num_workers=self.num_local_io_workers)
- 最好定义成生成器(相当于
IterableDataset
)
def __iter__(self):
self._create_dataloader()
return self
def __len__(self):
return self.len
def __next__(self):
if self.tput_timer:
self.tput_timer.start()
return next(self.data)
- 样例
import torch
import logging
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
class DeepSpeedDataLoader(object):
def __init__(self,
dataset,
batch_size,
pin_memory,
local_rank,
tput_timer,
collate_fn=None,
num_local_io_workers=None,
data_sampler=None):
self.tput_timer = tput_timer
self.batch_size = batch_size
if local_rank >= 0: # 开启分布式 用DistributedSampler
if data_sampler is None:
data_sampler = DistributedSampler(dataset)
device_count = 1
else: # 不开启分布式 用 RandomSampler
if data_sampler is None:
data_sampler = RandomSampler(dataset)
device_count = torch.cuda.device_count()
batch_size *= device_count
if num_local_io_workers is None:
num_local_io_workers = 2 * device_count
self.num_local_io_workers = num_local_io_workers
self.data_sampler = data_sampler
self.dataset = dataset
self.collate_fn = collate_fn
self.device_count = device_count
self.batch_size = batch_size
self.pin_memory = pin_memory
self.len = len(self.data_sampler)
self.data = None
def __iter__(self):
self._create_dataloader()
return self
def __len__(self):
return self.len
def __next__(self):
if self.tput_timer:
self.tput_timer.start()
return next(self.data)
def _create_dataloader(self):
if self.collate_fn is None:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
num_workers=self.num_local_io_workers)
else:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
collate_fn=self.collate_fn,
num_workers=self.num_local_io_workers)
self.data = (x for x in self.dataloader)
return self.dataloader
Training API
- 把读取的分布式
data
放到相应的显卡上inputs, labels = data[0].to(model_engine.local_rank), data[1].to(model_engine.local_rank)
- 注意
loss
和optimizer
分别是model_engine.backward(loss)
和model_engine.step()
,不需要optimizer.zero_grad()
. (Zeroing the gradients is handled automatically by DeepSpeed after the weights have been updated using a mini-batch.) - 样例
for i, data in enumerate(trainloader):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data[0].to(model_engine.local_rank), data[1].to(model_engine.local_rank)
outputs = model_engine(inputs)
loss = criterion(outputs, labels)
model_engine.backward(loss)
model_engine.step()
Configuration
- 定义
JSON file (ds_config.json)
{
"train_batch_size": 4,
"steps_per_print": 2000,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.001,
"betas": [
0.8,
0.999
],
"eps": 1e-8,
"weight_decay": 3e-7
}
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 0.001,
"warmup_num_steps": 1000
}
},
"wall_clock_breakdown": false
}
Running
代码语言:javascript复制$ deepspeed deepspeed.py --deepspeed --deepspeed_config ds_config.json
Inference
Inference API
deepspeed.init_inference()
returns an inference engine of typeInferenceEngine
.