作者 | Tilman Krokotsch
编译 | ronghuaiyang
报道 | AI公园
导读
本文非常详细的介绍并演示了如何将单元测试用于深度学习,让你的代码更加可信。
深度学习是一门很难评估代码正确性的学科。随机初始化、庞大的数据集和权重的有限可解释性意味着,要找到模型为什么不能训练的确切问题,大多数时候都需要反复试验。在传统的软件开发中,自动化单元测试是确定代码是否完成预期任务的面包和黄油。它帮助开发人员信任他们的代码,并在引入更改时更加自信。一个破坏性的更改将会被单元测试检测到。
从GitHub上许多研究库的情况来看,深度学习的实践者们还不喜欢这种方法。从业者不知道他们的代码是否正常工作,他们能接受吗?通常,由于上述三个原因,学习系统的每个组件的预期行为并不容易定义。然而,我相信实践者和研究人员应该重新考虑他们对单元测试的厌恶,因为它可以帮助研究过程更加顺利。你只需要学习如何信任你的代码。
显然,我不是第一个,也不是最后一个谈论用于深度学习的单元测试的人。如果你对这个话题感兴趣,你可以看看这里:
- A Recipe for Training Neural Networks by Andrej Karpathy
- How to Unit Test Deep Learning by Sergios Karagiannakos
这篇文章的灵感来自于上面提到的,可能还有很多我现在想不起来的。为了在讨论中增加一些内容,我们将重点关注如何编写可重用的单元测试,这样就可以“不去自己重复自己“。
我们的例子将测试用PyTorch编写的系统的组件,该系统在MNIST 上训练可变自动编码器(VAE)。你可以在github.com/tilman151/unittest_dl上找到本文中的所有代码。
什么是单元测试?
如果您熟悉单元测试,可以跳过此部分。对于其他人,我们将看到Python中的单元测试是什么样子的。为了简单起见,我们将使用内置的包unittest
,而不是其他花哨的包。
一般来说,单元测试的目的是检查代码是否正确地运行。通常(我也为此感到内疚很长一段时间),你会看到这样的东西在一个文件的结尾:
代码语言:javascript复制if __name__ == 'main':
net = Network()
x = torch.randn(4, 1, 32, 32)
y = net(x)
print(y.shape)
如果直接执行该文件,则代码片段将构建一个网络,执行前向传递并打印输出的形状。这样,我们就可以看到向前传播是否会抛出错误,以及输出的形状是否可信。如果将代码分发到不同的文件中,则必须手动运行每个文件,并检查打印到控制台的内容。更糟糕的是,这个代码片段有时会在运行后被删除,当有变化时被重写。
原则上,这已经是一个基本的单元测试。我们所要做的就是将它形式化一点,使它能够轻松地自动运行。它看起来是这样的:
代码语言:javascript复制import unittest
class MyFirstTest(unittest.TestCase):
def test_shape(self):
net = Network()
x = torch.randn(4, 1, 32, 32)
y = net(x)
self.assertEqual(torch.Size((10,)), y.shape)
unittest
包的主要组件是类TestCase
。单个单元测试是TestCase
子类的成员函数。在我们的例子中,包将自动检测类MyFirstTest
并运行函数'test_shape
。如果满足assertEqual
调用的条件,则测试成功。否则,或者如果它崩溃,测试将失败。
我需要测试些什么?
现在我们已经了解了单元测试是如何工作的,下一个问题是我们应该测试什么。下面你可以看到我们的例子的代码结构:
代码语言:javascript复制|- src
|- dataset.py
|- model.py
|- trainer.py
|- run.py
我们将测试每个文件中的功能除了run.py
,因为它只是我们程序的入口点。
Dataset
我们在例子中使用的数据集是torchvision
MNIST类。因此,我们可以假设像加载图像和训练/测试分割这样的基本功能可以正常工作。然而,MNIST类为配置提供了充足的机会,因此我们应该测试是否正确配置了所有内容。dataset.py
文件包含一个名为MyMNIST
的类,它有两个成员变量。成员train_data
有torchvision
MNIST类的一个实例,该实例被配置为加载数据的训练部分,而test_data
中的实例加载测试部分。两种方法都将每幅图像每边填充2个像素,并将像素值归一化在[- 1,1]之间。此外,train_data
对每个图像应用随机旋转来增强数据。
数据的形状
为了继续使用上面的代码片段,我们将首先测试数据集是否输出了我们想要的形状。图像的填充意味着,它们现在的大小应该是32x32像素。我们的测试看起来是这样的:
代码语言:javascript复制def test_shape(self):
dataset = MyMNIST()
sample, _ = dataset.train_data[0]
self.assertEqual(torch.Shape((1, 32, 32)), sample.shape)
现在我们可以确定我们的padding是我们想要的。这可能看起来很琐碎,你们中的一些人可能会认为我在测试这个方面很迂腐,但是我不知道我有多少次因为我搞不清楚填充函数是如何工作的而导致了形状错误。像这样的简单测试编写起来很快,并且可以为你以后省去许多麻烦。
数据的缩放
我们配置的下一件事是数据的缩放。在我们的例子中,这非常简单。我们希望确保每个图像的像素值在[- 1,1]之间。与之前的测试相反,我们将对数据集中的所有图像运行测试。通过这种方式,我们可以确定我们关于如何缩放数据的假设对于整个数据集是有效的。
代码语言:javascript复制def test_scaling(self):
dataset = MyMNIST()
for sample, _ in dataset.train_data:
self.assertGreaterEqual(1, sample.max())
self.assertLessEqual(-1, sample.min())
self.assertTrue(torch.any(sample < 0))
self.assertTrue(torch.any(sample > 0))
如你所见,我们不仅要测试每个图像的最大值和最小值是否在范围内。我们还通过断言测试是否存在大于零和小于零的值,我们将值缩放到[0,1]。这个测试之所以有效,是因为我们可以假设MNIST中的每个图像都覆盖了整个范围的值。对于更复杂的数据,比如自然图像,我们需要一个更复杂的测试条件。如果你的缩放基于数据的统计信息,那么测试一下是否只使用训练部分来计算这些统计信息也是一个好主意。
数据增强
增加训练数据可以极大地帮助提高模型的性能,特别是在数据量有限的情况下。另一方面,我们不会增加我们的测试数据,因为我们想要保持我们的模型的评估确定性。这意味着,我们应该测试我们的训练数据是否增加了,而我们的测试数据没有。敏锐的读者会在这一点上注意到一些重要的东西。到目前为止,我们的测试只涵盖了训练数据。这是需要强调的一点:
始终在训练和测试数据上运行测试
仅仅因为你的代码在数据的一个部分上工作,并不能保证在另一个部分上不存在未检测到的bug。对于数据增强,我们甚至希望为每个部分断言代码的不同行为。
对于我们的增强问题,一个简单的测试现在是加载一个样本两次,然后检查两个版本是否相等。简单的解决方案是为我们的每一个部分写一个测试函数:
代码语言:javascript复制def test_augmentation_active_train_data(self):
dataset = MyMNIST()
are_same = []
for i in range(len(dataset.train_data)):
sample_1, _ = dataset.train_data[i]
sample_2, _ = dataset.train_data[i]
are_same.append(0 == torch.sum(sample_1 - sample_2))
self.assertTrue(not all(are_same))
def test_augmentation_inactive_test_data(self):
dataset = MyMNIST()
are_same = []
for i in range(len(dataset.test_data)):
sample_1, _ = dataset.test_data[i]
sample_2, _ = dataset.test_data[i]
are_same.append(0 == torch.sum(sample_1 - sample_2))
self.assertTrue(all(are_same))
这些函数测试我们想要测试的内容,但是,正如你所看到的,它们几乎就是重复的。这有两个主要的缺点。首先,如果在测试中需要更改某些内容,我们必须记住在两个函数中都要更改。其次,如果我们想添加另一个部分,例如一个验证部分,我们将不得不第三次复制测试。要解决这个问题,我们应该将测试功能提取到一个单独的函数中,然后由真正的测试函数调用两次。重构后的测试看起来像这样:
代码语言:javascript复制def test_augmentation(self):
dataset = MyMNIST()
self._check_augmentation(dataset.train_data, active=True)
self._check_augmentation(dataset.test_data, active=False)
def _check_augmentation(self, data, active):
are_same = []
for i in range(len(data)):
sample_1, _ = data[i]
sample_2, _ = data[i]
are_same.append(0 == torch.sum(sample_1 - sample_2))
if active:
self.assertTrue(not all(are_same))
else:
self.assertTrue(all(are_same))
_check_augmentation
函数断言给定的数据集是否进行了增强,并有效地删除代码中的重复。函数本身不会由unittest
包自动运行,因为它不是以test_
开头的。因为我们的测试函数现在真的很短,我们把它们合并成一个组合函数。它们测试了增强是如何工作的这一单一的概念,因此应该属于相同的测试函数。但是,通过这个组合,我们引入了另一个问题。如果测试失败了,现在很难直接看到哪一个部分失败了。这个包只告诉我们组合函数的名称。进入subTest
函数。TestCase
类有一个成员函数subTest
,它可以在一个测试函数中标记不同的测试组件。这样,包就可以准确地告诉我们测试的哪一部分失败了。最后的函数是这样的:
def test_augmentation(self):
dataset = MyMNIST()
with self.subTest(split='train'):
self._check_augmentation(dataset.train_data, active=True)
with self.subTest(split='test'):
self._check_augmentation(dataset.test_data, active=False)
现在我们有了一个无重复、精确定位、可重用的测试功能。我们在此所使用的核心原则可以应用到我们在前面几节中编写的所有其他单元测试中。你可以在附带的存储库中看到结果测试。
数据的加载
数据集的最后一种类型的单元测试与我们的例子并不完全相关,因为我们使用的是内置数据集。无论如何我们都会把它包括进来,因为它涵盖了我们学习系统的一个重要部分。通常,你将在dataloader类中使用数据集,该类处理批处理并可以并行化加载。因此,测试你的数据集在单进程和多进程模式下是否与dataloader一起工作是一个好主意。考虑到我们所学到的增强测试,测试函数如下所示:
代码语言:javascript复制def test_single_process_dataloader(self):
dataset = MyMNIST()
with self.subTest(split='train'):
self._check_dataloader(dataset.train_data, num_workers=0)
with self.subTest(split='test'):
self._check_dataloader(dataset.test_data, num_workers=0)
def test_multi_process_dataloader(self):
dataset = MyMNIST()
with self.subTest(split='train'):
self._check_dataloader(dataset.train_data, num_workers=2)
with self.subTest(split='test'):
self._check_dataloader(dataset.test_data, num_workers=2)
def _check_dataloader(self, data, num_workers):
loader = DataLoader(data, batch_size=4, num_workers=num_workers)
for _ in loader:
pass
函数_check_dataloader
不会对加载的数据进行任何测试。我们只是想检查加载过程是否没有抛出错误。理论上,ni 也可以检查诸如正确的批大小或填充的序列数据的不同长度。因为我们为dataloader使用了最基本的配置,所以可以省略这些检查。
同样,这个测试可能看起来琐碎而没有必要,但是让我给你一个例子,在这个简单的检查中节省了我的时间。这个项目需要从pandas的dataframes中加载序列数据,并从这些datafames上的滑动窗口中构造样本。我们的数据集太大了,无法装入内存,所以我们必须按需加载数据模型,并从中剪切出所请求的序列。为了提高加载速度,我们决定用一个LRU cache来缓存一些数据文件。它在我们早期的单进程实验中如预期的那样工作,因此我们决定将它包含在代码库中。结果是,这个缓存不能很好地用于多进程,但是我们的单元测试提前发现了这个问题。在使用多进程时,我们停用了缓存,避免了以后出现令人不快的意外。
最后要注意的
有些人可能已经在我们的单元测试中看到了另一个重复的模式。每个测试对训练数据运行一次,对测试数据运行一次,产生相同的四行代码:
代码语言:javascript复制with self.subTest(split='train'):
self._check_something(dataset.train_data)
with self.subTest(split='test'):
self._check_dataloader(dataset.test_data)
也完全有理由消除这种重复。不幸的是,这将涉及到创建一个高阶函数,以函数_check_something
作为参数。有时,例如对于增强测试,我们还需要向_check_something
函数传递额外的参数。最后,所需的编程构造将引入更多的复杂性,并模糊要测试的概念。一般的规则是,为了可读性和可重用性,让你的测试代码尽可能在需要的范围内变复杂。
Model
模型可以说是学习系统的核心组件,通常需要是完全可配置的。这意味着,还有很多东西需要测试。幸运的是,PyTorch中用于神经网络模型的API非常简洁,大多数实践者都非常严格地使用它。这使得为模型编写可重用的单元测试相当容易。
我们的模型是一个简单的VAE,由一个全连接的编码器和解码器组成。前向函数接受输入图像,对其进行编码,执行重新参数化操作,然后将隐编码解码为图像。虽然相对简单,但这种变换可以演示几个值得进行单元测试的方面。
模型的输出形状
我们在本文开头看到的第一段代码是几乎每个人都要做的测试。我们也已经知道这个测试是如何写成单元测试的。我们要做的唯一一件事就是添加要测试的正确形状。对于一个自动编码器,就简单的判断和输入的形状是否相同:
代码语言:javascript复制@torch.nograd()
def test_shape(self):
net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
inputs = torch.randn(4, 1, 32, 32)
outputs = net(x)
self.assertEqual(inputs.shape, outputs.shape)
同样,这很简单,但有助于找到一些最恼人的bug。例如,在将模型输出从拉平的表示中reshape时忘记添加通道维度。
我们最后增加的测试是torch.nograd
。它告诉PyTorch这个函数不需要记录梯度,并给我们一个小的加速。对于每个测试来说,它可能不是很多,但是你永远不知道需要编写多少。同样,这是另一个可引用的单元测试智慧:
让你的测试更快。否则,没有人会想要运行它们。
单元测试应该在开发期间非常频繁地运行。如果你的测试运行时间很长,那么你可以跳过它们。
模型的移动
在CPU上训练深度神经网络在大多数时候都非常慢。这就是为什么我们使用GPU来加速它。为此,我们所有的模型参数必须驻留在GPU上。因此,我们应该断言我们的模型可以在设备(CPU和多个GPU)之间正确地移动。
我们可以用一个常见的错误来说明我们的例子VAE中的问题。这里你可以看到bottleneck
函数,执行重新参数化的技巧:
def bottleneck(self, mu, log_sigma):
noise = torch.randn(mu.shape)
latent_code = log_sigma.exp() * noise mu
return latent_code
它取隐先验的参数,从标准高斯分布中采样一个噪声张量,并使用参数对其进行变换。这在CPU上运行没有问题,但当模型移动到GPU时失败。问题是噪音张量是在CPU内存中创建的,因为它是默认的,并没有移动到模型所在的设备上。一个简单的错误和一个简单的解决方案。我们用noise = torch.randn_like(mu)
替换了这行有问题的代码。这就产生了一个与张量mu
相同形状和在相同设备上的噪声张量。
帮助我们尽早捕获这些bug的测试:
代码语言:javascript复制@torch.no_grad()
@unittest.skipUnless(torch.cuda.is_available(), 'No GPU was detected')
def test_device_moving(self):
net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
net_on_gpu = net.to('cuda:0')
net_back_on_cpu = net_on_gpu.cpu()
inputs = torch.randn(4, 1, 32, 32)
torch.manual_seed(42)
outputs_cpu = net(inputs)
torch.manual_seed(42)
outputs_gpu = net_on_gpu(inputs.to('cuda:0'))
torch.manual_seed(42)
outputs_back_on_cpu = net_back_on_cpu(inputs)
self.assertAlmostEqual(0., torch.sum(outputs_cpu - outputs_gpu.cpu()))
self.assertAlmostEqual(0., torch.sum(outputs_cpu - outputs_back_on_cpu))
我们把网络从一个CPU移动到另一个CPU,然后再移动回来,只是为了确保正确。现在我们有了网络的三份拷贝(移动网络复制了它们),并使用相同的输入张量向前传递。如果网络被正确移动,前向传递应该在不抛出错误的情况下运行,并且每次产生相同的输出。
为了运行这个测试,我们显然需要一个GPU,但也许我们想在笔记本电脑上做一些快速测试。如果PyTorch没有检测到GPU,unittest.skipUnless
可以跳过测试。这样可以避免将测试结果与失败的测试混淆。
你还可以看到,我们在每次通过之前固定了torch的随机种子。我们必须这样做,因为VAEs是非确定性的,否则我们会得到不同的结果。这说明了深度学习代码单元测试的另一个重要概念:
在测试中控制随机性。
如果你不能确保你的模型能到边界情况,你如何测试你的模型的一个罕见边界条件?如何确保模型的输出是确定性的?你如何知道一个失败的测试是由于随机的偶然还是由于你引入的bug ?通过手动设置深度学习框架的种子,可以消除函数中的随机性。此外,还应该将CuDNN设置为确定性模式。这主要影响卷积,但无论如何是一个好主意。
注意确定正在使用的所有框架的种子。Numpy和内置的Python随机数生成器有它们自己的种子,必须分别设置。有一个这样的函数是很有用的:
代码语言:javascript复制def make_deterministic(seed=42):
# PyTorch
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# Numpy
np.random.seed(seed)
# Built-in Python
random.seed(seed)
模型得到采样独立性
在99。99%的情况下,你都想用随机梯度下降的方式来训练你的模型。你给你的模型一个minibatch的样本,并计算他们的平均损失。批量处理训练样本假设你的模型可以处理每个样本,也就是你可以独立的把样本喂给模型。换句话说,你的batch中的样本在你的模型处理时不会相互影响。这个假设是很脆弱的,如果在一个错误的张量维度上进行错误的reshape或aggregation,就会打破这个假设。
下面的测试通过执行与输入相关的前向和后向传递来检查样本的独立性。在对这个batch做平均损失之前,我们把损失乘以零。如果我们的模型保持样本独立性,这将导致一个零梯度。唯一的事情,我们必须断言,如果只有masked的样本梯度是零:
代码语言:javascript复制def test_batch_independence(self):
inputs = torch.randn(4, 1, 32, 32)
inputs.requires_grad = True
net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
# Compute forward pass in eval mode to deactivate batch norm
net.eval()
outputs = net(inputs)
net.train()
# Mask loss for certain samples in batch
batch_size = inputs[0].shape[0]
mask_idx = torch.randint(0, batch_size, ())
mask = torch.ones_like(outputs)
mask[mask_idx] = 0
outputs = outputs * mask
# Compute backward pass
loss = outputs.mean()
loss.backward()
# Check if gradient exists and is zero for masked samples
for i, grad in enumerate(inputs.grad):
if i == mask_idx:
self.assertTrue(torch.all(grad == 0).item())
else:
self.assertTrue(not torch.all(grad == 0))
如果你准确地阅读了代码片段,你会注意到我们将模型设置为evaluation模式。这是因为batch normalization违反了我们上面的假设。进程均值和标准差的处理交叉污染了我们batch中的样本,所以我们通过evaluation模式停止了对样本的更新。我们可以这样做,因为我们的模型在训练和评估模式中表现相同。如果你的模型不是这样的,你将不得不找到另一种方法来禁用它进行测试。一个选项是用instance normalization临时替换它。
上面的测试函数非常通用,可以按原样复制。例外情况是,如果你的模型接受多个输入。处理这个问题的附加代码是必要的。
模型的参数更新
下一个测试也与梯度有关。当你的网络架构变得更加复杂时,比如初始化,很容易构建死子图。死子图是网络中包含可学习参数的一部分,前向传递、后向传递或两者都不使用。这就像在构造函数中构建一个网络层,然后忘记在forward函数中应用它一样简单。
找到这些死子图可以通过运行优化步骤并检查梯度你的网络参数:
代码语言:javascript复制def test_all_parameters_updated(self):
net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
optim = torch.optim.SGD(net.parameters(), lr=0.1)
outputs = net(torch.randn(4, 1, 32, 32))
loss = outputs.mean()
loss.backward()
optim.step()
for param_name, param in self.net.named_parameters():
if param.requires_grad:
with self.subTest(name=param_name):
self.assertIsNotNone(param.grad)
self.assertNotEqual(0., torch.sum(param.grad ** 2))
参数函数返回的模型的所有参数在优化步骤后都应该有一个梯度张量。此外,对于我们所使用的损失,它不应该是零。测试假设模型中的所有参数都需要梯度。即使是那些不应该被更新的参数也会首先检查requires_grad
标志。如果任何参数在测试中失败,子测试的名称将提示你在哪里查找。
提高重用性
现在我们已经写出了模型的所有测试,我们可以将它们作为一个整体进行分析。我们将注意到这些测试有两个共同点。所有测试都从创建模型和定义示例输入批处理开始。与以往一样,这种冗余级别有可能导致拼写错误和不一致。此外,你不希望在更改模型的构造函数时分别更新每个测试。
幸运的是,unittest
为我们提供了一个简单的解决方案,即setUp
函数。这个函数在执行TestCase
中的每个测试函数之前被调用,通常为空。通过在setUp
中将模型和输入定义为TestCase
的成员变量,我们可以在一个地方初始化测试的组件。
class TestVAE(unittest.TestCase):
def setUp(self):
self.net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
self.test_input = torch.random(4, 1, 32, 32)
... # Test functions
现在我们用各自的成员变量替换出现的net
和inputs
,这样就完成了。如果你想更进一步,对所有测试使用相同的模型实例,您可以使用setUpClass
。这个函数在构造TestCase
时被调用一次。如果构建速度很慢,并且你不想多次进行构建,那么这是非常有用的。
在这一点上,我们有一个整洁的系统来测试我们的VAE模型。我们可以轻松地添加测试,并确保每次都测试模型的相同版本。但是如果你想引入一种新的卷积层,会发生什么呢?它将在相同的数据上运行,也应该具有相同的行为,因此将应用相同的测试。
仅仅复制整个TestCase
显然不是首选的解决方案,但是通过使用setUp
,我们已经在正确的轨道上了。我们将所有测试函数转移到一个基类中,而将setUp
保留为一个抽象函数。
class AbstractTestVAE(unittest.TestCase):
def setUp(self):
raise NotImplementedError
... # Test functions
你的IDE会提示类没有成员变量net
和test_inputs
,但是Python并不关心。只要子类添加了它们,它就可以工作。对于我们想要测试的每个模型,我们创建这个抽象类的一个子类,并在其中实现setUp
。为多个模型或同一个模型的多个配置创建TestCases
就像:
class TestCNNVAE(AbstractTestVAE):
def setUp(self):
self.test_inputs = torch.randn(4, 1, 32, 32)
self.net = model.CNNVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
class TestMLPVAE(AbstractTestVAE):
def setUp(self):
self.test_inputs = torch.randn(4, 1, 32, 32)
self.net = model.MLPVAE(input_shape=(1, 32, 32), bottleneck_dim=16)
只剩下一个问题了。unittest
包发现并运行unittest.TestCase
的所有子元素。因为这包括不能实例化的抽象基类,所以我们总是会有一个失败的测试。
解决方案是由一个流行的设计模式提出的。通过删除TestCase
作为AbstractTestVAE
的父类,它就不再被发现了。相反,我们让我们的具体测试有两个父类, TestCase
和AbstractTestVAE
。抽象类和具体类之间的关系不再是父类和子类之间的关系。相反,具体类使用抽象类提供的共享功能。这个模式称为MixIn。
class AbstractTestVAE:
...
class TestCNNVAE(unittest.TestCase, AbstractTestVAE):
...
class TestMLPVAE(unittest.TestCase, AbstractTestVAE):
...
父类的顺序很重要,因为方法查找是从左到右进行的。这意味着TestCase
将覆盖AbstractTestVAE
的共享方法。在我们的例子中,这不是一个问题,但无论如何知道都是好的。
Trainer
我们的学习系统的最后一部分是trainer类。它将你所有的组件(数据集、优化器和模型)放在一起,并使用它们来训练模型。此外,它还实现了一个评估函数,输出测试数据的平均损失。在训练时,所有的损失和指标都被写入一个TensorBoard event文件中以便可视化。
在这一部分中,编写可重用测试是最困难的,因为它允许最大程度的自由实现。有些人只在脚本文件中使用简单的代码进行训练,有些人将其封装在函数中,还有一些人试图保持更面向对象的风格。我不会判断你喜欢哪种方式。我唯一要说的是,在我的经验中,整洁封装的trainer类使单元测试变得最舒适。
然而,我们会发现我们之前学过的一些原则在这里也适用。
trainer的损失
大多数时候,你只需要从torch上选择一个预先实现的损失函数就可以了。但话说回来,你所选择的损失函数可能无法实现。这种情况可能是由于实现相对简单,函数太小众或者太新。无论如何,如果你自己实现了它,你也应该测试它。
我们的例子使用Kulback-Leibler (KL)散度作为整体损失函数的一部分,这在PyTorch中是不存在的(现在的版本里有了)。我们的实现是这样的:
代码语言:javascript复制def _kl_divergence(log_sigma, mu):
return 0.5 * torch.sum((2 * log_sigma).exp() mu ** 2 - 1 - 2 * log_sigma)
函数取多变量高斯分布的标准偏差和平均值的对数,并计算在封闭形式中的标准高斯分布的KL散度。
检查这种损失的一种方法是手工计算,然后硬编码以便比较。更好的方法是在另一个包中找到一个参考实现,并根据它的输出检查代码。幸运的是,scipy
包有一个离散KL散度的实现,我们可以使用:
@torch.no_grad()
def test_kl_divergence(self):
mu = np.random.randn(10) * 0.25 # means around 0.
sigma = np.random.randn(10) * 0.1 1. # stds around 1.
standard_normal_samples = np.random.randn(100000, 10)
transformed_normal_sample = standard_normal_samples * sigma mu
bins = 1000
bin_range = [-2, 2]
expected_kl_div = 0
for i in range(10):
standard_normal_dist, _ = np.histogram(standard_normal_samples[:, i], bins, bin_range)
transformed_normal_dist, _ = np.histogram(transformed_normal_sample[:, i], bins, bin_range)
expected_kl_div = scipy.stats.entropy(transformed_normal_dist, standard_normal_dist)
actual_kl_div = self.vae_trainer._kl_divergence(torch.tensor(sigma).log(), torch.tensor(mu))
self.assertAlmostEqual(expected_kl_div, actual_kl_div.numpy(), delta=0.05)
我们首先从标准高斯函数和一个不同均值和标准差的高斯函数中抽取一个足够大的样本。然后我们用np.histogram
函数,得到基本pdf的离散逼近。有了这些,我们就可以用scipy.stats.entropy
得到一个KL散度来比较。我们使用一个相对较大的delta
来进行比较,因为scipy.stats.entropy
只是一个近似值。
你可能已经注意到,我们没有创建Trainer
对象,而是使用TestCase
的成员。我们在这里使用了与模型测试相同的技巧,并在setUp
函数中创建了它。我们还固定了PyTorch和NumPy的种子。因为我们这里不需要任何梯度,所以我们用@torch.no_grad
来装饰函数。
trainer的日志记录
我们使用TensorBoard来记录我们的训练过程的损失和度量。为此,我们希望确保按预期写入所有日志。一种方法是在训练后打开event文件,查找正确的event。同样,这也是一个有效的选项,但我们将以另一种方式来看看unittest
包的一个有趣功能:mock
。
mock
允许你用一个监视其自身是如何调用的函数来打包一个函数或对象。我们将替换summary writer的add_scalar
函数,并确保以这种方式记录我们关心的所有损失和指标。
def test_logging(self):
with mock.patch.object(self.vae_trainer.summary, 'add_scalar') as add_scalar_mock:
self.vae_trainer.train(1)
expected_calls = [mock.call('train/recon_loss', mock.ANY, 0),
mock.call('train/kl_div_loss', mock.ANY, 0),
mock.call('train/loss', mock.ANY, 0),
mock.call('test/loss', mock.ANY, 0)]
add_scalar_mock.assert_has_calls(expected_calls)
assert_has_calls
函数匹配预期调用列表和实际记录的调用。mock.ANY
表示我们不关心记录的标量的值,因为无论如何我们都不知道它。
因为我们不需要对整个数据集执行完一个epoch,所以我们在setUp
中将训练数据配置为只有一个batch。这样,我们可以显著地加快我们的测试速度。
trainer的拟合
最后一个问题也是最难回答的。我的训练最终会收敛吗?要确切地回答这个问题,我们需要用我们所有的数据进行一次全面的训练并对其打分。
由于这非常耗时,我们将使用一种更快的方法。我们将看看我们的训练是否能使模型对单个batch的数据进行过拟合。测试函数相当简单:
代码语言:javascript复制def test_overfit_on_one_batch(self):
self.vae_trainer.train(500)
self.assertGreaterEqual(30, self.vae_trainer.eval())
如前一节所述,setUp
函数创建一个只包含一个batch的数据集的trainer。此外,我们也使用训练数据作为测试数据。通过这种方式,我们可以从 eval
函数中获得训练batch的损失,并将其与我们预期的损失进行比较。
对于一个分类问题,当我们完全过拟合时,我们期望损失为零。“VAE”的问题是,它是一个非确定性的生成模型,零损失是不现实的。这就是为什么我们预期的损失是30,这等于每像素的误差为0.04。
这是迄今为止运行时间最长的测试,它可以运行500 epochs。最后,在我的笔记本电脑上用1.5分钟左右就可以了,这仍然是合理的。为了在不降低对没有GPU的机器的支持的情况下进一步加速,我们可以简单地在setUp
中添加这一行:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
这样一来,如果我们有GPU,我们就可以利用它,如果没有,就利用CPU进行训练。
最后要注意的
在我们进行日志记录时,你可能会注意到,针对trainer的单元测试往往会使你的文件夹充满event文件。为了避免这种情况,我们使用tempfile
包为trainer创建一个临时日志目录。测试结束后,我们只需要再次删除它和它的内容。为此,我们使用了孪生函数setUp
,和tearDown
。在每个测试函数后调用此函数,清理过程简单如下:
def tearDown(self):
shutil.rmtree(self.log_dir)
总结
我们看完了这篇文章。让我们评估一下我们从整个磨难中得到了什么。
我们为我们的小例子编写的测试套件包含58个单元测试,整个运行大约需要3.5分钟。对于这58个测试,我们只编写了20个函数。所有测试都可以确定地、独立地运行。如果有GPU,我们可以运行额外的测试。大多数测试,例如数据集和模型测试,可以在其他项目中轻松重用。我们可以通过使用:
- 子测试为我们的数据集的多种配置运行一个测试
setUp
和tearDown
函数一致地初始化和清理我们的测试- 抽象测试类来测试VAE的不同实现
torch.no_grad
装饰器在可能的情况下禁用梯度计算mock
模块检查函数是否被正确调用
最后,我希望我能够说服至少有人在他们的深度学习项目中使用单元测试。本文的配套git仓库可以作为起点。
—END—
英文原文:https://krokotsch.eu/cleancode/2020/08/11/Unit-Tests-for-Deep-Learning.html
历史精华好文
- 专辑1:AI产品/工程落地
- 专辑2:AI核心算法
- 专辑3:AI课程/资源/数据