虽然一个TFRecord文件中可以存储多个训练样例,但是当训练数据量较大时,可将数据分成多个TFRecord文件来提高处理效率。tensorflow提供了tf.train.match_filenames_once函数来获取符合一个正则表达式的所有文件,得到的文件列表可以通过tf.train.string_input_producer函数进行有效的管理。
tf.train.string_input_producer函数会使用初始化提供的文件列表创建一个输入队列,输入队列中原始的元素为文件列表中的所有文件。创建好的输入队列可以作为文件读取函数的参数。每次调用文件读取函数时,该函数会先判断当前是否已有打开的文件可读,如果没有或者打开的文件已经读完,这个函数会从输入队列中出队一个文件并从这个文件中读取数据。
通过设置shuffle参数,tf.train.string_input_producer函数支持随机打乱文件列表中文件出队的顺序。当shuffle参数为True时,文件在加入队列之前会被打乱顺序,所以出队的顺序也是随机的。随机打乱文件顺序以及加入输入队列的过程会跑在一个单独的线程上,这样不会影响获取文件的速度。tf.train.string_input_producer生成的输入队列可以同时被多个文件读取线程操作,而且输入队列会将队列中的文件均匀地分给不同的线程,不会出现有些文件被处理过多次而有些文件还没有被处理过的情况。
当一个输入队列中的所有文件都被处理完后,它会将初始化时提供的文件列表中的文件全部重新加入队列。tf.train.string_input_producer函数可以设置num_epochs参数来限制加载初始文件列表中的最大轮数。当所有文件都已经被使用了设定的轮数后,如果继续尝试读取新的文件,输入队列会报OutOfRange的错误。在测试神经网络模型时,因为所有测试数据只需要使用一次,所以可以将num_epochs参数设置为1。这样在计算完一轮后程序将自动停止。在展示tf.trian.match_filenames_once和tf.train_string_input_producer函数的使用方法之前,下面给出一个简单的程序来生成样例数据。
代码语言:javascript复制import tensorflow as tf
# 创建TFRecord文件的帮助函数。
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
# 模拟海量数据情况下将数据写入不同的文件。num_shares定义了总共写入多少个文件,
# instance_per_shard定义了每个文件中有多少个数据。
num_shards = 2
instance_per_shard = 2
for i in range(num_shards):
# 将数据分为多个文件时,可以将不同文件以类似0000n-of-0000m的后缀区分。其中m表示
# 了数据总共被存在了所少个文件中,n表示当前文件的编号。式样的方式既方便了通过正
# 则表达式获取文件列表,又在文件名中加入了更多的信息。
filename = ('/path/to/data.tfrecords-%.5d-of-%.5d' % (i, num_shards))
write = tf.python_io.TFRecordWriter(filename)
# 将数据封装成Example结构并写入TFRecord文件。
for j in range(instance_per_shard):
# Example结构仅包含当前样例属于几个文件以及是当前文件的第几个样本。
example = tf.train.Example(features = tf.train.Features(feature={
'i': _int64_feature(i),
'j': _int64_feature(j)}))
writer.write(example.SerializeToString())
writer.close()
程序运行之后,在指定的目录下将生成两个文件: /path/to/data.tfrecords-00000-of-00002和/path/to/data.tfrecords-00001-of-00002。每一个文件中存储了两个样例。在生成了样例数据之后。以下代码展示了tf.train.match_flienames_once函数和tf.train.string_input_producer函数的使用方法。
代码语言:javascript复制import tensorflow as tf
# 使用tf.train.match_fliename_once函数获取文件列表
files = tf.train.match_flienames_once("/path/to/data.tfrecords-*")
# 通过tf.train.string_input_producer函数创建输入队列,输入队列中的文件列表为
# tf.train.match_filenames_once函数获取的文件列表。这里将shuffle参数设置为False
# 来避免随机打乱读文件的顺序。但一般在解决真实问题时,会将shuffle参数设置为True。
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# 读取并解析一个样本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
serialized_example,
features = {
'i': tf.FixedLenFeature([], tf.int64),
'j': tf.FixedLenFeature([], tf.int64),
})
with tf.Session() as sess:
# 虽然在本段程序中没有生命任何变量,但使用tf.train.match_fliename_once函数时
# 需要初始化一些变量。
tf.local_variables_initializer().run()
'''
打印文件列表将得到以下结果:
['/path/to/data.tfrecords-00000-of-00002'
'/path/to/data.tfrecords-00001-of-00002'
]
'''
print sess.run(files)
# 声明tf.train.Coordinator类来协同不同线程,并启动线程。
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# 多次执行获取数据的操作
for i in range(6):
print sess.run([features['i'], features['j']])
coord.request_stop()
coord.join(threads)
以上打印将输出:
[0, 0] [0, 1] [1, 0] [1, 1] [0, 0] [0, 1]
在不打乱文件列表的情况下,会依次读出样例数据中的每一个样例。而且当前所有样例都被读完之后,程序会自动从头开始。如果限制num_epochs为1,那么程序将会报错:
代码语言:javascript复制tensorflow.python.framework.errors.OutOfRangeError: FIFOQueue
'_0_input_producer' is closed and has insufficient elements (requested 1, current size 0)
[[Node: ReaderRead = ReaderRead[_class=["loc:@TFRecordReader","loc: @input_producer"],
_device="/job:localhost/relica:0/task:0/cpu:0"]
(TFRecordReader, input_producer)
]]
三、组合训练数据(batching)
在得到单个样例的预处理之后,还需要将它们组成batch,然后再提供给神经网络的输入层。tensorflow提供了tf.train.batch和tf.train.shuffle_batch函数来将单个的样例组织成batch的形式输出。这两个函数都会生成一个队列,队列的入队操作时生成单个样例的方法,而每次得到的是一个batch的样例。它们唯一的区别在于是否会将数据顺序打乱。以下代码展示了这两个函数的使用方法。
代码语言:javascript复制import tensorflow as tf
# 假设example结构中i表示一个样例的example, label = feature['i'], feature['j']
example, label = features['i'], features['j']
# 一个batch中样例的个数
batch_size = 3
# 组合样例的队列中最多可以存储的样例个数。这个队列如果太大,那么需要占用很大内存资源;
# 如果太小,那么出队操作可能会因为没有数据而被阻碍(block),从而导致训练效率很低。一般
# 来说这个队列的大小会和每一个batch的大小相关,下面一行代码给出了设置队列大小的一种方式。
capacity = 1000 3 * batch_size
# 使用tf.train.batch函数组合样例。[example, label]参数给出需要组合的元素,
# 一般example和label分别代表训练样本和这个样本对应的正确标签。batch_size参数
# 给出了每个batch中样例的个数。capaticity给出了队列的最大容量。当队列长度等于
# 容量时,tensorflow将暂停入队操作,而只是等待元素出队。当元素个数小于容量时,
# Tensorflow将暂停入队操作,而只是等待元素出队。当元素个数小于容量时,tensorflow
# 将自动重新启动入队操作。
example_batch, label_batch = tf.train.batch(
[example ,label], batch_size=batch_size, capacity = capacity)
with tf.Session() as sess:
tf.initialize_all_variables().run()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess,coord=coord)
# 获取并打印组合之后的样例,在真实问题中这个输出一般会作为神将网络的输入。
for i in range(2):
cur_example_batch, cur_label_batch = sess.run(
[example_batch, label_batch])
print cur_example_batch, cur_label_batch
coord.request_stop()
coord.join(threads)
运行以上程序可以得到以下输出:
[0 0 1] [0 1 0] [1 0 0] [1 0 1]
从这个输出可以看到tf.train.batch函数可以将单个的数据组织成3个一组的batch。在example,label中读到的数据依次为:
example: 0, lable: 0 example: 0, lable: 1 example: 1, lable: 0 example: 1, lable: 1
这是因为tf.train.batch函数不会随机打乱顺序,所以组合之后得到的数据组合成了上面给出的输出。下面一段代码展示了tf.train.shuffle_batch函数的使用方法。
代码语言:javascript复制# 和tf.train.batch的样例代码一样产生example和label。
example, label = features['i'], features['j']
# 使用tf.train.shuffle_batch函数来组合样例。tf,train.shuffle_batch函数
# 的参数大部分都和tf.train.batch函数相似,但是min_after_dequeue参数是
# tf.train.shuffle_batch函数特有的。min_after_dequeue参数限制了出队时队列中
# 元素的最少个数。当队列中的元素太少时,随机打乱样例顺序的作用就不大了。所以
# tf.train.shuffle_batch函数提供了限制出队时最少元素的个数来保证随机打乱顺序的
# 作用。当出队函数被调用但是队列中元素不够时,出队操作将等待更多的元素入队才会完成。
# 如果min_after_dequeue参数被设定,capacity也应该响应调整类满足性能需求。
example_batch, label_batch = tf.train.shuffle_batch(
[example, label], batch_size=batch_size,
capacity=capacity, min_after_dequeue=30)
# 和tf.train.batch的样例代码一样打印example_batch, label_batch。
'''
运行以上代码可以得到以下输出:
[0 1 1] [0 1 0] [1 0 0] [0 0 1]
从输出可以看到,得到的样例顺序已经被打乱了。
tf.train.batch函数和tf.train.shuffle_batch含少数除了可以将单个训练数据整理输入batch,也提供了并行化处理数据的方法。tf.train.batch函数和tf.train.shuffle_batch函数并行化的方式一致。通过设置tf.train.shuffle函数中的num_threads参数,可以指定多个线程同时执行入队操作。tf.train.shuffle_batch函数的入队操作就是数据读取以及预处理的过程。当num_threads参数大于1时,多个线程会同时读取一个文件中的不同样例并进行预处理。如果需要多个线程处理不同文件中的样例时,可以使用tf.train.shuffle_batch_size函数。此函数会从输入文件队列中获取不同的文件分配给不同的线程。这个函数会平均分配文件以保证不同文件中的数据会被尽量平均地使用。
tf.train.shuffle_batch和tf.train.shuffle_batch_join函数都可以完成多线程并行的方式来进行数据预处理,但它们各有优劣。对于tf.train.shuffle_batch函数,不同线程会读取同一个文件。如果一个文件中的样例比较相似(比如都属于同一个类别),那么神经网络的训练效果有可能会受到影响。所以在使用tf.train.shuffle_batch_join函数时,不同线程会读取不同文件。如果读取数据的线程数比总文件数还大,那么多个线程可能会读取同一个文件中相近部分的数据。而且多个线程读取多个文件可能导致过多的硬盘寻址,从而使得读取效率降低。不同的并行化方式各有所长,具体采用哪一种方法需要根据具体情况来确定。
四、输入文件处理框架
下面代码给出了输入数据的完整程序。
代码语言:javascript复制import tensorflow as tf
# 创建文件列表,并通过文件列表创建输入文件队列。在调用输入数据处理流程前,需要
# 统一所有原始数据的格式并将它们存储到TFRecord文件中。下面给出的文件列表应该包含所有
# 提供训练数据的TFRecord文件。
files = tf.train.match_filename_once("/path/to/file_pattern-*")
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# 这里假设image中存储的是图像
# 的原始数据,label为该样例所对应的标签。height、width和channels给出了图片的维度。
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
serialized_example,
features={
'image': tf.FixedLenFeature([], tf.string),
'label': tf.FixedLenFeature([], tf.int64),
'height': tf.FixedLenFeature([], tf.int64),
'width': tf.FixedLenFeature([], tf.int64),
'channels': tf.FixedLenFeature([], tf.int64),
})
image, label = features['image'], features['label']
height, width = features['height'], features['width']
channels = features['channels']
# 从原始图像解析出像素矩阵,并根据图像尺寸还原图像
decoded_image = tf.decode_raw(image, tf.uint8)
decode_image.set_shape([height, width, channels])
# 定义神经网络输入层图片的大小
image_size = 299
# preprocess_for_train为图像预处理程序
distorted_image = preprocess_for_train(decoded_image, image_size, image_size, None)
# 将处理后的图像和标签数据通过tf.train.shuffle_batch整理成神经网络训练时
# 需要的batch。
min_after_dequeue = 10000
batch_size = 100
capacity = min_after_dequeue 3 * batch_size
image_batch, label_batch = tf.train.shuffle_batch(
[distorted_image, label], batch_size=batch_size,
capacity=capacity, min_after_dequeue=min_after_dequeue
)
# 定义神经网络的结构及优化过程。image_batch可以作为输入提供给神将网络的输入层。
# label_batch则提供了输入batch中样例的正确答案。
learning_rate = 0.01
logit = inference(image_batch)
loss = calc_loss(logit, label_batch)
train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss)
# 声明会话并运行神经网络的优化过程。
with tf.Session() as sess:
# 神将网络训练准备工作。这些工作包括变量初始化、线程启动。
sess.run((tf.global_variables_initializer(),tf.local_variables_initializer()))
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# 神经网络训练过程
TRAINING_ROUNDS = 5000
for i in range(TRAINING_ROUNDS):
sess.run(train_step)
# 停止所有线程。
coord.request_stop()
coord.join(threads)
下图展示了以上代码中输入数据处理的整个流程。从图中可以看出,输入数据处理的第一步为获取存储训练数据的文件列表,这个文件列表为{A、B、C},通过tf.train.string_input_producer函数,可以选择性地将文件列表中的顺序打乱,并加入输入队列。因为是否打乱文件的顺序可选的,所以在图中用虚线来表示,tf.train.string_input_producer函数会生成并维护一个输入文件队列,不同线程中的文件读取函数可以共享这个输入文件队列。在读取样例数据之后,需要将图像进行预处理。图像预处理的过程也会通过tf.train.shuffle_batch提供的机制并行地跑在多个线程中。输入数据处理流程的最后通过tf.train.shuffle_batch函数将处理好的单个输入样例整理成batch提供给神经网络的输入层。通过这种方式,可以有效地提高数据预处理的效率,避免数据预处理为神经网络模型训练过程中的性能瓶颈。