在之前的环节,我们已经能够读取数据,并且构建了我们的Dataset类,处理了数据中各种异常情况,并把数据转换成PyTorch可以处理的样子。一般来说,到了这一步就开始训练模型了。先不要考虑模型的效果,也不用做什么优化,先把模型训练跑通,看一下我们的效果,这样这个结果就可以作为baseline,然后再考虑优化的事情,每进行一步优化,就可以看到它对比基线有没有效果上的提升。话不多说,我们这就来搞一个模型。
接下来我们需要实现的流程如下:
- 初始化模型并加载数据
- 设定迭代周期,并循环训练,执行以下步骤
- 遍历我们的LunaDataset数据集,获取每批训练数据
- 数据加载器将数据加载进来
- 把数据传入模型以获取结果
- 计算损失,也就是计算预测结果和标注的差距
- 记录模型的效果保存到临时数据结构中
- 通过误差反向传播更新参数
- 获取验证集数据
- 加载验证集数据
- 使用模型预测验证集数据,并计算损失
- 记录模型在验证集上的效果
- 输出当前周期的效果记录
先定义好大框架
这些步骤其实都比较熟悉了,跟我们在前几章节中学的流程是一致的,但是我们这里的代码会更规范一些,加入了日志记录,并实现了几个基础功能来帮助我们运行模型。我们先来定义一个程序类,上面的流程基本上都会在这个类里面实现。如前面这里的代码都没办法直接运行,等我们把这些分块讲完,再研究怎么把代码跑起来。
代码语言:javascript复制class LunaTrainingApp:#首先创建一个类,这个类就是用来训练用的
def __init__(self, sys_argv=None): #构建初始化方法,这里设定接收参数
if sys_argv is None: #如果没有传入参数,则需要从命令行输入参数
sys_argv = sys.argv[1:]
parser = argparse.ArgumentParser() #接下来是参数解析,这里面还有好几个参数,会在完整代码看到
parser.add_argument('--num-workers',
help='Number of worker processes for background data loading',
default=8,
type=int,
)…… self.cli_args = parser.parse_args(sys_argv)
self.time_str = datetime.datetime.now().strftime('%Y-%m-%d_%H.%M.%S')……def main(self): #定义一个主方法,把训练过程放在main方法里
log.info("Starting {}, {}".format(type(self).__name__, self.cli_args))……if __name__ == '__main__':
LunaTrainingApp().main()
有了训练类,并按照代码模板定义好了我们的主方法,这里还有一个就是编写一个调用方法,这样我们就可以在Jupyter Notebook或者在shell中都可以去运行我们的代码。
代码语言:javascript复制import datetimefrom util.util import importstrfrom util.logconf import logging
log = logging.getLogger('nb') #日志记录def run(app, *argv):
argv = list(argv)
argv.insert(0, '--num-workers=4') # 这里设定在一个4核8线程CPU上运行程序,可以根据你的硬件情况进行调整
log.info("Running: {}({!r}).main()".format(app, argv))
app_cls = importstr(*app.rsplit('.', 1)) #这是一种import方法
app_cls(argv).main()
log.info("Finished: {}.{!r}).main()".format(app, argv))
*args
和**kwargs
主要用于函数定义,你可以将不定数量的参数传递给某个函数。 *args是用来发送一个非键值对的可变数量的参数列表给一个函数。 *args的用法:当传入的参数个数未知,且不需要知道参数名称时。 **kwargs 传入键值对(例如:num1=11,num2=22) **kwargs 允许将不定长度的键值对作为参数传递给一个函数。如果想要在一个函数里处理带名字的参数,应该使用 **kwargs。
初始化模型
接下来就开始涉及训练的一些环节,先把前面提到的一些环节定义好,然后再逐一去实现。当然,在实际做一个项目的时候最开始可能不会写这么正式的代码,对于比较简单的模型训练,一般就直接写一个模型训练的方法,然后就开始运行了,等运行成功再对代码进行优化,把其中的各个模块拆出来,封装成独立的方法方便我们后续的优化和调试。这里我们就省去了那些环节,直接从一个比较良好的代码出发。
代码语言:javascript复制class LunaTrainingApp:#这块仍然是修改类初始化方法
def __init__(self, sys_argv=None):……#设置使用的设备,这个跟之前的一样,有GPU使用GPU没有的话使用CPU
self.use_cuda = torch.cuda.is_available()
self.device = torch.device("cuda" if self.use_cuda else "cpu")#对模型初始化
self.model = self.initModel()#对优化器初始化
self.optimizer = self.initOptimizer()#定义模型初始化方法
def initModel(self):
model = LunaModel() #实例化一个LunaModel,当然LunaModel具体怎么定义的我们马上会讲,不要着急
if self.use_cuda:#如果使用的是GPU,就把模型发送到GPU上去
log.info("Using CUDA; {} devices.".format(torch.cuda.device_count()))
if torch.cuda.device_count() > 1:#如果有2个或更多的GPU 开启并行
model = nn.DataParallel(model)
model = model.to(self.device)
return model def initOptimizer(self):
return SGD(self.model.parameters(), lr=0.001, momentum=0.99) #我们使用的是一个带有动量的SGD方法,这里的超参数有两个一个是lr学习率,一个是momentum动量,这里我们先不讨论动量,只知道它是SGD的一个参数就行了。
# return Adam(self.model.parameters()) #如果你想使用其他的优化方法可以等第一次训练结束后开始尝试新的方案
接下来定义我们的数据加载器。
代码语言:javascript复制#训练集数据加载器def initTrainDl(self):
train_ds = LunaDataset(#从LunaDataset获取数据,验证集取10个
val_stride=10,
isValSet_bool=False,
)
batch_size = self.cli_args.batch_size #批大小是从参数中获取的
if self.use_cuda:
batch_size *= torch.cuda.device_count() #如果使用GPU,在多个GPU的情况下每次可以获取多个batch
train_dl = DataLoader( #这个是PyTorch的方法
train_ds,
batch_size=batch_size,
num_workers=self.cli_args.num_workers,
pin_memory=self.use_cuda,
)
return train_dl def initValDl(self): #验证集数据加载 跟上面一样的就不介绍了
val_ds = LunaDataset(
val_stride=10,
isValSet_bool=True,
)
batch_size = self.cli_args.batch_size if self.use_cuda:
batch_size *= torch.cuda.device_count()
val_dl = DataLoader(
val_ds,
batch_size=batch_size,
num_workers=self.cli_args.num_workers,
pin_memory=self.use_cuda,
)
return val_dl
最后在main方法中加入数据加载
代码语言:javascript复制 def main(self):
log.info("Starting {}, {}".format(type(self).__name__, self.cli_args))
train_dl = self.initTrainDl()
val_dl = self.initValDl()
实现模型核心部分
这里我们继续使用卷积来处理我们的图像。
先来看一下这个模型的设计。尾部只使用了一个batchnorm对数据进行正则化。然后进入到主干部分,主干中包含四个同样的块,每个块的细节是右边的内容,其中包含了两次3d卷积两次ReLU激活,最后有一个最大池化。在主干上面,结果输出到一个线性层,然后经过softmax给出预测概率。其中主干的四个块,每个块输出的channel数都扩大一倍,相当于提取更多维度的特征。 至于为啥设计这样一个网络,其实并没有定论,如果随意的组合可以搞出无数种情况,我们这里就暂且认为它已经实现了我们需要的功能架构。一般来说修改模型架构是很困难的事情,幸运的是有很多前人已经趟出了一些路线,因为尝试很多也未必有好的效果,我们就先按这个架构来实现。
首先是主干的块设计。
代码语言:javascript复制class LunaBlock(nn.Module):
def __init__(self, in_channels, conv_channels): #初始化方法
super().__init__()
self.conv1 = nn.Conv3d(# 使用3d卷积
in_channels, conv_channels, kernel_size=3, padding=1, bias=True,#这里kernel_size=3就是3×3×3的卷积核了
)
self.relu1 = nn.ReLU(inplace=True)
self.conv2 = nn.Conv3d(
conv_channels, conv_channels, kernel_size=3, padding=1, bias=True,
)
self.relu2 = nn.ReLU(inplace=True) #ReLU激活函数
self.maxpool = nn.MaxPool3d(2, 2) #最大池化
def forward(self, input_batch): #前向传播
block_out = self.conv1(input_batch)
block_out = self.relu1(block_out)
block_out = self.conv2(block_out)
block_out = self.relu2(block_out)
return self.maxpool(block_out)
然后是对整个模型的构建。
代码语言:javascript复制class LunaModel(nn.Module):
def __init__(self, in_channels=1, conv_channels=8):
super().__init__()#就像我们前面说的先使用一个batchnorm
self.tail_batchnorm = nn.BatchNorm3d(1)#然后是主干四个块
self.block1 = LunaBlock(in_channels, conv_channels)
self.block2 = LunaBlock(conv_channels, conv_channels * 2)
self.block3 = LunaBlock(conv_channels * 2, conv_channels * 4)
self.block4 = LunaBlock(conv_channels * 4, conv_channels * 8)#最后应用线性层 softmax输出结果
self.head_linear = nn.Linear(1152, 2)
self.head_softmax = nn.Softmax(dim=1)#这里还有一个对权重的初始化 别急马上会有讲到
self._init_weights()
定义前向传播
代码语言:javascript复制 def forward(self, input_batch):
bn_output = self.tail_batchnorm(input_batch)
block_out = self.block1(bn_output)
block_out = self.block2(block_out)
block_out = self.block3(block_out)
block_out = self.block4(block_out)#改变输出的形状,转换成一维
conv_flat = block_out.view(
block_out.size(0),
-1,
)
linear_output = self.head_linear(conv_flat)
return linear_output, self.head_softmax(linear_output)
接下来就是我们刚才说的初始化参数权重,这大概是一个标准的初始化方法,良好的初始化有利于防止出现梯度爆炸或者梯度消失。
代码语言:javascript复制 def _init_weights(self):
for m in self.modules():
if type(m) in {
nn.Linear,
nn.Conv3d,
nn.Conv2d,
nn.ConvTranspose2d,
nn.ConvTranspose3d,
}:
nn.init.kaiming_normal_(#这个让随机生成的数值符合正态分布
m.weight.data, a=0, mode='fan_out', nonlinearity='relu',
)
if m.bias is not None:
fan_in, fan_out =
nn.init._calculate_fan_in_and_fan_out(m.weight.data)
bound = 1 / math.sqrt(fan_out)
nn.init.normal_(m.bias, -bound, bound)
写完模型的核心部分,我们在训练环节还有一些事情需要处理,比如说我们的循环,以及损失定义。在main方法中加入循环
代码语言:javascript复制 def main(self):
log.info("Starting {}, {}".format(type(self).__name__, self.cli_args))
train_dl = self.initTrainDl()
val_dl = self.initValDl()
for epoch_ndx in range(1, self.cli_args.epochs 1):#前面这一大坨都是日志记录,
log.info("Epoch {} of {}, {}/{} batches of size {}*{}".format(
epoch_ndx, #正在训练的代数
self.cli_args.epochs,#设置的总代数
len(train_dl),#训练集大小
len(val_dl),#验证集大小
self.cli_args.batch_size,#batch大小
(torch.cuda.device_count() if self.use_cuda else 1),#GPU数目
))#实际执行训练
trnMetrics_t = self.doTraining(epoch_ndx, train_dl)
self.logMetrics(epoch_ndx, 'trn', trnMetrics_t) #这个是记录结果数据
valMetrics_t = self.doValidation(epoch_ndx, val_dl)
self.logMetrics(epoch_ndx, 'val', valMetrics_t)#这个是记录结果数据
if hasattr(self, 'trn_writer'):
self.trn_writer.close()
self.val_writer.close()
看一下doTraining方法
代码语言:javascript复制 def doTraining(self, epoch_ndx, train_dl):#首先执行训练 就是刚才定义的lunamodel
self.model.train()#初始化训练结果指标
trnMetrics_g = torch.zeros(
METRICS_SIZE,
len(train_dl.dataset),
device=self.device,
)#这一段是用来估算运行时间的,有了估计的时间,你可以抽空去干点别的事情
batch_iter = enumerateWithEstimate(
train_dl,
"E{} Training".format(epoch_ndx),
start_ndx=train_dl.num_workers,
)#梯度归零 然后计算损失
for batch_ndx, batch_tup in batch_iter:
self.optimizer.zero_grad()
loss_var = self.computeBatchLoss(
batch_ndx,
batch_tup,
train_dl.batch_size,
trnMetrics_g )#损失反向传播
loss_var.backward()#用优化器更新权重
self.optimizer.step()
self.totalTrainingSamples_count = len(train_dl.dataset)
return trnMetrics_g.to('cpu')
定义损失计算方法。
代码语言:javascript复制#有几个全局静态变量需要预先定义,用于标明索引位置,以及结果数组的大小METRICS_LABEL_NDX=0METRICS_PRED_NDX=1METRICS_LOSS_NDX=2METRICS_SIZE = 3
def computeBatchLoss(self, batch_ndx, batch_tup, batch_size, metrics_g):
input_t, label_t, _series_list, _center_list = batch_tup
#数据传到GPU上
input_g = input_t.to(self.device, non_blocking=True)
label_g = label_t.to(self.device, non_blocking=True)#使用模型预测,其中logits_g是没有经过softmax的值
logits_g, probability_g = self.model(input_g)
#使用交叉熵损失
loss_func = nn.CrossEntropyLoss(reduction='none')#这里获取每个样本的损失结果
loss_g = loss_func(
logits_g,
label_g[:,1],
)
start_ndx = batch_ndx * batch_size
end_ndx = start_ndx label_t.size(0)#把结果数据放进metrics_g
metrics_g[METRICS_LABEL_NDX, start_ndx:end_ndx] =
label_g[:,1].detach()
metrics_g[METRICS_PRED_NDX, start_ndx:end_ndx] =
probability_g[:,1].detach()
metrics_g[METRICS_LOSS_NDX, start_ndx:end_ndx] =
loss_g.detach()
return loss_g.mean()
最后就是定义验证方法了,这个跟训练方法基本一致,就不再多介绍了。
代码语言:javascript复制 def doValidation(self, epoch_ndx, val_dl):
with torch.no_grad():
self.model.eval()
valMetrics_g = torch.zeros(
METRICS_SIZE,
len(val_dl.dataset),
device=self.device,
)
batch_iter = enumerateWithEstimate(
val_dl,
"E{} Validation ".format(epoch_ndx),
start_ndx=val_dl.num_workers,
)
for batch_ndx, batch_tup in batch_iter:
self.computeBatchLoss(
batch_ndx, batch_tup, val_dl.batch_size, valMetrics_g)
return valMetrics_g.to('cpu')
今天已经写了很长了,结果还没有到实际的训练环节。不过大部分训练的代码我们已经搞完了,这样一块一块的看可能有点混乱,等到最后我们把代码都组起来之后就会好多了。今天先到这里,下一篇争取把训练结果跑出来。