分布式训练代码

2022-09-02 10:42:08 浏览数 (1)

Pytorch 分布式模式介绍_Wanderer001的博客-CSDN博客_pytorch 分布式

1、分布式训练代码

代码语言:javascript复制
import torch
from config import Config
from dataset import create_wf_datasets, my_collate_fn
from model import Net
from trainer import Trainer
from voc_dataset import create_voc_datasets
import argparse
import torch.distributed as dist
import torch.utils.data
import torch.utils.data.distributed

def main():
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    torch.backends.cudnn.benchmark = True
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='random seed (default: 1)')
    parser.add_argument('--log-interval', type=int, default=500, metavar='N',
                        help='how many batches to wait before logging training status')

    parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456')
    parser.add_argument('--rank', type=int)
    parser.add_argument('--world-size', default = 1, type=int)
    parser.add_argument('--no-cuda', action='store_true',
                            help='disables CUDA training')
    torch.set_default_tensor_type('torch.FloatTensor')
    args = parser.parse_args()
    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()
    print(args)
    # 初始化
    dist.init_process_group(init_method=args.init_method, backend="gloo", world_size=args.world_size, rank=1,
                            group_name="pytorch_test")
    torch.manual_seed(args.seed)
    if use_cuda:
        torch.cuda.manual_seed(args.seed)
    if Config.DATASETS == 'VOC':
        train_dataset, val_dataset = create_voc_datasets(Config.VOC_DATASET_DIR)
    elif Config.DATASETS == 'WF':
        train_dataset, val_dataset = create_wf_datasets(Config.WF_DATASET_DIR)
    else:
        raise RuntimeError('Select a dataset to train in config.py.')

    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}
    train_dataloader = torch.utils.data.DataLoader(train_dataset,
                                                   batch_size=args.batch_size,
                                                   sampler=train_sampler,
                                                   shuffle=True,
                                                   **kwargs)
    val_dataloader = torch.utils.data.DataLoader(
        val_dataset,
        batch_size=2,
        num_workers=Config.DATALOADER_WORKER_NUM,
        shuffle=False,
        collate_fn=my_collate_fn
    )
    use_cuda = not args.no_cuda and torch.cuda.is_available()
    print(args)
    # 初始化
    dist.init_process_group(init_method=args.init_method, backend="gloo", world_size=args.world_size, rank=args.rank,
                            group_name="pytorch_test")
    model = Net()
    if use_cuda:
        model = torch.nn.parallel.DistributedDataParallel(model)
    optimizer = torch.optim.SGD(model.parameters(), lr=Config.LEARNING_RATE,
        weight_decay=Config.WEIGHT_DECAY)
    trainer = Trainer(
        optimizer,
        model,
        train_dataloader,
        val_dataloader,
        resume=Config.RESUME_FROM,
        log_dir=Config.LOG_DIR,
        persist_stride=Config.MODEL_SAVE_STRIDE,
        max_epoch=Config.EPOCHS)
    trainer.train()
if __name__ == "__main__":
    main()

2、给train的标签重命名的代码

代码语言:javascript复制
import os

srcFile = './actwork/linkFile/allExtLinks - 副本.txt'
dstFile = './actwork/linkFile/allExtLinks - copy.txt'
try:
   os.rename(srcFile,dstFile)
except Exception as e:
   print(e)
   print('rename file failrn')
else:
   print('rename file successrn')



import os

path = '/Users/apple/Desktop/OCR'

path_list = os.listdir(path)

path_list.remove('.DS_Store')    # macos中的文件管理文件,默认隐藏,这里可以忽略

print(path_list)
代码语言:javascript复制
主要两种方式:DataParallel和DistributedDataParallel
DataParallel实现简单,但速度较慢,且存在负载不均衡的问题。
DistributedDataParallel本身是实现多机多卡的,但单机多卡也可以使用,配置稍复杂。demo如下:
DataParallel

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os

input_size = 5
output_size = 2
batch_size = 30
data_size = 90

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),
                         batch_size=batch_size, shuffle=True)

class Model(nn.Module):
    # Our model

    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output
model = Model(input_size, output_size)

if torch.cuda.is_available():
    model.cuda()

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 就这一行
    model = nn.DataParallel(model)

for data in rand_loader:
    if torch.cuda.is_available():
        input_var = Variable(data.cuda())
    else:
        input_var = Variable(data)
    output = model(input_var)
    print("Outside: input size", input_var.size(), "output_size", output.size())


DistributedDataParallel

运行: CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 distributedDataParallel.py

# distributedDataParallel.py 
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
from torch.utils.data.distributed import DistributedSampler
# 1) 初始化
torch.distributed.init_process_group(backend="nccl")

input_size = 5
output_size = 2
batch_size = 30
data_size = 90

# 2) 配置每个进程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size).to('cuda')

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

dataset = RandomDataset(input_size, data_size)
# 3)使用DistributedSampler
rand_loader = DataLoader(dataset=dataset,
                         batch_size=batch_size,
                         sampler=DistributedSampler(dataset))

class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output

model = Model(input_size, output_size)

# 4) 封装之前要把模型移到对应的gpu
model.to(device)

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 5) 封装
    model = torch.nn.parallel.DistributedDataParallel(model,
                                                      device_ids=[local_rank],
                                                      output_device=local_rank)

for data in rand_loader:
    if torch.cuda.is_available():
        input_var = data
    else:
        input_var = data

    output = model(input_var)
    print("Outside: input size", input_var.size(), "output_size", output.size())

0 人点赞