残差网络 在 CIFAR10上的简单应用

2022-11-18 14:00:09 浏览数 (2)

何恺明等人提出的残差网络(ResNet) 在2015年的ImageNet图像识别挑战赛夺魁,并深刻影响了后来的深度神经网络的设计。残差网络的核心思想是:增加网络的深度后的最好还能包含原始函数(原始函数指的是增加深度之前的网络,它把一个input张量映射为一个output张量)作为其元素之一,从而必不会使网络的拟合能力变得更差。 于是,残差块(residual blocks)便诞生了,这个设计对如何建立深层神经网络产生了深远的影响。凭借它,ResNet赢得了2015年ImageNet大规模视觉识别挑战赛。

这里是ResNet论文的地址:

https://www.cv-foundation.org/openaccess/content_cvpr_2016/papers/He_Deep_Residual_Learning_CVPR_2016_paper.pdf

残差块模型如下(传播路径从上往下看):通过添加直通的旁路,来保证深层网络的拟合能力不会退化。

具体化后的结构如下(传播路径从下往上看):

如果想改变通道数,就需要引入一个额外的1×1卷积层来将输入变换成需要的形状后再做相加运算:

关于残差网络的中文介绍,可参考李沐的在线书籍:

http://zh-v2.d2l.ai/chapter_convolutional-modern/resnet.html

代码语言:javascript复制
代码语言:javascript复制
import os
import glob
import numpy as np
import random
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
# import torch.nn.functional as F
from torchvision import transforms,datasets
# from load_CIFAR10_dataset import train_inputs, train_labels, test_inputs, test_labels, names
transform1 = transforms.Compose(
    [transforms.RandomHorizontalFlip(p=0.5),
     # transforms.RandomVerticalFlip(p=0.5),
     # transforms.RandomRotation((-5,5)),
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

transform2 = transforms.Compose(
    [# transforms.RandomHorizontalFlip(p=0.5),
        # transforms.RandomVerticalFlip(p=0.5),
        # transforms.RandomRotation((-5,5)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

train_set = datasets.CIFAR10(root='K:/datasets/CIFAR-10', train=True, download=False, transform=transform1)
test_set = datasets.CIFAR10(root='K:/datasets/CIFAR-10', train=False, download=False, transform=transform2)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

train_dataloader = DataLoader(train_set, batch_size=8, shuffle=True)
test_dataloader = DataLoader(test_set, batch_size=1024, shuffle=False)


# 如果 直接使用datasets.CIFAR10 函数加载数据集,这个类是不必要的
# 用于自定义数据集
class MyCifar10Dataset(Dataset):
    def __init__(self, inputs, labels, n=10):
        self.n = n
        self.inputs = inputs  # [Channels, H, W]
        self.labels = labels
    def onehoted(self, label): # 独热编码 处理
        label_onehot = np.zeros(self.n)
        label_onehot[label] = 1
        return label_onehot

    def normalized(self, input_):
        mean = np.mean(input_)
        std = np.std(input_)
        return (input_ - mean) / std

    def flip(self, input_):
        return input_[:,:][::-1]

    def __len__(self):  # 必须定义!
        return len(self.inputs)

    def __getitem__(self, index):  # 必须定义!
        label = self.onehoted(self.labels[index])
        input_ = self.normalized(self.inputs[index])
        r = random.random()
        #if r>=0.5:
            #input_= self.flip(input_)
        return input_, label


class Residual(nn.Module):  # 定义残差块
    def __init__(self, in_channels, out_channels, use_1x1conv=False, strides=1):
        super(Residual, self).__init__()
        # 通常如果通道数加倍,则高和宽豆减半(即 out_chanel = 2* in_channels, strides=(2,2))
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3,3), stride=(strides,strides), padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=(3,3), padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        if use_1x1conv:
            self.conv3 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(1,1), stride=(strides,strides))
        else:
            self.conv3 = None
        self.relu = nn.LeakyReLU(inplace=True)

    def forward(self, X):
        Y = self.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            X = self.conv3(X)
        Y  = X
        Y = self.relu(Y)
        return Y


# 用于循环添加残差块
def resnet_block(in_channels, out_channels, num_residuals, is_first_block=False):
    block = []
    for i in range(num_residuals):
        if i == 0 and not is_first_block:
            block.append(Residual(in_channels, out_channels, use_1x1conv=True, strides=2)) # 高宽减半
        else:
            block.append(Residual(out_channels, out_channels))  # 高宽不变,因为前面已减半
    return block

# 网络的整体结构
class NeuralNetwork(nn.Module): # Define model
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        out1 = 64
        b1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=out1, kernel_size=(4, 4), stride=(2, 2), padding=1),
            nn.BatchNorm2d(out1),
            nn.LeakyReLU(inplace=True))
        b2 = nn.Sequential(*resnet_block(out1, out1, num_residuals=2, is_first_block=True))
        b3 = nn.Sequential(*resnet_block(out1, 128, num_residuals=2, is_first_block=False))
        b4 = nn.Sequential(*resnet_block(128, 256, num_residuals=2, is_first_block=False))
        b5 = nn.Sequential(*resnet_block(256, 512, num_residuals=2, is_first_block=False))
        self.net = nn.Sequential(b1,
                            b2,
                            b3,
                            b4,
                            b5,
                            nn.AdaptiveAvgPool2d(output_size=(1,1)),  # 只需设定输出维度的大小 output_size ,具体的实现过程和参数选择已经自动帮你确定了, 设大一点则更多权重参数
                            nn.Flatten(),
                            nn.Linear(512, 128),
                            nn.LeakyReLU(inplace=True),
                            nn.Dropout(p=0.5),
                            nn.Linear(128, 10))

    def forward(self, X):
        return self.net(X)


def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)

    train_loss, correct = 0.0, 0.0
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)
        correct  = (pred.argmax(axis=1) == y).type(torch.float).sum().item()  ## RuntimeError: The size of tensor a (256) must match the size of tensor b (10) at non-singleton dimension 1
        #correct  = (pred.argmax(axis=1) == y.argmax(axis=1)).type(torch.float32).sum().item()
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss  = loss.item()  # only for monitor
        #if batch % 100 == 0:  # 训练监控
            #loss, current = loss.item(), batch * len(X)
            #print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
    train_loss /= num_batches
    correct /= size

    train_Avg_loss.append(train_loss)
    train_Accuracy.append(correct:=100*correct)
    print(f"Train Accuracy: {correct:>0.6f}%, train Avg loss: {train_loss:>8f}")


def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0.0, 0.0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss  = loss_fn(pred, y).item()
            correct  = (pred.argmax(axis=1) == y).type(torch.float32).sum().item()
            # correct  = (pred.argmax(axis=1) == y.argmax(axis=1)).type(torch.float32).sum().item()
    test_loss /= num_batches
    correct /= size

    test_Avg_loss.append(test_loss)
    test_Accuracy.append(correct:=100*correct)

    print(f"Test  Accuracy: {correct:>0.6f}%, test  Avg loss: {test_loss:>8f}n")



if __name__ == "__main__":
    from matplotlib import pyplot as plt
    from matplotlib import ticker

    #training_data = MyCifar10Dataset(train_inputs, train_labels)
    #test_data = MyCifar10Dataset(test_inputs, test_labels)
    #train_dataloader = DataLoader(training_data, batch_size=1024, shuffle=True)
    #test_dataloader = DataLoader(test_data, batch_size=1024, shuffle=False)
    for X, y in test_dataloader:
        print(f"Shape of X [batch, C, H, W]: {X.shape} {X.dtype}")
        print(f"Shape of y: {y.shape} {y.dtype}")
        break
    model = NeuralNetwork()
    if torch.cuda.is_available():  # Get cpu or gpu device for training.
        device ="cuda"
        model = model.to(device)
    else:
        device ="cpu"
    print(f"device = {device}")
    print(model)

    loss_fn = nn.CrossEntropyLoss()  # 交叉熵损失。适用于分类问题
    #  若报错 RuntimeError: "nll_loss_forward_reduce_cuda_kernel_2d_index" not implemented for 'Int'  # 原因是pytorch自带的损失计算函数不支持原本的标签1,2,3,4…,n,需要转换成网络需要的one_hot编码才行
    #optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.1, weight_decay=0.01)
    # 设置动态学习率,每step_size 个 epochs后, lr *= gamma 。
    # optimizer = torch.optim.SGD(model.parameters(), lr=0.1, weight_decay=1e-2)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1, weight_decay=1e-3)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.95)      # 设置动态学习率,每step_size 个 epochs后, lr *= gamma 。
    train_Accuracy = []
    train_Avg_loss = []
    test_Accuracy = []
    test_Avg_loss = []
    epochs = 400
    for t in range(epochs):
        print(f"Epoch {t 1}:")
        #momentum = 0 if t < 10 else 0.9
        #optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=momentum, weight_decay=1e-5)
        ## 设置动态学习率,每step_size 个 epochs后, lr *= gamma 。
        #optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
        #scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)
        print(f"current learning rate is {scheduler.get_last_lr()[0]}")
        train(train_dataloader, model, loss_fn, optimizer)
        scheduler.step()
        test(test_dataloader, model, loss_fn)
    print("Done.")

    model_path = "CIFAR10_mode.pth"
    torch.save(model.state_dict(), model_path)
    print(f"Saved PyTorch Model State to {model_path}")

    # 绘图显示正确率和平均损失
    plt.subplot(2, 1, 1)
    plt.plot(range(1, epochs 1), train_Accuracy, "r-", label="train_Accuracy")
    plt.plot(range(1, epochs 1), test_Accuracy, "b-", label="test_Accuracy")
    plt.xlabel("Epoch")
    xticker_formatter = ticker.FuncFormatter(lambda x, pos: "%d" % x)
    plt.gca().xaxis.set_major_formatter(xticker_formatter)
    plt.ylabel("Accuracy[%]")
    plt.legend(loc="lower right")
    plt.grid()

    plt.subplot(2, 1, 2)
    plt.plot(range(1, epochs 1), train_Avg_loss, "r-", label="train_Avg_loss")
    plt.plot(range(1, epochs 1), test_Avg_loss, "b-", label="test_Avg_loss")
    plt.xlabel("Epoch")
    plt.gca().xaxis.set_major_formatter(xticker_formatter)
    plt.ylabel("Avg_loss")
    plt.legend(loc="upper right")
    plt.grid()

    plt.savefig("Accuracy and loss plot small batch size2.png")
    # plt.show()
    correct_pred = {classname: 0 for classname in classes}
    total_pred = {classname: 0 for classname in classes}

    net = NeuralNetwork()
    net.load_state_dict(torch.load(model_path))

    # os.system("shutdown -s -t 1 ") # -t xx 设置关闭的超时为 xx 秒
代码语言:javascript复制

当batch 大小为 1024时,很容易过拟合。如下图是典型的过拟合,测试集准确率只有72%,但训练集准确率高达100%,模型已不能再学习。模型只是记住了训练集的答案,泛化能力差

将batch 大小为 降到 8时,虽然计算效率降低,但是测试集准确率提升到90%。降低batch大小可在一定程度上增加抽样噪音,缓解过拟合。

用于查看10个类别每个类别的测试集准确率的代码

代码语言:javascript复制

 # again no gradients needed
    with torch.no_grad():
        for data in test_dataloader:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predictions = torch.max(outputs, 1)
            # collect the correct predictions for each class
            for label, prediction in zip(labels, predictions):
                if label == prediction:
                    correct_pred[classes[label]]  = 1
                total_pred[classes[label]]  = 1
    # print accuracy for each class
    for classname, correct_count in correct_pred.items():
        accuracy = 100 * float(correct_count) / total_pred[classname]
        print(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %')

虽然看起来准确率都不够高,但考虑到有10个分类,盲猜的准确率只有10%,所以此残差网络还是学到了不少东西。

0 人点赞