混合密度网络-Mixture Density Networks

2022-04-28 19:18:17 浏览数 (1)

翻译自: https://blog.otoro.net/2015/11 /24/mixture-density-networks-with- tensorflow/?tdsourcetag=s_pctim_ai omsg Notebook-(Tensorflow实现): http://otoro.net/ml/ipynb/mixture/mixture.html Notebook-(PyTorch实现): https://github.com/hardmaru/pytorch_notebooks/blob/master/mixture_density_networks.ipynb

代码语言:javascript复制
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import math
NSAMPLE = 1000
x_data = np.float32(np.random.uniform(-10.5, 10.5, (1, NSAMPLE))).T
r_data = np.float32(np.random.normal(size=(NSAMPLE,1)))
y_data = np.float32(np.sin(0.75*x_data)*7.0 x_data*0.5 r_data*1.0)

plt.figure(figsize=(8, 8))
plot_out = plt.plot(x_data,y_data,'ro',alpha=0.3)
plt.show()

用于拟合的训练数据可视化效果如下:

我们定义如下一个20个节点的单隐层神经网络,网络结构非常简单:

Y = W_{out}tanh(WX b) b_{out}

Tensorflow实现该神经网络的代码如下:

代码语言:javascript复制
x = tf.placeholder(dtype=tf.float32, shape=[None,1])
y = tf.placeholder(dtype=tf.float32, shape=[None,1])

NHIDDEN = 20
W = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=1.0, dtype=tf.float32))
b = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=1.0, dtype=tf.float32))

W_out = tf.Variable(tf.random_normal([NHIDDEN,1], stddev=1.0, dtype=tf.float32))
b_out = tf.Variable(tf.random_normal([1,1], stddev=1.0, dtype=tf.float32))

hidden_layer = tf.nn.tanh(tf.matmul(x, W)   b)
y_out = tf.matmul(hidden_layer,W_out)   b_out

lossfunc = tf.nn.l2_loss(y_out-y);

train_op = tf.train.RMSPropOptimizer(learning_rate=0.1, decay=0.8).minimize(lossfunc)

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())

NEPOCH = 1000
for i in range(NEPOCH):
    sess.run(train_op,feed_dict={x: x_data, y: y_data})


x_test = np.float32(np.arange(-10.5,10.5,0.1))
x_test = x_test.reshape(x_test.size,1)
y_test = sess.run(y_out,feed_dict={x: x_test})

plt.figure(figsize=(8, 8))
plt.plot(x_data,y_data,'ro', x_test,y_test,'bo',alpha=0.3)
plt.show()
sess.close()

该神经网络的拟合效果如下,其中中红色是训练数据,蓝色是网络的输出值。

可以看到,如预期一样,单隐层神经网络可以很好地拟合正弦函数。然而,这种拟合方法只有当我们要用神经网络逼近的函数是一对一或多对一的函数时才有效。

假设我们将训练数据做如下反转,这样我们就有了一个多值函数的训练数据。

x=7.0 sin( 0.75 y) 0.5 y epsilon
代码语言:javascript复制
temp_data = x_data
x_data = y_data
y_data = temp_data

plt.figure(figsize=(8, 8))
plot_out = plt.plot(x_data,y_data,'ro',alpha=0.3)
plt.show()

现在我们的x可能会对应多个y,如果我们用同样的方法来拟合翻转后的数据,会发现它的效果很差。

可以看到,我们原来的模型已经失效了,无论增加多少层,增加多少节点数,都不能拟合出正确的多值函数曲线。

为什么会这样呢? 这是因为我们的神经网络最后的输出是一个确定值,然而实际上我们需要的是多个『可能』的值。

怎么解决这个问题呢? 我们换一种思路——假如我们输出的不是一个值,而是目标值的一个概率分布。比如当 x=1 时,我们得到 y 有两个取值 { 1, -1 },并且每个取值的概率都是 0.5。这就像薛定谔的那只量子叠加态的猫一样,我们得到的结果是一个概率分布,只有当我们进行一次『观察』时,才会得到一个具体结果!

这就是混合密度网络(Mixture Density Network, MDN)的思想。

混合密度网络(Mixture Density Network)

混合密度网络(Mixture Density Networks,MDNs)是由Christopher Bishop在上世纪90年代提出的,通过这个方法神经网络的预测输出不再是单一值,而是一堆概率分布。这是一个非常Powerful的思路,并已经广泛应用于机器学习的各个领域。

Bishop的MDNs预测单个分类的混合高斯分布(Mixture Gaussian Distributions),其中混合高斯分布的输出是多个均值和方差都不相同的高斯分布的加权和。

P(Y = y | X = x) = sum_{k=0}^{K-1} Pi_{k}(x) phi(y, mu_{k}(x), sigma_{k}(x)), sum_{k=0}^{K-1} Pi_{k}(x) = 1
Pi_{k} = frac{exp(Z_{k})}{sum_{i=0}^{23} exp(Z_{i})}, sigma = exp(Z_{24rightarrow43}), mu = Z_{44rightarrow71}
代码语言:javascript复制
NHIDDEN = 24
STDEV = 0.5
KMIX = 24 # number of mixtures
NOUT = KMIX * 3 # pi, mu, stdev

x = tf.placeholder(dtype=tf.float32, shape=[None,1], name="x")
y = tf.placeholder(dtype=tf.float32, shape=[None,1], name="y")

Wh = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=STDEV, dtype=tf.float32))
bh = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=STDEV, dtype=tf.float32))

Wo = tf.Variable(tf.random_normal([NHIDDEN,NOUT], stddev=STDEV, dtype=tf.float32))
bo = tf.Variable(tf.random_normal([1,NOUT], stddev=STDEV, dtype=tf.float32))

hidden_layer = tf.nn.tanh(tf.matmul(x, Wh)   bh)
output = tf.matmul(hidden_layer,Wo)   bo

def get_mixture_coef(output):
    out_pi = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam")
    out_sigma = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam")
    out_mu = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam")

    out_pi, out_sigma, out_mu = tf.split(output, 3, 1)

    max_pi = tf.reduce_max(out_pi, 1, keep_dims=True)
    out_pi = tf.subtract(out_pi, max_pi)

    out_pi = tf.exp(out_pi)

    normalize_pi = tf.reciprocal(tf.reduce_sum(out_pi, 1, keep_dims=True))
    out_pi = tf.multiply(normalize_pi, out_pi)

    out_sigma = tf.exp(out_sigma)

    return out_pi, out_sigma, out_mu

out_pi, out_sigma, out_mu = get_mixture_coef(output)
NSAMPLE = 2500

y_data = np.float32(np.random.uniform(-10.5, 10.5, (1, NSAMPLE))).T
r_data = np.float32(np.random.normal(size=(NSAMPLE,1))) # random noise
x_data = np.float32(np.sin(0.75*y_data)*7.0 y_data*0.5 r_data*1.0)

plt.figure(figsize=(8, 8))
plt.plot(x_data,y_data,'ro', alpha=0.3)
plt.show()

这次我们不能简单的使用L2损失函数,我们希望最大化真实的y值在混合概率密度中的概率,将经过标准化的y_normal与概率权重相乘,并取对数相加后取相反数,这个结果就是概率联合分布的最大似然函数,所以更合适的Loss Function是联合概率分布的负的对数似然。

CostFunction(y | x) = -log[ sum_{k}^K Pi_{k}(x) phi(y, mu(x), sigma(x)) ]

对于训练集中每一个点(x, y),我们最小化神经网络输出的概率分布与真实点的损失函数。

代码语言:javascript复制
oneDivSqrtTwoPI = 1 / math.sqrt(2*math.pi) # normalisation factor for gaussian, not needed.
def tf_normal(y, mu, sigma):
    result = tf.subtract(y, mu)
    result = tf.multiply(result,tf.reciprocal(sigma))
    result = -tf.square(result)/2
    return tf.multiply(tf.exp(result),tf.reciprocal(sigma))*oneDivSqrtTwoPI

def get_lossfunc(out_pi, out_sigma, out_mu, y):
    result = tf_normal(y, out_mu, out_sigma)
    result = tf.multiply(result, out_pi)
    result = tf.reduce_sum(result, 1, keep_dims=True)
    result = -tf.log(result)
    return tf.reduce_mean(result)

lossfunc = get_lossfunc(out_pi, out_sigma, out_mu, y)
train_op = tf.train.AdamOptimizer().minimize(lossfunc)

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())

NEPOCH = 10000
loss = np.zeros(NEPOCH) # store the training progress here.
for i in range(NEPOCH):
    sess.run(train_op,feed_dict={x: x_data, y: y_data})
    loss[i] = sess.run(lossfunc, feed_dict={x: x_data, y: y_data})

plt.figure(figsize=(8, 8))
plt.plot(np.arange(100, NEPOCH,1), loss[100:], 'r-')
plt.show()

我们发现大约经过6000多次迭代之后Loss停止下降,我们得到了训练完成的神经网络。

至此我们得到了一个混合概率分布,下一步我们根据预测出的联合概率密度来随机生成具体的点。这里通过采样一些x坐标点,随机生成10个对应的y轴坐标,从而验证pdf生成的数据是否与训练数据匹配。

代码语言:javascript复制
x_test = np.float32(np.arange(-15,15,0.1))
NTEST = x_test.size
x_test = x_test.reshape(NTEST,1) # needs to be a matrix, not a vector

def get_pi_idx(x, pdf):
    N = pdf.size
    accumulate = 0
    for i in range(0, N):
        accumulate  = pdf[i]
        if (accumulate >= x):
            return i
    print ('error with sampling ensemble')
    return -1

def generate_ensemble(out_pi, out_mu, out_sigma, M = 10):
    NTEST = x_test.size
    result = np.random.rand(NTEST, M) # initially random [0, 1]
    rn = np.random.randn(NTEST, M) # normal random matrix (0.0, 1.0)
    mu = 0
    std = 0
    idx = 0

    # transforms result into random ensembles
    for j in range(0, M):
        for i in range(0, NTEST):
            idx = get_pi_idx(result[i, j], out_pi[i])
            mu = out_mu[i, idx]
            std = out_sigma[i, idx]
            result[i, j] = mu   rn[i, j]*std
    return result

out_pi_test, out_sigma_test, out_mu_test = sess.run(get_mixture_coef(output), feed_dict={x: x_test})

y_test = generate_ensemble(out_pi_test, out_mu_test, out_sigma_test)

plt.figure(figsize=(8, 8))
plt.plot(x_data,y_data,'ro', x_test,y_test,'bo',alpha=0.3)
plt.show()

如上图所示,我们把神经网络生成的数据点(蓝色)和训练数据(红色)一同显示出来,可以看到,除了少数的Outlier,我们的分布较好的拟合了这些数据。

代码语言:javascript复制
plt.figure(figsize=(8, 8))
plt.plot(x_test,out_mu_test,'go',alpha=0.5)
plt.plot(x_test,y_test,'bo',alpha=0.1)
plt.show()

最后,我们可以绘制出整个混合概率分布(Mixture Gaussian Distributions)的热力图(HeatMap)来观察神经网络的输出效果。

代码语言:javascript复制
x_heatmap_label = np.float32(np.arange(-15,15,0.1))
y_heatmap_label = np.float32(np.arange(-15,15,0.1))

def custom_gaussian(x, mu, std):
    x_norm = (x-mu)/std
    result = oneDivSqrtTwoPI*math.exp(-x_norm*x_norm/2)/std
    return result

def generate_heatmap(out_pi, out_mu, out_sigma, x_heatmap_label, y_heatmap_label):
    N = x_heatmap_label.size
    M = y_heatmap_label.size
    K = KMIX

    z = np.zeros((N, M)) # initially random [0, 1]

    mu = 0
    std = 0
    pi = 0

  # transforms result into random ensembles
    for k in range(0, K):
        for i in range(0, M):
            pi = out_pi[i, k]
            mu = out_mu[i, k]
            std = out_sigma[i, k]
            for j in range(0, N):
                z[N-j-1, i]  = pi * custom_gaussian(y_heatmap_label[j], mu, std)
    return z

def draw_heatmap(xedges, yedges, heatmap):
    extent = [xedges[0], xedges[-1], yedges[0], yedges[-1]]
    plt.figure(figsize=(8, 8))
    plt.imshow(heatmap, extent=extent)
    plt.show()

z = generate_heatmap(out_pi_test, out_mu_test, out_sigma_test, x_heatmap_label, y_heatmap_label)
draw_heatmap(x_heatmap_label, y_heatmap_label, z)

0 人点赞