六、神经网络
在本章中,我们将介绍神经网络以及如何在 TensorFlow 中实现它们。大多数后续章节将基于神经网络,因此学习如何在 TensorFlow 中使用它们非常重要。在开始使用多层网络之前,我们将首先介绍神经网络的基本概念。在上一节中,我们将创建一个神经网络,学习如何玩井字棋。
在本章中,我们将介绍以下秘籍:
- 实现操作门
- 使用门和激活函数
- 实现单层神经网络
- 实现不同的层
- 使用多层网络
- 改进线性模型的预测
- 学习玩井字棋
读者可以在 Github 和 Packt 仓库中找到本章中的所有代码。
介绍
神经网络目前在诸如图像和语音识别,阅读手写,理解文本,图像分割,对话系统,自动驾驶汽车等任务中打破记录。虽然这些上述任务中的一些将在后面的章节中介绍,但重要的是将神经网络作为一种易于实现的机器学习算法引入,以便我们以后可以对其进行扩展。
神经网络的概念已经存在了几十年。然而,它最近才获得牵引力,因为我们现在具有训练大型网络的计算能力,因为处理能力,算法效率和数据大小的进步。
神经网络基本上是应用于输入数据矩阵的一系列操作。这些操作通常是加法和乘法的集合,然后是非线性函数的应用。我们已经看到的一个例子是逻辑回归,我们在第 3 章,线性回归中看到了这一点。逻辑回归是部分斜率 - 特征乘积的总和,其后是应用 Sigmoid 函数,这是非线性的。神经网络通过允许操作和非线性函数的任意组合(包括绝对值,最大值,最小值等的应用)来进一步概括这一点。
神经网络的重要技巧称为反向传播。反向传播是一种允许我们根据学习率和损失函数输出更新模型变量的过程。我们使用反向传播来更新第 3 章,线性回归和第 4 章,支持向量机中的模型变量。
关于神经网络的另一个重要特征是非线性激活函数。由于大多数神经网络只是加法和乘法运算的组合,因此它们无法对非线性数据集进行建模。为了解决这个问题,我们在神经网络中使用了非线性激活函数。这将允许神经网络适应大多数非线性情况。
重要的是要记住,正如我们在许多算法中所看到的,神经网络对我们选择的超参数敏感。在本章中,我们将探讨不同学习率,损失函数和优化程序的影响。
学习神经网络的资源更多,更深入,更详细地涵盖了该主题。这些资源如下:
- 描述反向传播的开创性论文是 Yann LeCun 等人的 Efficient Back Prop
- CS231,用于视觉识别的卷积神经网络,由斯坦福大学提供。
- CS224d,斯坦福大学自然语言处理的深度学习。
- [深度学习,麻省理工学院出版社出版的一本书,Goodfellow 等人,2016]http://www.deeplearningbook.org)。
- 迈克尔·尼尔森(Michael Nielsen)有一本名为“神经网络与深度学习”的在线书籍。
- 对于一个更实用的方法和神经网络的介绍,Andrej Karpathy 用 JavaScript 实例写了一个很棒的总结,称为黑客的神经网络指南。
- 另一个总结深度学习的网站被 Ian Goodfellow,Yoshua Bengio 和 Aaron Courville 称为初学者深度学习。
实现操作门
神经网络最基本的概念之一是作为操作门操作。在本节中,我们将从乘法操作开始作为门,然后再继续考虑嵌套门操作。
准备
我们将实现的第一个操作门是f(x) = a · x
。为优化此门,我们将a
输入声明为变量,将x
输入声明为占位符。这意味着 TensorFlow 将尝试更改a
值而不是x
值。我们将创建损失函数作为输出和目标值之间的差异,即 50。
第二个嵌套操作门将是f(x) = a · x b
。同样,我们将a
和b
声明为变量,将x
声明为占位符。我们再次将输出优化到目标值 50。值得注意的是,第二个例子的解决方案并不是唯一的。有许多模型变量组合可以使输出为 50.对于神经网络,我们并不关心中间模型变量的值,而是更加强调所需的输出。
将这些操作视为我们计算图上的操作门。下图描绘了前面两个示例:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HdAgokLV-1681566911063)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/d954838e-bc84-4659-be40-91028c951c64.png)]
图 1:本节中的两个操作门示例
操作步骤
要在 TensorFlow 中实现第一个操作门f(x) = a · x
并将输出训练为值 50,请按照下列步骤操作:
- 首先加载
TensorFlow
并创建图会话,如下所示:
import tensorflow as tf
sess = tf.Session()
- 现在我们需要声明我们的模型变量,输入数据和占位符。我们使输入数据等于值
5
,因此得到 50 的乘法因子将为 10(即5X10=50
),如下所示:
a = tf.Variable(tf.constant(4.))
x_val = 5.
x_data = tf.placeholder(dtype=tf.float32)
- 接下来,我们使用以下输入将操作添加到计算图中:
multiplication = tf.multiply(a, x_data)
- 我们现在将损失函数声明为输出与
50
的期望目标值之间的 L2 距离,如下所示:
loss = tf.square(tf.subtract(multiplication, 50.))
- 现在我们初始化我们的模型变量并将我们的优化算法声明为标准梯度下降,如下所示:
init = tf.global_variables_initializer()
sess.run(init)
my_opt = tf.train.GradientDescentOptimizer(0.01)
train_step = my_opt.minimize(loss)
- 我们现在可以将模型输出优化到
50
的期望值。我们通过连续输入 5 的输入值并反向传播损失来将模型变量更新为10
的值,如下所示:
print('Optimizing a Multiplication Gate Output to 50.')
for i in range(10):
sess.run(train_step, feed_dict={x_data: x_val})
a_val = sess.run(a)
mult_output = sess.run(multiplication, feed_dict={x_data: x_val})
print(str(a_val) ' * ' str(x_val) ' = ' str(mult_output))
- 上一步应该产生以下输出:
Optimizing a Multiplication Gate Output to 50.
7.0 * 5.0 = 35.0
8.5 * 5.0 = 42.5
9.25 * 5.0 = 46.25
9.625 * 5.0 = 48.125
9.8125 * 5.0 = 49.0625
9.90625 * 5.0 = 49.5312
9.95312 * 5.0 = 49.7656
9.97656 * 5.0 = 49.8828
9.98828 * 5.0 = 49.9414
9.99414 * 5.0 = 49.9707
接下来,我们将对两个嵌套的操作门f(x) = a · x b
进行相同的操作。
- 我们将以与前面示例完全相同的方式开始,但将初始化两个模型变量
a
和b
,如下所示:
from tensorflow.python.framework import ops
ops.reset_default_graph()
sess = tf.Session()
a = tf.Variable(tf.constant(1.))
b = tf.Variable(tf.constant(1.))
x_val = 5.
x_data = tf.placeholder(dtype=tf.float32)
two_gate = tf.add(tf.multiply(a, x_data), b)
loss = tf.square(tf.subtract(two_gate, 50.))
my_opt = tf.train.GradientDescentOptimizer(0.01)
train_step = my_opt.minimize(loss)
init = tf.global_variables_initializer()
sess.run(init)
- 我们现在优化模型变量以将输出训练到
50
的目标值,如下所示:
print('Optimizing Two Gate Output to 50.')
for i in range(10):
# Run the train step
sess.run(train_step, feed_dict={x_data: x_val})
# Get the a and b values
a_val, b_val = (sess.run(a), sess.run(b))
# Run the two-gate graph output
two_gate_output = sess.run(two_gate, feed_dict={x_data: x_val})
print(str(a_val) ' * ' str(x_val) ' ' str(b_val) ' = ' str(two_gate_output))
- 上一步应该产生以下输出:
Optimizing Two Gate Output to 50.
5.4 * 5.0 1.88 = 28.88
7.512 * 5.0 2.3024 = 39.8624
8.52576 * 5.0 2.50515 = 45.134
9.01236 * 5.0 2.60247 = 47.6643
9.24593 * 5.0 2.64919 = 48.8789
9.35805 * 5.0 2.67161 = 49.4619
9.41186 * 5.0 2.68237 = 49.7417
9.43769 * 5.0 2.68754 = 49.876
9.45009 * 5.0 2.69002 = 49.9405
9.45605 * 5.0 2.69121 = 49.9714
这里需要注意的是,第二个例子的解决方案并不是唯一的。这在神经网络中并不重要,因为所有参数都被调整为减少损失。这里的最终解决方案将取决于
a
和b
的初始值。如果这些是随机初始化的,而不是值 1,我们会看到每次迭代的模型变量的不同结束值。
工作原理
我们通过 TensorFlow 的隐式反向传播实现了计算门的优化。 TensorFlow 跟踪我们的模型的操作和变量值,并根据我们的优化算法规范和损失函数的输出进行调整。
我们可以继续扩展操作门,同时跟踪哪些输入是变量,哪些输入是数据。这对于跟踪是很重要的,因为 TensorFlow 将更改所有变量以最小化损失而不是数据,这被声明为占位符。
每个训练步骤自动跟踪计算图并自动更新模型变量的隐式能力是 TensorFlow 的强大功能之一,也是它如此强大的原因之一。
使用门和激活函数
现在我们可以将操作门连接在一起,我们希望通过激活函数运行计算图输出。在本节中,我们将介绍常见的激活函数。
准备
在本节中,我们将比较和对比两种不同的激活函数:Sigmoid 和整流线性单元(ReLU)。回想一下,这两个函数由以下公式给出:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Dn1Y6ZrA-1681566911064)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/99793b89-d872-4349-adf9-0b04b07b05dd.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rMfXutIV-1681566911065)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/e27c24a9-4e98-404e-a085-9019936fe3d4.png)]
在这个例子中,我们将创建两个具有相同结构的单层神经网络,除了一个将通过 sigmoid 激活并且一个将通过 ReLU 激活。损失函数将由距离值 0.75 的 L2 距离控制。我们将从正态分布(Normal(mean=2, sd=0.1))
中随机抽取批量数据,然后将输出优化为 0.75。
操作步骤
我们按如下方式处理秘籍:
- 我们将首先加载必要的库并初始化图。这也是我们可以提出如何使用 TensorFlow 设置随机种子的好点。由于我们将使用 NumPy 和 TensorFlow 中的随机数生成器,因此我们需要为两者设置随机种子。使用相同的随机种子集,我们应该能够复制结果。我们通过以下输入执行此操作:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
sess = tf.Session()
tf.set_random_seed(5)
np.random.seed(42)
- 现在我们需要声明我们的批量大小,模型变量,数据和占位符来输入数据。我们的计算图将包括将我们的正态分布数据输入到两个相似的神经网络中,这两个神经网络的区别仅在于激活函数。结束,如下所示:
batch_size = 50
a1 = tf.Variable(tf.random_normal(shape=[1,1]))
b1 = tf.Variable(tf.random_uniform(shape=[1,1]))
a2 = tf.Variable(tf.random_normal(shape=[1,1]))
b2 = tf.Variable(tf.random_uniform(shape=[1,1]))
x = np.random.normal(2, 0.1, 500)
x_data = tf.placeholder(shape=[None, 1], dtype=tf.float32)
- 接下来,我们将声明我们的两个模型,即 sigmoid 激活模型和 ReLU 激活模型,如下所示:
sigmoid_activation = tf.sigmoid(tf.add(tf.matmul(x_data, a1), b1))
relu_activation = tf.nn.relu(tf.add(tf.matmul(x_data, a2), b2))
- 损失函数将是模型输出与值 0.75 之间的平均 L2 范数,如下所示:
loss1 = tf.reduce_mean(tf.square(tf.subtract(sigmoid_activation, 0.75)))
loss2 = tf.reduce_mean(tf.square(tf.subtract(relu_activation, 0.75)))
- 现在我们需要声明我们的优化算法并初始化我们的变量,如下所示:
my_opt = tf.train.GradientDescentOptimizer(0.01)
train_step_sigmoid = my_opt.minimize(loss1)
train_step_relu = my_opt.minimize(loss2)
init = tf.global_variable_initializer()
sess.run(init)
- 现在,我们将针对两个模型循环我们的 750 次迭代训练,如下面的代码块所示。我们还将保存损失输出和激活输出值,以便稍后进行绘图:
loss_vec_sigmoid = []
loss_vec_relu = []
activation_sigmoid = []
activation_relu = []
for i in range(750):
rand_indices = np.random.choice(len(x), size=batch_size)
x_vals = np.transpose([x[rand_indices]])
sess.run(train_step_sigmoid, feed_dict={x_data: x_vals})
sess.run(train_step_relu, feed_dict={x_data: x_vals})
loss_vec_sigmoid.append(sess.run(loss1, feed_dict={x_data: x_vals}))
loss_vec_relu.append(sess.run(loss2, feed_dict={x_data: x_vals}))
activation_sigmoid.append(np.mean(sess.run(sigmoid_activation, feed_dict={x_data: x_vals})))
activation_relu.append(np.mean(sess.run(relu_activation, feed_dict={x_data: x_vals})))
- 要绘制损失和激活输出,我们需要输入以下代码:
plt.plot(activation_sigmoid, 'k-', label='Sigmoid Activation')
plt.plot(activation_relu, 'r--', label='Relu Activation')
plt.ylim([0, 1.0])
plt.title('Activation Outputs')
plt.xlabel('Generation')
plt.ylabel('Outputs')
plt.legend(loc='upper right')
plt.show()
plt.plot(loss_vec_sigmoid, 'k-', label='Sigmoid Loss')
plt.plot(loss_vec_relu, 'r--', label='Relu Loss')
plt.ylim([0, 1.0])
plt.title('Loss per Generation')
plt.xlabel('Generation')
plt.ylabel('Loss')
plt.legend(loc='upper right')
plt.show()
激活输出需要绘制,如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u5DtPK1j-1681566911065)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/e8f656c0-3ccb-4dee-b6ff-1f05dcb0a1d6.png)]
图 2:来自具有 Sigmoid 激活的网络和具有 ReLU 激活的网络的计算图输出
两个神经网络使用类似的架构和目标(0.75),但有两个不同的激活函数,sigmoid 和 ReLU。重要的是要注意 ReLU 激活网络收敛到比 sigmoid 激活所需的 0.75 目标更快,如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bafT8Qi2-1681566911065)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/59a4dcbb-73a7-4598-80e9-28690b520a8d.png)]
图 3:该图描绘了 Sigmoid 和 ReLU 激活网络的损耗值。注意迭代开始时 ReLU 损失的极端程度
工作原理
由于 ReLU 激活函数的形式,它比 sigmoid 函数更频繁地返回零值。我们认为这种行为是一种稀疏性。这种稀疏性导致收敛速度加快,但失去了受控梯度。另一方面,Sigmoid 函数具有非常良好控制的梯度,并且不会冒 ReLU 激活所带来的极值的风险,如下图所示:
激活函数 | 优点 | 缺点 |
---|---|---|
Sigmoid | 不太极端的输出 | 收敛速度较慢 |
RELU | 更快地收敛 | 可能有极端的输出值 |
更多
在本节中,我们比较了神经网络的 ReLU 激活函数和 Sigmoid 激活函数。还有许多其他激活函数通常用于神经网络,但大多数属于两个类别之一;第一类包含形状类似于 sigmoid 函数的函数,如 arctan,hypertangent,heavyiside step 等;第二类包含形状的函数,例如 ReLU 函数,例如 softplus,leaky ReLU 等。我们在本节中讨论的关于比较这两个函数的大多数内容都适用于任何类别的激活。然而,重要的是要注意激活函数的选择对神经网络的收敛和输出有很大影响。
实现单层神经网络
我们拥有实现对真实数据进行操作的神经网络所需的所有工具,因此在本节中我们将创建一个神经网络,其中一个层在Iris
数据集上运行。
准备
在本节中,我们将实现一个具有一个隐藏层的神经网络。重要的是要理解完全连接的神经网络主要基于矩阵乘法。因此,重要的是数据和矩阵的大小正确排列。
由于这是一个回归问题,我们将使用均方误差作为损失函数。
操作步骤
我们按如下方式处理秘籍:
- 要创建计算图,我们首先加载以下必要的库:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from sklearn import datasets
- 现在我们将加载
Iris
数据并将长度存储为目标值。然后我们将使用以下代码启动图会话:
iris = datasets.load_iris()
x_vals = np.array([x[0:3] for x in iris.data])
y_vals = np.array([x[3] for x in iris.data])
sess = tf.Session()
- 由于数据集较小,我们需要设置种子以使结果可重现,如下所示:
seed = 2
tf.set_random_seed(seed)
np.random.seed(seed)
- 为了准备数据,我们将创建一个 80-20 训练测试分割,并通过最小 - 最大缩放将 x 特征标准化为 0 到 1 之间,如下所示:
train_indices = np.random.choice(len(x_vals), round(len(x_vals)*0.8), replace=False)
test_indices = np.array(list(set(range(len(x_vals))) - set(train_indices)))
x_vals_train = x_vals[train_indices]
x_vals_test = x_vals[test_indices]
y_vals_train = y_vals[train_indices]
y_vals_test = y_vals[test_indices]
def normalize_cols(m):
col_max = m.max(axis=0)
col_min = m.min(axis=0)
return (m-col_min) / (col_max - col_min)
x_vals_train = np.nan_to_num(normalize_cols(x_vals_train))
x_vals_test = np.nan_to_num(normalize_cols(x_vals_test))
- 现在,我们将使用以下代码声明数据和目标的批量大小和占位符:
batch_size = 50
x_data = tf.placeholder(shape=[None, 3], dtype=tf.float32)
y_target = tf.placeholder(shape=[None, 1], dtype=tf.float32)
- 重要的是要用适当的形状声明我们的模型变量。我们可以将隐藏层的大小声明为我们希望的任何大小;在下面的代码块中,我们将其设置为有五个隐藏节点:
hidden_layer_nodes = 5
A1 = tf.Variable(tf.random_normal(shape=[3,hidden_layer_nodes]))
b1 = tf.Variable(tf.random_normal(shape=[hidden_layer_nodes]))
A2 = tf.Variable(tf.random_normal(shape=[hidden_layer_nodes,1]))
b2 = tf.Variable(tf.random_normal(shape=[1]))
- 我们现在分两步宣布我们的模型。第一步是创建隐藏层输出,第二步是创建模型的
final_output
,如下所示:
代码语言:javascript复制请注意,我们的模型从三个输入特征到五个隐藏节点,最后到一个输出值。
hidden_output = tf.nn.relu(tf.add(tf.matmul(x_data, A1), b1))
final_output = tf.nn.relu(tf.add(tf.matmul(hidden_output, A2), b2))
- 我们作为
loss
函数的均方误差如下:
loss = tf.reduce_mean(tf.square(y_target - final_output))
- 现在我们将声明我们的优化算法并使用以下代码初始化我们的变量:
my_opt = tf.train.GradientDescentOptimizer(0.005)
train_step = my_opt.minimize(loss)
init = tf.global_variables_initializer()
sess.run(init)
- 接下来,我们循环我们的训练迭代。我们还将初始化两个列表,我们可以存储我们的训练和
test_loss
函数。在每个循环中,我们还希望从训练数据中随机选择一个批量以适合模型,如下所示:
# First we initialize the loss vectors for storage.
loss_vec = []
test_loss = []
for i in range(500):
# We select a random set of indices for the batch.
rand_index = np.random.choice(len(x_vals_train), size=batch_size)
# We then select the training values
rand_x = x_vals_train[rand_index]
rand_y = np.transpose([y_vals_train[rand_index]])
# Now we run the training step
sess.run(train_step, feed_dict={x_data: rand_x, y_target: rand_y})
# We save the training loss
temp_loss = sess.run(loss, feed_dict={x_data: rand_x, y_target: rand_y})
loss_vec.append(np.sqrt(temp_loss))
# Finally, we run the test-set loss and save it.
test_temp_loss = sess.run(loss, feed_dict={x_data: x_vals_test, y_target: np.transpose([y_vals_test])})
test_loss.append(np.sqrt(test_temp_loss))
if (i 1)P==0:
print('Generation: ' str(i 1) '. Loss = ' str(temp_loss))
- 我们可以用
matplotlib
和以下代码绘制损失:
plt.plot(loss_vec, 'k-', label='Train Loss')
plt.plot(test_loss, 'r--', label='Test Loss')
plt.title('Loss (MSE) per Generation')
plt.xlabel('Generation')
plt.ylabel('Loss')
plt.legend(loc='upper right')
plt.show()
我们通过绘制下图来继续秘籍:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fbd0QHmh-1681566911066)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/e9f5eb6c-586d-4ad8-8ac0-9a4e00482688.png)]
图 4:我们绘制了训练和测试装置的损失(MSE)。请注意,我们在 200 代之后略微过拟合模型,因为测试 MSE 不会进一步下降,但训练 MSE 确实
工作原理
我们的模型现已可视化为神经网络图,如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oSaVZUEy-1681566911066)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/52077e5c-eee3-4684-97fa-cdac28fceec6.png)]
图 5:上图是我们的神经网络的可视化,在隐藏层中有五个节点。我们馈送三个值:萼片长度(S.L),萼片宽度(S.W.)和花瓣长度(P.L.)。目标将是花瓣宽度。总的来说,模型中总共有 26 个变量
更多
请注意,通过查看测试和训练集上的loss
函数,我们可以确定模型何时开始过拟合训练数据。我们还可以看到训练损失并不像测试装置那样平稳。这是因为有两个原因:第一个原因是我们使用的批量小于测试集,尽管不是很多;第二个原因是由于我们正在训练训练组,而测试装置不会影响模型的变量。
实现不同的层
了解如何实现不同的层非常重要。在前面的秘籍中,我们实现了完全连接的层。在本文中,我们将进一步扩展我们对各层的了解。
准备
我们已经探索了如何连接数据输入和完全连接的隐藏层,但是 TensorFlow 中有更多类型的层是内置函数。最常用的层是卷积层和最大池化层。我们将向您展示如何使用输入数据和完全连接的数据创建和使用此类层。首先,我们将研究如何在一维数据上使用这些层,然后在二维数据上使用这些层。
虽然神经网络可以以任何方式分层,但最常见的用途之一是使用卷积层和完全连接的层来首先创建特征。如果我们有太多的特征,通常会有一个最大池化层。在这些层之后,通常引入非线性层作为激活函数。我们将在第 8 章卷积神经网络中考虑的卷积神经网络(CNN)通常具有卷积,最大池化,激活,卷积,最大池化和激活形式。
操作步骤
我们将首先看一维数据。我们需要使用以下步骤为此任务生成随机数据数组:
- 我们首先加载我们需要的库并启动图会话,如下所示:
import tensorflow as tf
import numpy as np
sess = tf.Session()
- 现在我们可以初始化我们的数据(长度为
25
的 NumPy 数组)并创建占位符,我们将通过以下代码提供它:
data_size = 25
data_1d = np.random.normal(size=data_size)
x_input_1d = tf.placeholder(dtype=tf.float32, shape=[data_size])
- 接下来,我们将定义一个将构成卷积层的函数。然后我们将声明一个随机过滤器并创建卷积层,如下所示:
代码语言:javascript复制请注意,许多 TensorFlow 的层函数都是为处理 4D 数据而设计的(
4D = [batch size, width, height, and channels]
)。我们需要修改输入数据和输出数据,以扩展或折叠所需的额外维度。对于我们的示例数据,我们的批量大小为 1,宽度为 1,高度为 25,通道大小为 1。要扩展大小,我们使用expand_dims()
函数,并且为了折叠大小,我们使用squeeze()
函数。另请注意,我们可以使用output_size=(W-F 2P)/S 1
公式计算卷积层的输出大小,其中W
是输入大小,F
是滤镜大小,P
是填充大小,S
是步幅大小。
def conv_layer_1d(input_1d, my_filter):
# Make 1d input into 4d
input_2d = tf.expand_dims(input_1d, 0)
input_3d = tf.expand_dims(input_2d, 0)
input_4d = tf.expand_dims(input_3d, 3)
# Perform convolution
convolution_output = tf.nn.conv2d(input_4d, filter=my_filter, strides=[1,1,1,1], padding="VALID")
# Now drop extra dimensions
conv_output_1d = tf.squeeze(convolution_output)
return(conv_output_1d)
my_filter = tf.Variable(tf.random_normal(shape=[1,5,1,1]))
my_convolution_output = conv_layer_1d(x_input_1d, my_filter)
- 默认情况下,TensorFlow 的激活函数将按元素方式执行。这意味着我们只需要在感兴趣的层上调用激活函数。我们通过创建激活函数然后在图上初始化它来完成此操作,如下所示:
def activation(input_1d):
return tf.nn.relu(input_1d)
my_activation_output = activation(my_convolution_output)
- 现在我们将声明一个最大池化层函数。此函数将在我们的一维向量上的移动窗口上创建一个最大池化。对于此示例,我们将其初始化为宽度为 5,如下所示:
代码语言:javascript复制TensorFlow 的最大池化参数与卷积层的参数非常相似。虽然最大池化参数没有过滤器,但它确实有
size
,stride
和padding
选项。由于我们有一个带有有效填充的 5 的窗口(没有零填充),因此我们的输出数组将减少 4 个条目。
def max_pool(input_1d, width):
# First we make the 1d input into 4d.
input_2d = tf.expand_dims(input_1d, 0)
input_3d = tf.expand_dims(input_2d, 0)
input_4d = tf.expand_dims(input_3d, 3)
# Perform the max pool operation
pool_output = tf.nn.max_pool(input_4d, ksize=[1, 1, width, 1], strides=[1, 1, 1, 1], padding='VALID')
pool_output_1d = tf.squeeze(pool_output)
return pool_output_1d
my_maxpool_output = max_pool(my_activation_output, width=5)
- 我们将要连接的最后一层是完全连接的层。在这里,我们想要创建一个多特征函数,输入一维数组并输出指示的数值。还要记住,要使用 1D 数组进行矩阵乘法,我们必须将维度扩展为 2D,如下面的代码块所示:
def fully_connected(input_layer, num_outputs):
# Create weights
weight_shape = tf.squeeze(tf.stack([tf.shape(input_layer), [num_outputs]]))
weight = tf.random_normal(weight_shape, stddev=0.1)
bias = tf.random_normal(shape=[num_outputs])
# Make input into 2d
input_layer_2d = tf.expand_dims(input_layer, 0)
# Perform fully connected operations
full_output = tf.add(tf.matmul(input_layer_2d, weight), bias)
# Drop extra dimensions
full_output_1d = tf.squeeze(full_output)
return full_output_1d
my_full_output = fully_connected(my_maxpool_output, 5)
- 现在我们将初始化所有变量,运行图并打印每个层的输出,如下所示:
init = tf.global_variable_initializer()
sess.run(init)
feed_dict = {x_input_1d: data_1d}
# Convolution Output
print('Input = array of length 25')
print('Convolution w/filter, length = 5, stride size = 1, results in an array of length 21:')
print(sess.run(my_convolution_output, feed_dict=feed_dict))
# Activation Output
print('Input = the above array of length 21')
print('ReLU element wise returns the array of length 21:')
print(sess.run(my_activation_output, feed_dict=feed_dict))
# Maxpool Output
print('Input = the above array of length 21')
print('MaxPool, window length = 5, stride size = 1, results in the array of length 17:')
print(sess.run(my_maxpool_output, feed_dict=feed_dict))
# Fully Connected Output
print('Input = the above array of length 17')
print('Fully connected layer on all four rows with five outputs:')
print(sess.run(my_full_output, feed_dict=feed_dict))
- 上一步应该产生以下输出:
Input = array of length 25
Convolution w/filter, length = 5, stride size = 1, results in an array of length 21:
[-0.91608119 1.53731811 -0.7954089 0.5041104 1.88933098
-1.81099761 0.56695032 1.17945457 -0.66252393 -1.90287709
0.87184119 0.84611893 -5.25024986 -0.05473572 2.19293165
-4.47577858 -1.71364677 3.96857905 -2.0452652 -1.86647367
-0.12697852]
Input = the above array of length 21
ReLU element wise returns the array of length 21:
[ 0. 1.53731811 0. 0.5041104 1.88933098
0. 0. 1.17945457 0. 0.
0.87184119 0.84611893 0. 0. 2.19293165
0. 0. 3.96857905 0. 0.
0. ]
Input = the above array of length 21
MaxPool, window length = 5, stride size = 1, results in the array of length 17:
[ 1.88933098 1.88933098 1.88933098 1.88933098 1.88933098
1.17945457 1.17945457 1.17945457 0.87184119 0.87184119
2.19293165 2.19293165 2.19293165 3.96857905 3.96857905
3.96857905 3.96857905]
Input = the above array of length 17
Fully connected layer on all four rows with five outputs:
[ 1.23588216 -0.42116445 1.44521213 1.40348077 -0.79607368]
对于神经网络,一维数据非常重要。时间序列,信号处理和一些文本嵌入被认为是一维的并且经常在神经网络中使用。
我们现在将以相同的顺序考虑相同类型的层,但是对于二维数据:
- 我们将从清除和重置计算图开始,如下所示:
ops.reset_default_graph()
sess = tf.Session()
- 然后我们将初始化我们的输入数组,使其为
10x10
矩阵,然后我们将为具有相同形状的图初始化占位符,如下所示:
data_size = [10,10]
data_2d = np.random.normal(size=data_size)
x_input_2d = tf.placeholder(dtype=tf.float32, shape=data_size)
- 就像在一维示例中一样,我们现在需要声明卷积层函数。由于我们的数据已经具有高度和宽度,我们只需要将其扩展为二维(批量大小为 1,通道大小为 1),以便我们可以使用
conv2d()
函数对其进行操作。对于滤波器,我们将使用随机2x2
滤波器,两个方向的步幅为 2,以及有效填充(换句话说,没有零填充)。因为我们的输入矩阵是10x10
,我们的卷积输出将是5x5
,如下所示:
def conv_layer_2d(input_2d, my_filter):
# First, change 2d input to 4d
input_3d = tf.expand_dims(input_2d, 0)
input_4d = tf.expand_dims(input_3d, 3)
# Perform convolution
convolution_output = tf.nn.conv2d(input_4d, filter=my_filter, strides=[1,2,2,1], padding="VALID")
# Drop extra dimensions
conv_output_2d = tf.squeeze(convolution_output)
return(conv_output_2d)
my_filter = tf.Variable(tf.random_normal(shape=[2,2,1,1]))
my_convolution_output = conv_layer_2d(x_input_2d, my_filter)
- 激活函数在逐个元素的基础上工作,因此我们现在可以创建激活操作并使用以下代码在图上初始化它:
def activation(input_2d):
return tf.nn.relu(input_2d)
my_activation_output = activation(my_convolution_output)
- 我们的最大池化层与一维情况非常相似,只是我们必须声明最大池化窗口的宽度和高度。就像我们的卷积 2D 层一样,我们只需要扩展到两个维度,如下所示:
def max_pool(input_2d, width, height):
# Make 2d input into 4d
input_3d = tf.expand_dims(input_2d, 0)
input_4d = tf.expand_dims(input_3d, 3)
# Perform max pool
pool_output = tf.nn.max_pool(input_4d, ksize=[1, height, width, 1], strides=[1, 1, 1, 1], padding='VALID')
# Drop extra dimensions
pool_output_2d = tf.squeeze(pool_output)
return pool_output_2d
my_maxpool_output = max_pool(my_activation_output, width=2, height=2)
- 我们的全连接层与一维输出非常相似。我们还应该注意到,此层的 2D 输入被视为一个对象,因此我们希望每个条目都连接到每个输出。为了实现这一点,我们需要完全展平二维矩阵,然后将其展开以进行矩阵乘法,如下所示:
def fully_connected(input_layer, num_outputs):
# Flatten into 1d
flat_input = tf.reshape(input_layer, [-1])
# Create weights
weight_shape = tf.squeeze(tf.stack([tf.shape(flat_input), [num_outputs]]))
weight = tf.random_normal(weight_shape, stddev=0.1)
bias = tf.random_normal(shape=[num_outputs])
# Change into 2d
input_2d = tf.expand_dims(flat_input, 0)
# Perform fully connected operations
full_output = tf.add(tf.matmul(input_2d, weight), bias)
# Drop extra dimensions
full_output_2d = tf.squeeze(full_output)
return full_output_2d
my_full_output = fully_connected(my_maxpool_output, 5)
- 现在我们需要初始化变量并使用以下代码为我们的操作创建一个馈送字典:
init = tf.global_variables_initializer()
sess.run(init)
feed_dict = {x_input_2d: data_2d}
- 每个层的输出应如下所示:
# Convolution Output
print('Input = [10 X 10] array')
print('2x2 Convolution, stride size = [2x2], results in the [5x5] array:')
print(sess.run(my_convolution_output, feed_dict=feed_dict))
# Activation Output
print('Input = the above [5x5] array')
print('ReLU element wise returns the [5x5] array:')
print(sess.run(my_activation_output, feed_dict=feed_dict))
# Max Pool Output
print('Input = the above [5x5] array')
print('MaxPool, stride size = [1x1], results in the [4x4] array:')
print(sess.run(my_maxpool_output, feed_dict=feed_dict))
# Fully Connected Output
print('Input = the above [4x4] array')
print('Fully connected layer on all four rows with five outputs:')
print(sess.run(my_full_output, feed_dict=feed_dict))
- 上一步应该产生以下输出:
Input = [10 X 10] array
2x2 Convolution, stride size = [2x2], results in the [5x5] array:
[[ 0.37630892 -1.41018617 -2.58821273 -0.32302785 1.18970704]
[-4.33685207 1.97415686 1.0844903 -1.18965471 0.84643292]
[ 5.23706436 2.46556497 -0.95119286 1.17715418 4.1117816 ]
[ 5.86972761 1.2213701 1.59536231 2.66231227 2.28650784]
[-0.88964868 -2.75502229 4.3449688 2.67776585 -2.23714781]]
Input = the above [5x5] array
ReLU element wise returns the [5x5] array:
[[ 0.37630892 0. 0. 0. 1.18970704]
[ 0. 1.97415686 1.0844903 0. 0.84643292]
[ 5.23706436 2.46556497 0. 1.17715418 4.1117816 ]
[ 5.86972761 1.2213701 1.59536231 2.66231227 2.28650784]
[ 0. 0. 4.3449688 2.67776585 0. ]]
Input = the above [5x5] array
MaxPool, stride size = [1x1], results in the [4x4] array:
[[ 1.97415686 1.97415686 1.0844903 1.18970704]
[ 5.23706436 2.46556497 1.17715418 4.1117816 ]
[ 5.86972761 2.46556497 2.66231227 4.1117816 ]
[ 5.86972761 4.3449688 4.3449688 2.67776585]]
Input = the above [4x4] array
Fully connected layer on all four rows with five outputs:
[-0.6154139 -1.96987963 -1.88811922 0.20010889 0.32519674]
工作原理
我们现在应该知道如何在 TensorFlow 中使用一维和二维数据中的卷积和最大池化层。无论输入的形状如何,我们最终都得到相同的大小输出。这对于说明神经网络层的灵活性很重要。本节还应该再次向我们强调形状和大小在神经网络操作中的重要性。
使用多层神经网络
我们现在将通过在低出生体重数据集上使用多层神经网络将我们对不同层的知识应用于实际数据。
准备
现在我们知道如何创建神经网络并使用层,我们将应用此方法,以预测低出生体重数据集中的出生体重。我们将创建一个具有三个隐藏层的神经网络。低出生体重数据集包括实际出生体重和出生体重是否高于或低于 2,500 克的指标变量。在这个例子中,我们将目标设为实际出生体重(回归),然后在最后查看分类的准确率。最后,我们的模型应该能够确定出生体重是否小于 2,500 克。
操作步骤
我们按如下方式处理秘籍:
- 我们将首先加载库并初始化我们的计算图,如下所示:
import tensorflow as tf
import matplotlib.pyplot as plt
import os
import csv
import requests
import numpy as np
sess = tf.Session()
- 我们现在将使用
requests
模块从网站加载数据。在此之后,我们将数据拆分为感兴趣的特征和目标值,如下所示:
# Name of data file
birth_weight_file = 'birth_weight.csv'
birthdata_url = 'https://github.com/nfmcclure/tensorflow_cookbook/raw/master'
'/01_Introduction/07_Working_with_Data_Sources/birthweight_data/birthweight.dat'
# Download data and create data file if file does not exist in current directory
if not os.path.exists(birth_weight_file):
birth_file = requests.get(birthdata_url)
birth_data = birth_file.text.split('rn')
birth_header = birth_data[0].split('t')
birth_data = [[float(x) for x in y.split('t') if len(x) >= 1]
for y in birth_data[1:] if len(y) >= 1]
with open(birth_weight_file, "w") as f:
writer = csv.writer(f)
writer.writerows([birth_header])
writer.writerows(birth_data)
# Read birth weight data into memory
birth_data = []
with open(birth_weight_file, newline='') as csvfile:
csv_reader = csv.reader(csvfile)
birth_header = next(csv_reader)
for row in csv_reader:
birth_data.append(row)
birth_data = [[float(x) for x in row] for row in birth_data]
# Pull out target variable
y_vals = np.array([x[0] for x in birth_data])
# Pull out predictor variables (not id, not target, and not birthweight)
x_vals = np.array([x[1:8] for x in birth_data])
- 为了帮助实现可重复性,我们现在需要为 NumPy 和 TensorFlow 设置随机种子。然后我们声明我们的批量大小如下:
seed = 4
tf.set_random_seed(seed)
np.random.seed(seed)
batch_size = 100
- 接下来,我们将数据分成 80-20 训练测试分组。在此之后,我们需要正则化我们的输入特征,使它们在 0 到 1 之间,具有最小 - 最大缩放比例,如下所示:
train_indices = np.random.choice(len(x_vals), round(len(x_vals)*0.8), replace=False)
test_indices = np.array(list(set(range(len(x_vals))) - set(train_indices)))
x_vals_train = x_vals[train_indices]
x_vals_test = x_vals[test_indices]
y_vals_train = y_vals[train_indices]
y_vals_test = y_vals[test_indices]
# Normalize by column (min-max norm)
def normalize_cols(m, col_min=np.array([None]), col_max=np.array([None])):
if not col_min[0]:
col_min = m.min(axis=0)
if not col_max[0]:
col_max = m.max(axis=0)
return (m-col_min) / (col_max - col_min), col_min, col_max
x_vals_train, train_min, train_max = np.nan_to_num(normalize_cols(x_vals_train))
x_vals_test, _, _ = np.nan_to_num(normalize_cols(x_vals_test), train_min, train_max)
归一化输入特征是一种常见的特征转换,尤其适用于神经网络。如果我们的数据以 0 到 1 的中心为激活函数,它将有助于收敛。
- 由于我们有多个层具有相似的初始化变量,我们现在需要创建一个函数来初始化权重和偏差。我们使用以下代码执行此操作:
def init_weight(shape, st_dev):
weight = tf.Variable(tf.random_normal(shape, stddev=st_dev))
return weight
def init_bias(shape, st_dev):
bias = tf.Variable(tf.random_normal(shape, stddev=st_dev))
return bias
- 我们现在需要初始化占位符。将有八个输入特征和一个输出,出生重量以克为单位,如下所示:
x_data = tf.placeholder(shape=[None, 8], dtype=tf.float32)
y_target = tf.placeholder(shape=[None, 1], dtype=tf.float32)
- 对于所有三个隐藏层,完全连接的层将使用三次。为了防止重复代码,我们将在初始化模型时创建一个层函数,如下所示:
def fully_connected(input_layer, weights, biases):
layer = tf.add(tf.matmul(input_layer, weights), biases)
return tf.nn.relu(layer)
- 现在是时候创建我们的模型了。对于每个层(和输出层),我们将初始化权重矩阵,偏置矩阵和完全连接的层。对于此示例,我们将使用大小为 25,10 和 3 的隐藏层:
代码语言:javascript复制我们使用的模型将有 522 个变量适合。为了得到这个数字,我们可以看到数据和第一个隐藏层之间有
8*25 25=225
变量。如果我们以这种方式继续添加它们,我们将有225 260 33 4=522
变量。这远远大于我们在逻辑回归模型中使用的九个变量。
# Create second layer (25 hidden nodes)
weight_1 = init_weight(shape=[8, 25], st_dev=10.0)
bias_1 = init_bias(shape=[25], st_dev=10.0)
layer_1 = fully_connected(x_data, weight_1, bias_1)
# Create second layer (10 hidden nodes)
weight_2 = init_weight(shape=[25, 10], st_dev=10.0)
bias_2 = init_bias(shape=[10], st_dev=10.0)
layer_2 = fully_connected(layer_1, weight_2, bias_2)
# Create third layer (3 hidden nodes)
weight_3 = init_weight(shape=[10, 3], st_dev=10.0)
bias_3 = init_bias(shape=[3], st_dev=10.0)
layer_3 = fully_connected(layer_2, weight_3, bias_3)
# Create output layer (1 output value)
weight_4 = init_weight(shape=[3, 1], st_dev=10.0)
bias_4 = init_bias(shape=[1], st_dev=10.0)
final_output = fully_connected(layer_3, weight_4, bias_4)
- 我们现在将使用 L1 损失函数(绝对值),声明我们的优化器(使用 Adam 优化),并按如下方式初始化变量:
loss = tf.reduce_mean(tf.abs(y_target - final_output))
my_opt = tf.train.AdamOptimizer(0.05)
train_step = my_opt.minimize(loss)
init = tf.global_variables_initializer()
sess.run(init)
虽然我们在前一步骤中用于 Adam 优化函数的学习率是 0.05,但有研究表明较低的学习率始终产生更好的结果。对于这个秘籍,由于数据的一致性和快速收敛的需要,我们使用了更大的学习率。
- 接下来,我们需要训练我们的模型进行 200 次迭代。我们还将包含存储
train
和test
损失的代码,选择随机批量大小,并每 25 代打印一次状态,如下所示:
# Initialize the loss vectors
loss_vec = []
test_loss = []
for i in range(200):
# Choose random indices for batch selection
rand_index = np.random.choice(len(x_vals_train), size=batch_size)
# Get random batch
rand_x = x_vals_train[rand_index]
rand_y = np.transpose([y_vals_train[rand_index]])
# Run the training step
sess.run(train_step, feed_dict={x_data: rand_x, y_target: rand_y})
# Get and store the train loss
temp_loss = sess.run(loss, feed_dict={x_data: rand_x, y_target: rand_y})
loss_vec.append(temp_loss)
# Get and store the test loss
test_temp_loss = sess.run(loss, feed_dict={x_data: x_vals_test, y_target: np.transpose([y_vals_test])})
test_loss.append(test_temp_loss)
if (i 1)%==0:
print('Generation: ' str(i 1) '. Loss = ' str(temp_loss))
- 上一步应该产生以下输出:
Generation: 25. Loss = 5922.52
Generation: 50. Loss = 2861.66
Generation: 75. Loss = 2342.01
Generation: 100. Loss = 1880.59
Generation: 125. Loss = 1394.39
Generation: 150. Loss = 1062.43
Generation: 175. Loss = 834.641
Generation: 200. Loss = 848.54
- 以下是使用
matplotlib
绘制训练和测试损失的代码片段:
plt.plot(loss_vec, 'k-', label='Train Loss')
plt.plot(test_loss, 'r--', label='Test Loss')
plt.title('Loss per Generation')
plt.xlabel('Generation')
plt.ylabel('Loss')
plt.legend(loc='upper right')
plt.show()
我们通过绘制下图来继续秘籍:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u23ehDtt-1681566911066)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/2a730c9a-4bf5-4244-bfef-73ccca82eaa7.png)]
图 6:在上图中,我们绘制了我们训练的神经网络的训练和测试损失,以克数表示出生体重。请注意,大约 30 代后我们已经达到了良好的模型
- 我们现在想将我们的出生体重结果与我们之前的后勤结果进行比较。使用逻辑线性回归(如第 3 章中的实现逻辑回归秘籍,线性回归),我们在数千次迭代后获得了大约 60% 的准确率结果。为了将其与我们在上一节中所做的进行比较,我们需要输出训练并测试回归结果,并通过创建指标(如果它们高于或低于 2,500 克)将其转换为分类结果。要找出模型的准确率,我们需要使用以下代码:
actuals = np.array([x[1] for x in birth_data])
test_actuals = actuals[test_indices]
train_actuals = actuals[train_indices]
test_preds = [x[0] for x in sess.run(final_output, feed_dict={x_data: x_vals_test})]
train_preds = [x[0] for x in sess.run(final_output, feed_dict={x_data: x_vals_train})]
test_preds = np.array([1.0 if x<2500.0 else 0.0 for x in test_preds])
train_preds = np.array([1.0 if x<2500.0 else 0.0 for x in train_preds])
# Print out accuracies
test_acc = np.mean([x==y for x,y in zip(test_preds, test_actuals)])
train_acc = np.mean([x==y for x,y in zip(train_preds, train_actuals)])
print('On predicting the category of low birthweight from regression output (<2500g):')
print('Test Accuracy: {}'.format(test_acc))
print('Train Accuracy: {}'.format(train_acc))
- 上一步应该产生以下输出:
Test Accuracy: 0.631578947368421
Train Accuracy: 0.7019867549668874
工作原理
在这个秘籍中,我们创建了一个回归神经网络,其中包含三个完全连接的隐藏层,以预测低出生体重数据集的出生体重。当将其与物流输出进行比较以预测高于或低于 2,500 克时,我们获得了类似的结果并且在更少的几代中实现了它们。在下一个方案中,我们将尝试通过使其成为多层逻辑类神经网络来改进逻辑回归。
改进线性模型的预测
在前面的秘籍中,我们注意到我们拟合的参数数量远远超过等效的线性模型。在这个秘籍中,我们将尝试通过使用神经网络来改进我们的低出生体重的逻辑模型。
准备
对于这个秘籍,我们将加载低出生体重数据,并使用神经网络与两个隐藏的完全连接的层与 sigmoid 激活,以适应低出生体重的概率。
操作步骤
我们按如下方式处理秘籍:
- 我们首先加载库并初始化我们的计算图,如下所示:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import requests
sess = tf.Session()
- 接下来,我们按照前面的秘籍加载,提取和标准化我们的数据,除了在这里我们将使用低出生体重指示变量作为我们的目标而不是实际出生体重,如下所示:
# Name of data file
birth_weight_file = 'birth_weight.csv'
birthdata_url = 'https://github.com/nfmcclure/tensorflow_cookbook/raw/master'
'/01_Introduction/07_Working_with_Data_Sources/birthweight_data/birthweight.dat'
# Download data and create data file if file does not exist in current directory
if not os.path.exists(birth_weight_file):
birth_file = requests.get(birthdata_url)
birth_data = birth_file.text.split('rn')
birth_header = birth_data[0].split('t')
birth_data = [[float(x) for x in y.split('t') if len(x) >= 1]
for y in birth_data[1:] if len(y) >= 1]
with open(birth_weight_file, "w") as f:
writer = csv.writer(f)
writer.writerows([birth_header])
writer.writerows(birth_data)
# read birth weight data into memory
birth_data = []
with open(birth_weight_file, newline='') as csvfile:
csv_reader = csv.reader(csvfile)
birth_header = next(csv_reader)
for row in csv_reader:
birth_data.append(row)
birth_data = [[float(x) for x in row] for row in birth_data]
# Pull out target variable
y_vals = np.array([x[0] for x in birth_data])
# Pull out predictor variables (not id, not target, and not birthweight)
x_vals = np.array([x[1:8] for x in birth_data])
train_indices = np.random.choice(len(x_vals), round(len(x_vals)*0.8), replace=False)
test_indices = np.array(list(set(range(len(x_vals))) - set(train_indices)))
x_vals_train = x_vals[train_indices]
x_vals_test = x_vals[test_indices]
y_vals_train = y_vals[train_indices]
y_vals_test = y_vals[test_indices]
def normalize_cols(m, col_min=np.array([None]), col_max=np.array([None])):
if not col_min[0]:
col_min = m.min(axis=0)
if not col_max[0]:
col_max = m.max(axis=0)
return (m - col_min) / (col_max - col_min), col_min, col_max
x_vals_train, train_min, train_max = np.nan_to_num(normalize_cols(x_vals_train))
x_vals_test, _, _ = np.nan_to_num(normalize_cols(x_vals_test, train_min, train_max))
- 接下来,我们需要声明我们的批量大小和数据的占位符,如下所示:
batch_size = 90
x_data = tf.placeholder(shape=[None, 7], dtype=tf.float32)
y_target = tf.placeholder(shape=[None, 1], dtype=tf.float32)
- 如前所述,我们现在需要声明在模型中初始化变量和层的函数。为了创建更好的逻辑函数,我们需要创建一个在输入层上返回逻辑层的函数。换句话说,我们将使用完全连接的层并为每个层返回一个 sigmoid 元素。重要的是要记住我们的损失函数将包含最终的 sigmoid,因此我们要在最后一层指定我们不会返回输出的 sigmoid,如下所示:
def init_variable(shape):
return tf.Variable(tf.random_normal(shape=shape))
# Create a logistic layer definition
def logistic(input_layer, multiplication_weight, bias_weight, activation = True):
linear_layer = tf.add(tf.matmul(input_layer, multiplication_weight), bias_weight)
if activation:
return tf.nn.sigmoid(linear_layer)
else:
return linear_layer
- 现在我们将声明三个层(两个隐藏层和一个输出层)。我们将首先为每个层初始化权重和偏差矩阵,并按如下方式定义层操作:
# First logistic layer (7 inputs to 14 hidden nodes)
A1 = init_variable(shape=[7,14])
b1 = init_variable(shape=[14])
logistic_layer1 = logistic(x_data, A1, b1)
# Second logistic layer (14 hidden inputs to 5 hidden nodes)
A2 = init_variable(shape=[14,5])
b2 = init_variable(shape=[5])
logistic_layer2 = logistic(logistic_layer1, A2, b2)
# Final output layer (5 hidden nodes to 1 output)
A3 = init_variable(shape=[5,1])
b3 = init_variable(shape=[1])
final_output = logistic(logistic_layer2, A3, b3, activation=False)
- 接下来,我们声明我们的损失(交叉熵)和优化算法,并初始化以下变量:
# Create loss function
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=final_output, labels=y_target))
# Declare optimizer
my_opt = tf.train.AdamOptimizer(learning_rate = 0.002)
train_step = my_opt.minimize(loss)
# Initialize variables
init = tf.global_variables_initializer()
sess.run(init)
交叉熵是一种测量概率之间距离的方法。在这里,我们想要测量确定性(0 或 1)与模型概率(
0 < x < 1
)之间的差异。 TensorFlow 使用内置的 sigmoid 函数实现交叉熵。这也是超参数调整的一部分,因为我们更有可能找到最佳的损失函数,学习率和针对当前问题的优化算法。为简洁起见,我们不包括超参数调整。
- 为了评估和比较我们的模型与以前的模型,我们需要在图上创建预测和精度操作。这将允许我们提供整个测试集并确定准确率,如下所示:
prediction = tf.round(tf.nn.sigmoid(final_output))
predictions_correct = tf.cast(tf.equal(prediction, y_target), tf.float32)
accuracy = tf.reduce_mean(predictions_correct)
- 我们现在准备开始我们的训练循环。我们将训练 1500 代并保存模型损失并训练和测试精度以便以后进行绘图。我们的训练循环使用以下代码启动:
# Initialize loss and accuracy vectors loss_vec = [] train_acc = [] test_acc = []
for i in range(1500):
# Select random indicies for batch selection
rand_index = np.random.choice(len(x_vals_train), size=batch_size)
# Select batch
rand_x = x_vals_train[rand_index]
rand_y = np.transpose([y_vals_train[rand_index]])
# Run training step
sess.run(train_step, feed_dict={x_data: rand_x, y_target: rand_y})
# Get training loss
temp_loss = sess.run(loss, feed_dict={x_data: rand_x, y_target: rand_y})
loss_vec.append(temp_loss)
# Get training accuracy
temp_acc_train = sess.run(accuracy, feed_dict={x_data: x_vals_train, y_target: np.transpose([y_vals_train])})
train_acc.append(temp_acc_train)
# Get test accuracy
temp_acc_test = sess.run(accuracy, feed_dict={x_data: x_vals_test, y_target: np.transpose([y_vals_test])})
test_acc.append(temp_acc_test)
if (i 1)0==0:
print('Loss = '' str(temp_loss))
- 上一步应该产生以下输出:
Loss = 0.696393
Loss = 0.591708
Loss = 0.59214
Loss = 0.505553
Loss = 0.541974
Loss = 0.512707
Loss = 0.590149
Loss = 0.502641
Loss = 0.518047
Loss = 0.502616
- 以下代码块说明了如何使用
matplotlib
绘制交叉熵损失以及训练和测试集精度:
# Plot loss over time
plt.plot(loss_vec, 'k-')
plt.title('Cross Entropy Loss per Generation')
plt.xlabel('Generation')
plt.ylabel('Cross Entropy Loss')
plt.show()
# Plot train and test accuracy
plt.plot(train_acc, 'k-', label='Train Set Accuracy')
plt.plot(test_acc, 'r--', label='Test Set Accuracy')
plt.title('Train and Test Accuracy')
plt.xlabel('Generation')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')
plt.show()
我们得到每代交叉熵损失的图如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OsOLskCG-1681566911067)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/c3e4b9e2-8d06-45d0-b19f-890dbcef6c4a.png)]
图 7:超过 1500 次迭代的训练损失
在大约 50 代之内,我们已经达到了良好的模式。在我们继续训练时,我们可以看到在剩余的迭代中获得的很少,如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Z7bhUPKq-1681566911067)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/77fdb4ad-5bdf-4013-9768-1beca691cf5b.png)]
图 8:训练组和测试装置的准确率
正如您在上图中所看到的,我们很快就找到了一个好模型。
工作原理
在考虑使用神经网络建模数据时,您必须考虑优缺点。虽然我们的模型比以前的模型融合得更快,并且可能具有更高的准确率,但这需要付出代价;我们正在训练更多的模型变量,并且更有可能过拟合。为了检查是否发生过拟合,我们会查看测试和训练集的准确率。如果训练集的准确率继续增加而测试集的精度保持不变或甚至略微下降,我们可以假设过拟合正在发生。
为了对抗欠拟合,我们可以增加模型深度或训练模型以进行更多迭代。为了解决过拟合问题,我们可以为模型添加更多数据或添加正则化技术。
同样重要的是要注意我们的模型变量不像线性模型那样可解释。神经网络模型具有比线性模型更难解释的系数,因为它们解释了模型中特征的重要性。
学习玩井字棋
为了展示适应性神经网络的可用性,我们现在将尝试使用神经网络来学习井字棋的最佳动作。我们将知道井字棋是一种确定性游戏,并且最佳动作已经知道。
准备
为了训练我们的模型,我们将使用一系列的棋盘位置,然后对许多不同的棋盘进行最佳的最佳响应。我们可以通过仅考虑在对称性方面不同的棋盘位置来减少要训练的棋盘数量。井字棋棋盘的非同一性变换是 90 度,180 度和 270 度的旋转(在任一方向上),水平反射和垂直反射。鉴于这个想法,我们将使用最佳移动的候选棋盘名单,应用两个随机变换,然后将其输入神经网络进行学习。
由于井字棋是一个确定性的游戏,值得注意的是,无论谁先走,都应该赢或抽。我们希望能够以最佳方式响应我们的动作并最终获得平局的模型。
如果我们将X
标注为 1,将O
标注为 -1,将空格标注为 0,则下图说明了我们如何将棋盘位置和最佳移动视为一行数据:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-17St0ZeF-1681566911067)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/e71cb35d-172e-4f0b-bc3b-e50bd46ca956.png)]Figure 9: Here, we illustrate how to consider a board and an optimal move as a row of data. Note that X = 1, O = -1, and empty spaces are 0, and we start indexing at 0
除了模型损失,要检查我们的模型如何执行,我们将做两件事。我们将执行的第一项检查是从训练集中删除位置和最佳移动行。这将使我们能够看到神经网络模型是否可以推广它以前从未见过的移动。我们将评估模型的第二种方法是在最后实际对抗它。
可以在此秘籍的 GitHub 目录 和 Packt 仓库中找到可能的棋盘列表和最佳移动。
操作步骤
我们按如下方式处理秘籍:
- 我们需要从为此脚本加载必要的库开始,如下所示:
import tensorflow as tf
import matplotlib.pyplot as plt
import csv
import random
import numpy as np
import random
- 接下来,我们声明以下批量大小来训练我们的模型:
batch_size = 50
- 为了使棋盘更容易可视化,我们将创建一个输出带
X
和O
的井字棋棋盘的函数。这是通过以下代码完成的:
def print_board(board):
symbols = ['O', ' ', 'X']
board_plus1 = [int(x) 1 for x in board]
board_line1 = ' {} | {} | {}'.format(symbols[board_plus1[0]],
symbols[board_plus1[1]],
symbols[board_plus1[2]])
board_line2 = ' {} | {} | {}'.format(symbols[board_plus1[3]],
symbols[board_plus1[4]],
symbols[board_plus1[5]])
board_line3 = ' {} | {} | {}'.format(symbols[board_plus1[6]],
symbols[board_plus1[7]],
symbols[board_plus1[8]])
print(board_line1)
print('___________')
print(board_line2)
print('___________')
print(board_line3)
- 现在我们必须创建一个函数,它将返回一个新的棋盘和一个转换下的最佳响应位置。这是通过以下代码完成的:
def get_symmetry(board, response, transformation):
'''
:param board: list of integers 9 long:
opposing mark = -1
friendly mark = 1
empty space = 0
:param transformation: one of five transformations on a board:
rotate180, rotate90, rotate270, flip_v, flip_h
:return: tuple: (new_board, new_response)
'''
if transformation == 'rotate180':
new_response = 8 - response
return board[::-1], new_response
elif transformation == 'rotate90':
new_response = [6, 3, 0, 7, 4, 1, 8, 5, 2].index(response)
tuple_board = list(zip(*[board[6:9], board[3:6], board[0:3]]))
return [value for item in tuple_board for value in item], new_response
elif transformation == 'rotate270':
new_response = [2, 5, 8, 1, 4, 7, 0, 3, 6].index(response)
tuple_board = list(zip(*[board[0:3], board[3:6], board[6:9]]))[::-1]
return [value for item in tuple_board for value in item], new_response
elif transformation == 'flip_v':
new_response = [6, 7, 8, 3, 4, 5, 0, 1, 2].index(response)
return board[6:9] board[3:6] board[0:3], new_response
elif transformation == 'flip_h':
# flip_h = rotate180, then flip_v
new_response = [2, 1, 0, 5, 4, 3, 8, 7, 6].index(response)
new_board = board[::-1]
return new_board[6:9] new_board[3:6] new_board[0:3], new_response
else:
raise ValueError('Method not implmented.')
- 棋盘列表及其最佳响应位于目录中的
.csv
文件中,可从 github 仓库或 Packt 仓库获得。我们将创建一个函数,它将使用棋盘和响应加载文件,并将其存储为元组列表,如下所示:
def get_moves_from_csv(csv_file):
'''
:param csv_file: csv file location containing the boards w/ responses
:return: moves: list of moves with index of best response
'''
moves = []
with open(csv_file, 'rt') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
for row in reader:
moves.append(([int(x) for x in row[0:9]],int(row[9])))
return moves
- 现在我们需要将所有内容组合在一起以创建一个函数,该函数将返回随机转换的棋盘和响应。这是通过以下代码完成的:
def get_rand_move(moves, rand_transforms=2):
# This function performs random transformations on a board.
(board, response) = random.choice(moves)
possible_transforms = ['rotate90', 'rotate180', 'rotate270', 'flip_v', 'flip_h']
for i in range(rand_transforms):
random_transform = random.choice(possible_transforms)
(board, response) = get_symmetry(board, response, random_transform)
return board, response
- 接下来,我们需要初始化图会话,加载数据,并创建一个训练集,如下所示:
sess = tf.Session()
moves = get_moves_from_csv('base_tic_tac_toe_moves.csv')
# Create a train set:
train_length = 500
train_set = []
for t in range(train_length):
train_set.append(get_rand_move(moves))
- 请记住,我们希望从我们的训练集中删除一个棋盘和一个最佳响应,以查看该模型是否可以推广以实现最佳移动。以下棋盘的最佳举措将是在第 6 号指数进行:
test_board = [-1, 0, 0, 1, -1, -1, 0, 0, 1]
train_set = [x for x in train_set if x[0] != test_board]
- 我们现在可以创建函数来创建模型变量和模型操作。请注意,我们在以下模型中不包含
softmax()
激活函数,因为它包含在损失函数中:
def init_weights(shape):
return tf.Variable(tf.random_normal(shape))
def model(X, A1, A2, bias1, bias2):
layer1 = tf.nn.sigmoid(tf.add(tf.matmul(X, A1), bias1))
layer2 = tf.add(tf.matmul(layer1, A2), bias2)
return layer2
- 现在我们需要声明我们的占位符,变量和模型,如下所示:
X = tf.placeholder(dtype=tf.float32, shape=[None, 9])
Y = tf.placeholder(dtype=tf.int32, shape=[None])
A1 = init_weights([9, 81])
bias1 = init_weights([81])
A2 = init_weights([81, 9])
bias2 = init_weights([9])
model_output = model(X, A1, A2, bias1, bias2)
- 接下来,我们需要声明我们的
loss
函数,它将是最终输出对率的平均 softmax(非标准化输出)。然后我们将声明我们的训练步骤和优化器。如果我们希望将来能够对抗我们的模型,我们还需要创建一个预测操作,如下所示:
loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=model_output, labels=Y))
train_step = tf.train.GradientDescentOptimizer(0.025).minimize(loss)
prediction = tf.argmax(model_output, 1)
- 我们现在可以使用以下代码初始化变量并循环遍历神经网络的训练:
# Initialize variables
init = tf.global_variables_initializer()
sess.run(init)
loss_vec = []
for i in range(10000):
# Select random indices for batch
rand_indices = np.random.choice(range(len(train_set)), batch_size, replace=False)
# Get batch
batch_data = [train_set[i] for i in rand_indices]
x_input = [x[0] for x in batch_data]
y_target = np.array([y[1] for y in batch_data])
# Run training step
sess.run(train_step, feed_dict={X: x_input, Y: y_target})
# Get training loss
temp_loss = sess.run(loss, feed_dict={X: x_input, Y: y_target})
loss_vec.append(temp_loss)
代码语言:javascript复制 if iP0==0:
print('iteration ' str(i) ' Loss: ' str(temp_loss))
- 以下是绘制模型训练损失所需的代码:
plt.plot(loss_vec, 'k-', label='Loss')
plt.title('Loss (MSE) per Generation')
plt.xlabel('Generation')
plt.ylabel('Loss')
plt.show()
我们应该得到以下每代损失的绘图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sMTOOQmn-1681566911067)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/598821e9-0461-40de-bb4e-d6c72b9d6a28.png)]
图 10:超过 10,000 次迭代的井字棋训练组损失
在上图中,我们绘制了训练步骤的损失。
- 为了测试模型,我们需要看看它是如何在我们从训练集中删除的测试棋盘上执行的。我们希望模型可以推广和预测移动的最佳索引,这将是索引号 6。大多数时候模型将成功,如下所示:
test_boards = [test_board]
feed_dict = {X: test_boards}
logits = sess.run(model_output, feed_dict=feed_dict)
predictions = sess.run(prediction, feed_dict=feed_dict)
print(predictions)
- 上一步应该产生以下输出:
[6]
- 为了评估我们的模型,我们需要与我们训练的模型进行对比。要做到这一点,我们必须创建一个能够检查胜利的函数。这样,我们的程序将知道何时停止要求更多动作。这是通过以下代码完成的:
def check(board):
wins = [[0,1,2], [3,4,5], [6,7,8], [0,3,6], [1,4,7], [2,5,8], [0,4,8], [2,4,6]]
for i in range(len(wins)):
if board[wins[i][0]]==board[wins[i][1]]==board[wins[i][2]]==1.:
return 1
elif board[wins[i][0]]==board[wins[i][1]]==board[wins[i][2]]==-1.:
return 1
return 0
- 现在我们可以使用我们的模型循环播放游戏。我们从一个空白棋盘(全零)开始,我们要求用户输入一个索引(0-8),然后我们将其输入到模型中进行预测。对于模型的移动,我们采用最大的可用预测,也是一个开放空间。从这个游戏中,我们可以看到我们的模型并不完美,如下所示:
game_tracker = [0., 0., 0., 0., 0., 0., 0., 0., 0.]
win_logical = False
num_moves = 0
while not win_logical:
player_index = input('Input index of your move (0-8): ')
num_moves = 1
# Add player move to game
game_tracker[int(player_index)] = 1.
# Get model's move by first getting all the logits for each index
[potential_moves] = sess.run(model_output, feed_dict={X: [game_tracker]})
# Now find allowed moves (where game tracker values = 0.0)
allowed_moves = [ix for ix,x in enumerate(game_tracker) if x==0.0]
# Find best move by taking argmax of logits if they are in allowed moves
model_move = np.argmax([x if ix in allowed_moves else -999.0 for ix,x in enumerate(potential_moves)])
# Add model move to game
game_tracker[int(model_move)] = -1.
print('Model has moved')
print_board(game_tracker)
# Now check for win or too many moves
if check(game_tracker)==1 or num_moves>=5:
print('Game Over!')
win_logical = True
- 上一步应该产生以下交互输出:
Input index of your move (0-8): 4
Model has moved
O | |
___________
| X |
___________
| |
Input index of your move (0-8): 6
Model has moved
O | |
___________
| X |
___________
X | | O
Input index of your move (0-8): 2
Model has moved
O | | X
___________
O | X |
___________
X | | O
Game Over!
工作原理
在本节中,我们通过馈送棋盘位置和九维向量训练神经网络来玩井字棋,并预测最佳响应。我们只需要喂几个可能的井字棋棋盘并对每个棋盘应用随机变换以增加训练集大小。
为了测试我们的算法,我们删除了一个特定棋盘的所有实例,并查看我们的模型是否可以推广以预测最佳响应。最后,我们针对我们的模型玩了一个示例游戏。虽然它还不完善,但仍有不同的架构和训练程序可用于改进它。
七、自然语言处理
在本章中,我们将向您介绍如何使用 TensorFlow 中的文本。我们将首先介绍单词嵌入如何使用词袋方法,然后我们将继续实现更高级的嵌入,如 word2vec 和 doc2vec。
在本章中,我们将介绍以下主题:
- 使用词袋
- 实现 TF-IDF
- 使用 Skip-Gram 嵌入
- 使用 CBOW 嵌入
- 使用 word2vec 进行预测
- 使用 doc2vec 进行情感分析
请注意,读者可以在 Github 和 Packt 仓库中找到本章的所有代码。
介绍
到目前为止,我们只考虑过主要使用数字输入的机器学习算法。如果我们想要使用文本,我们必须找到一种方法将文本转换为数字。有很多方法可以做到这一点,我们将在本章中探讨一些常用的方法。
如果我们考虑句子TensorFlow makes machine learning easy
,我们可以按照我们观察它们的顺序将单词转换为数字。这将使句子成为1 2 3 4 5
。然后,当我们看到一个新句子machine learning is easy
时,我们可以将其翻译为3 4 0 5,
,表示我们没有看到的索引为零的单词。通过这两个例子,我们将词汇量限制为六个数字。对于大块文本,我们可以选择我们想要保留多少单词,并且通常保留最常用的单词,用零索引标记其他所有单词。
如果单词learning
的数值为 4,单词makes
的数值为 2,则自然会认为learning
是makes
的两倍。由于我们不希望单词之间存在这种类型的数字关系,我们可以假设这些数字代表的是类别,而不是关系数字。
另一个问题是这两个句子的大小不同。我们所做的每个观察(在这种情况下,句子)需要具有与我们希望创建的模型相同的大小输入。为了解决这个问题,我们必须在稀疏向量中创建每个句子,如果该单词出现在该索引中,则该特定索引中的值为 1:
TensorFlow | makes | machine | learning | easy |
---|---|---|---|---|
1 | 2 | 3 | 4 | 5 |
first_sentence = [0,1,1,1,1,1]
为了进一步解释前面的向量,我们的词汇由六个不同的单词组成(五个已知单词和一个未知单词)。对于这些单词中的每一个,我们要么具有零值或 1 值。零表示单词不出现在我们的句子中,1 表示它至少出现一次。因此值为零表示该单词不会出现,值为 1 表示它出现
machine | learning | is | easy |
---|---|---|---|
3 | 4 | 0 | 5 |
second_sentence = [1,0,0,1,1,1]
这种方法的缺点是我们失去了任何词序的指示。两个句子TensorFlow makes machine learning easy
和machine learning makes TensorFlow easy
将产生相同的句子向量。
值得注意的是,这些向量的长度等于我们选择的词汇量的大小。选择非常大的词汇量是很常见的,因此这些句子向量可能非常稀疏。这种类型的嵌入称为词袋。我们将在下一节中实现这一点。
另一个缺点是单词is
和TensorFlow
具有相同的数字索引值:1。有意义的是,单词is
可能不如单词TensorFlow
的出现重要。
我们将在本章中探索不同类型的嵌入,试图解决这些问题,但首先我们将开始实现字袋算法。
使用词袋嵌入
在本节中,我们将首先向您展示如何使用 TensorFlow 中的词袋嵌入。这种映射是我们在介绍中介绍的。在这里,我们将向您展示如何使用此类嵌入进行垃圾邮件预测。
准备
为了说明如何在文本数据集中使用词袋,我们将使用来自 UCI 机器学习数据仓库的垃圾邮件电话文本数据库。这是垃圾邮件或非垃圾邮件(非垃圾邮件)的电话短信集合。我们将下载此数据,将其存储以备将来使用,然后继续使用词袋方法来预测文本是否为垃圾邮件。将在词袋算法上运行的模型将是没有隐藏层的逻辑模型。我们将使用批量大小为 1 的随机训练,并在最后的保持测试集上计算精度。
操作步骤
对于这个例子,我们将首先获取数据,正则化和分割文本,通过嵌入函数运行它,并训练逻辑函数来预测垃圾邮件:
- 第一项任务是为此任务导入必要的库。在通常的库中,我们需要一个
.zip
文件库来解压缩来自 UCI 机器学习网站的数据,我们从中检索它:
import tensorflow as tf
import matplotlib.pyplot as plt
import os
import numpy as np
import csv
import string
import requests
import io
from zipfile import ZipFile
from tensorflow.contrib import learn
sess = tf.Session()
- 我们不会在每次运行脚本时下载文本数据,而是保存它并检查文件之前是否已保存。如果我们想要更改脚本的参数,这可以防止我们反复下载数据。下载此数据后,我们将提取输入和目标数据,并将目标更改为
1
以查找垃圾邮件,将0
更改为非垃圾邮件:
save_file_name = os.path.join('temp','temp_spam_data.csv')
if os.path.isfile(save_file_name):
text_data = []
with open(save_file_name, 'r') as temp_output_file:
reader = csv.reader(temp_output_file)
for row in reader:
text_data.append(row)
else:
zip_url = 'http://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip'
r = requests.get(zip_url)
z = ZipFile(io.BytesIO(r.content))
file = z.read('SMSSpamCollection')
# Format Data
text_data = file.decode()
text_data = text_data.encode('ascii',errors='ignore')
text_data = text_data.decode().split('n')
text_data = [x.split('t') for x in text_data if len(x)>=1]
# And write to csv
with open(save_file_name, 'w') as temp_output_file:
writer = csv.writer(temp_output_file)
writer.writerows(text_data)
texts = [x[1] for x in text_data]
target = [x[0] for x in text_data]
# Relabel 'spam' as 1, 'ham' as 0
target = [1 if x=='spam' else 0 for x in target]
- 为了减少潜在的词汇量,我们将文本正则化。为此,我们消除了文本中大小写和数字的影响。使用以下代码:
# Convert to lower case
texts = [x.lower() for x in texts]
# Remove punctuation
texts = [''.join(c for c in x if c not in string.punctuation) for x in texts]
# Remove numbers
texts = [''.join(c for c in x if c not in '0123456789') for x in texts]
# Trim extra whitespace
texts = [' '.join(x.split()) for x in texts]
- 我们还必须确定最大句子大小。为此,我们将查看数据集中文本长度的直方图。我们可以看到一个很好的截止可能是 25 个字左右。使用以下代码:
# Plot histogram of text lengths
text_lengths = [len(x.split()) for x in texts]
text_lengths = [x for x in text_lengths if x < 50]
plt.hist(text_lengths, bins=25)
plt.title('Histogram of # of Words in Texts')
sentence_size = 25
min_word_freq = 3
由此,我们将得到以下绘图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-A3jMCM35-1681566911068)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/da702c2c-702c-4ea7-bcf6-e778e7cfad70.png)]
图 1:数据中每个文本中单词数的直方图。
We use this to establish a maximum length of words to consider in each text. We set this to 25 words, but it can easily be set to 30 or 40 as well.
- TensorFlow 有一个内置的处理工具,用于确定名为
VocabularyProcessor()
的词汇嵌入,它位于learn.preprocessing
库中。请注意,您可能会使用此函数获得已弃用的警告:
vocab_processor = learn.preprocessing.VocabularyProcessor(sentence_size, min_frequency=min_word_freq)
vocab_processor.fit_transform(texts)
transformed_texts = np.array([x for x in vocab_processor.transform(texts)])
embedding_size = len(np.unique(transformed_texts))
- 现在我们将数据分成 80-20 训练和测试集:
train_indices = np.random.choice(len(texts), round(len(texts)*0.8), replace=False)
test_indices = np.array(list(set(range(len(texts))) - set(train_indices)))
texts_train = [x for ix, x in enumerate(texts) if ix in train_indices]
texts_test = [x for ix, x in enumerate(texts) if ix in test_indices]
target_train = [x for ix, x in enumerate(target) if ix in train_indices]
target_test = [x for ix, x in enumerate(target) if ix in test_indices]
- 接下来,我们声明单词的嵌入矩阵。句子词将被翻译成指数。这些索引将被转换为单热编码的向量,我们可以使用单位矩阵创建,这将是我们的单词嵌入的大小。我们将使用此矩阵查找每个单词的稀疏向量,并将它们一起添加到稀疏句子向量中。使用以下代码执行此操作:
identity_mat = tf.diag(tf.ones(shape=[embedding_size]))
- 由于我们最终会执行逻辑回归来预测垃圾邮件的概率,因此我们需要声明逻辑回归变量。然后我们也可以声明我们的数据占位符。值得注意的是,
x_data
输入占位符应该是整数类型,因为它将用于查找我们的单位矩阵的行索引。 TensorFlow 要求此查找为整数:
A = tf.Variable(tf.random_normal(shape=[embedding_size,1]))
b = tf.Variable(tf.random_normal(shape=[1,1]))
# Initialize placeholders
x_data = tf.placeholder(shape=[sentence_size], dtype=tf.int32)
y_target = tf.placeholder(shape=[1, 1], dtype=tf.float32)
- 现在我们将使用 TensorFlow 的嵌入查找函数,它将句子中单词的索引映射到我们单位矩阵的单热编码向量。当我们有这个矩阵时,我们通过总结上述单词向量来创建句子向量。使用以下代码执行此操作:
x_embed = tf.nn.embedding_lookup(identity_mat, x_data)
x_col_sums = tf.reduce_sum(x_embed, 0)
- 现在我们为每个句子都有固定长度的句子向量,我们想要进行逻辑回归。为此,我们需要声明实际的模型操作。由于我们一次只做一个数据点(随机训练),我们将扩展输入的维度并对其进行线性回归操作。请记住,TensorFlow 具有包含 sigmoid 函数的损失函数,因此我们不需要在此输出中包含它:
x_col_sums_2D = tf.expand_dims(x_col_sums, 0)
model_output = tf.add(tf.matmul(x_col_sums_2D, A), b)
- 我们现在将声明损失函数,预测操作和优化函数来训练模型。使用以下代码执行此操作:
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=model_output, labels=y_target))
# Prediction operation
prediction = tf.sigmoid(model_output)
# Declare optimizer
my_opt = tf.train.GradientDescentOptimizer(0.001)
train_step = my_opt.minimize(loss)
- 接下来,我们将在开始训练生成之前初始化图变量:
init = tf.global_variables_initializer()
sess.run(init)
- 现在我们将开始对句子进行迭代。 TensorFlow 的
vocab_processor.fit()
函数是一次运行一个句子的生成器。我们将利用这一优势,以便我们可以对物流模型进行随机训练。为了更好地了解准确率趋势,我们将保留过去 50 个训练步骤的平均值。如果我们只是绘制当前的一个,我们会看到 1 或 0,这取决于我们是否预测训练数据是否正确。使用以下代码执行此操作:
loss_vec = []
train_acc_all = []
train_acc_avg = []
for ix, t in enumerate(vocab_processor.fit_transform(texts_train)):
y_data = [[target_train[ix]]]
sess.run(train_step, feed_dict={x_data: t, y_target: y_data})
temp_loss = sess.run(loss, feed_dict={x_data: t, y_target: y_data})
loss_vec.append(temp_loss)
if (ix 1)==0:
print('Training Observation #{}: Loss= {}'.format(ix 1, temp_loss))
# Keep trailing average of past 50 observations accuracy
# Get prediction of single observation
[[temp_pred]] = sess.run(prediction, feed_dict={x_data:t, y_target:y_data})
# Get True/False if prediction is accurate
train_acc_temp = target_train[ix]==np.round(temp_pred)
train_acc_all.append(train_acc_temp)
if len(train_acc_all) >= 50:
train_acc_avg.append(np.mean(train_acc_all[-50:]))
- 这产生以下输出:
Starting Training Over 4459 Sentences.
Training Observation #10: Loss = 5.45322
Training Observation #20: Loss = 3.58226
Training Observation #30: Loss = 0.0
...
Training Observation #4430: Loss = 1.84636
Training Observation #4440: Loss = 1.46626e-05
Training Observation #4450: Loss = 0.045941
- 为了获得测试集的准确率,我们重复前面的过程,但仅限于预测操作,而不是测试集的训练操作:
print('Getting Test Set Accuracy')
test_acc_all = []
for ix, t in enumerate(vocab_processor.fit_transform(texts_test)):
y_data = [[target_test[ix]]]
if (ix 1)P==0:
print('Test Observation #{}'.format(ix 1))
# Keep trailing average of past 50 observations accuracy
# Get prediction of single observation
[[temp_pred]] = sess.run(prediction, feed_dict={x_data:t, y_target:y_data})
# Get True/False if prediction is accurate
test_acc_temp = target_test[ix]==np.round(temp_pred)
test_acc_all.append(test_acc_temp)
print('nOverall Test Accuracy: {}'.format(np.mean(test_acc_all)))
Getting Test Set Accuracy For 1115 Sentences.
Test Observation #10
Test Observation #20
Test Observation #30
...
Test Observation #1000
Test Observation #1050
Test Observation #1100
Overall Test Accuracy: 0.8035874439461883
工作原理
在本例中,我们使用了来自 UCI 机器学习库的垃圾邮件文本数据。我们使用 TensorFlow 的词汇处理函数来创建标准化词汇表来处理和创建句子向量,这些句子向量是每个文本的单词向量的总和。我们使用这个句子向量与逻辑回归并获得 80% 准确率模型来预测特定文本是否是垃圾邮件。
更多
值得一提的是限制句子(或文本)大小的动机。在此示例中,我们将文本大小限制为 25 个单词。这是词袋的常见做法,因为它限制了文本长度对预测的影响。你可以想象,如果我们找到一个单词,例如meeting
,它可以预测文本是非垃圾邮件(而不是垃圾邮件),那么垃圾邮件可能会通过在最后输入该单词的多次出现来实现。实际上,这是目标数据不平衡的常见问题。在这种情况下可能会出现不平衡的数据,因为垃圾邮件可能很难找到,而非垃圾邮件可能很容易找到。由于这个事实,我们创建的词汇可能严重偏向于我们数据的非垃圾邮件部分中表示的单词(更多非垃圾邮件意味着更多的单词在非垃圾邮件中表示而不是垃圾邮件)。如果我们允许无限长度的文本,那么垃圾邮件发送者可能会利用这一点并创建非常长的文本,这些文本在我们的逻辑模型中触发非垃圾邮件词因素的概率更高。
在下一节中,我们将尝试通过使用单词出现的频率来更好地解决此问题,以确定单词嵌入的值。
实现 TF-IDF
由于我们可以为每个单词选择嵌入,我们可能会决定更改某些单词的加权。一种这样的策略是增加有用的单词和减轻过度常见或罕见单词的权重。我们将在此秘籍中探索的嵌入是尝试实现此目的。
准备
TF-IDF 是一个缩写,代表文本频率 - 反向文档频率。该术语基本上是每个单词的文本频率和反向文档频率的乘积。
在前面的秘籍中,我们介绍了词袋方法,它为句子中每个单词的出现赋值为 1。这可能并不理想,因为每个类别的句子(前一个秘籍中的垃圾邮件和非垃圾邮件)很可能具有the
,and
和其他单词的相同频率,而诸如Viagra
和sale
之类的单词]可能应该更加重视查明文本是否是垃圾邮件。
首先,我们要考虑词频。在这里,我们考虑单个条目中单词出现的频率。这部分(TF)的目的是找到在每个条目中看起来很重要的项。
但是the
和and
等词可能会在每个条目中频繁出现。我们希望减轻这些单词的重要性,因此将前面的文本频率(TF)乘以整个文档频率的倒数可能有助于找到重要的单词。然而,由于文本集(语料库)可能非常大,因此通常采用逆文档频率的对数。这为我们留下了每个文档条目中每个单词的 TF-IDF 的以下公式:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d1ggXhlW-1681566911068)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/6d9df72e-4e84-45f4-864f-32730248af36.png)]
这里w[tf]
是文档中的单词频率,w[df]
是所有文档中这些单词的总频率。有意义的是,TF-IDF 的高值可能表示在确定文档内容时非常重要的单词。
创建 TF-IDF 向量要求我们将所有文本加载到内存中,并在开始训练模型之前计算每个单词的出现次数。因此,它没有在 TensorFlow 中完全实现,因此我们将使用 scikit-learn 来创建我们的 TF-IDF 嵌入,但是使用 TensorFlow 来适应逻辑模型。
操作步骤
我们将按如下方式处理秘籍:
- 我们将从加载必要的库开始。这次,我们正在为我们的文本加载 scikit-learn TF-IDF 预处理库。使用以下代码执行此操作:
import tensorflow as tf
import matplotlib.pyplot as plt
import csv
import numpy as np
import os
import string
import requests
import io
import nltk
from zipfile import ZipFile
from sklearn.feature_extraction.text import TfidfVectorizer
- 我们将开始一个图会话,并为我们的词汇表声明我们的批量大小和最大特征大小:
sess = tf.Session()
batch_size= 200
max_features = 1000
- 接下来,我们将从 Web 或我们的
temp
数据文件夹中加载数据(如果我们之前已保存过)。使用以下代码执行此操作:
save_file_name = os.path.join('temp','temp_spam_data.csv')
if os.path.isfile(save_file_name):
text_data = []
with open(save_file_name, 'r') as temp_output_file:
reader = csv.reader(temp_output_file)
for row in reader:
text_data.append(row)
else:
zip_url = 'http://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip'
r = requests.get(zip_url)
z = ZipFile(io.BytesIO(r.content))
file = z.read('SMSSpamCollection')
# Format Data
text_data = file.decode()
text_data = text_data.encode('ascii',errors='ignore')
text_data = text_data.decode().split('n')
text_data = [x.split('t') for x in text_data if len(x)>=1]
# And write to csv
with open(save_file_name, 'w') as temp_output_file:
writer = csv.writer(temp_output_file)
writer.writerows(text_data)
texts = [x[1] for x in text_data]
target = [x[0] for x in text_data]
# Relabel 'spam' as 1, 'ham' as 0
target = [1. if x=='spam' else 0. for x in target]
- 就像前面的秘籍一样,我们将通过将所有内容转换为小写,删除标点符号并删除数字来减少词汇量:
# Lower case
texts = [x.lower() for x in texts]
# Remove punctuation
texts = [''.join(c for c in x if c not in string.punctuation) for x in texts]
# Remove numbers
texts = [''.join(c for c in x if c not in '0123456789') for x in texts]
# Trim extra whitespace
texts = [' '.join(x.split()) for x in texts]
- 为了使用 scikt-learn 的 TF-IDF 处理函数,我们必须告诉它如何标记我们的句子。通过这个,我们只是指如何将句子分解为相应的单词。我们已经为我们构建了一个很好的标记器:
nltk
包可以很好地将句子分解为相应的单词:
def tokenizer(text):
words = nltk.word_tokenize(text)
return words
# Create TF-IDF of texts
tfidf = TfidfVectorizer(tokenizer=tokenizer, stop_words='english', max_features=max_features)
sparse_tfidf_texts = tfidf.fit_transform(texts)
- 接下来,我们将数据集分解为测试和训练集。使用以下代码执行此操作:
train_indices = np.random.choice(sparse_tfidf_texts.shape[0], round(0.8*sparse_tfidf_texts.shape[0]), replace=False)
test_indices = np.array(list(set(range(sparse_tfidf_texts.shape[0])) - set(train_indices)))
texts_train = sparse_tfidf_texts[train_indices]
texts_test = sparse_tfidf_texts[test_indices]
target_train = np.array([x for ix, x in enumerate(target) if ix in train_indices])
target_test = np.array([x for ix, x in enumerate(target) if ix in test_indices])
- 现在我们声明我们的逻辑回归模型变量和我们的数据占位符:
A = tf.Variable(tf.random_normal(shape=[max_features,1]))
b = tf.Variable(tf.random_normal(shape=[1,1]))
# Initialize placeholders
x_data = tf.placeholder(shape=[None, max_features], dtype=tf.float32)
y_target = tf.placeholder(shape=[None, 1], dtype=tf.float32)
- 我们现在可以声明模型操作和损失函数。请记住,逻辑回归的 sigmoid 部分在我们的损失函数中。使用以下代码执行此操作:
model_output = tf.add(tf.matmul(x_data, A), b)
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=model_output, labels=y_target))
- 我们将预测和精度函数添加到图中,以便在我们的模型训练时我们可以看到训练和测试集的准确率:
prediction = tf.round(tf.sigmoid(model_output))
predictions_correct = tf.cast(tf.equal(prediction, y_target), tf.float32)
accuracy = tf.reduce_mean(predictions_correct)
- 然后我们将声明一个优化器并初始化我们的图变量:
my_opt = tf.train.GradientDescentOptimizer(0.0025)
train_step = my_opt.minimize(loss)
# Intitialize Variables
init = tf.global_variables_initializer()
sess.run(init)
- 我们现在将训练我们的模型超过 10,000 代,并记录每 100 代的测试/训练损失和准确率,每 500 代打印一次。使用以下代码执行此操作:
train_loss = []
test_loss = []
train_acc = []
test_acc = []
i_data = []
for i in range(10000):
rand_index = np.random.choice(texts_train.shape[0], size=batch_size)
rand_x = texts_train[rand_index].todense()
rand_y = np.transpose([target_train[rand_index]])
sess.run(train_step, feed_dict={x_data: rand_x, y_target: rand_y})
# Only record loss and accuracy every 100 generations
if (i 1)0==0:
i_data.append(i 1)
train_loss_temp = sess.run(loss, feed_dict={x_data: rand_x, y_target: rand_y})
train_loss.append(train_loss_temp)
test_loss_temp = sess.run(loss, feed_dict={x_data: texts_test.todense(), y_target: np.transpose([target_test])})
test_loss.append(test_loss_temp)
train_acc_temp = sess.run(accuracy, feed_dict={x_data: rand_x, y_target: rand_y})
train_acc.append(train_acc_temp)
test_acc_temp = sess.run(accuracy, feed_dict={x_data: texts_test.todense(), y_target: np.transpose([target_test])})
test_acc.append(test_acc_temp)
if (i 1)P0==0:
acc_and_loss = [i 1, train_loss_temp, test_loss_temp, train_acc_temp, test_acc_temp]
acc_and_loss = [np.round(x,2) for x in acc_and_loss]
print('Generation # {}. Train Loss (Test Loss): {:.2f} ({:.2f}). Train Acc (Test Acc): {:.2f} ({:.2f})'.format(*acc_and_loss))
- 这产生以下输出:
Generation # 500. Train Loss (Test Loss): 0.69 (0.73). Train Acc (Test Acc): 0.62 (0.57)
Generation # 1000. Train Loss (Test Loss): 0.62 (0.63). Train Acc (Test Acc): 0.68 (0.66)
...
Generation # 9500. Train Loss (Test Loss): 0.39 (0.45). Train Acc (Test Acc): 0.89 (0.85)
Generation # 10000. Train Loss (Test Loss): 0.48 (0.45). Train Acc (Test Acc): 0.84 (0.85)
以下是绘制训练和测试装置的准确率和损耗的绘图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0nGx8Vd0-1681566911068)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/46d64e8f-a3e6-42db-abd6-d58abebce4c8.png)]
图 2:根据 TF-IDF 值构建的物流垃圾邮件模型的交叉熵损失
训练和测试精度图如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4wayuE1j-1681566911069)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/cfd1bce3-764c-4ddf-9f6c-847e76d7fdf7.png)]
图 3:根据 TF-IDF 值构建的逻辑垃圾邮件模型的训练和测试集精度
工作原理
使用模型的 TF-IDF 值增加了我们对先前的词袋模型的预测,从 80% 的准确率到接近 90% 的准确率。我们通过使用 scikit-learn 的 TF-IDF 词汇处理函数并使用这些 TF-IDF 值进行逻辑回归来实现这一目标。
更多
虽然我们可能已经解决了重要性这个问题,但我们还没有解决字序问题。词袋和 TF-IDF 都没有考虑句子中的单词的顺序特征。我们将在接下来的几节中尝试解决这个问题,这将向我们介绍 word2vec 技术。
使用 Skip-Gram 嵌入
在之前的秘籍中,我们在训练模型之前决定了我们的文本嵌入。使用神经网络,我们可以使嵌入值成为训练过程的一部分。我们将探索的第一个这样的方法叫做 Skip-Gram 嵌入。
准备
在此秘籍之前,我们没有考虑与创建单词嵌入相关的单词顺序。 2013 年初,Tomas Mikolov 和谷歌的其他研究人员撰写了一篇关于创建解决这个问题的单词嵌入的论文,他们将他们的方法命名为 word2vec。
基本思想是创建捕获单词关系方面的单词嵌入。我们试图了解各种单词如何相互关联。这些嵌入可能如何表现的一些示例如下:
king - man woman = queen
India pale ale - hops malt = stout
如果我们只考虑它们之间的位置关系,我们可能会实现这样的数字表示。如果我们能够分析足够大的相关文档来源,我们可能会发现在我们的文本中,king
,man
和queen
这两个词在彼此之间相互提及。如果我们也知道man
和woman
以不同的方式相关,那么我们可以得出结论,man
是king
,因为woman
是queen
,依此类推。
为了找到这样的嵌入,我们将使用一个神经网络来预测给定输入字的周围单词。我们可以轻松地切换它并尝试在给定一组周围单词的情况下预测目标单词,但我们将从前面的方法开始。两者都是 word2vec 过程的变体,但是从目标词预测周围词(上下文)的前述方法称为 Skip-Gram 模型。在下一个秘籍中,我们将实现另一个方法,从上下文预测目标词,这称为连续词袋方法(CBOW):
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BPZeDUTP-1681566911069)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/33e5cd5b-a002-43db-af41-8826c3cd5fc2.png)]
图 4:word2vec 的 Skip-Gram 实现的图示。 Skip-Gram 预测目标词的上下文窗口(每侧窗口大小为 1)。
对于这个秘籍,我们将在康奈尔大学的一组电影评论数据上实现 Skip-Gram 模型。 word2vec 的 CBOW 方法将在下一个秘籍中实现。
操作步骤
对于这个秘籍,我们将创建几个辅助函数。这些函数将加载数据,正则化文本,生成词汇表并生成数据批量。只有在这之后我们才开始训练我们的单词嵌入。为了清楚起见,我们不是预测任何目标变量,而是我们将拟合单词嵌入:
- 首先,我们将加载必要的库并启动图会话:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import random
import os
import string
import requests
import collections
import io
import tarfile
import urllib.request
from nltk.corpus import stopwords
sess = tf.Session()
- 然后我们声明一些模型参数。我们将一次查看 50 对单词嵌入(批量大小)。每个单词的嵌入大小将是一个长度为 200 的向量,我们只考虑 10,000 个最常用的单词(每隔一个单词将被归类为未知单词)。我们将训练 5 万代,并每 500 代打印一次。然后我们将声明一个我们将在损失函数中使用的
num_sampled
变量(我们将在后面解释),并且我们还声明了我们的 Skip-Gram 窗口大小。在这里,我们将窗口大小设置为 2,因此我们将查看目标每侧的周围两个单词。我们将通过名为nltk
的 Python 包设置我们的停用词。我们还想要一种方法来检查我们的单词嵌入是如何执行的,因此我们将选择一些常见的电影评论单词并从每 2,000 次迭代中打印出最近的邻居单词:
batch_size = 50
embedding_size = 200
vocabulary_size = 10000
generations = 50000
print_loss_every = 500
num_sampled = int(batch_size/2)
window_size = 2
stops = stopwords.words('english')
print_valid_every = 2000
valid_words = ['cliche', 'love', 'hate', 'silly', 'sad']
- 接下来,我们将声明我们的数据加载函数,该函数会检查以确保在下载之前我们没有下载数据。否则,如果之前保存了数据,它将从磁盘加载数据。使用以下代码执行此操作:
def load_movie_data():
save_folder_name = 'temp'
pos_file = os.path.join(save_folder_name, 'rt-polarity.pos')
neg_file = os.path.join(save_folder_name, 'rt-polarity.neg')
# Check if files are already downloaded
if os.path.exists(save_folder_name):
pos_data = []
with open(pos_file, 'r') as temp_pos_file:
for row in temp_pos_file:
pos_data.append(row)
neg_data = []
with open(neg_file, 'r') as temp_neg_file:
for row in temp_neg_file:
neg_data.append(row)
else: # If not downloaded, download and save
movie_data_url = 'http://www.cs.cornell.edu/people/pabo/movie-review-data/rt-polaritydata.tar.gz'
stream_data = urllib.request.urlopen(movie_data_url)
tmp = io.BytesIO()
while True:
s = stream_data.read(16384)
if not s:
break
tmp.write(s)
stream_data.close()
tmp.seek(0)
tar_file = tarfile.open(fileobj=tmp, mode='r:gz')
pos = tar_file.extractfile('rt-polaritydata/rt-polarity.pos')
neg = tar_file.extractfile('rt-polaritydata/rt-polarity.neg')
# Save pos/neg reviews
pos_data = []
for line in pos:
pos_data.append(line.decode('ISO-8859-1').encode('ascii',errors='ignore').decode())
neg_data = []
for line in neg:
neg_data.append(line.decode('ISO-8859-1').encode('ascii',errors='ignore').decode())
tar_file.close()
# Write to file
if not os.path.exists(save_folder_name):
os.makedirs(save_folder_name)
# Save files
with open(pos_file, 'w') as pos_file_handler:
pos_file_handler.write(''.join(pos_data))
with open(neg_file, 'w') as neg_file_handler:
neg_file_handler.write(''.join(neg_data))
texts = pos_data neg_data
target = [1]*len(pos_data) [0]*len(neg_data)
return(texts, target)
texts, target = load_movie_data()
- 接下来,我们将为文本创建正则化函数。此函数将输入字符串列表并使其为小写,删除标点,删除数字,删除额外的空格,并删除停用词。使用以下代码执行此操作:
def normalize_text(texts, stops):
# Lower case
texts = [x.lower() for x in texts]
# Remove punctuation
texts = [''.join(c for c in x if c not in string.punctuation) for x in texts]
# Remove numbers
texts = [''.join(c for c in x if c not in '0123456789') for x in texts]
# Remove stopwords
texts = [' '.join([word for word in x.split() if word not in (stops)]) for x in texts]
# Trim extra whitespace
texts = [' '.join(x.split()) for x in texts]
return(texts)
texts = normalize_text(texts, stops)
- 为了确保我们所有的电影评论都能提供信息,我们应该确保它们足够长,以包含重要的单词关系。我们会随意将其设置为三个或更多单词:
target = [target[ix] for ix, x in enumerate(texts) if len(x.split()) > 2]
texts = [x for x in texts if len(x.split()) > 2]
- 为了构建我们的词汇表,我们将创建一个函数来创建一个带有计数的单词字典。任何不常见的词都不会使我们的词汇量大小被截止,将被标记为
RARE
。使用以下代码执行此操作:
def build_dictionary(sentences, vocabulary_size):
# Turn sentences (list of strings) into lists of words
split_sentences = [s.split() for s in sentences]
words = [x for sublist in split_sentences for x in sublist]
# Initialize list of [word, word_count] for each word, starting with unknown
count = [['RARE', -1]]
# Now add most frequent words, limited to the N-most frequent (N=vocabulary size)
count.extend(collections.Counter(words).most_common(vocabulary_size-1))
# Now create the dictionary
word_dict = {}
# For each word, that we want in the dictionary, add it, then make it the value of the prior dictionary length
for word, word_count in count:
word_dict[word] = len(word_dict)
return(word_dict)
- 我们需要一个函数将一个句子列表转换为单词索引列表,我们可以将它们传递给嵌入查找函数。使用以下代码执行此操作:
def text_to_numbers(sentences, word_dict):
# Initialize the returned data
data = []
for sentence in sentences:
sentence_data = []
# For each word, either use selected index or rare word index
for word in sentence:
if word in word_dict:
word_ix = word_dict[word]
else:
word_ix = 0
sentence_data.append(word_ix)
data.append(sentence_data)
return data
- 现在我们可以实际创建我们的字典并将我们的句子列表转换为单词索引列表:
word_dictionary = build_dictionary(texts, vocabulary_size)
word_dictionary_rev = dict(zip(word_dictionary.values(), word_dictionary.keys()))
text_data = text_to_numbers(texts, word_dictionary)
- 从前面的单词字典中,我们可以查找我们在步骤 2 中选择的验证字的索引。使用以下代码执行此操作:
valid_examples = [word_dictionary[x] for x in valid_words]
- 我们现在将创建一个将返回 Skip-Gram 批次的函数。我们想训练一对单词,其中一个单词是训练输入(来自我们窗口中心的目标单词),另一个单词是从窗口中选择的。例如,句子
the cat in the hat
可能导致(输入,输出)对,如下所示:(the
,in
),(cat
,in
),(the
,in
),(hat
,in
)如果是目标词,我们每个方向的窗口大小为 2:
def generate_batch_data(sentences, batch_size, window_size, method='skip_gram'):
# Fill up data batch
batch_data = []
label_data = []
while len(batch_data) < batch_size:
# select random sentence to start
rand_sentence = np.random.choice(sentences)
# Generate consecutive windows to look at
window_sequences = [rand_sentence[max((ix-window_size),0):(ix window_size 1)] for ix, x in enumerate(rand_sentence)]
# Denote which element of each window is the center word of interest
label_indices = [ix if ix<window_size else window_size for ix,x in enumerate(window_sequences)]
# Pull out center word of interest for each window and create a tuple for each window
if method=='skip_gram':
batch_and_labels = [(x[y], x[:y] x[(y 1):]) for x,y in zip(window_sequences, label_indices)]
# Make it in to a big list of tuples (target word, surrounding word)
tuple_data = [(x, y_) for x,y in batch_and_labels for y_ in y]
else:
raise ValueError('Method {} not implmented yet.'.format(method))
# extract batch and labels
batch, labels = [list(x) for x in zip(*tuple_data)]
batch_data.extend(batch[:batch_size])
label_data.extend(labels[:batch_size])
# Trim batch and label at the end
batch_data = batch_data[:batch_size]
label_data = label_data[:batch_size]
# Convert to numpy array
batch_data = np.array(batch_data)
label_data = np.transpose(np.array([label_data]))
return batch_data, label_data
- 我们现在可以初始化嵌入矩阵,声明占位符,并初始化嵌入查找函数。使用以下代码执行此操作:
embeddings = tf.Variable(tf.random_uniform([vocabulary_size,
embedding_size], -1.0, 1.0))
# Create data/target placeholders
x_inputs = tf.placeholder(tf.int32, shape=[batch_size])
y_target = tf.placeholder(tf.int32, shape=[batch_size, 1])
valid_dataset = tf.constant(valid_examples, dtype=tf.int32)
# Lookup the word embedding:
embed = tf.nn.embedding_lookup(embeddings, x_inputs)
- 损失函数应该是诸如
softmax
之类的东西,它计算预测错误单词类别时的损失。但由于我们的目标有 10,000 个不同的类别,因此非常稀疏。这种稀疏性导致关于模型的拟合或收敛的问题。为了解决这个问题,我们将使用称为噪声对比误差的损失函数。这种 NCE 损失函数通过预测单词类与随机噪声预测将我们的问题转化为二元预测。num_sampled
参数指定批量变成随机噪声的程度:
nce_weights = tf.Variable(tf.truncated_normal([vocabulary_size,
embedding_size], stddev=1.0 / np.sqrt(embedding_size)))
nce_biases = tf.Variable(tf.zeros([vocabulary_size]))
loss = tf.reduce_mean(tf.nn.nce_loss(weights=nce_weights,
biases=nce_biases,
inputs=embed,
labels=y_target,
num_sampled=num_sampled,
num_classes=vocabulary_size))
- 现在我们需要创建一种方法来查找附近的单词到我们的验证单词。我们将通过计算验证集和所有单词嵌入之间的余弦相似性来完成此操作,然后我们可以为每个验证字打印出最接近的单词集。使用以下代码执行此操作:
norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1, keepdims=True))
normalized_embeddings = embeddings / norm
valid_embeddings = tf.nn.embedding_lookup(normalized_embeddings, valid_dataset)
similarity = tf.matmul(valid_embeddings, normalized_embeddings, transpose_b=True)
- 我们现在声明我们的优化函数并初始化我们的模型变量:
optimizer = tf.train.GradientDescentOptimizer(learning_rate=1.0).minimize(loss)
init = tf.global_variables_initializer()
sess.run(init)
- 现在我们可以训练我们的嵌入并在训练期间打印损失和最接近我们验证集的单词。使用以下代码执行此操作:
loss_vec = []
loss_x_vec = []
for i in range(generations):
batch_inputs, batch_labels = generate_batch_data(text_data, batch_size, window_size)
feed_dict = {x_inputs : batch_inputs, y_target : batch_labels}
# Run the train step
sess.run(optimizer, feed_dict=feed_dict)
# Return the loss
if (i 1) % print_loss_every == 0:
loss_val = sess.run(loss, feed_dict=feed_dict)
loss_vec.append(loss_val)
loss_x_vec.append(i 1)
print("Loss at step {} : {}".format(i 1, loss_val))
# Validation: Print some random words and top 5 related words
if (i 1) % print_valid_every == 0:
sim = sess.run(similarity, feed_dict=feed_dict)
for j in range(len(valid_words)):
valid_word = word_dictionary_rev[valid_examples[j]]
top_k = 5 # number of nearest neighbors
nearest = (-sim[j, :]).argsort()[1:top_k 1]
log_str = "Nearest to {}:".format(valid_word)
for k in range(top_k):
close_word = word_dictionary_rev[nearest[k]]
log_str = "%s %s," % (log_str, close_word)
print(log_str)
在前面的代码中,我们在调用
argsort
方法之前采用相似矩阵的否定。我们这样做是因为我们想要找到从最高相似度值到最低相似度值的索引,而不是相反。
- 这产生以下输出:
Loss at step 500 : 13.387781143188477
Loss at step 1000 : 7.240757465362549
Loss at step 49500 : 0.9395825862884521
Loss at step 50000 : 0.30323168635368347
Nearest to cliche: walk, intrigue, brim, eileen, dumber,
Nearest to love: plight, fiction, complete, lady, bartleby,
Nearest to hate: style, throws, players, fearlessness, astringent,
Nearest to silly: delivers, meow, regain, nicely, anger,
Nearest to sad: dizzying, variety, existing, environment, tunney,
工作原理
我们通过Skip-Gram
方法在电影评论数据集上训练了一个 word2vec 模型。我们下载了数据,将单词转换为带有字典的索引,并将这些索引号用作嵌入查找,我们对其进行了训练,以便附近的单词可以相互预测。
更多
乍一看,我们可能期望验证集的附近单词集合是同义词。事实并非如此,因为很少有同义词实际上在句子中彼此相邻。我们真正得到的是预测我们的数据集中哪些单词彼此接近。我们希望使用这样的嵌入将使预测更容易。
为了使用这些嵌入,我们必须使它们可重用并保存它们。我们将通过实现 CBOW 嵌入在下一个秘籍中执行此操作。
使用 CBOW 嵌入
在这个秘籍中,我们将实现 word2vec 的 CBOW(连续词袋)方法。它与Skip-Gram
方法非常相似,除了我们预测来自环境词周围窗口的单个目标词。
准备
在这个秘籍中,我们将实现 word2vec 的CBOW
方法。它与Skip-Gram
方法非常相似,只是我们预测来自环境词周围窗口的单个目标词。
在前面的示例中,我们将窗口和目标的每个组合视为一组配对的输入和输出,但是使用 CBOW,我们将周围的窗口嵌入添加到一起以获得一个嵌入来预测目标字嵌入:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nlbw1Y9X-1681566911069)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/5e2d25dc-7309-4f36-bb6a-b192653c4c62.png)]
图 5:如何在一个例子的窗口上创建 CBOW 嵌入数据的描述(每侧窗口大小为 1)
大多数代码都保持不变,除了我们需要改变我们创建嵌入的方式以及如何从句子生成数据。
为了使代码更易于阅读,我们已将所有主要函数移动到同一目录中名为text_helpers.py
的单独文件中。此函数保存数据加载,文本正则化,字典创建和批量生成函数。除非另有说明,否则这些函数与使用 Skip-Gram 嵌入秘籍中显示的完全相同。
操作步骤
我们将按如下方式处理秘籍:
- 我们将首先加载必要的库,包括前面提到的
text_helpers.py
脚本,我们将把我们的函数用于文本加载和操作。然后我们将开始一个图会话:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import random
import os
import pickle
import string
import requests
import collections
import io
import tarfile
import urllib.request
import text_helpers
from nltk.corpus import stopwords
sess = tf.Session()
- 我们要确保在开始保存之前存在临时数据和参数保存文件夹。使用以下代码检查:
# Make a saving directory if it doesn't exist
data_folder_name = 'temp'
if not os.path.exists(data_folder_name):
os.makedirs(data_folder_name)
- 然后我们将声明模型的参数,这与我们在上一个秘籍中对
Skip-Gram
方法所做的类似:
# Declare model parameters
batch_size = 500
embedding_size = 200
vocabulary_size = 2000
generations = 50000
model_learning_rate = 0.001
num_sampled = int(batch_size/2
window_size = 3
# Add checkpoints to training
save_embeddings_every = 5000
print_valid_every = 5000
print_loss_every = 100
# Declare stop words
stops = stopwords.words('english')
# We pick some test words. We are expecting synonyms to appear
valid_words = ['love', 'hate', 'happy', 'sad', 'man', 'woman']
- 我们已将数据加载和文本正则化函数移动到我们在开始时导入的单独文件中,此文件在 Github 仓库和 Packt 仓库中都可用。现在我们可以打电话给他们我们也只想要包含三个或更多单词的评论。使用以下代码:
texts, target = text_helpers.load_movie_data(data_folder_name) texts = text_helpers.normalize_text(texts, stops) # Texts must contain at least 3 words target = [target[ix] for ix, x in enumerate(texts) if len(x.split()) > 2] texts = [x for x in texts if len(x.split()) > 2]
- 现在我们将创建我们的词汇词典,这将帮助我们查找单词。当我们想要打印出最接近我们验证集的单词时,我们还需要一个反向字典来查找索引中的单词:
word_dictionary = text_helpers.build_dictionary(texts,
vocabulary_size)
word_dictionary_rev = dict(zip(word_dictionary.values(), word_dictionary.keys()))
text_data = text_helpers.text_to_numbers(texts, word_dictionary)
# Get validation word keys
valid_examples = [word_dictionary[x] for x in valid_words]
- 接下来,我们将初始化我们想要拟合的单词嵌入,并声明模型数据占位符。使用以下代码执行此操作:
embeddings = tf.Variable(tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0))
# Create data/target placeholders
x_inputs = tf.placeholder(tf.int32, shape=[batch_size,
2*window_size])
y_target = tf.placeholder(tf.int32, shape=[batch_size, 1])
valid_dataset = tf.constant(valid_examples, dtype=tf.int32)
- 我们现在可以创建一种处理嵌入一词的方法。由于 CBOW 模型添加了上下文窗口的嵌入,我们将创建一个循环并将所有嵌入添加到窗口中:
# Lookup the word embeddings and
# Add together window embeddings:
embed = tf.zeros([batch_size, embedding_size])
for element in range(2*window_size):
embed = tf.nn.embedding_lookup(embeddings, x_inputs[:, element])
- 我们将使用 TensorFlow 中内置的噪声对比误差损失函数,因为我们的分类输出太稀疏,无法使 softmax 收敛,如下所示:
# NCE loss parameters
nce_weights = tf.Variable(tf.truncated_normal([vocabulary_size,
embedding_size], stddev=1.0 / np.sqrt(embedding_size)))
nce_biases = tf.Variable(tf.zeros([vocabulary_size]))
# Declare loss function (NCE)
loss = tf.reduce_mean(tf.nn.nce_loss(weights=nce_weights,
biases=nce_biases,
inputs=embed,
labels=y_target,
num_sampled=num_sampled,
num_classes=vocabulary_size))
- 就像我们在 Skip-Gram 秘籍中所做的那样,我们将使用余弦相似性来打印离我们的验证字数据集最近的单词,以了解我们的嵌入如何工作。使用以下代码执行此操作:
norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1, keepdims=True))
normalized_embeddings = embeddings / norm
valid_embeddings = tf.nn.embedding_lookup(normalized_embeddings, valid_dataset)
similarity = tf.matmul(valid_embeddings, normalized_embeddings, transpose_b=True)
- 要保存嵌入,我们必须加载 TensorFlow
train.Saver
方法。这个方法默认保存整个图,但是我们可以给它一个参数来保存嵌入变量,我们也可以给它一个特定的名称。在这里,我们给它的名称与图中的变量名称相同:
saver = tf.train.Saver({"embeddings": embeddings})
- 我们现在将声明我们的优化函数并初始化我们的模型变量。使用以下代码执行此操作:
optimizer = tf.train.GradientDescentOptimizer(learning_rate=model_learning_rate).minimize(loss)
init = tf.global_variables_initializer()
sess.run(init)
- 最后,我们可以遍历我们的训练步骤,打印出损失,并将我们指定的嵌入和字典保存到:
loss_vec = []
loss_x_vec = []
for i in range(generations):
batch_inputs, batch_labels = text_helpers.generate_batch_data(text_data, batch_size, window_size, method='cbow')
feed_dict = {x_inputs : batch_inputs, y_target : batch_labels}
# Run the train step
sess.run(optimizer, feed_dict=feed_dict)
# Return the loss
if (i 1) % print_loss_every == 0:
loss_val = sess.run(loss, feed_dict=feed_dict)
loss_vec.append(loss_val)
loss_x_vec.append(i 1)
print('Loss at step {} : {}'.format(i 1, loss_val))
# Validation: Print some random words and top 5 related words
if (i 1) % print_valid_every == 0:
sim = sess.run(similarity, feed_dict=feed_dict)
for j in range(len(valid_words)):
valid_word = word_dictionary_rev[valid_examples[j]]
top_k = 5 # number of nearest neighbors
nearest = (-sim[j, :]).argsort()[1:top_k 1]
log_str = "Nearest to {}:".format(valid_word)
for k in range(top_k):
close_word = word_dictionary_rev[nearest[k]]
print_str = '{} {},'.format(log_str, close_word)
print(print_str)
# Save dictionary embeddings
if (i 1) % save_embeddings_every == 0:
# Save vocabulary dictionary
with open(os.path.join(data_folder_name,'movie_vocab.pkl'), 'wb') as f:
pickle.dump(word_dictionary, f)
# Save embeddings
model_checkpoint_path = os.path.join(os.getcwd(),data_folder_name,'cbow_movie_embeddings.ckpt')
save_path = saver.save(sess, model_checkpoint_path)
print('Model saved in file: {}'.format(save_path))
- 这产生以下输出:
Loss at step 100 : 62.04829025268555
Loss at step 200 : 33.182334899902344
...
Loss at step 49900 : 1.6794960498809814
Loss at step 50000 : 1.5071022510528564
Nearest to love: clarity, cult, cliched, literary, memory,
Nearest to hate: bringing, gifted, almost, next, wish,
Nearest to happy: ensemble, fall, courage, uneven, girls,
Nearest to sad: santa, devoid, biopic, genuinely, becomes,
Nearest to man: project, stands, none, soul, away,
Nearest to woman: crush, even, x, team, ensemble,
Model saved in file: .../temp/cbow_movie_embeddings.ckpt
text_helpers.py
文件中除了一个函数之外的所有函数都具有直接来自上一个秘籍的函数。我们将通过添加cbow
方法对generate_batch_data()
函数稍加补充,如下所示:
elif method=='cbow':
batch_and_labels = [(x[:y] x[(y 1):], x[y]) for x,y in zip(window_sequences, label_indices)]
# Only keep windows with consistent 2*window_size
batch_and_labels = [(x,y) for x,y in batch_and_labels if len(x)==2*window_size]
batch, labels = [list(x) for x in zip(*batch_and_labels)]
工作原理
此秘籍与使用 Skip-Gram 创建嵌入非常相似。主要区别在于我们如何生成数据并组合嵌入。
对于这个秘籍,我们加载数据,正则化文本,创建词汇词典,使用字典查找嵌入,组合嵌入,并训练神经网络来预测目标词。
更多
值得注意的是,CBOW
方法训练周围窗口的累加嵌入以预测目标字。这样做的一个结果是来自 word2vec 的CBOW
方法具有Skip-Gram
方法缺乏的平滑效果,并且认为这对于较小的文本数据集可能是优选的是合理的。
使用 word2vec 进行预测
在本文中,我们将使用先前学习的嵌入策略来执行分类。
准备
现在我们已经创建并保存了 CBOW 字嵌入,我们需要使用它们来对电影数据集进行情感预测。在本文中,我们将学习如何加载和使用预先训练的嵌入,并使用这些嵌入来通过训练逻辑线性模型来预测好的或坏的评论来执行情感分析。
情感分析是一项非常艰巨的任务,因为人类语言使得很难掌握所谓意义的真实含义的微妙之处和细微差别。讽刺,笑话和含糊不清的引用都使这项任务成倍增加。我们将在电影评论数据集上创建一个简单的逻辑回归,以查看我们是否可以从我们在上一个秘籍中创建并保存的 CBOW 嵌入中获取任何信息。由于本文的重点是加载和使用已保存的嵌入,我们不会追求更复杂的模型。
操作步骤
我们将按如下方式处理秘籍:
- 我们将首先加载必要的库并启动图会话:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import random
import os
import pickle
import string
import requests
import collections
import io
import tarfile
import urllib.request
import text_helpers
from nltk.corpus import stopwords
sess = tf.Session()
- 现在我们将声明模型参数。嵌入大小应与我们用于创建前面的 CBOW 嵌入的嵌入大小相同。使用以下代码执行此操作:
embedding_size = 200
vocabulary_size = 2000
batch_size = 100
max_words = 100
stops = stopwords.words('english')
- 我们将从我们创建的
text_helpers.py
文件加载和转换文本数据。使用以下代码执行此操作:
texts, target = text_helpers.load_movie_data()
# Normalize text
print('Normalizing Text Data')
texts = text_helpers.normalize_text(texts, stops)
# Texts must contain at least 3 words
target = [target[ix] for ix, x in enumerate(texts) if len(x.split()) > 2]
texts = [x for x in texts if len(x.split()) > 2]
train_indices = np.random.choice(len(target), round(0.8*len(target)), replace=False)
test_indices = np.array(list(set(range(len(target))) - set(train_indices)))
texts_train = [x for ix, x in enumerate(texts) if ix in train_indices]
texts_test = [x for ix, x in enumerate(texts) if ix in test_indices]
target_train = np.array([x for ix, x in enumerate(target) if ix in train_indices])
target_test = np.array([x for ix, x in enumerate(target) if ix in test_indices])
- 我们现在加载我们在拟合 CBOW 嵌入时创建的单词字典。重要的是我们加载它以便我们具有从单词到嵌入索引的完全相同的映射,如下所示:
dict_file = os.path.join(data_folder_name, 'movie_vocab.pkl')
word_dictionary = pickle.load(open(dict_file, 'rb'))
- 我们现在可以使用我们的单词字典将我们加载的句子数据转换为数字
numpy
数组:
text_data_train = np.array(text_helpers.text_to_numbers(texts_train, word_dictionary))
text_data_test = np.array(text_helpers.text_to_numbers(texts_test, word_dictionary))
- 由于电影评论的长度不同,我们将它们标准化,因此它们的长度都相同。在我们的例子中,我们将其设置为 100 个单词。如果评论少于 100 个单词,我们将用零填充它。使用以下代码执行此操作:
text_data_train = np.array([x[0:max_words] for x in [y [0]*max_words for y in text_data_train]])
text_data_test = np.array([x[0:max_words] for x in [y [0]*max_words for y in text_data_test]])
- 现在我们将声明我们的模型变量和占位符以进行逻辑回归。使用以下代码执行此操作:
A = tf.Variable(tf.random_normal(shape=[embedding_size,1]))
b = tf.Variable(tf.random_normal(shape=[1,1]))
# Initialize placeholders
x_data = tf.placeholder(shape=[None, max_words], dtype=tf.int32)
y_target = tf.placeholder(shape=[None, 1], dtype=tf.float32)
- 为了让 TensorFlow 恢复我们预先训练的嵌入,我们必须首先给
Saver
方法一个变量来恢复,所以我们将创建一个嵌入变量,其形状与我们将加载的嵌入相同:
embeddings = tf.Variable(tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0))
- 现在我们将
embedding_lookup
函数放在图上,并将句子中所有单词的平均嵌入。使用以下代码执行此操作:
embed = tf.nn.embedding_lookup(embeddings, x_data)
# Take average of all word embeddings in documents
embed_avg = tf.reduce_mean(embed, 1)
- 接下来,我们将声明我们的模型操作和损失函数,记住我们的损失函数已经内置了 sigmoid 操作,如下所示:
model_output = tf.add(tf.matmul(embed_avg, A), b)
# Declare loss function (Cross Entropy loss)
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=model_output, labels=y_target))
- 现在我们将向图添加预测和精度函数,以便我们可以在使用以下代码训练模型时评估精度:
prediction = tf.round(tf.sigmoid(model_output))
predictions_correct = tf.cast(tf.equal(prediction, y_target), tf.float32)
accuracy = tf.reduce_mean(predictions_correct)
- 我们将声明我们的优化函数并初始化以下模型变量:
my_opt = tf.train.AdagradOptimizer(0.005)
train_step = my_opt.minimize(loss)
init = tf.global_variables_initializer()
sess.run(init)
- 现在我们有一个随机初始化嵌入,我们可以告诉
Saver
方法将我们之前的 CBOW 嵌入加载到嵌入变量中。使用以下代码执行此操作:
model_checkpoint_path = os.path.join(data_folder_name,'cbow_movie_embeddings.ckpt')
saver = tf.train.Saver({"embeddings": embeddings})
saver.restore(sess, model_checkpoint_path)
- 现在我们可以开始训练几代。请注意,我们每 100 代就可以节省训练和测试损失和准确率。我们只会每 500 代打印一次模型状态,如下所示:
train_loss = []
test_loss = []
train_acc = []
test_acc = []
i_data = []
for i in range(10000):
rand_index = np.random.choice(text_data_train.shape[0], size=batch_size)
rand_x = text_data_train[rand_index]
rand_y = np.transpose([target_train[rand_index]])
sess.run(train_step, feed_dict={x_data: rand_x, y_target: rand_y})
# Only record loss and accuracy every 100 generations
if (i 1)0==0:
i_data.append(i 1)
train_loss_temp = sess.run(loss, feed_dict={x_data: rand_x, y_target: rand_y})
train_loss.append(train_loss_temp)
test_loss_temp = sess.run(loss, feed_dict={x_data: text_data_test, y_target: np.transpose([target_test])})
test_loss.append(test_loss_temp)
train_acc_temp = sess.run(accuracy, feed_dict={x_data: rand_x, y_target: rand_y})
train_acc.append(train_acc_temp)
test_acc_temp = sess.run(accuracy, feed_dict={x_data: text_data_test, y_target: np.transpose([target_test])})
test_acc.append(test_acc_temp)
if (i 1)P0==0:
acc_and_loss = [i 1, train_loss_temp, test_loss_temp, train_acc_temp, test_acc_temp]
acc_and_loss = [np.round(x,2) for x in acc_and_loss]
print('Generation # {}. Train Loss (Test Loss): {:.2f} ({:.2f}). Train Acc (Test Acc): {:.2f} ({:.2f})'.format(*acc_and_loss))
- 结果如下:
Generation # 500. Train Loss (Test Loss): 0.70 (0.71). Train Acc (Test Acc): 0.52 (0.48)
Generation # 1000. Train Loss (Test Loss): 0.69 (0.72). Train Acc (Test Acc): 0.56 (0.47)
...
Generation # 9500. Train Loss (Test Loss): 0.69 (0.70). Train Acc (Test Acc): 0.57 (0.55)
Generation # 10000. Train Loss (Test Loss): 0.70 (0.70). Train Acc (Test Acc): 0.59 (0.55)
- 以下是绘制训练和测试损失和准确率的代码,我们每 100 代保存一次:
# Plot loss over time
plt.plot(i_data, train_loss, 'k-', label='Train Loss')
plt.plot(i_data, test_loss, 'r--', label='Test Loss', linewidth=4)
plt.title('Cross Entropy Loss per Generation')
plt.xlabel('Generation')
plt.ylabel('Cross Entropy Loss')
plt.legend(loc='upper right')
plt.show()
# Plot train and test accuracy
plt.plot(i_data, train_acc, 'k-', label='Train Set Accuracy')
plt.plot(i_data, test_acc, 'r--', label='Test Set Accuracy', linewidth=4)
plt.title('Train and Test Accuracy')
plt.xlabel('Generation')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')
plt.show()
每代交叉熵损失的图如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0QmX2gmU-1681566911070)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/f7717031-b704-4976-9b75-27fcf2042a9a.png)]Figure 6: Here we observe the train and test loss over 10,000 generations
上述代码的训练图和测试精度如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-w1C6X39z-1681566911070)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/3d330ede-7ab7-4ef5-8d54-43de2455437c.png)]
图 7:我们可以观察到训练和测试装置的准确率正在缓慢提高 10,000 代。值得注意的是,该模型表现非常差,并且仅比随机预测器略好。
工作原理
我们加载了我们之前的 CBOW 嵌入并对平均嵌入评论进行了逻辑回归。这里要注意的重要方法是我们如何将模型变量从磁盘加载到当前模型中已经初始化的变量。我们还必须记住在训练嵌入之前存储和加载我们创建的词汇表。使用相同的嵌入时,从单词到嵌入索引具有相同的映射非常重要。
更多
我们可以看到,我们在预测情感方面几乎达到了 60% 的准确率。例如,要知道单词great;
背后的含义是一项艰巨的任务,它可以在评论中用于消极或积极的背景。
为了解决这个问题,我们希望以某种方式为文档本身创建嵌入并解决情感问题。通常,整个评论是积极的,或者整个评论是否定的。我们可以利用这个优势,我们将在下面的使用 doc2vec 以获取情感分析方法中查看如何执行此操作。
使用 doc2vec 进行情感分析
既然我们知道如何训练单词嵌入,我们也可以扩展这些方法以进行文档嵌入。我们将在以下部分中探讨如何执行此操作。
准备
在前面关于 word2vec 方法的部分中,我们设法捕获了单词之间的位置关系。我们没有做的是捕捉单词与它们来自的文档(或电影评论)之间的关系。 word2vec 的一个扩展来捕获文档效果,称为 doc2vec。
doc2vec 的基本思想是引入文档嵌入,以及可能有助于捕获文档基调的单词嵌入。例如,只知道单词movie
和love
彼此接近可能无法帮助我们确定评论的情感。评论可能是谈论他们如何热爱电影或他们如何不爱电影。但是如果评论足够长并且在文档中找到了更多否定词,那么我们可以采用可以帮助我们预测后续词语的整体语气。
Doc2vec 只是为文档添加了一个额外的嵌入矩阵,并使用一个单词窗口加上文档索引来预测下一个单词。文档中的所有文字窗口都具有相同的文档索引。值得一提的是,考虑如何将文档嵌入与单词嵌入相结合是很重要的。我们通过对它们求和来将单词嵌入组合在单词窗口中。将这些嵌入与文档嵌入相结合有两种主要方式:通常,文档嵌入要么添加到单词嵌入中,要么连接到单词嵌入的末尾。如果我们添加两个嵌入,我们将文档嵌入大小限制为与嵌入字大小相同的大小。如果我们连接,我们解除了这个限制,但增加了逻辑回归必须处理的变量数量。为了便于说明,我们将向您展示如何处理此秘籍中的连接。但总的来说,对于较小的数据集,添加是更好的选择。
第一步是将文档和单词嵌入适用于整个电影评论集。然后我们将进行训练测试分组,训练逻辑模型,看看我们是否可以更准确地预测评论情感。
操作步骤
我们将按如下方式处理秘籍:
- 我们将从加载必要的库并启动图会话开始,如下所示:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import random
import os
import pickle
import string
import requests
import collections
import io
import tarfile
import urllib.request
import text_helpers
from nltk.corpus import stopwords
sess = tf.Session()
- 我们将加载电影评论语料库,就像我们在前两个秘籍中所做的那样。使用以下代码执行此操作:
texts, target = text_helpers.load_movie_data()
- 我们将声明模型参数,如下所示:
batch_size = 500
vocabulary_size = 7500
generations = 100000
model_learning_rate = 0.001
embedding_size = 200 # Word embedding size
doc_embedding_size = 100 # Document embedding size
concatenated_size = embedding_size doc_embedding_size
num_sampled = int(batch_size/2)
window_size = 3 # How many words to consider to the left.
# Add checkpoints to training
save_embeddings_every = 5000
print_valid_every = 5000
print_loss_every = 100
# Declare stop words
stops = stopwords.words('english')
# We pick a few test words.
valid_words = ['love', 'hate', 'happy', 'sad', 'man', 'woman']
- 我们将正则化电影评论,并确保每个电影评论都大于所需的窗口大小。使用以下代码执行此操作:
texts = text_helpers.normalize_text(texts, stops)
# Texts must contain at least as much as the prior window size
target = [target[ix] for ix, x in enumerate(texts) if len(x.split()) > window_size]
texts = [x for x in texts if len(x.split()) > window_size]
assert(len(target)==len(texts))
- 现在我们将创建我们的单词字典。请务必注意,我们不必创建文档字典。文件索引只是文件的索引;每个文档都有一个唯一的索引:
word_dictionary = text_helpers.build_dictionary(texts, vocabulary_size)
word_dictionary_rev = dict(zip(word_dictionary.values(), word_dictionary.keys()))
text_data = text_helpers.text_to_numbers(texts, word_dictionary)
# Get validation word keys
valid_examples = [word_dictionary[x] for x in valid_words]
- 接下来,我们将定义单词嵌入和文档嵌入。然后我们将声明我们的噪声对比损失参数。使用以下代码执行此操作:
embeddings = tf.Variable(tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0))
doc_embeddings = tf.Variable(tf.random_uniform([len(texts), doc_embedding_size], -1.0, 1.0))
# NCE loss parameters
nce_weights = tf.Variable(tf.truncated_normal([vocabulary_size, concatenated_size],
stddev=1.0 / np.sqrt(concatenated_size)))
nce_biases = tf.Variable(tf.zeros([vocabulary_size]))
- 我们现在将声明 doc2vec 索引和目标词索引的占位符。请注意,输入索引的大小是窗口大小加 1。这是因为我们生成的每个数据窗口都有一个附加的文档索引,如下所示:
x_inputs = tf.placeholder(tf.int32, shape=[None, window_size 1])
y_target = tf.placeholder(tf.int32, shape=[None, 1])
valid_dataset = tf.constant(valid_examples, dtype=tf.int32)
- 现在我们必须创建嵌入函数,它将单词嵌入加在一起,然后在最后连接文档嵌入。使用以下代码执行此操作:
embed = tf.zeros([batch_size, embedding_size])
for element in range(window_size):
embed = tf.nn.embedding_lookup(embeddings, x_inputs[:, element])
doc_indices = tf.slice(x_inputs, [0,window_size],[batch_size,1])
doc_embed = tf.nn.embedding_lookup(doc_embeddings,doc_indices)
# concatenate embeddings
final_embed = tf.concat(axis=1, values=)
- 我们还需要声明一组验证词的余弦距离,我们可以经常打印出来以观察 doc2vec 模型的进度。使用以下代码执行此操作:
loss = tf.reduce_mean(tf.nn.nce_loss(weights=nce_weights,
biases=nce_biases,
labels=y_target,
inputs=final_embed,
num_sampled=num_sampled,
num_classes=vocabulary_size))
# Create optimizer
optimizer =
tf.train.GradientDescentOptimizer(learning_rate=model_learning_rate)
train_step = optimizer.minimize(loss)
- 我们还需要从一组验证单词中声明余弦距离,我们可以经常打印出来以观察 doc2vec 模型的进度。使用以下代码执行此操作:
norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1,
keep_dims=True))
normalized_embeddings = embeddings / norm
valid_embeddings = tf.nn.embedding_lookup(normalized_embeddings,
valid_dataset)
similarity = tf.matmul(valid_embeddings, normalized_embeddings,
transpose_b=True)
- 为了以后保存我们的嵌入,我们将创建一个模型
saver
函数。然后我们可以初始化变量,这是我们开始训练单词嵌入之前的最后一步:
saver = tf.train.Saver({"embeddings": embeddings, "doc_embeddings":
doc_embeddings})
init = tf.global_variables_initializer()
sess.run(init)
loss_vec = []
loss_x_vec = []
for i in range(generations):
batch_inputs, batch_labels = text_helpers.generate_batch_data(text_data, batch_size,
window_size, method='doc2vec')
feed_dict = {x_inputs : batch_inputs, y_target : batch_labels}
# Run the train step
sess.run(train_step, feed_dict=feed_dict)
# Return the loss
if (i 1) % print_loss_every == 0:
loss_val = sess.run(loss, feed_dict=feed_dict)
loss_vec.append(loss_val)
loss_x_vec.append(i 1)
print('Loss at step {} : {}'.format(i 1, loss_val))
# Validation: Print some random words and top 5 related words
if (i 1) % print_valid_every == 0:
sim = sess.run(similarity, feed_dict=feed_dict)
for j in range(len(valid_words)):
valid_word = word_dictionary_rev[valid_examples[j]]
top_k = 5 # number of nearest neighbors
nearest = (-sim[j, :]).argsort()[1:top_k 1]
log_str = "Nearest to {}:".format(valid_word)
for k in range(top_k):
close_word = word_dictionary_rev[nearest[k]]
log_str = '{} {},'.format(log_str, close_word)
print(log_str)
# Save dictionary embeddings
if (i 1) % save_embeddings_every == 0:
# Save vocabulary dictionary
with open(os.path.join(data_folder_name,'movie_vocab.pkl'), 'wb') as f:
pickle.dump(word_dictionary, f)
# Save embeddings
model_checkpoint_path = os.path.join(os.getcwd(),data_folder_name,'doc2vec_movie_embeddings.ckpt')
save_path = saver.save(sess, model_checkpoint_path)
print('Model saved in file: {}'.format(save_path))
- 这产生以下输出:
Loss at step 100 : 126.176816940307617
Loss at step 200 : 89.608322143554688
...
Loss at step 99900 : 17.733346939086914
Loss at step 100000 : 17.384489059448242
Nearest to love: ride, with, by, its, start,
Nearest to hate: redundant, snapshot, from, performances, extravagant,
Nearest to happy: queen, chaos, them, succumb, elegance,
Nearest to sad: terms, pity, chord, wallet, morality,
Nearest to man: of, teen, an, our, physical,
Nearest to woman: innocuous, scenes, prove, except, lady,
Model saved in file: /.../temp/doc2vec_movie_embeddings.ckpt
- 现在我们已经训练了 doc2vec 嵌入,我们可以在逻辑回归中使用这些嵌入来预测评论情感。首先,我们为逻辑回归设置了一些参数。使用以下代码执行此操作:
max_words = 20 # maximum review word length
logistic_batch_size = 500 # training batch size
- 我们现在将数据集拆分为训练集和测试集:
train_indices = np.sort(np.random.choice(len(target),
round(0.8*len(target)), replace=False))
test_indices = np.sort(np.array(list(set(range(len(target))) -
set(train_indices))))
texts_train = [x for ix, x in enumerate(texts) if ix in train_indices]
texts_test = [x for ix, x in enumerate(texts) if ix in test_indices]
target_train = np.array([x for ix, x in enumerate(target) if ix in train_indices])
target_test = np.array([x for ix, x in enumerate(target) if ix in test_indices])
- 接下来,我们将评论转换为数字单词索引,并将每个评论填充或裁剪为 20 个单词,如下所示:
text_data_train = np.array(text_helpers.text_to_numbers(texts_train, word_dictionary)) text_data_test = np.array(text_helpers.text_to_numbers(texts_test, word_dictionary)) # Pad/crop movie reviews to specific length text_data_train = np.array([x[0:max_words] for x in [y [0]*max_words for y in text_data_train]]) text_data_test = np.array([x[0:max_words] for x in [y [0]*max_words for y in text_data_test]])
- 现在我们将声明图中与逻辑回归模型相关的部分。我们将添加数据占位符,变量,模型操作和损失函数,如下所示:
# Define Logistic placeholders
log_x_inputs = tf.placeholder(tf.int32, shape=[None, max_words 1])
log_y_target = tf.placeholder(tf.int32, shape=[None, 1])
A = tf.Variable(tf.random_normal(shape=[concatenated_size,1]))
b = tf.Variable(tf.random_normal(shape=[1,1]))
# Declare logistic model (sigmoid in loss function)
model_output = tf.add(tf.matmul(log_final_embed, A), b)
# Declare loss function (Cross Entropy loss)
logistic_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=model_output,
labels=tf.cast(log_y_target, tf.float32)))
- 我们需要创建另一个嵌入函数。前半部分中的嵌入函数在三个单词(和文档索引)的较小窗口上进行训练,以预测下一个单词。在这里,我们将采用相同的方式进行 20 字复习。使用以下代码执行此操作:
# Add together element embeddings in window:
log_embed = tf.zeros([logistic_batch_size, embedding_size])
for element in range(max_words):
log_embed = tf.nn.embedding_lookup(embeddings, log_x_inputs[:, element])
log_doc_indices = tf.slice(log_x_inputs, [0,max_words],[logistic_batch_size,1])
log_doc_embed = tf.nn.embedding_lookup(doc_embeddings,log_doc_indices)
# concatenate embeddings
log_final_embed = tf.concat(1, [log_embed, tf.squeeze(log_doc_embed)])
- 接下来,我们将在图上创建预测和准确率函数,以便我们可以在训练生成过程中评估模型的表现。然后我们将声明一个优化函数并初始化所有变量:
prediction = tf.round(tf.sigmoid(model_output))
predictions_correct = tf.cast(tf.equal(prediction, tf.cast(log_y_target, tf.float32)), tf.float32)
accuracy = tf.reduce_mean(predictions_correct)
# Declare optimizer
logistic_opt = tf.train.GradientDescentOptimizer(learning_rate=0.01)
logistic_train_step = logistic_opt.minimize(logistic_loss, var_list=[A, b])
# Intitialize Variables
init = tf.global_variables_initializer()
sess.run(init)
- 现在我们可以开始 Logistic 模型训练了:
train_loss = []
test_loss = []
train_acc = []
test_acc = []
i_data = []
for i in range(10000):
rand_index = np.random.choice(text_data_train.shape[0], size=logistic_batch_size)
rand_x = text_data_train[rand_index]
# Append review index at the end of text data
rand_x_doc_indices = train_indices[rand_index]
rand_x = np.hstack((rand_x, np.transpose([rand_x_doc_indices])))
rand_y = np.transpose([target_train[rand_index]])
feed_dict = {log_x_inputs : rand_x, log_y_target : rand_y}
sess.run(logistic_train_step, feed_dict=feed_dict)
# Only record loss and accuracy every 100 generations
if (i 1)0==0:
rand_index_test = np.random.choice(text_data_test.shape[0], size=logistic_batch_size)
rand_x_test = text_data_test[rand_index_test]
# Append review index at the end of text data
rand_x_doc_indices_test = test_indices[rand_index_test]
rand_x_test = np.hstack((rand_x_test, np.transpose([rand_x_doc_indices_test])))
rand_y_test = np.transpose([target_test[rand_index_test]])
test_feed_dict = {log_x_inputs: rand_x_test, log_y_target: rand_y_test}
i_data.append(i 1)
train_loss_temp = sess.run(logistic_loss, feed_dict=feed_dict)
train_loss.append(train_loss_temp)
test_loss_temp = sess.run(logistic_loss, feed_dict=test_feed_dict)
test_loss.append(test_loss_temp)
train_acc_temp = sess.run(accuracy, feed_dict=feed_dict)
train_acc.append(train_acc_temp)
test_acc_temp = sess.run(accuracy, feed_dict=test_feed_dict)
test_acc.append(test_acc_temp)
if (i 1)P0==0:
acc_and_loss = [i 1, train_loss_temp, test_loss_temp, train_acc_temp, test_acc_temp]
acc_and_loss = [np.round(x,2) for x in acc_and_loss]
print('Generation # {}. Train Loss (Test Loss): {:.2f} ({:.2f}). Train Acc (Test Acc): {:.2f} ({:.2f})'.format(*acc_and_loss))
- 这产生以下输出:
Generation # 500. Train Loss (Test Loss): 5.62 (7.45). Train Acc (Test Acc): 0.52 (0.48) Generation # 10000. Train Loss (Test Loss): 2.35 (2.51). Train Acc (Test Acc): 0.59 (0.58)
- 我们还应该注意到,我们在名为 doc2vec 的
text_helpers.generate_batch_data()
函数中创建了一个单独的数据批量生成方法,我们在本文的第一部分中使用它来训练 doc2vec 嵌入。以下是与该方法有关的该函数的摘录:
def generate_batch_data(sentences, batch_size, window_size, method='skip_gram'):
# Fill up data batch
batch_data = []
label_data = []
while len(batch_data) < batch_size:
# select random sentence to start
rand_sentence_ix = int(np.random.choice(len(sentences), size=1))
rand_sentence = sentences[rand_sentence_ix]
# Generate consecutive windows to look at
window_sequences = [rand_sentence[max((ix-window_size),0):(ix window_size 1)] for ix, x in enumerate(rand_sentence)]
# Denote which element of each window is the center word of interest
label_indices = [ix if ix<window_size else window_size for ix,x in enumerate(window_sequences)]
# Pull out center word of interest for each window and create a tuple for each window
if method=='skip_gram':
...
elif method=='cbow':
...
elif method=='doc2vec':
# For doc2vec we keep LHS window only to predict target word
batch_and_labels = [(rand_sentence[i:i window_size], rand_sentence[i window_size]) for i in range(0, len(rand_sentence)-window_size)]
batch, labels = [list(x) for x in zip(*batch_and_labels)]
# Add document index to batch!! Remember that we must extract the last index in batch for the doc-index
batch = [x [rand_sentence_ix] for x in batch]
else:
raise ValueError('Method {} not implmented yet.'.format(method))
# extract batch and labels
batch_data.extend(batch[:batch_size])
label_data.extend(labels[:batch_size])
# Trim batch and label at the end
batch_data = batch_data[:batch_size]
label_data = label_data[:batch_size]
# Convert to numpy array
batch_data = np.array(batch_data)
label_data = np.transpose(np.array([label_data]))
return batch_data, label_data
工作原理
在这个秘籍中,我们进行了两个训练循环。第一个是适合 doc2vec 嵌入,第二个循环是为了适应电影情感的逻辑回归。
虽然我们没有大幅度提高情感预测准确率(它仍然略低于 60%),但我们在电影语料库中成功实现了 doc2vec 的连接版本。为了提高我们的准确率,我们应该为 doc2vec 嵌入和可能更复杂的模型尝试不同的参数,因为逻辑回归可能无法捕获自然语言中的所有非线性行为。
八、卷积神经网络
卷积神经网络(CNN)负责过去几年中图像识别的重大突破。在本章中,我们将介绍以下主题:
- 实现简单的 CNN
- 实现高级的 CNN
- 重新训练现有的 CNN 模型
- 应用 Stylenet 和神经式项目
- 实现 DeepDream
提醒一下,读者可以在这里,以及 Packt 仓库找到本章的所有代码。
介绍
在数学中,卷积是应用于另一个函数的输出的函数。在我们的例子中,我们将考虑在图像上应用矩阵乘法(滤波器)。出于我们的目的,我们将图像视为数字矩阵。这些数字可以表示像素或甚至图像属性。我们将应用于这些矩阵的卷积运算包括在图像上移动固定宽度的滤波器并应用逐元素乘法来得到我们的结果。
有关图像卷积如何工作的概念性理解,请参见下图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-B7nIxhob-1681566911070)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/1a969461-cf3c-433e-9a1c-db07eec5db88.png)]
图 1:如何在图像上应用卷积滤镜(长度与宽度之间的深度),以创建新的特征层。这里,我们有一个2x2
卷积滤波器,在5x5
输入的有效空间中操作,两个方向的步幅为 1。结果是4x4
矩阵
CNN 还具有满足更多要求的其他操作,例如引入非线性(ReLU)或聚合参数(最大池化)以及其他类似操作。上图是在5x5
数组上应用卷积运算的示例,其中卷积滤波器是2x2
矩阵。步长为 1,我们只考虑有效的展示位置。此操作中的可训练变量将是2x2
滤波器权重。在卷积之后,通常会跟进聚合操作,例如最大池化。如果我们在两个方向上采用步幅为 2 的2x2
区域的最大值,下图提供了最大池如何操作的示例:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AE9JPgHf-1681566911070)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/1cc7f69a-e794-4bb1-b857-7a857482777f.png)]
图 2:最大池化操作如何运行的示例。这里,我们有一个2x2
窗口,在4x4
输入的有效空间上操作,两个方向的步幅为 2。结果是2x2
矩阵
虽然我们将首先创建自己的 CNN 进行图像识别,但强烈建议您使用现有的架构,我们将在本章的其余部分中进行操作。
通常采用预先训练好的网络并使用新数据集对其进行重新训练,并在最后使用新的完全连接层。这种方法非常有用,我们将在重新训练现有的 CNN 模型秘籍中进行说明,我们将重新训练现有的架构以改进我们的 CIFAR-10 预测。
实现简单的 CNN
在本文中,我们将开发一个四层卷积神经网络,以提高我们预测 MNIST 数字的准确率。前两个卷积层将各自由卷积-ReLU-最大池化操作组成,最后两个层将是完全连接的层。
准备
为了访问 MNIST 数据,TensorFlow 有一个examples.tutorials
包,它具有很好的数据集加载函数。加载数据后,我们将设置模型变量,创建模型,批量训练模型,然后可视化损失,准确率和一些样本数字。
操作步骤
执行以下步骤:
- 首先,我们将加载必要的库并启动图会话:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
from tensorflow.python.framework import ops
ops.reset_default_graph()
sess = tf.Session()
- 接下来,我们将加载数据并将图像转换为
28x28
数组:
data_dir = 'temp'
mnist = input_data.read_data_sets(data_dir, one_hot=False)
train_xdata = np.array([np.reshape(x, (28,28)) for x in mnist.train.images])
test_xdata = np.array([np.reshape(x, (28,28)) for x in mnist.test.images])
train_labels = mnist.train.labels
test_labels = mnist.test.labels
请注意,此处下载的 MNIST 数据集还包括验证集。此验证集通常与测试集的大小相同。如果我们进行任何超参数调整或模型选择,最好将其加载到其他测试中。
- 现在我们将设置模型参数。请记住,图像的深度(通道数)为 1,因为这些图像是灰度的:
batch_size = 100
learning_rate = 0.005
evaluation_size = 500
image_width = train_xdata[0].shape[0]
image_height = train_xdata[0].shape[1]
target_size = max(train_labels) 1
num_channels = 1
generations = 500
eval_every = 5
conv1_features = 25
conv2_features = 50
max_pool_size1 = 2
max_pool_size2 = 2
fully_connected_size1 = 100
- 我们现在可以声明数据的占位符。我们将声明我们的训练数据变量和测试数据变量。我们将针对训练和评估规模使用不同的批量大小。您可以根据可用于训练和评估的物理内存来更改这些内容:
x_input_shape = (batch_size, image_width, image_height, num_channels)
x_input = tf.placeholder(tf.float32, shape=x_input_shape)
y_target = tf.placeholder(tf.int32, shape=(batch_size))
eval_input_shape = (evaluation_size, image_width, image_height, num_channels)
eval_input = tf.placeholder(tf.float32, shape=eval_input_shape)
eval_target = tf.placeholder(tf.int32, shape=(evaluation_size))
- 我们将使用我们在前面步骤中设置的参数声明我们的卷积权重和偏差:
conv1_weight = tf.Variable(tf.truncated_normal([4, 4, num_channels, conv1_features], stddev=0.1, dtype=tf.float32))
conv1_bias = tf.Variable(tf.zeros([conv1_features],dtype=tf.float32))
conv2_weight = tf.Variable(tf.truncated_normal([4, 4, conv1_features, conv2_features], stddev=0.1, dtype=tf.float32))
conv2_bias = tf.Variable(tf.zeros([conv2_features],dtype=tf.float32))
- 接下来,我们将为模型的最后两层声明完全连接的权重和偏差:
resulting_width = image_width // (max_pool_size1 * max_pool_size2)
resulting_height = image_height // (max_pool_size1 * max_pool_size2)
full1_input_size = resulting_width * resulting_height*conv2_features
full1_weight = tf.Variable(tf.truncated_normal([full1_input_size, fully_connected_size1], stddev=0.1, dtype=tf.float32))
full1_bias = tf.Variable(tf.truncated_normal([fully_connected_size1], stddev=0.1, dtype=tf.float32))
full2_weight = tf.Variable(tf.truncated_normal([fully_connected_size1, target_size], stddev=0.1, dtype=tf.float32))
full2_bias = tf.Variable(tf.truncated_normal([target_size], stddev=0.1, dtype=tf.float32))
- 现在我们将宣布我们的模型。我们首先创建一个模型函数。请注意,该函数将在全局范围内查找所需的层权重和偏差。此外,为了使完全连接的层工作,我们将第二个卷积层的输出展平,这样我们就可以在完全连接的层中使用它:
def my_conv_net(input_data):
# First Conv-ReLU-MaxPool Layer
conv1 = tf.nn.conv2d(input_data, conv1_weight, strides=[1, 1, 1, 1], padding='SAME')
relu1 = tf.nn.relu(tf.nn.bias_add(conv1, conv1_bias))
max_pool1 = tf.nn.max_pool(relu1, ksize=[1, max_pool_size1, max_pool_size1, 1], strides=[1, max_pool_size1, max_pool_size1, 1], padding='SAME')
# Second Conv-ReLU-MaxPool Layer
conv2 = tf.nn.conv2d(max_pool1, conv2_weight, strides=[1, 1, 1, 1], padding='SAME')
relu2 = tf.nn.relu(tf.nn.bias_add(conv2, conv2_bias))
max_pool2 = tf.nn.max_pool(relu2, ksize=[1, max_pool_size2, max_pool_size2, 1], strides=[1, max_pool_size2, max_pool_size2, 1], padding='SAME')
# Transform Output into a 1xN layer for next fully connected layer
final_conv_shape = max_pool2.get_shape().as_list()
final_shape = final_conv_shape[1] * final_conv_shape[2] * final_conv_shape[3]
flat_output = tf.reshape(max_pool2, [final_conv_shape[0], final_shape])
# First Fully Connected Layer
fully_connected1 = tf.nn.relu(tf.add(tf.matmul(flat_output, full1_weight), full1_bias))
# Second Fully Connected Layer
final_model_output = tf.add(tf.matmul(fully_connected1, full2_weight), full2_bias)
return final_model_output
- 接下来,我们可以在训练和测试数据上声明模型:
model_output = my_conv_net(x_input)
test_model_output = my_conv_net(eval_input)
- 我们将使用的损失函数是 softmax 函数。我们使用稀疏 softmax,因为我们的预测只是一个类别,而不是多个类别。我们还将使用一个对对率而不是缩放概率进行操作的损失函数:
loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=model_output, labels=y_target))
- 接下来,我们将创建一个训练和测试预测函数。然后我们还将创建一个准确率函数来确定模型在每个批次上的准确率:
prediction = tf.nn.softmax(model_output)
test_prediction = tf.nn.softmax(test_model_output)
# Create accuracy function
def get_accuracy(logits, targets):
batch_predictions = np.argmax(logits, axis=1)
num_correct = np.sum(np.equal(batch_predictions, targets))
return 100. * num_correct/batch_predictions.shape[0]
- 现在我们将创建我们的优化函数,声明训练步骤,并初始化所有模型变量:
my_optimizer = tf.train.MomentumOptimizer(learning_rate, 0.9)
train_step = my_optimizer.minimize(loss)
# Initialize Variables
init = tf.global_variables_initializer()
sess.run(init)
- 我们现在可以开始训练我们的模型。我们以随机选择的批次循环数据。我们经常选择在训练上评估模型并测试批次并记录准确率和损失。我们可以看到,经过 500 代,我们可以在测试数据上快速达到 96%-97% 的准确率:
train_loss = []
train_acc = []
test_acc = []
for i in range(generations):
rand_index = np.random.choice(len(train_xdata), size=batch_size)
rand_x = train_xdata[rand_index]
rand_x = np.expand_dims(rand_x, 3)
rand_y = train_labels[rand_index]
train_dict = {x_input: rand_x, y_target: rand_y}
sess.run(train_step, feed_dict=train_dict)
temp_train_loss, temp_train_preds = sess.run([loss, prediction], feed_dict=train_dict)
temp_train_acc = get_accuracy(temp_train_preds, rand_y)
if (i 1) % eval_every == 0:
eval_index = np.random.choice(len(test_xdata), size=evaluation_size)
eval_x = test_xdata[eval_index]
eval_x = np.expand_dims(eval_x, 3)
eval_y = test_labels[eval_index]
test_dict = {eval_input: eval_x, eval_target: eval_y}
test_preds = sess.run(test_prediction, feed_dict=test_dict)
temp_test_acc = get_accuracy(test_preds, eval_y)
# Record and print results
train_loss.append(temp_train_loss)
train_acc.append(temp_train_acc)
test_acc.append(temp_test_acc)
acc_and_loss = [(i 1), temp_train_loss, temp_train_acc, temp_test_acc]
acc_and_loss = [np.round(x,2) for x in acc_and_loss]
print('Generation # {}. Train Loss: {:.2f}. Train Acc (Test Acc): {:.2f} ({:.2f})'.format(*acc_and_loss))
- 这产生以下输出:
Generation # 5. Train Loss: 2.37. Train Acc (Test Acc): 7.00 (9.80)
Generation # 10. Train Loss: 2.16. Train Acc (Test Acc): 31.00 (22.00)
Generation # 15. Train Loss: 2.11. Train Acc (Test Acc): 36.00 (35.20)
...
Generation # 490. Train Loss: 0.06. Train Acc (Test Acc): 98.00 (97.40)
Generation # 495. Train Loss: 0.10. Train Acc (Test Acc): 98.00 (95.40)
Generation # 500. Train Loss: 0.14. Train Acc (Test Acc): 98.00 (96.00)
- 以下是使用
Matplotlib
绘制损耗和精度的代码:
eval_indices = range(0, generations, eval_every)
# Plot loss over time
plt.plot(eval_indices, train_loss, 'k-')
plt.title('Softmax Loss per Generation')
plt.xlabel('Generation')
plt.ylabel('Softmax Loss')
plt.show()
# Plot train and test accuracy
plt.plot(eval_indices, train_acc, 'k-', label='Train Set Accuracy')
plt.plot(eval_indices, test_acc, 'r--', label='Test Set Accuracy')
plt.title('Train and Test Accuracy')
plt.xlabel('Generation')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')
plt.show()
然后我们得到以下图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MZrLQfi3-1681566911071)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/1092cc3c-6d21-4b82-bf79-af63e1a4fa3d.png)]
图 3:左图是我们 500 代训练中的训练和测试集精度。右图是超过 500 代的 softmax 损失值。
- 如果我们想要绘制最新批次结果的样本,下面是绘制由六个最新结果组成的样本的代码:
# Plot the 6 of the last batch results:
actuals = rand_y[0:6]
predictions = np.argmax(temp_train_preds,axis=1)[0:6]
images = np.squeeze(rand_x[0:6])
Nrows = 2
Ncols = 3
for i in range(6):
plt.subplot(Nrows, Ncols, i 1)
plt.imshow(np.reshape(images[i], [28,28]), cmap='Greys_r')
plt.title('Actual: ' str(actuals[i]) ' Pred: ' str(predictions[i]), fontsize=10)
frame = plt.gca()
frame.axes.get_xaxis().set_visible(False)
frame.axes.get_yaxis().set_visible(False)
我们得到前面代码的以下输出:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0m18X32d-1681566911071)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/5e821fd6-fba1-48c6-9b72-d7ac6238f1ff.png)]
图 4:六个随机图像的绘图,标题中包含实际值和预测值。右下图预计是 3,而事实上它是 1
工作原理
我们提高了 MNIST 数据集的表现,并构建了一个模型,在从头开始训练时,可快速达到约 97% 的准确率。我们的前两层是卷积,ReLU 和最大池化的组合。第二层是完全连接的层。我们以 100 个批次进行了训练,并研究了我们训练的几代的准确率和损失。最后,我们还绘制了六个随机数字和每个数字的预测/实际值。
CNN 非常适合图像识别。造成这种情况的部分原因是卷积层创建了自己的低级特征,当它们遇到重要的部分图像时会被激活。这种类型的模型自己创建特征并将其用于预测。
更多
在过去几年中,CNN 模型在图像识别方面取得了巨大进步。正在探索许多新颖的想法,并且经常发现新的架构。该领域的一个很好的论文库是一个名为 Arxiv.org 的仓库网站,由康奈尔大学创建和维护。 Arxiv.org 包括许多领域的一些最新论文,包括计算机科学和计算机科学子领域,如计算机视觉和图像识别。
另见
以下列出了一些可用于了解 CNN 的优秀资源:
- 斯坦福大学有一个很棒的维基
- 迈克尔·尼尔森的深度学习
- 吴建新介绍卷积神经网络
实现高级的 CNN
能够扩展 CNN 模型以进行图像识别非常重要,这样我们才能理解如何增加网络的深度。如果我们有足够的数据,这可能会提高我们预测的准确率。扩展 CNN 网络的深度是以标准方式完成的:我们只是重复卷积,最大池和 ReLU,直到我们对深度感到满意为止。许多更精确的图像识别网络以这种方式操作。
准备
在本文中,我们将实现一种更先进的读取图像数据的方法,并使用更大的 CNN 在 CIFAR10 数据集上进行图像识别。该数据集具有 60,000 个32x32
图像,这些图像恰好属于十个可能类别中的一个。图像的潜在类别是飞机,汽车,鸟,猫,鹿,狗,青蛙,马,船和卡车。另见“另见”部分中的第一个要点。
大多数图像数据集太大而无法放入内存中。我们可以使用 TensorFlow 设置一个图像管道,一次从一个文件中一次读取。我们通过设置图像阅读器,然后创建在图像阅读器上运行的批量队列来完成此操作。
此外,对于图像识别数据,通常在将图像发送之前随机扰动图像以进行训练。在这里,我们将随机裁剪,翻转和更改亮度。
此秘籍是TensorFlow CIFAR-10 官方教程的改编版本,可在本章末尾的“另见”部分中找到。我们将教程浓缩为一个脚本,我们将逐行完成并解释所有必要的代码。我们还将一些常量和参数恢复为原始引用的纸张值;我们将在适当的步骤中标记这一点。
操作步骤
执行以下步骤:
- 首先,我们加载必要的库并启动图会话:
import os
import sys
import tarfile
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from six.moves import urllib
sess = tf.Session()
- 现在我们将声明一些模型参数。我们的批量大小为 128(用于训练和测试)。我们将每 50 代输出一次状态,总共运行 20,000 代。每 500 代,我们将评估一批测试数据。然后我们将声明一些图像参数,高度和宽度,以及随机裁剪图像的大小。有三个通道(红色,绿色和蓝色),我们有十个不同的目标。然后我们将声明我们将从队列中存储数据和图像批次的位置:
batch_size = 128
output_every = 50
generations = 20000
eval_every = 500
image_height = 32
image_width = 32
crop_height = 24
crop_width = 24
num_channels = 3
num_targets = 10
data_dir = 'temp'
extract_folder = 'cifar-10-batches-bin'
- 建议您在我们向好的模型迈进时降低学习率,因此我们将以指数方式降低学习率:初始学习率将设置为 0.1,并且我们将以 250% 的指数方式将其降低 10% 代。确切的公式将由
0.1 · 0.9^(x / 250)
给出,其中x
是当前世代号。默认情况下,此值会持续降低,但 TensorFlow 会接受仅更新学习率的阶梯参数。这里我们设置一些参数供将来使用:
learning_rate = 0.1
lr_decay = 0.9
num_gens_to_wait = 250.
- 现在我们将设置参数,以便我们可以读取二进制 CIFAR-10 图像:
image_vec_length = image_height * image_width * num_channels
record_length = 1 image_vec_length
- 接下来,我们将设置数据目录和 URL 以下载 CIFAR-10 图像,如果我们还没有它们:
data_dir = 'temp'
if not os.path.exists(data_dir):
os.makedirs(data_dir)
cifar10_url = 'http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz'
data_file = os.path.join(data_dir, 'cifar-10-binary.tar.gz')
if not os.path.isfile(data_file):
# Download file
filepath, _ = urllib.request.urlretrieve(cifar10_url, data_file)
# Extract file
tarfile.open(filepath, 'r:gz').extractall(data_dir)
- 我们将设置记录阅读器并使用以下
read_cifar_files()
函数返回随机失真的图像。首先,我们需要声明一个读取固定字节长度的记录读取器对象。在我们读取图像队列之后,我们将图像和标签分开。最后,我们将使用 TensorFlow 的内置图像修改函数随机扭曲图像:
def read_cifar_files(filename_queue, distort_images = True):
reader = tf.FixedLengthRecordReader(record_bytes=record_length)
key, record_string = reader.read(filename_queue)
record_bytes = tf.decode_raw(record_string, tf.uint8)
# Extract label
image_label = tf.cast(tf.slice(record_bytes, [0], [1]), tf.int32)
# Extract image
image_extracted = tf.reshape(tf.slice(record_bytes, [1], [image_vec_length]), [num_channels, image_height, image_width])
# Reshape image
image_uint8image = tf.transpose(image_extracted, [1, 2, 0])
reshaped_image = tf.cast(image_uint8image, tf.float32)
# Randomly Crop image
final_image = tf.image.resize_image_with_crop_or_pad(reshaped_image, crop_width, crop_height)
if distort_images:
# Randomly flip the image horizontally, change the brightness and contrast
final_image = tf.image.random_flip_left_right(final_image)
final_image = tf.image.random_brightness(final_image,max_delta=63)
final_image = tf.image.random_contrast(final_image,lower=0.2, upper=1.8)
# Normalize whitening
final_image = tf.image.per_image_standardization(final_image)
return final_image, image_label
- 现在我们将声明一个函数,它将填充我们的图像管道以供批量器使用。我们首先需要设置一个我们想要读取的图像文件列表,并定义如何使用通过预构建的 TensorFlow 函数创建的输入生成器对象来读取它们。输入生成器可以传递给我们在上一步中创建的读取函数:
read_cifar_files()
。然后我们将在队列中设置批量阅读器:shuffle_batch()
:
def input_pipeline(batch_size, train_logical=True):
if train_logical:
files = [os.path.join(data_dir, extract_folder, 'data_batch_{}.bin'.format(i)) for i in range(1,6)]
else:
files = [os.path.join(data_dir, extract_folder, 'test_batch.bin')]
filename_queue = tf.train.string_input_producer(files)
image, label = read_cifar_files(filename_queue)
min_after_dequeue = 1000
capacity = min_after_dequeue 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch([image, label], batch_size, capacity, min_after_dequeue)
return example_batch, label_batch
正确设置
min_after_dequeue
很重要。此参数负责设置用于采样的图像缓冲区的最小大小。TensorFlow 官方文档建议将其设置为(#threads error margin)*batch_size
。请注意,将其设置为更大的大小会导致更均匀的混洗,因为它正在从队列中的更大数据集进行混洗,但是在此过程中也将使用更多内存。
- 接下来,我们可以声明我们的模型函数。我们将使用的模型有两个卷积层,后面是三个完全连接的层。为了使变量声明更容易,我们首先声明两个变量函数。两个卷积层将分别创建 64 个特征。第一个完全连接的层将第二个卷积层与 384 个隐藏节点连接起来。第二个完全连接的操作将这 384 个隐藏节点连接到 192 个隐藏节点。最后的隐藏层操作将 192 个节点连接到我们试图预测的 10 个输出类。请参阅以下
#
前面的内联注释:
def cifar_cnn_model(input_images, batch_size, train_logical=True):
def truncated_normal_var(name, shape, dtype):
return tf.get_variable(name=name, shape=shape, dtype=dtype, initializer=tf.truncated_normal_initializer(stddev=0.05))
def zero_var(name, shape, dtype):
return tf.get_variable(name=name, shape=shape, dtype=dtype, initializer=tf.constant_initializer(0.0))
# First Convolutional Layer
with tf.variable_scope('conv1') as scope:
# Conv_kernel is 5x5 for all 3 colors and we will create 64 features
conv1_kernel = truncated_normal_var(name='conv_kernel1', shape=[5, 5, 3, 64], dtype=tf.float32)
# We convolve across the image with a stride size of 1
conv1 = tf.nn.conv2d(input_images, conv1_kernel, [1, 1, 1, 1], padding='SAME')
# Initialize and add the bias term
conv1_bias = zero_var(name='conv_bias1', shape=[64], dtype=tf.float32)
conv1_add_bias = tf.nn.bias_add(conv1, conv1_bias)
# ReLU element wise
relu_conv1 = tf.nn.relu(conv1_add_bias)
# Max Pooling
pool1 = tf.nn.max_pool(relu_conv1, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1],padding='SAME', name='pool_layer1')
# Local Response Normalization
norm1 = tf.nn.lrn(pool1, depth_radius=5, bias=2.0, alpha=1e-3, beta=0.75, name='norm1')
# Second Convolutional Layer
with tf.variable_scope('conv2') as scope:
# Conv kernel is 5x5, across all prior 64 features and we create 64 more features
conv2_kernel = truncated_normal_var(name='conv_kernel2', shape=[5, 5, 64, 64], dtype=tf.float32)
# Convolve filter across prior output with stride size of 1
conv2 = tf.nn.conv2d(norm1, conv2_kernel, [1, 1, 1, 1], padding='SAME')
# Initialize and add the bias
conv2_bias = zero_var(name='conv_bias2', shape=[64], dtype=tf.float32)
conv2_add_bias = tf.nn.bias_add(conv2, conv2_bias)
# ReLU element wise
relu_conv2 = tf.nn.relu(conv2_add_bias)
# Max Pooling
pool2 = tf.nn.max_pool(relu_conv2, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='SAME', name='pool_layer2')
# Local Response Normalization (parameters from paper)
norm2 = tf.nn.lrn(pool2, depth_radius=5, bias=2.0, alpha=1e-3, beta=0.75, name='norm2')
# Reshape output into a single matrix for multiplication for the fully connected layers
reshaped_output = tf.reshape(norm2, [batch_size, -1])
reshaped_dim = reshaped_output.get_shape()[1].value
# First Fully Connected Layer
with tf.variable_scope('full1') as scope:
# Fully connected layer will have 384 outputs.
full_weight1 = truncated_normal_var(name='full_mult1', shape=[reshaped_dim, 384], dtype=tf.float32)
full_bias1 = zero_var(name='full_bias1', shape=[384], dtype=tf.float32)
full_layer1 = tf.nn.relu(tf.add(tf.matmul(reshaped_output, full_weight1), full_bias1))
# Second Fully Connected Layer
with tf.variable_scope('full2') as scope:
# Second fully connected layer has 192 outputs.
full_weight2 = truncated_normal_var(name='full_mult2', shape=[384, 192], dtype=tf.float32)
full_bias2 = zero_var(name='full_bias2', shape=[192], dtype=tf.float32)
full_layer2 = tf.nn.relu(tf.add(tf.matmul(full_layer1, full_weight2), full_bias2))
# Final Fully Connected Layer -> 10 categories for output (num_targets)
with tf.variable_scope('full3') as scope:
# Final fully connected layer has 10 (num_targets) outputs.
full_weight3 = truncated_normal_var(name='full_mult3', shape=[192, num_targets], dtype=tf.float32)
full_bias3 = zero_var(name='full_bias3', shape=[num_targets], dtype=tf.float32)
final_output = tf.add(tf.matmul(full_layer2, full_weight3), full_bias3)
return final_output
我们的本地响应标准化参数取自本文,并在本文的“另见”部分中引用。
- 现在我们将创建损失函数。我们将使用 softmax 函数,因为图片只能占用一个类别,因此输出应该是十个目标的概率分布:
def cifar_loss(logits, targets):
# Get rid of extra dimensions and cast targets into integers
targets = tf.squeeze(tf.cast(targets, tf.int32))
# Calculate cross entropy from logits and targets
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=targets)
# Take the average loss across batch size
cross_entropy_mean = tf.reduce_mean(cross_entropy)
return cross_entropy_mean
- 接下来,我们宣布我们的训练步骤。学习率将以指数阶跃函数降低:
def train_step(loss_value, generation_num):
# Our learning rate is an exponential decay (stepped down)
model_learning_rate = tf.train.exponential_decay(learning_rate, generation_num, num_gens_to_wait, lr_decay, staircase=True)
# Create optimizer
my_optimizer = tf.train.GradientDescentOptimizer(model_learning_rate)
# Initialize train step
train_step = my_optimizer.minimize(loss_value)
return train_step
- 我们还必须具有精确度函数,以计算一批图像的准确率。我们将输入对率目标向量,并输出平均精度。然后我们可以将它用于训练和测试批次:
def accuracy_of_batch(logits, targets):
# Make sure targets are integers and drop extra dimensions
targets = tf.squeeze(tf.cast(targets, tf.int32))
# Get predicted values by finding which logit is the greatest
batch_predictions = tf.cast(tf.argmax(logits, 1), tf.int32)
# Check if they are equal across the batch
predicted_correctly = tf.equal(batch_predictions, targets)
# Average the 1's and 0's (True's and False's) across the batch size
accuracy = tf.reduce_mean(tf.cast(predicted_correctly, tf.float32))
return accuracy
- 现在我们有了一个图像管道函数,我们可以初始化训练图像管道和测试图像管道:
images, targets = input_pipeline(batch_size, train_logical=True)
test_images, test_targets = input_pipeline(batch_size, train_logical=False)
- 接下来,我们将初始化训练输出和测试输出的模型。值得注意的是,我们必须在创建训练模型后声明
scope.reuse_variables()
,这样,当我们为测试网络声明模型时,它将使用相同的模型参数:
with tf.variable_scope('model_definition') as scope:
# Declare the training network model
model_output = cifar_cnn_model(images, batch_size)
# Use same variables within scope
scope.reuse_variables()
# Declare test model output
test_output = cifar_cnn_model(test_images, batch_size)
- 我们现在可以初始化我们的损耗和测试精度函数。然后我们将声明
generation
变量。此变量需要声明为不可训练,并传递给我们的训练函数,该函数在学习率指数衰减计算中使用它:
loss = cifar_loss(model_output, targets)
accuracy = accuracy_of_batch(test_output, test_targets)
generation_num = tf.Variable(0, trainable=False)
train_op = train_step(loss, generation_num)
- 我们现在将初始化所有模型的变量,然后通过运行 TensorFlow 函数
start_queue_runners()
来启动图像管道。当我们开始训练或测试模型输出时,管道将输入一批图像来代替饲料字典:
init = tf.global_variables_initializer()
sess.run(init)
tf.train.start_queue_runners(sess=sess)
- 我们现在循环训练我们的训练,节省训练损失和测试准确率:
train_loss = []
test_accuracy = []
for i in range(generations):
_, loss_value = sess.run([train_op, loss])
if (i 1) % output_every == 0:
train_loss.append(loss_value)
output = 'Generation {}: Loss = {:.5f}'.format((i 1), loss_value)
print(output)
if (i 1) % eval_every == 0:
[temp_accuracy] = sess.run([accuracy])
test_accuracy.append(temp_accuracy)
acc_output = ' --- Test Accuracy= {:.2f}%.'.format(100. * temp_accuracy)
print(acc_output)
- 这产生以下输出:
...
Generation 19500: Loss = 0.04461
--- Test Accuracy = 80.47%.
Generation 19550: Loss = 0.01171
Generation 19600: Loss = 0.06911
Generation 19650: Loss = 0.08629
Generation 19700: Loss = 0.05296
Generation 19750: Loss = 0.03462
Generation 19800: Loss = 0.03182
Generation 19850: Loss = 0.07092
Generation 19900: Loss = 0.11342
Generation 19950: Loss = 0.08751
Generation 20000: Loss = 0.02228
--- Test Accuracy = 83.59%.
- 最后,这里有一些
matplotlib
代码将绘制在训练过程中的损失和测试准确率:
eval_indices = range(0, generations, eval_every)
output_indices = range(0, generations, output_every)
# Plot loss over time
plt.plot(output_indices, train_loss, 'k-')
plt.title('Softmax Loss per Generation')
plt.xlabel('Generation')
plt.ylabel('Softmax Loss')
plt.show()
# Plot accuracy over time
plt.plot(eval_indices, test_accuracy, 'k-')
plt.title('Test Accuracy')
plt.xlabel('Generation')
plt.ylabel('Accuracy')
plt.show()
我们得到以下秘籍的以下绘图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XmuZWlzO-1681566911071)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/8c38465a-ddc7-4389-9cc5-806b5a388769.png)]
图 5:训练损失在左侧,测试精度在右侧。对于 CIFAR-10 图像识别 CNN,我们能够实现在测试集上达到约 75% 准确率的模型
工作原理
在我们下载了 CIFAR-10 数据之后,我们建立了一个图像管道而不是使用源字典。有关图像管道的更多信息,请参阅 TensorFlow CIFAR-10 官方教程。我们使用此训练和测试管道来尝试预测图像的正确类别。最后,该模型在测试集上达到了约 75% 的准确率。
另见
- 有关 CIFAR-10 数据集的更多信息,请参阅学习 Tiny Images 的多个特征层,Alex Krizhevsky,2009
- 要查看原始的 TensorFlow 代码,请参阅此链接
- 有关局部响应归一化的更多信息,请参阅使用深度卷积神经网络的 ImageNet 分类,Krizhevsky,A. 等人,2012
重新训练现有的 CNN 模型
从头开始训练新的图像识别需要大量的时间和计算能力。如果我们可以采用先前训练的网络并使用我们的图像重新训练它,它可以节省我们的计算时间。对于此秘籍,我们将展示如何使用预先训练的 TensorFlow 图像识别模型并对其进行微调以处理不同的图像集。
准备
其思想是从卷积层重用先前模型的权重和结构,并重新训练网络顶部的完全连接层。
TensorFlow 在现有 CNN 模型的基础上创建了一个关于训练的教程(请参阅下一节中的第一个要点)。在本文中,我们将说明如何对 CIFAR-10 使用相同的方法。我们将采用的 CNN 网络使用一种非常流行的架构,称为 Inception。 Inception CNN 模型由 Google 创建,在许多图像识别基准测试中表现非常出色。有关详细信息,请参阅“另见”部分的第二个要点中的纸张参考。
我们将介绍的主要 Python 脚本显示如何下载 CIFAR-10 图像数据并自动分离,标记和保存图像到每个训练和测试文件夹中的十个类。之后,我们将重申如何在我们的图像上训练网络。
操作步骤
执行以下步骤:
- 我们首先加载必要的库来下载,解压缩和保存 CIFAR-10 图像:
import os
import tarfile
import _pickle as cPickle
import numpy as np
import urllib.request
import scipy.misc
from imageio import imwrite
- 我们现在声明 CIFAR-10 数据链接并创建我们将存储数据的临时目录。我们还将在以后保存图像时声明要引用的十个类别:
cifar_link = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'
data_dir = 'temp'
if not os.path.isdir(data_dir):
os.makedirs(data_dir)
objects = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
- 现在我们将下载 CIFAR-10
.tar
数据文件,并解压该文件:
target_file = os.path.join(data_dir, 'cifar-10-python.tar.gz')
if not os.path.isfile(target_file):
print('CIFAR-10 file not found. Downloading CIFAR data (Size = 163MB)')
print('This may take a few minutes, please wait.')
filename, headers = urllib.request.urlretrieve(cifar_link, target_file)
# Extract into memory
tar = tarfile.open(target_file)
tar.extractall(path=data_dir)
tar.close()
- 我们现在为训练创建必要的文件夹结构。临时目录将有两个文件夹,
train_dir
和validation_dir
。在每个文件夹中,我们将为每个类别创建 10 个子文件夹:
# Create train image folders
train_folder = 'train_dir'
if not os.path.isdir(os.path.join(data_dir, train_folder)):
for i in range(10):
folder = os.path.join(data_dir, train_folder, objects[i])
os.makedirs(folder)
# Create test image folders
test_folder = 'validation_dir'
if not os.path.isdir(os.path.join(data_dir, test_folder)):
for i in range(10):
folder = os.path.join(data_dir, test_folder, objects[i])
os.makedirs(folder)
- 为了保存图像,我们将创建一个从内存加载它们并将它们存储在图像字典中的函数:
def load_batch_from_file(file):
file_conn = open(file, 'rb')
image_dictionary = cPickle.load(file_conn, encoding='latin1')
file_conn.close()
return(image_dictionary)
- 使用前面的字典,我们将使用以下函数将每个文件保存在正确的位置:
def save_images_from_dict(image_dict, folder='data_dir'):
for ix, label in enumerate(image_dict['labels']):
folder_path = os.path.join(data_dir, folder, objects[label])
filename = image_dict['filenames'][ix]
#Transform image data
image_array = image_dict['data'][ix]
image_array.resize([3, 32, 32])
# Save image
output_location = os.path.join(folder_path, filename)
imwrite(output_location,image_array.transpose())
- 使用上述函数,我们可以遍历下载的数据文件并将每个图像保存到正确的位置:
data_location = os.path.join(data_dir, 'cifar-10-batches-py')
train_names = ['data_batch_' str(x) for x in range(1,6)]
test_names = ['test_batch']
# Sort train images
for file in train_names:
print('Saving images from file: {}'.format(file))
file_location = os.path.join(data_dir, 'cifar-10-batches-py', file)
image_dict = load_batch_from_file(file_location)
save_images_from_dict(image_dict, folder=train_folder)
# Sort test images
for file in test_names:
print('Saving images from file: {}'.format(file))
file_location = os.path.join(data_dir, 'cifar-10-batches-py', file)
image_dict = load_batch_from_file(file_location)
save_images_from_dict(image_dict, folder=test_folder)
- 我们脚本的最后一部分创建了图像标签文件,这是我们需要的最后一条信息。这个文件让我们将输出解释为标签而不是数字索引:
cifar_labels_file = os.path.join(data_dir,'cifar10_labels.txt')
print('Writing labels file, {}'.format(cifar_labels_file))
with open(cifar_labels_file, 'w') as labels_file:
for item in objects:
labels_file.write("{}n".format(item))
- 当前面的脚本运行时,它将下载图像并将它们分类到 TensorFlow 再训练教程所期望的正确文件夹结构中。完成后,我们只需按照教程进行操作即可。首先,我们应该克隆教程仓库:
git clone https://github.com/tensorflow/models/tree/master/research/inception
- 为了使用先前训练的模型,我们必须下载网络权重并将其应用于我们的模型。为此,您必须访问该站点,并按照说明下载并安装 cifar10 模型架构和权重。您还将最终下载包含下面描述的构建,训练和测试脚本的数据目录。
对于此步骤,我们导航到
research/inception/inception
目录,然后执行以下命令,--train_directory
,--validation_directory
,--output_directory
和--labels_file
的路径指向相对路径或完整路径创建的目录结构。
- 现在我们将图像放在正确的文件夹结构中,我们必须将它们变成
TFRecords
对象。我们通过运行以下命令来完成此操作:
me@computer:~$ python3 data/build_image_data.py
--train_directory="temp/train_dir/"
--validation_directory="temp/validation_dir"
--output_directory="temp/" --labels_file="temp/cifar10_labels.txt"
- 现在我们将使用
bazel
训练模型,将参数设置为true
。该脚本每 10 代输出一次损失。我们可以随时终止此过程,模型输出将在temp/training_results
文件夹中。我们可以从此文件夹加载模型以进行评估:
me@computer:~$ bazel-bin/inception/flowers_train
--train_dir="temp/training_results" --data_dir="temp/data_dir"
--pretrained_model_checkpoint_path="model.ckpt-157585"
--fine_tune=True --initial_learning_rate=0.001
--input_queue_memory_factor=1
- 这应该使输出类似于以下内容:
2018-06-02 11:10:10.557012: step 1290, loss = 2.02 (1.2 examples/sec; 23.771 sec/batch)
...
工作原理
关于预训练 CNN 上的训练的 TensorFlow 官方教程需要设置一个文件夹;我们从 CIFAR-10 数据创建的设置。然后我们将数据转换为所需的TFRecords
格式并开始训练模型。请记住,我们正在微调模型并重新训练顶部的完全连接的层以适合我们的 10 类数据。
另见
- Tensorflow Inception-v3 官方教程
- Googlenet Inception-v3 文件
应用 StyleNet 和 NeuralStyle 项目
一旦我们对 CNN 进行了图像识别训练,我们就可以将网络本身用于一些有趣的数据和图像处理。 Stylenet 是一种尝试从一张图片中学习图像样式并将其应用于第二张图片同时保持第二图像结构(或内容)完整的过程。如果我们能够找到与样式强烈相关的中间 CNN 节点,这可能是可能的,与图像的内容分开。
准备
Stylenet 是一个过程,它接收两个图像并将一个图像的样式应用于第二个图像的内容。它基于 2015 年的着名论文“艺术风格的神经算法”(参见下一节的第一个要点)。作者在一些 CNN 中找到了一个属性,其中存在中间层,它们似乎编码图片的样式,有些编码图片的内容。为此,如果我们训练样式图片上的样式层和原始图像上的内容层,并反向传播那些计算的损失,我们可以将原始图像更改为更像样式图像。
为了实现这一目标,我们将下载本文推荐的网络;叫做 imagenet-vgg-19。还有一个 imagenet-vgg-16 网络也可以使用,但是本文推荐使用 imagenet-vgg-19。
操作步骤
执行以下步骤:
- 首先,我们将以
mat
格式下载预先训练好的网络。mat
格式是matlab
对象,Python 中的scipy
包有一个可以读取它的方法。下载mat
对象的链接在这里。我们将此模型保存在 Python 脚本所在的同一文件夹中,以供参考:
http://www.vlfeat.org/matconvnet/models/beta16/imagenet-vgg-verydeep-19.mat
- 我们将通过加载必要的库来启动我们的 Python 脚本:
import os
import scipy.io
import scipy.misc
import imageio
from skimage.transform import resize
from operator import mul
from functools import reduce
import numpy as np
import tensorflow as tf
from tensorflow.python.framework import ops
ops.reset_default_graph()
- 然后我们可以声明两个图像的位置:原始图像和样式图像。出于我们的目的,我们将使用本书的封面图片作为原始图像;对于风格形象,我们将使用文森特·梵高的星夜。随意使用您想要的任何两张图片。如果您选择使用这些图片,可以在本书的 GitHub 网站上找到(导航到 Styelnet 部分):
original_image_file = 'temp/book_cover.jpg'
style_image_file = 'temp/starry_night.jpg'
- 我们将为我们的模型设置一些参数:
mat
文件的位置,权重,学习率,代数以及输出中间图像的频率。对于权重,有助于在原始图像上高度加权样式图像。应根据所需结果的变化调整这些超参数:
vgg_path = 'imagenet-vgg-verydeep-19.mat'
original_image_weight = 5.0
style_image_weight = 500.0
regularization_weight = 100
learning_rate = 10
generations = 100
output_generations = 25
beta1 = 0.9
beta2 = 0.999
- 现在我们将使用
scipy
加载两个图像并更改样式图像以适合原始图像大小:
original_image = imageio.imread(original_image_file)
style_image = imageio.imread(style_image_file)
# Get shape of target and make the style image the same
target_shape = original_image.shape
style_image = resize(style_image, target_shape)
- 从论文中,我们可以按照它们出现的顺序定义层。我们将使用作者的命名约定:
vgg_layers = ['conv1_1', 'relu1_1',
'conv1_2', 'relu1_2', 'pool1',
'conv2_1', 'relu2_1',
'conv2_2', 'relu2_2', 'pool2',
'conv3_1', 'relu3_1',
'conv3_2', 'relu3_2',
'conv3_3', 'relu3_3',
'conv3_4', 'relu3_4', 'pool3',
'conv4_1', 'relu4_1',
'conv4_2', 'relu4_2',
'conv4_3', 'relu4_3',
'conv4_4', 'relu4_4', 'pool4',
'conv5_1', 'relu5_1',
'conv5_2', 'relu5_2',
'conv5_3', 'relu5_3',
'conv5_4', 'relu5_4']
- 现在我们将定义一个从
mat
文件中提取参数的函数:
def extract_net_info(path_to_params):
vgg_data = scipy.io.loadmat(path_to_params)
normalization_matrix = vgg_data['normalization'][0][0][0]
mat_mean = np.mean(normalization_matrix, axis=(0,1))
network_weights = vgg_data['layers'][0]
return mat_mean, network_weights
- 根据加载的权重和
layer
定义,我们可以使用以下函数在 TensorFlow 中重新创建网络。我们将遍历每一层并使用适当的weights
和biases
分配相应的函数,如果适用:
def vgg_network(network_weights, init_image):
network = {}
image = init_image
for i, layer in enumerate(vgg_layers):
if layer[1] == 'c':
weights, bias = network_weights[i][0][0][0][0]
weights = np.transpose(weights, (1, 0, 2, 3))
bias = bias.reshape(-1)
conv_layer = tf.nn.conv2d(image, tf.constant(weights), (1, 1, 1, 1), 'SAME')
image = tf.nn.bias_add(conv_layer, bias)
elif layer[1] == 'r':
image = tf.nn.relu(image)
else:
image = tf.nn.max_pool(image, (1, 2, 2, 1), (1, 2, 2, 1), 'SAME')
network[layer] = image
return(network)
- 本文推荐了一些策略,用于将中间层分配给原始图像和样式图像。虽然我们应该为原始图像保留
relu4_2
,但我们可以为样式图像尝试其他reluX_1
层输出的不同组合:
original_layer = ['relu4_2']
style_layers = ['relu1_1', 'relu2_1', 'relu3_1', 'relu4_1', 'relu5_1']
- 接下来,我们将运行前面的函数来获取权重和均值。我们还需要均匀设置 VGG19 样式层权重。如果您愿意,可以通过更改权重进行实验。现在,我们假设它们对于两个层都是 0.5:
# Get network parameters
normalization_mean, network_weights = extract_net_info(vgg_path)
shape = (1,) original_image.shape
style_shape = (1,) style_image.shape
original_features = {}
style_features = {}
# Set style weights
style_weights = {l: 1./(len(style_layers)) for l in style_layers}
- 为了忠实于原始图片外观,我们希望添加一个损失值,将内容/原始特征与原始内容特征进行比较。为此,我们加载 VGG19 模型并计算原始内容特征的内容/原始特征:
g_original = tf.Graph()
with g_original.as_default(), tf.Session() as sess1:
image = tf.placeholder('float', shape=shape)
vgg_net = vgg_network(network_weights, image)
original_minus_mean = original_image - normalization_mean
original_norm = np.array([original_minus_mean])
for layer in original_layers:
original_features[layer] = vgg_net[layer].eval(feed_dict={image: original_norm})
- 与步骤 11 类似,我们希望将原始图像的样式特征更改为样式图片的样式特征。为此,我们将为损失函数添加样式损失值。此损失值需要查看我们预先确定的样式层中样式图像的值。我们还将通过单独的图运行此操作。我们按如下方式计算这些样式特征:
# Get style image network
g_style = tf.Graph()
with g_style.as_default(), tf.Session() as sess2:
image = tf.placeholder('float', shape=style_shape)
vgg_net = vgg_network(network_weights, image)
style_minus_mean = style_image - normalization_mean
style_norm = np.array([style_minus_mean])
for layer in style_layers:
features = vgg_net[layer].eval(feed_dict={image: style_norm})
features = np.reshape(features, (-1, features.shape[3]))
gram = np.matmul(features.T, features) / features.size
style_features[layer] = gram
- 我们启动默认图来计算损失和训练步骤。首先,我们首先将随机图像初始化为 TensorFlow 变量:
# Make Combined Image via loss function
with tf.Graph().as_default():
# Get network parameters
initial = tf.random_normal(shape) * 0.256
init_image = tf.Variable(initial)
vgg_net = vgg_network(network_weights, init_image)
- 接下来,我们计算原始内容损失(将其缩进到默认图下)。这个损失部分将尽可能保持原始图像的结构完整:
# Loss from Original Image
original_layers_w = {'relu4_2': 0.5, 'relu5_2': 0.5}
original_loss = 0
for o_layer in original_layers:
temp_original_loss = original_layers_w[o_layer] * original_image_weight *
(2 * tf.nn.l2_loss(vgg_net[o_layer] - original_features[o_layer]))
original_loss = (temp_original_loss / original_features[o_layer].size)
- 仍然在默认图缩进下,我们创建第二个损失项,即样式损失。此损失将比较我们预先计算的样式特征与输入图像的样式特征(随机初始化):
# Loss from Style Image
style_loss = 0
style_losses = []
for style_layer in style_layers:
layer = vgg_net[style_layer]
feats, height, width, channels = [x.value for x in layer.get_shape()]
size = height * width * channels
features = tf.reshape(layer, (-1, channels))
style_gram_matrix = tf.matmul(tf.transpose(features), features) / size
style_expected = style_features[style_layer]
style_losses.append(style_weights[style_layer] * 2 *
tf.nn.l2_loss(style_gram_matrix - style_expected) /
style_expected.size)
style_loss = style_image_weight * tf.reduce_sum(style_losses)
- 第三个也是最后一个损失条款将有助于平滑图像。我们在这里使用总变差损失来惩罚相邻像素的剧烈变化,如下所示:
total_var_x = reduce(mul, init_image[:, 1:, :, :].get_shape().as_list(), 1)
total_var_y = reduce(mul, init_image[:, :, 1:, :].get_shape().as_list(), 1)
first_term = regularization_weight * 2
second_term_numerator = tf.nn.l2_loss(init_image[:, 1:, :, :] - init_image[:, :shape[1]-1, :, :])
second_term = second_term_numerator / total_var_y
third_term = (tf.nn.l2_loss(init_image[:, :, 1:, :] - init_image[:, :, :shape[2]-1, :]) / total_var_x)
total_variation_loss = first_term * (second_term third_term)
- 接下来,我们结合损失项并创建优化函数和训练步骤,如下所示:
# Combined Loss
loss = original_loss style_loss total_variation_loss
# Declare Optimization Algorithm
optimizer = tf.train.AdamOptimizer(learning_rate, beta1, beta2)
train_step = optimizer.minimize(loss)
- 现在我们运行训练步骤,保存中间图像,并保存最终输出图像,如下所示:
# Initialize variables and start training
with tf.Session() as sess:
tf.global_variables_initializer().run()
for i in range(generations):
train_step.run()
# Print update and save temporary output
if (i 1) % output_generations == 0:
print('Generation {} out of {}, loss: {}'.format(i 1, generations, sess.run(loss)))
image_eval = init_image.eval()
best_image_add_mean = image_eval.reshape(shape[1:]) normalization_mean
output_file = 'temp_output_{}.jpg'.format(i)
imageio.imwrite(output_file, best_image_add_mean.astype(np.uint8))
# Save final image
image_eval = init_image.eval()
best_image_add_mean = image_eval.reshape(shape[1:]) normalization_mean
output_file = 'final_output.jpg'
scipy.misc.imsave(output_file, best_image_add_mean)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-g3DnfZUm-1681566911072)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/f26567a0-ae26-409e-87ee-2ef19c33567d.png)]
图 6:使用 Stylenet 算法将书籍封面图像与星夜相结合。请注意,可以通过更改脚本开头的权重来使用不同的样式重点
工作原理
我们首先加载两个图像,然后将预先训练的网络权重和指定的层加载到原始图像和样式图像。我们计算了三种损失函数:原始图像损失,样式损失和总变差损失。然后我们训练随机噪声图片以使用样式图像的样式和原始图像的内容。
损失函数受 GitHub 神经风格项目的影响很大。我们还强烈建议读者查看这些项目中的代码以获得改进,更多细节,以及通常更强大的算法,可以提供更好的结果。
另见
- Gatys,Ecker,Bethge 的艺术风格神经算法,2015
- Leon Gatys 在 CVPR 2016(计算机视觉和模式识别)上的一个很好的推荐视频
实现 DeepDream
受过训练的 CNN 的另一个用途是利用一些中间节点检测标签特征(例如,猫的耳朵或鸟的羽毛)的事实。利用这一事实,我们可以找到转换任何图像的方法,以反映我们选择的任何节点的节点特征。对于这个秘籍,我们将在 TensorFlow 的网站上浏览 DeepDream 教程,但我们将更详细地介绍基本部分。希望我们可以让读者准备好使用 DeepDream 算法来探索 CNN 及其中创建的特征。
准备
TensorFlow 的官方教程展示了如何通过脚本实现 DeepDream(请参阅下一节中的第一个要点)。这个方法的目的是通过他们提供的脚本并解释每一行。虽然教程很棒,但有些部分可以跳过,有些部分可以使用更多解释。我们希望提供更详细的逐行说明。我们还将在必要时使代码符合 Python3 标准。
操作步骤
执行以下步骤:
- 为了开始使用 DeepDream,我们需要下载在 CIFAR-1000 上接受过 CNN 训练的 GoogleNet:
me@computer:~$ wget https://storage.googleapis.com/download.tensorflow.org/models/inception5h.zip
me@computer:~$ unzip inception5h.zip
- 我们首先加载必要的库并启动图会话:
import os
import matplotlib.pyplot as plt
import numpy as np
import PIL.Image
import tensorflow as tf
from io import BytesIO
graph = tf.Graph()
sess = tf.InteractiveSession(graph=graph)
- 我们现在声明解压缩模型参数的位置(从步骤 1 开始)并将参数加载到 TensorFlow 图中:
# Model location
model_fn = 'tensorflow_inception_graph.pb'
# Load graph parameters
with tf.gfile.FastGFile(model_fn, 'rb') as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
- 我们为输入创建一个占位符,保存 imagenet 平均值 117.0,然后使用正则化占位符导入图定义:
# Create placeholder for input
t_input = tf.placeholder(np.float32, name='input')
# Imagenet average bias to subtract off images
imagenet_mean = 117.0
t_preprocessed = tf.expand_dims(t_input-imagenet_mean, 0)
tf.import_graph_def(graph_def, {'input':t_preprocessed})
- 接下来,我们将导入卷积层,以便在以后可视化并使用它们进行 DeepDream 处理:
# Create a list of layers that we can refer to later
layers = [op.name for op in graph.get_operations() if op.type=='Conv2D' and 'import/' in op.name]
# Count how many outputs for each layer
feature_nums = [int(graph.get_tensor_by_name(name ':0').get_shape()[-1]) for name in layers]
- 现在我们将选择一个可视化的层。我们也可以通过名字选择其他人。我们选择查看特征号
139
。图像以随机噪声开始:
layer = 'mixed4d_3x3_bottleneck_pre_relu'
channel = 139
img_noise = np.random.uniform(size=(224,224,3)) 100.0
- 我们声明了一个绘制图像数组的函数:
def showarray(a, fmt='jpeg'):
# First make sure everything is between 0 and 255
a = np.uint8(np.clip(a, 0, 1)*255)
# Pick an in-memory format for image display
f = BytesIO()
# Create the in memory image
PIL.Image.fromarray(a).save(f, fmt)
# Show image
plt.imshow(a)
- 我们将通过创建一个从图中按名称检索层的函数来缩短一些重复代码:
def T(layer): #Helper for getting layer output tensor return graph.get_tensor_by_name("import/%s:0"%layer)
- 我们将创建的下一个函数是一个包装函数,用于根据我们指定的参数创建占位符:
# The following function returns a function wrapper that will create the placeholder
# inputs of a specified dtype
def tffunc(*argtypes):
'''Helper that transforms TF-graph generating function into a regular one.
See "resize" function below.
'''
placeholders = list(map(tf.placeholder, argtypes))
def wrap(f):
out = f(*placeholders)
def wrapper(*args, **kw):
return out.eval(dict(zip(placeholders, args)), session=kw.get('session'))
return wrapper
return wrap
- 我们还需要一个将图像大小调整为大小规格的函数。我们使用 TensorFlow 的内置图像线性插值函数:
tf.image.resize.bilinear()
# Helper function that uses TF to resize an image
def resize(img, size):
img = tf.expand_dims(img, 0)
# Change 'img' size by linear interpolation
return tf.image.resize_bilinear(img, size)[0,:,:,:]
- 现在我们需要一种方法来更新源图像,使其更像我们使用的特征。我们通过指定如何计算图像上的梯度来完成此操作。我们定义了一个函数,用于计算图像上子区域(图块)的梯度,以加快计算速度。为了防止平铺输出,我们将在
x
和y
方向上随机移动或滚动图像,这将平滑平铺效果:
def calc_grad_tiled(img, t_grad, tile_size=512):
'''Compute the value of tensor t_grad over the image in a tiled way.
Random shifts are applied to the image to blur tile boundaries over
multiple iterations.'''
# Pick a subregion square size
sz = tile_size
# Get the image height and width
h, w = img.shape[:2]
# Get a random shift amount in the x and y direction
sx, sy = np.random.randint(sz, size=2)
# Randomly shift the image (roll image) in the x and y directions
img_shift = np.roll(np.roll(img, sx, 1), sy, 0)
# Initialize the while image gradient as zeros
grad = np.zeros_like(img)
# Now we loop through all the sub-tiles in the image
for y in range(0, max(h-sz//2, sz),sz):
for x in range(0, max(w-sz//2, sz),sz):
# Select the sub image tile
sub = img_shift[y:y sz,x:x sz]
# Calculate the gradient for the tile
g = sess.run(t_grad, {t_input:sub})
# Apply the gradient of the tile to the whole image gradient
grad[y:y sz,x:x sz] = g
# Return the gradient, undoing the roll operation
return np.roll(np.roll(grad, -sx, 1), -sy, 0)
- 现在我们可以声明 DeepDream 函数。我们算法的目标是我们选择的特征的平均值。损耗在梯度上运行,这取决于输入图像和所选特征之间的距离。策略是将图像分成高频和低频,并计算低频部分的梯度。将得到的高频图像再次分开并重复该过程。原始图像和低频图像的集合称为
octaves
。对于每次传递,我们计算梯度并将它们应用于图像:
def render_deepdream(t_obj, img0=img_noise,
iter_n=10, step=1.5, octave_n=4, octave_scale=1.4):
# defining the optimization objective, the objective is the mean of the feature
t_score = tf.reduce_mean(t_obj)
# Our gradients will be defined as changing the t_input to get closer to the values of t_score. Here, t_score is the mean of the feature we select.
# t_input will be the image octave (starting with the last)
t_grad = tf.gradients(t_score, t_input)[0] # behold the power of automatic differentiation!
# Store the image
img = img0
# Initialize the image octave list
octaves = []
# Since we stored the image, we need to only calculate n-1 octaves
for i in range(octave_n-1):
# Extract the image shape
hw = img.shape[:2]
# Resize the image, scale by the octave_scale (resize by linear interpolation)
lo = resize(img, np.int32(np.float32(hw)/octave_scale))
# Residual is hi. Where residual = image - (Resize lo to be hw-shape)
hi = img-resize(lo, hw)
# Save the lo image for re-iterating
img = lo
# Save the extracted hi-image
octaves.append(hi)
# generate details octave by octave
for octave in range(octave_n):
if octave>0:
# Start with the last octave
hi = octaves[-octave]
#
img = resize(img, hi.shape[:2]) hi
for i in range(iter_n):
# Calculate gradient of the image.
g = calc_grad_tiled(img, t_grad)
# Ideally, we would just add the gradient, g, but
# we want do a forward step size of it ('step'),
# and divide it by the avg. norm of the gradient, so
# we are adding a gradient of a certain size each step.
# Also, to make sure we aren't dividing by zero, we add 1e-7.
img = g*(step / (np.abs(g).mean() 1e-7))
print('.',end = ' ')
showarray(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/255.0)
- 通过我们所做的所有特征设置,我们现在可以运行 DeepDream 算法:
# Run Deep Dream
if __name__=="__main__":
# Create resize function that has a wrapper that creates specified placeholder types
resize = tffunc(np.float32, np.int32)(resize)
# Open image
img0 = PIL.Image.open('book_cover.jpg')
img0 = np.float32(img0)
# Show Original Image
showarray(img0/255.0)
# Create deep dream
render_deepdream(T(layer)[:,:,:,139], img0, iter_n=15)
sess.close()
输出如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0JlWV3VB-1681566911072)(https://gitcode.net/apachecn/apachecn-dl-zh/-/raw/master/docs/tf-ml-cookbook-2e-zh/img/07fcab05-6c73-4aeb-b2ed-a1330c65fa0d.png)]
图 7:本书的封面,贯穿 DeepDream 算法,其特征层编号为 50,110,100 和 139
更多
我们敦促读者使用 DeepDream 官方教程作为进一步信息的来源,并访问 DeepDream 上的原始 Google 研究博客文章(请参阅下面的第二个要点参见另见部分)。