何恺明等人提出的残差网络(ResNet) 在2015年的ImageNet图像识别挑战赛夺魁,并深刻影响了后来的深度神经网络的设计。残差网络的核心思想是:增加网络的深度后的最好还能包含原始函数(原始函数指的是增加深度之前的网络,它把一个input张量映射为一个output张量)作为其元素之一,从而必不会使网络的拟合能力变得更差。 于是,残差块(residual blocks)便诞生了,这个设计对如何建立深层神经网络产生了深远的影响。凭借它,ResNet赢得了2015年ImageNet大规模视觉识别挑战赛。
这里是ResNet论文的地址:
https://www.cv-foundation.org/openaccess/content_cvpr_2016/papers/He_Deep_Residual_Learning_CVPR_2016_paper.pdf
残差块模型如下(传播路径从上往下看):通过添加直通的旁路,来保证深层网络的拟合能力不会退化。
具体化后的结构如下(传播路径从下往上看):
如果想改变通道数,就需要引入一个额外的1×1卷积层来将输入变换成需要的形状后再做相加运算:
关于残差网络的中文介绍,可参考李沐的在线书籍:
http://zh-v2.d2l.ai/chapter_convolutional-modern/resnet.html
代码语言:javascript复制
代码语言:javascript复制import os
import glob
import numpy as np
import random
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
# import torch.nn.functional as F
from torchvision import transforms,datasets
# from load_CIFAR10_dataset import train_inputs, train_labels, test_inputs, test_labels, names
transform1 = transforms.Compose(
[transforms.RandomHorizontalFlip(p=0.5),
# transforms.RandomVerticalFlip(p=0.5),
# transforms.RandomRotation((-5,5)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
transform2 = transforms.Compose(
[# transforms.RandomHorizontalFlip(p=0.5),
# transforms.RandomVerticalFlip(p=0.5),
# transforms.RandomRotation((-5,5)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
train_set = datasets.CIFAR10(root='K:/datasets/CIFAR-10', train=True, download=False, transform=transform1)
test_set = datasets.CIFAR10(root='K:/datasets/CIFAR-10', train=False, download=False, transform=transform2)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
train_dataloader = DataLoader(train_set, batch_size=8, shuffle=True)
test_dataloader = DataLoader(test_set, batch_size=1024, shuffle=False)
# 如果 直接使用datasets.CIFAR10 函数加载数据集,这个类是不必要的
# 用于自定义数据集
class MyCifar10Dataset(Dataset):
def __init__(self, inputs, labels, n=10):
self.n = n
self.inputs = inputs # [Channels, H, W]
self.labels = labels
def onehoted(self, label): # 独热编码 处理
label_onehot = np.zeros(self.n)
label_onehot[label] = 1
return label_onehot
def normalized(self, input_):
mean = np.mean(input_)
std = np.std(input_)
return (input_ - mean) / std
def flip(self, input_):
return input_[:,:][::-1]
def __len__(self): # 必须定义!
return len(self.inputs)
def __getitem__(self, index): # 必须定义!
label = self.onehoted(self.labels[index])
input_ = self.normalized(self.inputs[index])
r = random.random()
#if r>=0.5:
#input_= self.flip(input_)
return input_, label
class Residual(nn.Module): # 定义残差块
def __init__(self, in_channels, out_channels, use_1x1conv=False, strides=1):
super(Residual, self).__init__()
# 通常如果通道数加倍,则高和宽豆减半(即 out_chanel = 2* in_channels, strides=(2,2))
self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3,3), stride=(strides,strides), padding=1)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=(3,3), padding=1)
self.bn2 = nn.BatchNorm2d(out_channels)
if use_1x1conv:
self.conv3 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(1,1), stride=(strides,strides))
else:
self.conv3 = None
self.relu = nn.LeakyReLU(inplace=True)
def forward(self, X):
Y = self.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3:
X = self.conv3(X)
Y = X
Y = self.relu(Y)
return Y
# 用于循环添加残差块
def resnet_block(in_channels, out_channels, num_residuals, is_first_block=False):
block = []
for i in range(num_residuals):
if i == 0 and not is_first_block:
block.append(Residual(in_channels, out_channels, use_1x1conv=True, strides=2)) # 高宽减半
else:
block.append(Residual(out_channels, out_channels)) # 高宽不变,因为前面已减半
return block
# 网络的整体结构
class NeuralNetwork(nn.Module): # Define model
def __init__(self):
super(NeuralNetwork, self).__init__()
out1 = 64
b1 = nn.Sequential(
nn.Conv2d(in_channels=3, out_channels=out1, kernel_size=(4, 4), stride=(2, 2), padding=1),
nn.BatchNorm2d(out1),
nn.LeakyReLU(inplace=True))
b2 = nn.Sequential(*resnet_block(out1, out1, num_residuals=2, is_first_block=True))
b3 = nn.Sequential(*resnet_block(out1, 128, num_residuals=2, is_first_block=False))
b4 = nn.Sequential(*resnet_block(128, 256, num_residuals=2, is_first_block=False))
b5 = nn.Sequential(*resnet_block(256, 512, num_residuals=2, is_first_block=False))
self.net = nn.Sequential(b1,
b2,
b3,
b4,
b5,
nn.AdaptiveAvgPool2d(output_size=(1,1)), # 只需设定输出维度的大小 output_size ,具体的实现过程和参数选择已经自动帮你确定了, 设大一点则更多权重参数
nn.Flatten(),
nn.Linear(512, 128),
nn.LeakyReLU(inplace=True),
nn.Dropout(p=0.5),
nn.Linear(128, 10))
def forward(self, X):
return self.net(X)
def train(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset)
num_batches = len(dataloader)
train_loss, correct = 0.0, 0.0
model.train()
for batch, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
# Compute prediction error
pred = model(X)
loss = loss_fn(pred, y)
correct = (pred.argmax(axis=1) == y).type(torch.float).sum().item() ## RuntimeError: The size of tensor a (256) must match the size of tensor b (10) at non-singleton dimension 1
#correct = (pred.argmax(axis=1) == y.argmax(axis=1)).type(torch.float32).sum().item()
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss = loss.item() # only for monitor
#if batch % 100 == 0: # 训练监控
#loss, current = loss.item(), batch * len(X)
#print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
train_loss /= num_batches
correct /= size
train_Avg_loss.append(train_loss)
train_Accuracy.append(correct:=100*correct)
print(f"Train Accuracy: {correct:>0.6f}%, train Avg loss: {train_loss:>8f}")
def test(dataloader, model, loss_fn):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0.0, 0.0
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred = model(X)
test_loss = loss_fn(pred, y).item()
correct = (pred.argmax(axis=1) == y).type(torch.float32).sum().item()
# correct = (pred.argmax(axis=1) == y.argmax(axis=1)).type(torch.float32).sum().item()
test_loss /= num_batches
correct /= size
test_Avg_loss.append(test_loss)
test_Accuracy.append(correct:=100*correct)
print(f"Test Accuracy: {correct:>0.6f}%, test Avg loss: {test_loss:>8f}n")
if __name__ == "__main__":
from matplotlib import pyplot as plt
from matplotlib import ticker
#training_data = MyCifar10Dataset(train_inputs, train_labels)
#test_data = MyCifar10Dataset(test_inputs, test_labels)
#train_dataloader = DataLoader(training_data, batch_size=1024, shuffle=True)
#test_dataloader = DataLoader(test_data, batch_size=1024, shuffle=False)
for X, y in test_dataloader:
print(f"Shape of X [batch, C, H, W]: {X.shape} {X.dtype}")
print(f"Shape of y: {y.shape} {y.dtype}")
break
model = NeuralNetwork()
if torch.cuda.is_available(): # Get cpu or gpu device for training.
device ="cuda"
model = model.to(device)
else:
device ="cpu"
print(f"device = {device}")
print(model)
loss_fn = nn.CrossEntropyLoss() # 交叉熵损失。适用于分类问题
# 若报错 RuntimeError: "nll_loss_forward_reduce_cuda_kernel_2d_index" not implemented for 'Int' # 原因是pytorch自带的损失计算函数不支持原本的标签1,2,3,4…,n,需要转换成网络需要的one_hot编码才行
#optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.1, weight_decay=0.01)
# 设置动态学习率,每step_size 个 epochs后, lr *= gamma 。
# optimizer = torch.optim.SGD(model.parameters(), lr=0.1, weight_decay=1e-2)
optimizer = torch.optim.SGD(model.parameters(), lr=0.1, weight_decay=1e-3)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.95) # 设置动态学习率,每step_size 个 epochs后, lr *= gamma 。
train_Accuracy = []
train_Avg_loss = []
test_Accuracy = []
test_Avg_loss = []
epochs = 400
for t in range(epochs):
print(f"Epoch {t 1}:")
#momentum = 0 if t < 10 else 0.9
#optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=momentum, weight_decay=1e-5)
## 设置动态学习率,每step_size 个 epochs后, lr *= gamma 。
#optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
#scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)
print(f"current learning rate is {scheduler.get_last_lr()[0]}")
train(train_dataloader, model, loss_fn, optimizer)
scheduler.step()
test(test_dataloader, model, loss_fn)
print("Done.")
model_path = "CIFAR10_mode.pth"
torch.save(model.state_dict(), model_path)
print(f"Saved PyTorch Model State to {model_path}")
# 绘图显示正确率和平均损失
plt.subplot(2, 1, 1)
plt.plot(range(1, epochs 1), train_Accuracy, "r-", label="train_Accuracy")
plt.plot(range(1, epochs 1), test_Accuracy, "b-", label="test_Accuracy")
plt.xlabel("Epoch")
xticker_formatter = ticker.FuncFormatter(lambda x, pos: "%d" % x)
plt.gca().xaxis.set_major_formatter(xticker_formatter)
plt.ylabel("Accuracy[%]")
plt.legend(loc="lower right")
plt.grid()
plt.subplot(2, 1, 2)
plt.plot(range(1, epochs 1), train_Avg_loss, "r-", label="train_Avg_loss")
plt.plot(range(1, epochs 1), test_Avg_loss, "b-", label="test_Avg_loss")
plt.xlabel("Epoch")
plt.gca().xaxis.set_major_formatter(xticker_formatter)
plt.ylabel("Avg_loss")
plt.legend(loc="upper right")
plt.grid()
plt.savefig("Accuracy and loss plot small batch size2.png")
# plt.show()
correct_pred = {classname: 0 for classname in classes}
total_pred = {classname: 0 for classname in classes}
net = NeuralNetwork()
net.load_state_dict(torch.load(model_path))
# os.system("shutdown -s -t 1 ") # -t xx 设置关闭的超时为 xx 秒
代码语言:javascript复制
当batch 大小为 1024时,很容易过拟合。如下图是典型的过拟合,测试集准确率只有72%,但训练集准确率高达100%,模型已不能再学习。模型只是记住了训练集的答案,泛化能力差
将batch 大小为 降到 8时,虽然计算效率降低,但是测试集准确率提升到90%。降低batch大小可在一定程度上增加抽样噪音,缓解过拟合。
用于查看10个类别每个类别的测试集准确率的代码
代码语言:javascript复制
# again no gradients needed
with torch.no_grad():
for data in test_dataloader:
images, labels = data
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
_, predictions = torch.max(outputs, 1)
# collect the correct predictions for each class
for label, prediction in zip(labels, predictions):
if label == prediction:
correct_pred[classes[label]] = 1
total_pred[classes[label]] = 1
# print accuracy for each class
for classname, correct_count in correct_pred.items():
accuracy = 100 * float(correct_count) / total_pred[classname]
print(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %')
虽然看起来准确率都不够高,但考虑到有10个分类,盲猜的准确率只有10%,所以此残差网络还是学到了不少东西。