首先,我们先参考Tensorflow深度学习算法整理 中卷积神经网络回忆一下2D卷积。
3D卷积如上图所示,3D卷积输入多了深度C这个维度,输入是高度H*宽度W*深度C的三维矩阵。3D卷积核的深度小于输入层深度,这是3D卷积核跟2D卷积核最本质的区别。因此,3D 卷积核可以在所有三个方向(图像的高度、宽度、通道)上移动,而2D卷积核只能在特征图的高、宽平面上移动。在每个位置,逐元素的乘法和加法都会提供一个数值。因为3D卷积核是滑过一个 3D 空间,所以输出数值也按 3D 空间排布。也就是说输出是一个 3D 数据。3D卷积被普遍用在视频分类,三维医学图像分割等场景中。
我们将时间维度看成是第三维,这里是对连续的四帧图像进行卷积操作,3D卷积是通过堆叠多个连续的帧组成一个立方体,然后在立方体中运用3D卷积核。在这个结构中,卷积层中每一个特征map都会与上一层中多个邻近的连续帧相连,因此捕捉运动信息。
3D卷积和多通道卷积的区别
多通道卷积属于2D卷积范畴,它的卷积核一定是一个2D卷积核,无论输入的feature map的有多少个通道,通过2D卷积核的输出一定是一个单通道的结果,其计算方式为将单通道的卷积核对所有通道同时进行卷积,将所有卷积结果再相加。如果需要输出多个通道的结果只能够通过增加卷积核来完成,输出多少个通道就增加多少个卷积核。
3D卷积核本身就是多通道的,其本身的通道数一定是小于输入的feature map的通道数的。3D卷积运算的时候不是对feature map的所有通道同时进行卷积,而是将feature map的所有通道作为一个整体进行卷积,得到的也是一个多通道的输出结果。
视频分类
虽然视频本质上是连续帧的二维图像,但是如果将一段视频切片当做一个整体,将其数据升级到三维,三维卷积神经网络在视频方面应用最广泛的就是进行视频分类。与二维神经网络相同,三维神经网络也包括输入层,卷积层,池化层,全连接层,损失函数层等网络层。
光流(optical flow):
通过时序上相邻帧计算像素移动的方向和速度。
通过计算视频帧沿水平、竖直和时间方向的梯度进行推断。在右图中,不同颜色表示不同方向的运动,颜色深浅表示速度快慢。
input—>H1:
神经网络的输入为7张大小为60*40的连续帧(每帧都是单通道灰度图),7张帧通过事先设定硬核(hardwired kernels,一种编码方式,注意不是3D卷积核)获得5种不同特征:灰度、x方向梯度、y方向梯度、x方向光流、y方向光流,前面三个通道的信息可以直接对每帧分别操作获取,后面的光流(x,y)则需要利用两帧的信息才能提取,因此H1层的特征maps数量:(7 7 7 6 6=33)解释:7个灰度(输入是7个),7个x方向梯度,7个y方向梯度,6个x方向光流(因为是两帧作差得到的,所以7个,相互两个作差就是6个),6个y方向光流,特征maps的大小依然是60* 40。所以这里才是对网络的初始输入特征图为33*60*40.
H1—>C2:
用两个7*7*3的3D卷积核对5种特征分别进行卷积,由原来的7 7 7 6 6变成了5 5 5 4 4=23,获得两个系列,每个系列5个通道(7*7表示空间维度,3表示时间维度,也就是每次操作3帧图像),同时,为了增加特征maps的个数,在这一层采用了两种不同的3D卷积核,因此C2层的特征maps数量为:(((7-3) 1)*3 ((6-3) 1)*2)*2=23*2。这里右乘的2表示两种卷积核,这是时间上的卷积。而空间上的卷积特征maps的大小为:((60-7) 1)* ((40-7) 1)=54*34。然后为卷积结果加上偏置套一个tanh函数进行输出。(典型神经网。)
C2—>S3:
2x2的2D池化,下采样。下采样之后的特征maps数量保持不变,因此S3层的特征maps数量为:23*2。特征maps的空间大小为:((54/2)*(34/2)=27*17
S3—>C4:
用三个7*6*3的3D卷积核分别对各个系列各个通道进行卷积,由原来的5 5 5 4 4变成了3 3 3 2 2=13,获得6个系列,每个系列依旧5个通道的大量maps。在这一层采用了三种不同的3D卷积核,因此C4层的特征maps数量为:(((5-3) 1)*3 ((4-3) 1)*2)*6=13*6,这里的6表示3种卷积核对2组特征变成了6组。而空间上的卷积特征maps的大小为:((27-7) 1)* ((17-6) 1)=21*12。然后为卷积结果加上偏置套一个tanh函数进行输出。
C4—>S5:
3x3的2D池化,下采样。下采样之后的特征maps数量保持不变,因此S3层的特征maps数量为:13*6。特征maps的空间大小为:((21/3)*(12/3)=7*4
S5—>C6:
7x4的2D卷积,直接消除空间维度变成1*1。C6的特征maps数量保持不变,依然为13*6。再经过flatten之后变成128维的向量。加上偏置套一个tanh函数进行输出。
C6—>output:
最后经过全连接层进行输出。
这个是一个早期的3D卷积模型,只适合一些比较简单的数据集。一般现在3D模型不会这么去配置
这个是在TRECVID,KTH数据集上的实验结果,我们可以看到它比2D卷积具有一定的优势。
深度3DCNN模型
该模型包括了8个卷积层(3*3*3),5个最大池化层,2个全连接层,模型的输入为3*16*112*112(长112、宽112,3通道,16帧),pool1是一个2D池化,pool2到5都是3D池化。
- 时间卷积核大小的比较
上图中左图使用的时间卷积核大小都是相同的,当时间卷积核大小为1的时候,3D卷积就退化成了2D卷积,我们可以看到它的精度是最低的。而使用时间卷积核大小为3的时候,模型精度是最高的。上图中右图中使用的是时间卷积核大小为3与时间卷积核大小不断递增和递减两种情况的比较,我们可以看到模型的精度依然是时间卷积核大小为3的时候最高。
PyTorch代码
代码语言:javascript复制import torch
import torch.nn as nn
class C3D(nn.Module):
def __init__(self, num_classes):
super(C3D, self).__init__()
self.conv1 = nn.Conv3d(3, 64, kernel_size=(3, 3, 3), padding=(1, 1, 1))
self.bn1 = nn.BatchNorm3d(64)
self.pool1 = nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2))
self.conv2 = nn.Conv3d(64, 128, kernel_size=(3, 3, 3), padding=(1, 1, 1))
self.bn2 = nn.BatchNorm3d(128)
self.pool2 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))
self.conv3a = nn.Conv3d(128, 256, kernel_size=(3, 3, 3), padding=(1, 1, 1))
self.bn3a = nn.BatchNorm3d(256)
self.conv3b = nn.Conv3d(256, 256, kernel_size=(3, 3, 3), padding=(1, 1, 1))
self.bn3b = nn.BatchNorm3d(256)
self.pool3 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))
self.conv4a = nn.Conv3d(256, 512, kernel_size=(3, 3, 3), padding=(1, 1, 1))
self.bn4a = nn.BatchNorm3d(512)
self.conv4b = nn.Conv3d(512, 512, kernel_size=(3, 3, 3), padding=(1, 1, 1))
self.bn4b = nn.BatchNorm3d(512)
self.pool4 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))
self.conv5a = nn.Conv3d(512, 512, kernel_size=(3, 3, 3), padding=(1, 1, 1))
self.bn5a = nn.BatchNorm3d(512)
self.conv5b = nn.Conv3d(512, 512, kernel_size=(3, 3, 3), padding=(1, 1, 1))
self.bn5b = nn.BatchNorm3d(512)
self.pool5 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2), padding=(0, 1, 1))
self.fc6 = nn.Linear(8192, 4096)
self.fc7 = nn.Linear(4096, 4096)
self.fc8 = nn.Linear(4096, num_classes)
self.dropout = nn.Dropout(p=0.5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.bn1(self.conv1(x)))
x = self.pool1(x)
x = self.relu(self.bn2(self.conv2(x)))
x = self.pool2(x)
x = self.relu(self.bn3a(self.conv3a(x)))
x = self.relu(self.bn3b(self.conv3b(x)))
x = self.pool3(x)
x = self.relu(self.bn4a(self.conv4a(x)))
x = self.relu(self.bn4b(self.conv4b(x)))
x = self.pool4(x)
x = self.relu(self.bn5a(self.conv5a(x)))
x = self.relu(self.bn5b(self.conv5b(x)))
x = self.pool5(x)
x = x.view(-1, 8192)
x = self.relu(self.fc6(x))
x = self.dropout(x)
x = self.relu(self.fc7(x))
x = self.dropout(x)
x = self.fc8(x)
return x
if __name__ == '__main__':
inputs = torch.rand(1, 3, 16, 112, 112)
net = C3D(num_classes=101)
outputs = net(inputs)
print(outputs.size())
运行结果
代码语言:javascript复制torch.Size([1, 101])
Tensorflow代码
代码语言:javascript复制import tensorflow as tf
import numpy as np
from tensorflow.keras import layers, models
class C3D(models.Model):
def __init__(self, num_classes):
super(C3D, self).__init__()
self.conv1 = layers.Conv3D(64, kernel_size=(3, 3, 3), padding='same')
self.bn1 = layers.BatchNormalization()
self.pool1 = layers.MaxPool3D(pool_size=(1, 2, 2), strides=(1, 2, 2), padding='valid')
self.conv2 = layers.Conv3D(128, kernel_size=(3, 3, 3), padding='same')
self.bn2 = layers.BatchNormalization()
self.pool2 = layers.MaxPool3D(pool_size=(2, 2, 2), strides=(2, 2, 2), padding='valid')
self.conv3a = layers.Conv3D(256, kernel_size=(3, 3, 3), padding='same')
self.bn3a = layers.BatchNormalization()
self.conv3b = layers.Conv3D(256, kernel_size=(3, 3, 3), padding='same')
self.bn3b = layers.BatchNormalization()
self.pool3 = layers.MaxPool3D(pool_size=(2, 2, 2), strides=(2, 2, 2), padding='valid')
self.conv4a = layers.Conv3D(512, kernel_size=(3, 3, 3), padding='same')
self.bn4a = layers.BatchNormalization()
self.conv4b = layers.Conv3D(512, kernel_size=(3, 3, 3), padding='same')
self.bn4b = layers.BatchNormalization()
self.pool4 = layers.MaxPool3D(pool_size=(2, 2, 2), strides=(2, 2, 2), padding='valid')
self.conv5a = layers.Conv3D(512, kernel_size=(3, 3, 3), padding='same')
self.bn5a = layers.BatchNormalization()
self.conv5b = layers.Conv3D(512, kernel_size=(3, 3, 3), padding='same')
self.bn5b = layers.BatchNormalization()
self.padding = layers.ZeroPadding3D(padding=(0, 1, 1))
self.pool5 = layers.MaxPool3D(pool_size=(2, 2, 2), strides=(2, 2, 2), padding='valid')
self.flatten = layers.Flatten()
self.fc6 = layers.Dense(4096, activation='relu')
self.fc7 = layers.Dense(4096, activation='relu')
self.fc8 = layers.Dense(num_classes)
self.relu = layers.ReLU()
self.dropout = layers.Dropout(0.5)
def call(self, x):
x = self.relu(self.bn1(self.conv1(x)))
x = self.pool1(x)
x = self.relu(self.bn2(self.conv2(x)))
x = self.pool2(x)
x = self.relu(self.bn3a(self.conv3a(x)))
x = self.relu(self.bn3b(self.conv3b(x)))
x = self.pool3(x)
x = self.relu(self.bn4a(self.conv4a(x)))
x = self.relu(self.bn4b(self.conv4b(x)))
x = self.pool4(x)
x = self.relu(self.bn5a(self.conv5a(x)))
x = self.relu(self.bn5b(self.conv5b(x)))
x = self.padding(x)
x = self.pool5(x)
x = self.flatten(x)
x = self.fc6(x)
x = self.dropout(x)
x = self.fc7(x)
x = self.dropout(x)
x = self.fc8(x)
return x
if __name__ == '__main__':
inputs = tf.constant(np.random.rand(1, 16, 112, 112, 3))
net = C3D(num_classes=101)
outputs = net(inputs)
print(outputs.shape)
运行结果
代码语言:javascript复制(1, 101)
3D卷积模型分解
- 卷积拆分与低秩近似原理
将高维卷积拆分成低维卷积,在上图中左边的矩形体是一个3D卷积(C是深度、Y是高度、X是宽度),而在右边图中,我们将其拆分为三个一维卷积。
卷积拆分的有效性
卷积拆分可以减少参数量和计算量,上图的3D卷积核为DXY(宽X、高Y,深度为D),输出特征大小为THW(高H、宽W、时间为T),那么卷积核的参数量为DXY相乘,计算量为THW(DXY)相乘;我们将这个3D卷积核分解为一个XY的2D卷积和一个时间维度为D的1D卷积,这样两个卷积核的总的参数量为XY D,计算量为THW(D XY),我们可以看到一个是做乘法,一个是做加法,那么自然做加法的量要小的多,参数量和计算量自然要小的多。
实际上我们早就接触过了卷积的拆分,在Inception V3中
我们就是将一个n*n的2D卷积给拆分成了1*n和n*1的1D卷积,从而减少了参数量和计算量来达到缩减模型大小的作用,从而可以使用在更小的移动设备中。
- 分解3D卷积
在上图中是一个叫做
,这里我们假设3D卷积为K,我们可以将其拆分成空间卷积
和时间卷积
这两组卷积。在上图的蓝色长条块就是空间卷积层,实际上就是我们经常用到的2D卷积,位于网络的浅层,离输入比较近,它是用来学习每帧图像中的语义信息的,它不关心时序,所以它学习到的是表征特征,一般我们将图像RGB语义信息称为表征信息。
经过空间卷积层之后经过了两个分支,上面的淡绿色的分支,它包含了转换层,时序层和全连接层,总体上就是一个时间分支。它是学习时间相关的信息,就是运动相关的特征。我们来看一下网络的输入
网络的输入分为两部分,一部分是RGB的帧,另外一部分是帧差(两帧图像的差值)。它有一个时间上的采样的步长(stride),根据采样的步长来进行采样(每隔多少帧进行采样)。我们将采样的RGB的帧称为
,帧差就为
,计算公式为
这里的
就为时间间隔。取相隔
时序的帧,去计算它们俩的距离。
包含了比较短的时序上的动作,而把所有的
拼接起来又反映了一个长程的动作,称为
。将
和
拼接起来作为整体的输入。
- 空间转换 排列层
经过空间卷积提取完特征之后,接下来要输入到时间卷积层。但是格式需要转换,因为早期的框架不支持3D卷积,我们依然要使用2D卷积来实现时间上的卷积,将4D(通道、时间、高度、宽度)的tensor转换成3D的tensor,通过reshape将高度、宽度合并,只保留时间维度和通道维度,再对其进行2D卷积,这样就可以使用原有的框架。这里还对通道的维度进行了学习,使用学习的p变换(初始为高斯正态分布)将通道f通过加权得到f'。当然现在无需这一步,因为现在的框架早已经支持3D卷积。
- 双通道时间卷积,学习不同快慢的时间信息
经过上面的空间转换,空间的维度已经压缩成了1个,故使用2D卷积就可以来实现。在进入时间卷积后,我们看到它分成了两个分支,一个是3*3的卷积,一个是5*5的卷积。现在用于去学习的一个维度是时间,卷积核的参数越大,可以看作是时间维度的感受野越大,时间维度的感受野越大可以看作是可以看到更长程的动作,更长程的动作可以理解成更慢的动作。而更快的动作只需要更小的感受野就可以了,故3*3卷积又称为fast分支,而5*5卷积称为slow分支。
经过空间卷积层之后经过了两个分支,下面的深绿色的分支是随机从T帧中采样一帧学习图像特征,实际上这里只是起到一个辅助模型学习的作用,并不是必须的。它的网络的深度要更深一些,相比于与时间卷积层共用的空间卷积层,意味着可以学习更加抽象的视觉特征。
- 加权融合策略(Sparsity Concentration Index (SCI))
上图中SCI(p)中的
表示某一段视频,将该视频分为第j类的概率,max表示取最高的概率,值越大越好,最好等于1。SCI(p)为可靠性因子,反映了模型的可靠性,通过SCI(
)
进行一个加权,它用到了多个图像值来进行训练的,C表示空间裁剪数量(9个位置 2种翻转=18个图像块),每一个都可以预测出来一个概率向量,需要对这些概率向量进行融合,不同的图像块的可靠性SCI(p)不一样。将不同图像的可靠性SCI(p)计算出来之后进行融合,得到一个更好的
。然后再计算有M个p,M表示切片的数量,切片就是之前说的按照一个步长进行采样,采样的数量就是切片的数量。基于每一个切片计算出来的
,再在这么多个
中找出一个概率最大的
当作最终的分类结果。
- 不同策略的比较
上图的左边的表是该模型不同配置的不同数据集的精度。我们之前介绍的是最下面的SCI的融合策略,它还有其他的融合策略——倒数第三的只选择单独的图像块的策略、倒数第二的对所有图像块的平均融合、正数第一的仅使用空间卷积的策略和正数第二的只使用时间卷积的策略等。经过该表我们看到综合了所有策略的SCI融合策略所取得的精度是最好的。
上图的右边的表是不同的主流模型的比较,比如倒数第四的双流法,还有一些传统方法,
方法能取得更好的性能。
- 特征降维和可视化分析
作者将
进行了降维,上图的左边的左上部分是空间维度的降维图形,右上部分是时间维度的降维图形,而下面的部分则是包含了空间维度和时间维度的降维图形,我们可以看到下面的图形具有更好的聚分辨识性。上图的右边是反卷积可视化方法,反卷积可视化方法可以看到什么样的特征可以激活卷积核。
残差3D卷积Pseudo-3D
相比于
,还有更多的3D卷积模型效能比它要好。比如ResNet的3D版本。
上图中的a图就是一个标准的2D ResBlock,b图就是3D版本的ResBlock,而c图和d图都是3D ResBlock的变形。在b图中,1*3*3是空间卷积,3*1*1是时间卷积,它们是一种串行的方式;而在c图中,它们是一种并行的方式;d图是增加了分支,与b图相比,b图的输出结果只与时间卷积有关系,而d图是将时间卷积的结果加上空间卷积的残差。这三种3D卷积各有特点,实际上作者最终采用的架构是将这三种单元进行交替的串联使用。它们其实都是一种伪3D模型。
- 不同架构比较,特征降维与可视化
上图中左边的表第一行表示2D卷积的ResNet-50,P3D-A、P3D-B、P3D-C表示分别只使用A、B、C模块,而P3D ResNet为使用A、B、C串行的架构。它们的参数量相当,速度也相当。从结果来看,使用三种结构串行的P3D ResNet的精度最高。
上图中右边的图为降维可视化,各个颜色簇分的越开表示特征越有效。
- 与其他模型比较
上面的表分成3部分,第一部分是端到端模型;第二种是用3D卷积提取特征,用SVM进行分类训练;第三种是和一些方法进行融合。总的来说P3D ResNet取得了最好的效果。
Pytorch代码
代码语言:javascript复制import torch
import torch.nn as nn
import collections
from itertools import repeat
class SpatioTemporalConv(nn.Module):
# 时空卷积
def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, bias=False):
super(SpatioTemporalConv, self).__init__()
_triple = self._ntuple(3)
kernel_size = _triple(kernel_size)
stride = _triple(stride)
padding = _triple(padding)
self.temporal_spatial_conv = nn.Conv3d(in_channels, out_channels, kernel_size,
stride=stride, padding=padding, bias=bias)
self.bn = nn.BatchNorm3d(out_channels)
self.relu = nn.ReLU()
def _ntuple(self, n):
def parse(x):
if isinstance(x, collections.abc.Iterable):
return tuple(x)
return tuple(repeat(x, n))
return parse
def forward(self, x):
x = self.bn(self.temporal_spatial_conv(x))
x = self.relu(x)
return x
class SpatioTemporalResBlock(nn.Module):
# 时空ResBlock
def __init__(self, in_channels, out_channels, kernel_size, downsample=False):
super(SpatioTemporalResBlock, self).__init__()
# 是否下采样
self.downsample = downsample
# same padding
padding = kernel_size // 2
if self.downsample:
# 1*1*1卷积,并且进行降采样
self.downsampleconv = SpatioTemporalConv(in_channels, out_channels, 1, stride=2)
self.downsamplebn = nn.BatchNorm3d(out_channels)
self.conv1 = SpatioTemporalConv(in_channels, out_channels, kernel_size, padding=padding, stride=2)
else:
self.conv1 = SpatioTemporalConv(in_channels, out_channels, kernel_size, padding=padding)
self.bn1 = nn.BatchNorm3d(out_channels)
self.relu1 = nn.ReLU()
# standard conv->batchnorm->ReLU
self.conv2 = SpatioTemporalConv(out_channels, out_channels, kernel_size, padding=padding)
self.bn2 = nn.BatchNorm3d(out_channels)
self.outrelu = nn.ReLU()
def forward(self, x):
res = self.relu1(self.bn1(self.conv1(x)))
res = self.bn2(self.conv2(res))
if self.downsample:
x = self.downsamplebn(self.downsampleconv(x))
return self.outrelu(x res)
class SpatioTemporalResLayer(nn.Module):
# 时空ResLayer
def __init__(self, in_channels, out_channels, kernel_size, layer_size, block_type=SpatioTemporalResBlock,
downsample=False):
super(SpatioTemporalResLayer, self).__init__()
# 第一个时空ResBlock
self.block1 = block_type(in_channels, out_channels, kernel_size, downsample)
# 后续的时空ResBlock
self.blocks = nn.ModuleList([])
for i in range(layer_size - 1):
self.blocks = [block_type(out_channels, out_channels, kernel_size)]
def forward(self, x):
x = self.block1(x)
for block in self.blocks:
x = block(x)
return x
class R3DNet(nn.Module):
def __init__(self, layer_sizes, num_classes):
super(R3DNet, self).__init__()
# 普通3D卷积
self.conv1 = SpatioTemporalConv(3, 64, [3, 7, 7], stride=[1, 2, 2], padding=[1, 3, 3])
# Res3D卷积
self.conv2 = SpatioTemporalResLayer(64, 64, 3, layer_sizes[0])
# 带降采样的Res3D卷积
self.conv3 = SpatioTemporalResLayer(64, 128, 3, layer_sizes[1], downsample=True)
self.conv4 = SpatioTemporalResLayer(128, 256, 3, layer_sizes[2], downsample=True)
self.conv5 = SpatioTemporalResLayer(256, 512, 3, layer_sizes[3], downsample=True)
self.pool = nn.AdaptiveAvgPool3d(1)
self.fc = nn.Linear(512, num_classes)
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = self.conv3(x)
x = self.conv4(x)
x = self.conv5(x)
x = self.pool(x)
x = x.view(-1, 512)
x = self.fc(x)
return x
if __name__ == '__main__':
inputs = torch.rand(1, 3, 16, 112, 112)
net = R3DNet((2, 2, 2, 2), 101)
outputs = net(inputs)
print(outputs.size())
运行结果
代码语言:javascript复制torch.Size([1, 101])
Tensorflow代码
代码语言:javascript复制import tensorflow as tf
import numpy as np
from tensorflow.keras import layers, models, Sequential
import collections
from itertools import repeat
class SpatioTemporalConv(layers.Layer):
def __init__(self, out_channels, kernel_size, stride=1):
super(SpatioTemporalConv, self).__init__()
_triple = self._ntuple(3)
kernel_size = _triple(kernel_size)
stride = _triple(stride)
self.temporal_spatial_conv = layers.Conv3D(out_channels, kernel_size, strides=stride, padding='same')
self.bn = layers.BatchNormalization()
self.relu = layers.ReLU()
def _ntuple(self, n):
def parse(x):
if isinstance(x, collections.abc.Iterable):
return tuple(x)
return tuple(repeat(x, n))
return parse
def call(self, x):
x = self.relu(self.bn(self.temporal_spatial_conv(x)))
return x
class SpatioTemporalResBlock(layers.Layer):
def __init__(self, out_channels, kernel_size, downsample=False):
super(SpatioTemporalResBlock, self).__init__()
self.downsample = downsample
if self.downsample:
self.downsampleconv = SpatioTemporalConv(out_channels, 1, stride=2)
self.downsamplebn = layers.BatchNormalization()
self.conv1 = SpatioTemporalConv(out_channels, kernel_size, stride=2)
else:
self.conv1 = SpatioTemporalConv(out_channels, kernel_size)
self.bn1 = layers.BatchNormalization()
self.relu1 = layers.ReLU()
self.conv2 = SpatioTemporalConv(out_channels, kernel_size)
self.bn2 = layers.BatchNormalization()
self.outrelu = layers.ReLU()
def call(self, x):
res = self.relu1(self.bn1(self.conv1(x)))
res = self.bn2(self.conv2(res))
if self.downsample:
x = self.downsamplebn(self.downsampleconv(x))
return self.outrelu(x res)
class SpatioTemporalResLayer(layers.Layer):
def __init__(self, out_channels, kernel_size, layer_size, block_type=SpatioTemporalResBlock,
downsample=False):
super(SpatioTemporalResLayer, self).__init__()
self.block1 = block_type(out_channels, kernel_size, downsample)
self.blocks = Sequential([])
for i in range(layer_size - 1):
self.blocks.add(block_type(out_channels, kernel_size))
def call(self, x):
x = self.block1(x)
x = self.blocks(x)
return x
class R3DNet(models.Model):
def __init__(self, layer_sizes, num_classes):
super(R3DNet, self).__init__()
self.padding = layers.ZeroPadding3D(padding=(1, 3, 3))
self.conv1 = SpatioTemporalConv(64, [3, 7, 7], stride=[1, 2, 2])
self.conv2 = SpatioTemporalResLayer(64, 3, layer_sizes[0])
self.conv3 = SpatioTemporalResLayer(128, 3, layer_sizes[1], downsample=True)
self.conv4 = SpatioTemporalResLayer(256, 3, layer_sizes[2], downsample=True)
self.conv5 = SpatioTemporalResLayer(512, 3, layer_sizes[3], downsample=True)
self.pool = layers.GlobalAveragePooling3D()
self.flatten = layers.Flatten()
self.fc = layers.Dense(num_classes)
def call(self, x):
x = self.padding(x)
x = self.conv1(x)
x = self.conv2(x)
x = self.conv3(x)
x = self.conv4(x)
x = self.conv5(x)
x = self.pool(x)
x = self.flatten(x)
x = self.fc(x)
return x
if __name__ == '__main__':
inputs = tf.constant(np.random.rand(1, 16, 112, 112, 3))
net = R3DNet((2, 2, 2, 2), 101)
outputs = net(inputs)
print(outputs.shape)
运行结果
代码语言:javascript复制(1, 101)
R(2 1)D
将不同的卷积进行堆叠有这么一些常见的策略,上图中a表示的是全部用2D卷积进行特征学习,然后在顶层进行特征的融合,这种策略跟3D没有太大的关系。b、c、d、e都是3D模型,对于b来说是把3D模块放在底层,c是把3D模块放在顶层,d全部都是3D模块,e全部都是(2 1)D模块(3D模块的分解,空间维度2D,时间维度1D)。
- 不同子结构比较(Kinetics validation set)
上图是不同结构的比较,在输入上还用到了8帧和16帧。MC2、MC3、MC4、MC5表示底层用到的3D卷积的数量。综合来看R(2 1)D表现了最好的性能,它和R3D保持了相当的参数量。右边的图的横轴表示参数量,纵轴为精度来进行可视化。
- 训练难度比较,不同训练策略
R(2 1)D相比R3D更加容易训练,体现在上图中就是R(2 1)D训练的损失值和验证的损失值比R3D有更低的值。在相同的网络深度的情况下,有更低的损失值就表明模型更加容易训练。右边的表展示了不同的训练加finetune(就是用别人训练好的模型,加上我们自己的数据,来训练新的模型。finetune相当于使用别人的模型的前几层,来提取浅层特征,然后在最后再落入我们自己的分类中)的策略。train表示训练用的长度,finetune表示使用预训练模型的长度。作者发现在长的序列上进行训练不适用finetune需要花费很长的时间,而在短的序列上训练,在长的序列上进行finetune具有好的性价比,比如最下面的一行,使用训练好的8序列的模型对32序列进行finetune比不finetune,训练的时间少了将近2/3。而模型精度只损失了大概1%。
- 与其他模型比较
上图中左边的表主要是两类模型进行比较,一个是I3D模型,一个是双流模型。右边的表也是多种模型的比较,其中包括P3D,总的来看R(2 1)D虽然不是最好的,但是和最好的模型旗鼓相当。
- 第一层卷积核权重可视化
上图中第一行是空间卷积获取的特征可视化,它的维度是2*2。下面是时间卷积核的可视化,它是一个2维的,而在时间维度是一个1维的序列,将其进行拉伸。可以供我们观察模型学习到的参数的有效性如何。
Pytorch代码
代码语言:javascript复制import torch
import torch.nn as nn
import math
import collections
from itertools import repeat
class SpatioTemporalConv(nn.Module):
# 时空卷积
def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, bias=False, first_conv=False):
super(SpatioTemporalConv, self).__init__()
_triple = self._ntuple(3)
kernel_size = _triple(kernel_size)
stride = _triple(stride)
padding = _triple(padding)
# 如果是第一次卷积
if first_conv:
# 空间卷积核参数
spatial_kernel_size = kernel_size
spatial_stride = (1, stride[1], stride[2])
spatial_padding = padding
# 时间卷积核参数
temporal_kernel_size = (3, 1, 1)
temporal_stride = (stride[0], 1, 1)
temporal_padding = (1, 0, 0)
# 空间卷积过度到时间卷积的中间通道数
intermed_channels = 45
# 空间卷积
self.spatial_conv = nn.Conv3d(in_channels, intermed_channels, spatial_kernel_size,
stride=spatial_stride, padding=spatial_padding, bias=bias)
self.bn1 = nn.BatchNorm3d(intermed_channels)
# 时间卷积
self.temporal_conv = nn.Conv3d(intermed_channels, out_channels, temporal_kernel_size,
stride=temporal_stride, padding=temporal_padding, bias=bias)
self.bn2 = nn.BatchNorm3d(out_channels)
self.relu = nn.ReLU()
else: # 非第一次卷积
# 空间卷积参数
spatial_kernel_size = (1, kernel_size[1], kernel_size[2])
spatial_stride = (1, stride[1], stride[2])
spatial_padding = (0, padding[1], padding[2])
# 时间卷积参数
temporal_kernel_size = (kernel_size[0], 1, 1)
temporal_stride = (stride[0], 1, 1)
temporal_padding = (padding[0], 0, 0)
# 空间卷积过度到时间卷积的中间通道数
intermed_channels = int(math.floor((kernel_size[0] * kernel_size[1] * kernel_size[2] * in_channels * out_channels) /
(kernel_size[1] * kernel_size[2] * in_channels kernel_size[0] * out_channels)))
# 空间卷积
self.spatial_conv = nn.Conv3d(in_channels, intermed_channels, spatial_kernel_size,
stride=spatial_stride, padding=spatial_padding, bias=bias)
self.bn1 = nn.BatchNorm3d(intermed_channels)
# 时间卷积
self.temporal_conv = nn.Conv3d(intermed_channels, out_channels, temporal_kernel_size,
stride=temporal_stride, padding=temporal_padding, bias=bias)
self.bn2 = nn.BatchNorm3d(out_channels)
self.relu = nn.ReLU()
def _ntuple(self, n):
def parse(x):
if isinstance(x, collections.abc.Iterable):
return tuple(x)
return tuple(repeat(x, n))
return parse
def forward(self, x):
x = self.relu(self.bn1(self.spatial_conv(x)))
x = self.relu(self.bn2(self.temporal_conv(x)))
return x
class SpatioTemporalResBlock(nn.Module):
# 时空卷积ResBlock
def __init__(self, in_channels, out_channels, kernel_size, downsample=False):
super(SpatioTemporalResBlock, self).__init__()
self.downsample = downsample
# same padding
padding = kernel_size // 2
if self.downsample:
# 1*1*1卷积,并且进行降采样
self.downsampleconv = SpatioTemporalConv(in_channels, out_channels, 1, stride=2)
self.downsamplebn = nn.BatchNorm3d(out_channels)
self.conv1 = SpatioTemporalConv(in_channels, out_channels, kernel_size, padding=padding, stride=2)
else:
self.conv1 = SpatioTemporalConv(in_channels, out_channels, kernel_size, padding=padding)
self.bn1 = nn.BatchNorm3d(out_channels)
self.relu = nn.ReLU()
self.conv2 = SpatioTemporalConv(out_channels, out_channels, kernel_size, padding=padding)
self.bn2 = nn.BatchNorm3d(out_channels)
def forward(self, x):
res = self.relu(self.bn1(self.conv1(x)))
res = self.bn2(self.conv2(res))
if self.downsample:
x = self.downsamplebn(self.downsampleconv(x))
return self.relu(x res)
class SpatioTemporalResLayer(nn.Module):
# 时空ResLayer
def __init__(self, in_channels, out_channels, kernel_size, layer_size, block_type=SpatioTemporalResBlock,
downsample=False):
super(SpatioTemporalResLayer, self).__init__()
# 第一个时空ResBlock
self.block1 = block_type(in_channels, out_channels, kernel_size, downsample)
# 后续的时空ResBlock
self.blocks = nn.ModuleList([])
for i in range(layer_size - 1):
self.blocks = [block_type(out_channels, out_channels, kernel_size)]
def forward(self, x):
x = self.block1(x)
for block in self.blocks:
x = block(x)
return x
class R2Plus1DNet(nn.Module):
def __init__(self, layer_sizes, num_classes):
super(R2Plus1DNet, self).__init__()
# 普通(2 1)D卷积
self.conv1 = SpatioTemporalConv(3, 64, (1, 7, 7), stride=(1, 2, 2), padding=(0, 3, 3), first_conv=True)
# Res(2 1)D卷积
self.conv2 = SpatioTemporalResLayer(64, 64, 3, layer_sizes[0])
# 带降采样的Res(2 1)D卷积
self.conv3 = SpatioTemporalResLayer(64, 128, 3, layer_sizes[1], downsample=True)
self.conv4 = SpatioTemporalResLayer(128, 256, 3, layer_sizes[2], downsample=True)
self.conv5 = SpatioTemporalResLayer(256, 512, 3, layer_sizes[3], downsample=True)
self.pool = nn.AdaptiveAvgPool3d(1)
self.fc = nn.Linear(512, num_classes)
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = self.conv3(x)
x = self.conv4(x)
x = self.conv5(x)
x = self.pool(x)
x = x.view(-1, 512)
x = self.fc(x)
return x
if __name__ == '__main__':
inputs = torch.rand(1, 3, 16, 112, 112)
net = R2Plus1DNet((2, 2, 2, 2), 101)
outputs = net(inputs)
print(outputs.size())
运行结果
代码语言:javascript复制torch.Size([1, 101])
Tensorflow代码
代码语言:javascript复制import tensorflow as tf
import numpy as np
from tensorflow.keras import layers, models, Sequential
import collections
from itertools import repeat
import math
class SpatioTemporalConv(layers.Layer):
def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, first_conv=False):
super(SpatioTemporalConv, self).__init__()
_triple = self._ntuple(3)
kernel_size = _triple(kernel_size)
stride = _triple(stride)
padding = _triple(padding)
if first_conv:
spatial_kernel_size = kernel_size
spatial_stride = (1, stride[1], stride[2])
spatial_padding = padding
temporal_kernel_size = (3, 1, 1)
temporal_stride = (stride[0], 1, 1)
temporal_padding = (1, 0, 0)
intermed_channels = 45
self.padding1 = layers.ZeroPadding3D(padding=spatial_padding)
self.spatial_conv = layers.Conv3D(intermed_channels, spatial_kernel_size, strides=spatial_stride,
padding='valid')
self.bn1 = layers.BatchNormalization()
self.padding2 = layers.ZeroPadding3D(padding=temporal_padding)
self.temporal_conv = layers.Conv3D(out_channels, temporal_kernel_size, strides=temporal_stride,
padding='valid')
self.bn2 = layers.BatchNormalization()
self.relu = layers.ReLU()
else:
spatial_kernel_size = (1, kernel_size[1], kernel_size[2])
spatial_stride = (1, stride[1], stride[2])
spatial_padding = (0, padding[1], padding[2])
temporal_kernel_size = (kernel_size[0], 1, 1)
temporal_stride = (stride[0], 1, 1)
temporal_padding = (padding[0], 0, 0)
intermed_channels = int(math.floor((kernel_size[0] * kernel_size[1] * kernel_size[2] * in_channels * out_channels) /
(kernel_size[1] * kernel_size[2] * in_channels kernel_size[0] * out_channels)))
self.padding1 = layers.ZeroPadding3D(padding=spatial_padding)
self.spatial_conv = layers.Conv3D(intermed_channels, spatial_kernel_size, strides=spatial_stride,
padding='valid')
self.bn1 = layers.BatchNormalization()
self.padding2 = layers.ZeroPadding3D(padding=temporal_padding)
self.temporal_conv = layers.Conv3D(out_channels, temporal_kernel_size, strides=temporal_stride,
padding='valid')
self.bn2 = layers.BatchNormalization()
self.relu = layers.ReLU()
def _ntuple(self, n):
def parse(x):
if isinstance(x, collections.abc.Iterable):
return tuple(x)
return tuple(repeat(x, n))
return parse
def call(self, x):
x = self.padding1(x)
x = self.relu(self.bn1(self.spatial_conv(x)))
x = self.padding2(x)
x = self.relu(self.bn2(self.temporal_conv(x)))
return x
class SpatioTemporalResBlock(layers.Layer):
def __init__(self, in_channels, out_channels, kernel_size, downsample=False):
super(SpatioTemporalResBlock, self).__init__()
self.downsample = downsample
padding = kernel_size // 2
if self.downsample:
self.downsampleconv = SpatioTemporalConv(in_channels, out_channels, 1, stride=2)
self.downsamplebn = layers.BatchNormalization()
self.conv1 = SpatioTemporalConv(in_channels, out_channels, kernel_size, stride=2, padding=padding)
else:
self.conv1 = SpatioTemporalConv(in_channels, out_channels, kernel_size, padding=padding)
self.bn1 = layers.BatchNormalization()
self.relu = layers.ReLU()
self.conv2 = SpatioTemporalConv(in_channels, out_channels, kernel_size, padding=padding)
self.bn2 = layers.BatchNormalization()
def call(self, x):
res = self.relu(self.bn1(self.conv1(x)))
res = self.bn2(self.conv2(res))
if self.downsample:
x = self.downsamplebn(self.downsampleconv(x))
return self.relu(x res)
class SpatioTemporalResLayer(layers.Layer):
def __init__(self, in_channels, out_channels, kernel_size, layer_size, block_type=SpatioTemporalResBlock,
downsample=False):
super(SpatioTemporalResLayer, self).__init__()
self.block1 = block_type(in_channels, out_channels, kernel_size, downsample)
self.blocks = Sequential([])
for i in range(layer_size - 1):
self.blocks.add(block_type(out_channels, out_channels, kernel_size))
def call(self, x):
x = self.block1(x)
x = self.blocks(x)
return x
class R2Plus1DNet(models.Model):
def __init__(self, layer_sizes, num_classes):
super(R2Plus1DNet, self).__init__()
self.conv1 = SpatioTemporalConv(3, 64, (1, 7, 7), stride=(1, 2, 2), padding=(0, 3, 3), first_conv=True)
self.conv2 = SpatioTemporalResLayer(64, 64, 3, layer_sizes[0])
self.conv3 = SpatioTemporalResLayer(64, 128, 3, layer_sizes[1], downsample=True)
self.conv4 = SpatioTemporalResLayer(128, 256, 3, layer_sizes[2], downsample=True)
self.conv5 = SpatioTemporalResLayer(256, 512, 3, layer_sizes[3], downsample=True)
self.pool = layers.GlobalAveragePooling3D()
self.flatten = layers.Flatten()
self.fc = layers.Dense(num_classes)
def call(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = self.conv3(x)
x = self.conv4(x)
x = self.conv5(x)
x = self.pool(x)
x = self.flatten(x)
x = self.fc(x)
return x
if __name__ == '__main__':
inputs = tf.constant(np.random.rand(1, 16, 112, 112, 3))
net = R2Plus1DNet((2, 2, 2, 2), 101)
outputs = net(inputs)
print(outputs.shape)
运行结果
代码语言:javascript复制(1, 101)
双流模型
基本双流模型
双流模型(Two stream model)有两个流。上图中的上半部分为空间分支,它的输入为单帧图像,就是RGB图像,这个分支就是我们非常熟悉的图像分类网络,它是从视频片段中随机采样一帧,使用预训练模型。上图的下半部分为时间分支,它的输入为多帧的光流,连续采样N帧,计算X和Y方向,得到2N-1通道(20)的光流,无预训练模型。这两个分支基本上是一样的,只是在时间分支上少部分没有批归一化层,其他的卷积核的大小,全连接层的参数量都是一样的。这两个分支的主要区别就是输入的不同。
- 不同时间模型与空间模型的比较
上图中左边的表是对空间模型参数的比较,我们看到对不同的训练方式进行了比较,第一种为从头开始训练,第二种是用了预训练模型 finetune整个模型,第三种是预训练模型 finetune最后一层。总的来看当dropout为0.5的时候,对于预训练模型 finetune最后一层是最好的;当dropout为0.9的时候,对于预训练模型 finetune整个模型的效果是最好的。但是不论如何,使用了预训练模型的效果都好过从头训练的模型。因为UCF-101这个数据集的量级比起ImageNet的数据集的量级要低的多,所以使用ImageNet的预训练模型效果要好的多。
上图中右边的表是时间模型参数的比较,它比较的是输入的不同,第一种是单帧光流,第二种是5帧光流,第三种是10帧光流,第四种是10帧的改进光流,第五种是双向的10帧光流。它们还比较了是否减去光流的均值,off为不减,on为减。我们看到减去了光流的均值精度有所提升,多帧光流比单帧光流精度也有所提升。
- 与其他模型比较
双流模型是一种比较早期的方法,它主要是和一些传统的方法进行比较,比如说IDT;另外和它自己单独的空间流和单独的时间流模型进行比较。对空间流和时间流进行融合的时候做了两种方案的比较,一个是通过平均方法(将两个流预测的分数直接进行平均融合),一个是通过SVM(将两个流最后提取出来的全连接层的特征串接起来用SVM模型来进行分类器的训练)。有关SVM的内容可以参考机器学习算法整理(三) 中的支撑向量机SVM。基于SVM分类器的训练的效果明显好过基于分数求平均效果,主要原因是因为UCF-101和HMDB-51这两个数据集都比较小,训练出来的模型比较容易过拟合,用SVM进行分类会更好一些。
- 多任务学习,解决数据集过小的问题
这里的多任务学习都是进行分类任务,只不过训练的时候分了两个分支,一个分支只用UCF-101数据集进行训练,另外一个分支是用HMDB-51数据集进行训练。可以看到用了多任务学习的模型精度是有所提升的,HMDB-51数据集比UCF-101数据集更小,用更多的数据集来进行训练有助于提高模型的泛化能力。
- 融合模型
最后对整个模型做了不同融合策略的比较。将空间分支给固定成了预训练模型 finetune最后一层,时间分支分为双向光流(bi-directional)、单向光流(uni-directional)、单向光流 多任务学习(multi-task)。使用平均融合和SVM来对特征进行分类训练。可以看到单向光流 多任务学习 SVM策略取得了综合最好的性能。最基本的双流模型主要是对特征进行融合,对中间的过程没有信息交互。
双流模型融合策略
- 基于3D卷积的融合
上图的下部代表了两个输入的分支,一个是1-τ的RGB图像,一个是1-τ( /-)L/2的光流。各自输入一个模型,得到各自的特征。得到特征之后,会进行拼接。为了方便拼接,这两个特征的维度都是D*T*H*W,拼接以后就是2D*T*H*W,再输入到融合模块中。上图中的上部有两个分支,左边的分支是时空分支(包含了空间和时间),右边是时间分支。这两个分支去掉任何一个分支都是可以的,都可以作为最终预测的输出。但作为一个整体,其中时空分支是主分支,时间分支是辅助分支,训练、测试的时候都会用到这两个流的结果进行融合使用。
在时空分支中,是将拼接后的2D*T*H*W特征图->3D卷积->3D池化->全连接;时间分支中,是将拼接后的2D*T*H*W特征图->3D池化->全连接层。