【云+社区年度征文】tensorflow2 tfrecorddataset+estimator 训练预测加载全流程概述

2020-12-20 15:18:53 浏览数 (1)

toc

0. 背景

image.pngimage.png

本文主要记录切换项目至TF2.0 后使用TFRecordDataset保存训练数据与使用estimator建模及后续的模型或者checkpoint加载使用预测的一些基本方法及踩过的坑。

1. 训练数据准备

首先需要将处理好的训练数据保存为TFRecord格式,方便TF框架读取。

 TensorFlow提供了一种统一的格式来存储数据,这个格式就是TFRecords。为了高效的读取数据,可以将数据进行序列化存储,这样也便于网络流式读取数据,TFRecord就是一种保存记录的方法可以允许你讲任意的数据转换为TensorFlow所支持的格式,这种方法可以使TensorFlow的数据集更容易与网络应用架构相匹配。TFRecord是谷歌推荐的一种常用的存储二进制序列数据的文件格式,理论上它可以保存任何格式的信息。下面是Tensorflow的官网给出的文档结构,整个文件由文件长度信息,长度校验码,数据,数据校验码组成。

使用TFRecord代替之前的Pandas读取数据原因与TFrecord文件格式与Dataset API优点主要有一下几点:

  1. 节省内存,不需要将所有数据读取至内存,所以可以使用更多的数据集进行训练不再受内存限制;
  2. 简洁性: 常规方式:用python代码来进行batch,shuffle,padding等numpy类型的数据处理,再用placeholder feed_dict来将其导入到graph中变成tensor类型。因此在网络的训练过程中,不得不在tensorflow的代码中穿插python代码来实现控制。 Dataset API:将数据直接放在graph中进行处理,整体对数据集进行上述数据操作,使代码更加简洁;
  3. 对接性: TensorFlow中也加入了高级API (Estimator、Experiment,Dataset)帮助建立网络,和Keras等库不一样的是:这些API并不注重网络结构的搭建,而是将不同类型的操作分开,帮助周边操作。可以在保证网络结构控制权的基础上,节省工作量。若使用Dataset API导入数据,后续还可选择与Estimator对接。

tfrecord_save.py

代码语言:txt复制
MULTI_COLUMNS = []
SINGLE_COLUMNS = []
NUMBER_COLUMNS = []

writer = tf.io.TFRecordWriter(write_url)

for row in dataframe.itertuples():
    feature_map = {}
    feature_map['label'] = tf.train.Feature(int64_list=tf.train.Int64List(value=[getattr(row,'label')]))
    
    for i in MULTI_COLUMNS:
        feature_map[i] = tf.train.Feature(bytes_list=tf.train.BytesList(value=split(getattr(row,i))))
    for j in SINGLE_COLUMNS:
        feature_map[j] = tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(getattr(row,j) , encoding = 'utf-8')]))
    for l in NUMBER_COLUMNS:
        feature_map[l] = tf.train.Feature(float_list=tf.train.FloatList(value=[getattr(row,l)]))

    example = tf.train.Example(
        features=tf.train.Features(
            feature = feature_map
        )
    )
    writer.write(record=example.SerializeToString())
writer.close()

2. 训练

2.1 划分与读取训练集与测试集

推荐在产生TFRECORD时就划分好测试集与训练集,在input_fn读取时读取

代码语言:txt复制
def train_input_fn():
    train_ds = tf.data.TFRecordDataset(filenames=train_data_dir,num_parallel_reads = 104).shuffle(SUFFLE_SIZE, reshuffle_each_iteration=True).batch(65536).prefetch(4).repeat()
    return train_ds

def eval_input_fn():
    test_ds = tf.data.TFRecordDataset(filenames=eval_data_dir,num_parallel_reads = 104).shuffle(SUFFLE_SIZE, reshuffle_each_iteration=True).batch(65536).prefetch(4).repeat()
    return test_ds

或者使用TFRecordDataset API filter, 不过实测速度较慢

代码语言:txt复制
all = tf.data.Dataset.from_tensor_slices(list(range(1, 21))) 
        .shuffle(10, reshuffle_each_iteration=False)

test_dataset = all.enumerate() 
                    .filter(lambda x,y: x % 4 == 0) 
                    .map(lambda x,y: y)

train_dataset = all.enumerate() 
                    .filter(lambda x,y: x % 4 != 0) 
                    .map(lambda x,y: y)

2.2 特征工程与构造特征输入Feature cloumns

Feature cloumns是原始数据和Estimator模型之间的桥梁,它们被用来把各种形式的原始数据转换为模型能够使用的格式。

深度神经网络只能处理数值数据,网络中的每个神经元节点执行一些针对输入数据和网络权重的乘法和加法运算。然而,现实中的有很多非数值的类别数据,比如产品的品牌、类目等,这些数据如果不加转换,神经网络是无法处理的。另一方面,即使是数值数据,在仍给网络进行训练之前有时也需要做一些处理,比如标准化、离散化等。

这里简单介绍一下一些常用的feature columns与用法。在Tensorflow中,通过调用tf.feature_column模块来创建feature columns。

有两大类feature column,一类是生成dense tensor的Dense Column;另一类是生成sparse tensor的Categorical Column。具体地,目前tensorflow提供的feature columns如下图所示。

image.pngimage.png
2.2.1 Numeric column
代码语言:txt复制
tf.feature_column.numeric_column(
    key,
    shape=(1,),
    default_value=None,
    dtype=tf.float32,
    normalizer_fn=None
)
  • key: 特征的名字。也就是对应的列名称。
  • shape: 该key所对应的特征的shape. 默认是1,但是比如one-hot类型的,shape就不是1,而是实际的维度。总之,这里是key所对应的维度,不一定是1.
  • default_value: 如果不存在使用的默认值
  • normalizer_fn: 对该特征下的所有数据进行转换。如果需要进行normalize,那么就是使用normalize的函数.这里不仅仅局限于normalize,也可以是任何的转换方法,比如取对数,取指数,这仅仅是一种变换方法.
2.2.2 Bucketized column

Bucketized column用来把numeric column的值按照提供的边界(boundaries)离散化为多个值。离散化是特征工程常用的一种方法。例如,把年份离散化为4个阶段。

代码语言:txt复制
tf.feature_column.bucketized_column(
    source_column,
    boundaries
)
  • source_column: 必须是numeric_column
  • boundaries: 不同的桶。boundaries=0., 1., 2.,产生的bucket就是, (-inf, 0.), [0., 1.), [1., 2.), and [2., inf), 每一个区间分别表示0, 1, 2, 3,所以相当于分桶分了4个.
代码语言:txt复制
# First, convert the raw input to a numeric column.
numeric_feature_column = tf.feature_column.numeric_column("Year")

# Then, bucketize the numeric column on the years 1960, 1980, and 2000.
bucketized_feature_column = tf.feature_column.bucketized_column(
    source_column = numeric_feature_column,
    boundaries = [1960, 1980, 2000])
2.2.3 Categorical identity column

与Bucketized column类似,Categorical identity column用单个唯一值表示bucket。

代码语言:txt复制
identity_feature_column = tf.feature_column.categorical_column_with_identity(
    key='my_feature_b',
    num_buckets=4) # Values [0, 4)
2.2.4 Categorical vocabulary column

顾名思义,Categorical vocabulary column把一个vocabulary中的string映射为数值型的类别特征,是做one-hot编码的很好的方法。在tensorflow中有两种提供词汇表的方法,一种是用list,另一种是用file,对应的feature column分别为:

tf.feature_column.categorical_column_with_vocabulary_list

tf.feature_column.categorical_column_with_vocabulary_file

2.2.5 Hashed Column

为类别特征提供词汇表有时候会过于繁琐,特别是在词汇表非常大的时候,词汇表会非常消耗内存。tf.feature_column.categorical_column_with_hash_bucket允许用户指定类别的总数,通过hash的方式来得到最终的类别ID。

2.2.6 Crossed column

交叉组合特征也是一种很常用的特征工程手段,尤其是在使用LR模型时。Crossed column仅仅适用于sparser特征,产生的依然是sparsor特征。

代码语言:txt复制
tf.feature_column.crossed_column(
    keys,
    hash_bucket_size,
    hash_key=None
)

Crossed特征对keys的笛卡尔积执行hash操作,再把hash的结果对hash_bucket_size取模得到最终的结果:Hash(cartesian product of features) % hash_bucket_size

2.2.7 Indicator and embedding columns

Indicator columns 和 embedding columns 不能直接作用在原始特征上,而是作用在categorical columns上。

当某些特征的类别数量非常大时,使用indicator_column来把原始数据转换为神经网络的输入就变得非常不灵活,这时通常使用embedding column把原始特征映射为一个低维稠密的实数向量。同一类别的embedding向量间的距离通常可以用来度量类别直接的相似性。

image.pngimage.png
代码语言:txt复制
tf.feature_column.embedding_column(
    categorical_column,
    dimension,
    combiner='mean',
    initializer=None,
    ckpt_to_load_from=None,
    tensor_name_in_ckpt=None,
    max_norm=None,
    trainable=True
)
categorical_column: 使用categoryical_column产生的sparsor column
dimension: 定义embedding的维数
combiner: 对于多个entries进行的推导。默认是meam, 但是 sqrtn在词袋模型中,有更好的准确度。
initializer: 初始化方法,默认使用高斯分布来初始化。
tensor_name_in_ckpt: 可以从check point中恢复
ckpt_to_load_from: check point file,这是在 tensor_name_in_ckpt 不为空的情况下设置的.
max_norm: 默认是l2
2.2.8 Weighted categorical column

当需要给一个类别特征赋予一定的权重,比如给用户行为序列按照行为发生的时间到某个特定时间的差来计算不同的权重,这是可以用到weighted_categorical_column。

代码语言:txt复制
tf.feature_column.weighted_categorical_column(
    categorical_column,
    weight_feature_key,
    dtype=tf.float32
)

2.3 建立estimator并开始训练

如建立一个wide & deep 模型可以使用estimator自带的模型DNNLinearCombinedClassifier

代码语言:txt复制
hidden_units = [256, 64, 32]
dropout = 0.3
my_dnn_optimizer = tf.keras.optimizers.Nadam(learning_rate=0.001)
config = tf.estimator.RunConfig(log_step_count_steps=20, save_summary_steps=20)

est = tf.estimator.DNNLinearCombinedClassifier(
    linear_feature_columns=wide_columns,
    dnn_feature_columns=deep_columns,
    weight_column='weight',
    dnn_hidden_units=hidden_units,
    dnn_optimizer=my_dnn_optimizer,
    dnn_dropout=dropout,
    model_dir=model_dir,
    config=config,
    n_classes=4)

如需要自定义模型,需要重写model_fn,这里不展开讲述。

之后就可以开始训练,使用train_and_evaluate方法可以训练同时进行验证, 训练过程将会保存checkpoint, 使用RunConfig可以设置log打印频率保存频率以及分布式训练等设置。

代码语言:txt复制
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=2000)
eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn)
tf.estimator.train_and_evaluate(est, train_spec, eval_spec)

训练完成后保存模型saved model pb格式

代码语言:txt复制
feature_spec = tf.feature_column.make_parse_example_spec(feature_columns)
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
export_dir = est.export_saved_model(model_dir, serving_input_receiver_fn)

3. 加载模型并预测

3.1 加载checkpoint

模型训练到一半可以中途暂停,estimator可以通过model_dir读取训练到一半的模型并进行预测、继续训练或者直接保存模型。

代码语言:txt复制
## 加载
est = tf.estimator.DNNLinearCombinedClassifier(
    linear_feature_columns=wide_columns,
    dnn_feature_columns=deep_columns,
    weight_column='weight',
    dnn_hidden_units=hidden_units,
    dnn_optimizer=my_dnn_optimizer,
    dnn_dropout=dropout,
    model_dir=model_dir,
    n_classes=4)
## 预测
result = est.predict(eval_input_fn)
output = []
for prediction in zip(result):
    output.append(int(prediction[0]['classes']))
## 保存  
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
export_dir = est.export_saved_model(model_dir, serving_input_receiver_fn)

3.2 加载pb saved model

代码语言:txt复制
imported = tf.saved_model.load(export_dir)
f = imported.signatures["predict"]
def predict(dataframe):
    for row in dataframe.itertuples():
        feature_map = {}
        feature_map[l] = tf.train.Feature(float_list=tf.train.FloatList(value=[getattr(row,l)]))
        example = tf.train.Example(
            features=tf.train.Features(
                feature = feature_map
            )
        )
        ex = tf.constant([example.SerializeToString()])
        result = f(examples=ex)
    return result

Ref

  1. https://www.tensorflow.org/tutorials/load_data/tfrecord
  2. https://zhuanlan.zhihu.com/p/33223782
  3. https://zhuanlan.zhihu.com/p/41663141
  4. http://d0evi1.com/tensorflow/custom_estimators/

0 人点赞