TensorFlow 2.0 - Keras Pipeline、自定义Layer、Loss、Metric

2021-02-19 12:53:06 浏览数 (1)

文章目录

    • 1. Keras Sequential / Functional API
    • 2. 自定义 layer
    • 3. 自定义 loss
    • 4. 自定义 评估方法

学习于:简单粗暴 TensorFlow 2

1. Keras Sequential / Functional API

  • tf.keras.models.Sequential([layers...]),但是它不能表示更复杂的模型
代码语言:javascript复制
mymodel = tf.keras.models.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(100, activation='relu'),
    tf.keras.layers.Dense(10),
    tf.keras.layers.Softmax()
])
  • Functional API 可以表示更复杂的模型
代码语言:javascript复制
inp = tf.keras.Input(shape=(28, 28, 1))
x = tf.keras.layers.Flatten()(inp)
x = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)(x)
x = tf.keras.layers.Dense(units=10)(x)
out = tf.keras.layers.Softmax()(x)
mymodel = tf.keras.Model(inputs=inp, outputs=out)
代码语言:javascript复制
# 配置模型:优化器,损失函数,评估方法
mymodel.compile(
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate),
    loss=tf.keras.losses.sparse_categorical_crossentropy,
    metrics=[tf.keras.metrics.sparse_categorical_accuracy]
)

# 训练:X,Y,batch_size, epochs
mymodel.fit(data_loader.train_data, data_loader.train_label,
            batch_size=batch_size,epochs=num_epochs)
代码语言:javascript复制
# 测试
res = mymodel.evaluate(data_loader.test_data, data_loader.test_label)
print(res) # [loss, acc]

2. 自定义 layer

  • 继承 tf.keras.layers.Layer,重写 __init__buildcall 三个方法
代码语言:javascript复制
import tensorflow as tf

# 实现一个 线性layer
class myLayer(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.units = units

    def build(self, input_shape):  # input_shape 是一个tensor
        # input_shape 是第一次运行 call() 时参数inputs的形状
        # 第一次使用该层的时候,调用build
        self.w = self.add_weight(name='w',
                                   shape=[input_shape[-1], self.units],
                                   initializer=tf.zeros_initializer())
        self.b = self.add_weight(name='b',
                                   shape=[self.units],
                                   initializer=tf.zeros_initializer())

    def call(self, inputs):
        y_pred = tf.matmul(inputs, self.w)   self.b
        return y_pred
  • 使用自定义的 layer
代码语言:javascript复制
class LinearModel(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense = myLayer(units=1) # 使用

    def call(self, inputs):
        output = self.dense(inputs)
        return output
  • 简单的线性回归
代码语言:javascript复制
import numpy as np

# 原始数据
X_raw = np.array([0.0, 1., 2., 3., 4.], dtype=np.float32)
y_raw = np.array([0.01, 2., 4., 5.98, 8.], dtype=np.float32)

X = np.expand_dims(X_raw, axis=-1)
y = np.expand_dims(y_raw, axis=-1)

# 转成张量
X = tf.constant(X)
y = tf.constant(y)

model = LinearModel()
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss=tf.keras.losses.MeanSquaredError()
)
model.fit(X, y, batch_size=6, epochs=10000)
print(model.variables)

X_test = tf.constant([[5.1], [6.1]])
res = model.predict(X_test)
print(res)

输出:

代码语言:javascript复制
[<tf.Variable 'linear_model/my_layer/w:0' shape=(1, 1) dtype=float32, 
	numpy=array([[1.9959974]], dtype=float32)>, 
<tf.Variable 'linear_model/my_layer/b:0' shape=(1,) dtype=float32, 
	numpy=array([0.00600523], dtype=float32)>]
[[10.185592]
 [12.181589]]

3. 自定义 loss

  • 继承 tf.keras.losses.Loss,重写 call 方法
代码语言:javascript复制
class myError(tf.keras.losses.Loss):
    def call(self, y_true, y_pred):
        return tf.reduce_mean(tf.square(y_true - y_pred))

model = LinearModel()
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss=myError() # 使用 自定义的loss
)

4. 自定义 评估方法

  • 继承 tf.keras.metrics.Metric ,重写 __init__update_stateresult 三个方法
代码语言:javascript复制
class myMetric(tf.keras.metrics.Metric):
    def __init__(self):
        super().__init__()
        self.total = self.add_weight(name='total',
                                     dtype=tf.int32,
                                     initializer=tf.zeros_initializer())
        self.count = self.add_weight(name='count',
                                     dtype=tf.int32,
                                     initializer=tf.zeros_initializer())

    def update_state(self, y_true, y_pred, sample_weight=None):
        values = tf.cast(tf.abs(y_true - y_pred) < 0.1, tf.int32)
        # 这里简单的判断误差 < 0.1, 算 true
        self.total.assign_add(tf.shape(y_true)[0])
        self.count.assign_add(tf.reduce_sum(values))

    def result(self):
        return self.count / self.total


model = LinearModel()
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss=myError(),
    metrics=[myMetric()] # 调用自定义的 metric
)

0 人点赞