TensorFlow_Probability实现Normalizing flows实例

2021-03-14 18:55:22 浏览数 (1)

标准化流(Normalizing Flows)是算法工具包中的一种便捷技术,它将简单的密度(如高斯分布)转换为丰富的复杂分布,可用于生成模型,RL和变分推断。 TensorFlow具有一组不错的功能,可轻松构建流程并对其进行训练以适应实际数据。

现在流行的生成模型Flow三部曲:NICE(Nonlinear Independent Components Estimation 非线性独立分量估计),RealNVP(real-valued non-volume preserving 实值非体积保留),Glow(Generative Flow with Invertible 1x1 Convolutions)其基石就是Normalizing Flow原理。

1 理论基础:变量变换公式

回顾下连续随机变量的变量公式的变化。这些重要的公式对于Normorlizing Flows理论是至关重要的。

标准化流(Normalizing flows)是利用变量变换公式理论来估计未知目标数据密度的一类模型。简单的说,假定简单分布(例如高斯分布)通过一系列光滑可逆变量变换来匹配现有问题的分布(或称观测)。这个假定之所以成立关键是,需要把每一步变换都是可逆的,现有深度学习的模型通常不能满足。

为了学习最优参数θ,应用最大似然原理搜索。这也突出了设计标准化流模型的另外一个关键:设计有效简化Jacobian行列式的流模型。

Normalizing flowsNormalizing flows

2 双射器(Bijectors)

代码语言:txt复制
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
tfb = tfp.bijectors
tfpl = tfp.layers
# Define base distribution
normal = tfd.Normal(loc=0., scale=1.)
# Sample from base distribution
n = 1000
z = normal.sample(n)

# Define scale and shift
scale = 4.5
shift = 7
# Define chain bijector
scale_and_shift = tfb.Chain([tfb.Shift(shift), tfb.Scale(scale)])

# Apply the forward transformation
x = scale_and_shift(z)

# Plot z - x density
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.hist(z, bins=60, density=True )
plt.gca().set_title('z density')
plt.subplot(1, 2, 2)
plt.hist(x, bins=60, density=True)
plt.gca().set_title('x density')
plt.show()

一个双射器类,由三部分构成

1)前向映射,f从d维实空间映射到d维实空间;

2) 反向映射;

3)雅各比矩阵的逆对数行列式(inverse log determinant of the Jacobian)

3 双射器实现标准化流的示例

3.1 Load dataset

代码语言:txt复制
# Load dataset
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
n_samples = 1000
noisy_moons = datasets.make_moons(n_samples=n_samples, noise=.05)
X, y = noisy_moons
X_data = StandardScaler().fit_transform(X)
xlim, ylim = [-2, 2], [-2, 2]

# Plot with labels
y_label = y.astype(np.bool)
X_train, Y_train = X_data[..., 0], X_data[..., 1]
plt.scatter(X_train[y_label], Y_train[y_label], s=10, color='blue')
plt.scatter(X_train[y_label == False], Y_train[y_label == False], s=10, color='red')
plt.legend(['label: 1', 'label: 0'])
plt.xlim(xlim)
plt.ylim(ylim)

3.2 utils functions

代码语言:txt复制
# Define base distribution
base_distribution = tfd.Normal(loc=0, scale=1)

# Define the trainable distribution
def make_masked_autoregressive_flow(hidden_units=[16, 16], activation='relu'):
    made = tfb.AutoregressiveNetwork(
        params=2, event_shape=[2], hidden_units=hidden_units, activation=activation)
    return tfb.MaskedAutoregressiveFlow(shift_and_log_scale_fn=made)

trainble_distribution = tfd.TransformedDistribution(base_distribution, make_masked_autoregressive_flow(),event_shape=[2])

from mpl_toolkits.axes_grid1 import make_axes_locatable
from tensorflow.compat.v1 import logging

# Define a plot contour routine
def plot_contour_prob(dist, rows=1, title=[''], scale_fig=4):
    cols = int(len(dist) / rows)
    xx = np.linspace(-5.0, 5.0, 100)
    yy = np.linspace(-5.0, 5.0, 100)
    X, Y = np.meshgrid(xx, yy

    fig, ax = plt.subplots(rows, cols, figsize=(scale_fig * cols, scale_fig * rows))
    fig.tight_layout(pad=4.5)
    i = 0
    for r in range(rows):
        for c in range(cols):
            Z = dist[i].prob(np.dstack((X, Y)))
            if len(dist) == 1:
                axi = ax
            elif rows == 1:
                axi = ax[c]
            else:
                axi = ax[r, c]

            # Plot contour
            p = axi.contourf(X, Y, Z)

            # Add a colorbar
            divider = make_axes_locatable(axi)
            cax = divider.append_axes("right", size="5%", pad=0.1)
            cbar = fig.colorbar(p, cax=cax)

            # Set title and labels
            axi.set_title('Filled Contours Plot: '   str(title[i]))
            axi.set_xlabel('x')
            axi.set_ylabel('y')

            i  = 1
    plt.show()

# Define a scatter plot routine for the bijectors
def _plot(results, rows=1, legend=False):
    cols = int(len(results) / rows)
    f, arr = plt.subplots(rows, cols, figsize=(4 * cols, 4 * rows))
    i = 0
    for r in range(rows):
        for c in range(cols):
            res = results[i]
            X, Y = res[..., 0].numpy(), res[..., 1].numpy()
            if rows == 1:
                p = arr[c]
            else:
                p = arr[r, c]
            p.scatter(X, Y, s=10, color='red')
            p.set_xlim([-5, 5])
            p.set_ylim([-5, 5])
            p.set_title(names[i])
            
            i  = 1

利用TFP库定义base_distribution,使用make_masked_autoregressive_flow定义一个可训练的分布(tfd.TransformedDistribution)。这里是不是没有Bijector的事情?其实一个分布经过双射器就变成了变换后分布,因为双射器可进可退,分布也就可以双向变换,结合深度学习框架,就变成可训练的分布。

Bijector(dist) ==> TransformedDistribution(dist,bijector)

代码语言:txt复制
x = base_distribution.sample((1000,2))
names = [base_distribution.name, trainble_distribution.bijector.name]
samples = [x, trainble_distribution.bijector.forward(x)]
# Plot
_plot(samples)

从前面定义base_distribution采样一批数据点,通过前面的可训练分布trainble_distribution生成新的数据分布

3.3 训练单个MaskedAutoregressiveFlow bijector来拟合目标分布

代码语言:txt复制
from tensorflow.keras.callbacks import LambdaCallback
# Define a training routine

def train_dist_routine(trainable_distribution, n_epochs=200, batch_size=None, n_disp=100):
    x_ = Input(shape=(2,), dtype=tf.float32)
    log_prob_ = trainable_distribution.log_prob(x_)
    model = Model(x_, log_prob_)

    model.compile(optimizer=tf.optimizers.Adam(),
                  loss=lambda _, log_prob: -log_prob)

    ns = X_data.shape[0]
    if batch_size is None:
        batch_size = ns

    # Display the loss every n_disp epoch
    epoch_callback = LambdaCallback(
        on_epoch_end=lambda epoch, logs: 
                        print('n Epoch {}/{}'.format(epoch 1, n_epochs, logs),
                              'nt '   (': {:.4f}, '.join(logs.keys())   ': {:.4f}').format(*logs.values()))
                                       if epoch % n_disp == 0 else False 
    )


    history = model.fit(x=X_data,
                        y=np.zeros((ns, 0), dtype=np.float32),
                        batch_size=batch_size,
                        epochs=n_epochs,
                        validation_split=0.2,
                        shuffle=True,
                        verbose=False,
                        callbacks=[epoch_callback])
    return history

# Train the distribution
history = train_dist_routine(trainble_distribution, n_epochs=600, n_disp=50)    
# Get losses
train_losses = history.history['loss']
valid_losses = history.history['val_loss']
# Plot loss vs epoch
plt.plot(train_losses, label='train')
plt.plot(valid_losses, label='valid')
plt.legend()
plt.xlabel("Epochs")
plt.ylabel("Negative log likelihood")
plt.title("Training and validation loss curves")
plt.show()

可以看到,可训练分布()配合nll loss也可以在深度学习框架下训练。

代码语言:txt复制
# Define a plot routine
def visualize_training_data(samples):
    f, arr = plt.subplots(1, 2, figsize=(15, 6))
    names = ['Data', 'Trainable']
    samples = [tf.constant(X_data), samples[-1]]

    for i in range(2):
        res = samples[i]
        X, Y = res[..., 0].numpy(), res[..., 1].numpy()
        arr[i].scatter(X, Y, s=10, color='red')
        arr[i].set_xlim([-2, 2])
        arr[i].set_ylim([-2, 2])
        arr[i].set_title(names[i])
visualize_training_data(samples)

训练后,其输出数据点分布开始趋近与目标分布

3.4 训练一组的MaskedAutoregressiveFlow Bijectors来拟合目标分布

代码语言:txt复制
# Define a more expressive model
num_bijectors = 6
bijectors = []

for i in range(num_bijectors):
    masked_auto_i = make_masked_autoregressive_flow(hidden_units=[256,256], activation='relu')
    bijectors.append(masked_auto_i)
    bijectors.append(tfb.Permute(permutation=[1,0]))

flow_bijector = tfb.Chain(list(reversed(bijectors[:-1])))

# Define the trainable distribution
trainable_distribution = tfd.TransformedDistribution(distribution=base_distribution,
                                                   bijector=flow_bijector,
                                                   event_shape=[2])

# Make samples

def make_samples():
    x = base_distribution.sample((1000, 2))
    samples = [x]
    names = [base_distribution.name]
    for bijector in reversed(trainable_distribution.bijector.bijectors):
        x = bijector.forward(x)
        samples.append(x)
        names.append(bijector.name)
    return names, samples

names, samples = make_samples()
# Plot
_plot(samples, 3)
visualize_training_data(samples)
训练前:各个双射器输出训练前:各个双射器输出
训练前:目标和输出拟合训练前:目标和输出拟合

一个双射器的可训练分布表达能力有限,自然而然考虑级联多个双射器的可训练分布就能更好趋近目标分布。

定义6个双射器级联的可训练分布,其各个双射器与permute(用于调整rank)的初始情况如上图。

代码语言:txt复制
history = train_dist_routine(trainable_distribution, n_epochs=600, n_disp=50)
names, samples = make_samples()
_plot(samples, 3)
visualize_training_data(samples)
训练后:各个双射器输出训练后:各个双射器输出
训练后:目标和输出拟合训练后:目标和输出拟合

4 扩展阅读

4.1 变量变换公式相关

  • https://en.wikipedia.org/wiki/Probability_density_function
  • https://en.wikipedia.org/wiki/Cumulative_distribution_function
  • https://en.wikipedia.org/wiki/Monotonic_function

4.2 相关资源

Probabilistic Deep Learning with TensorFlow 2, by 伦敦帝国学院

Eric Jang - Normalizing Flows Tutorial

4.2 代码

Real NVP (pytorch): chrischute/real-nvp

Real NVP (tensorflow): tensorflow/models/tree/master/research/real_nvp

Glow (tensorflow): openai/glow

0 人点赞