文章目录
tensorflow
代码语言:javascript
复制# -*- coding:utf-8 -*-
# /usr/bin/python
'''
Author:Yan Errol
Email:2681506@gmail.com
Wechat:qq260187357
Date:2019-04-23--08:12
Describe: BP网络的设计
'''
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
# 读取数据集
def load_datasets(data_path):
x_input = []
y_input = []
with open(data_path, "r") as data:
lines = data.readlines()
for line in lines:
one_line = line.strip().split(',')
one_line = [float(item) for item in one_line]
x_input.append(one_line[:-1])
y_input.append(one_line[-1])
return np.array(x_input), np.array(y_input)
# BP网络参数初始化
def init_weight(in_node, out_node, init_method):
np.random.seed(1)
weight = 0
# 均值为0,方差为1的正态分布初始化
if init_method == "random_normal":
weight = np.random.randn(in_node, out_node)
# Xavier初始化
elif init_method == "Xavier":
weight = np.random.randn(in_node, out_node) / np.sqrt(in_node)
# He 初始化
elif init_method == "he_init":
weight = np.random.randn(in_node, out_node) / np.sqrt(in_node / 2)
return np.array(weight, dtype=np.float32)
# 定义层
def BP_set_layer(input, in_node, out_node, num_sample, activation_fun=None):
# 参数初始化 权重的使用和初始化
weight = np.random.randn(in_node, out_node) / np.sqrt(in_node / 2)
weight = tf.Variable(weight, dtype=np.float32)
bias = tf.Variable(
tf.add(
tf.zeros(
shape=[
num_sample,
out_node],
dtype=tf.float32),
0.1),
dtype=tf.float32)
# 模拟 y = w*x b
y = tf.add(tf.matmul(input, weight), bias)
if activation_fun:
y = activation_fun(y)
return y
# 移动平均线
def moving_average(a, w=10):
if len(a) < w:
return a[:]
return [val if idx < w else sum(
a[(idx - w):idx]) / w for idx, val in enumerate(a)]
# 训练BP网络
def BP_training(file_path):
x, y = load_datasets(file_path)
y = y[:, np.newaxis]
num_sample = x.shape[0]
print("数据集维度:", x.shape, y.shape)
# 定义占位变量
x_node = tf.placeholder(tf.float32, shape=[None, 28])
y_node = tf.placeholder(tf.float32, shape=[None, 1])
# 定义神经网络模型
layer1 = BP_set_layer(x_node, 28, 16, num_sample, tf.nn.relu)
layer2 = BP_set_layer(layer1, 16, 16, num_sample, tf.nn.relu)
layer3 = BP_set_layer(layer2, 16, 16, num_sample, tf.nn.relu)
y_pred = BP_set_layer(layer3, 16, 1, num_sample)
# 定义损失函数和优化算法
train_loss = tf.losses.mean_squared_error(
labels=y_node, predictions=y_pred)
# 定义学习率
learning_rate = 0.001
train_opt = tf.train.AdamOptimizer(learning_rate).minimize(loss=train_loss)
# 定义参数
display_step = 2
training_epoch = 10
# 定义保存权重器
saver = tf.train.Saver()
# 创建会话
with tf.Session() as sess:
# 初始化全局变量
plotdata = {"batchsize": [], "loss": []} # 存放批次值和loss值
sess.run(tf.global_variables_initializer())
for step in range(training_epoch):
feed_data = {x_node: x, y_node: y}
if step % display_step == 0:
_, temp_loss = sess.run(
[train_opt, train_loss], feed_dict=feed_data)
print(
"Epoch:",
step,
"loss = ",
temp_loss,
)
if not (temp_loss == "NA"):
plotdata["batchsize"].append(step)
plotdata["loss"].append(temp_loss)
if step % 100 == 0:
print(temp_loss)
print("Training is over!")
print("cost=", sess.run(train_loss, feed_dict=feed_data))
# 训练的图像你
plotdata["avgloss"] = moving_average(plotdata["loss"])
plt.figure(1)
plt.plot(plotdata["batchsize"], plotdata["avgloss"], 'b--')
plt.xlabel("Minibatch number")
plt.ylabel("Loss")
plt.title("Minibatch run vs.Training loss")
plt.show()
# 训练结束后起动保存权重
saver.save(sess, "../checkpoint/checkpoint.cpkt")
# 预测
# 加载测试集
x_input = []
test_data_path = "../nomal/test_data_nor.csv"
with open(test_data_path, "r") as data:
lines = data.readlines()
for line in lines:
one_line = line.strip().split(',')
one_line = [float(item) for item in one_line]
x_input.append(one_line)
x_input = np.array(x_input)
# 加载训练的权重
with tf.Session() as sess2:
sess2.run(tf.global_variables_initializer())
saver = tf.train.import_meta_graph('../checkpoint/checkpoint.cpkt.meta')
saver.restore(sess2, "../checkpoint/checkpoint.cpkt")
print("Restore is over!")
print(type(x_input))
y = sess2.run(y_pred, feed_dict={x_node: x_input})
print(y, type(y))
return y_pred
if __name__ == "__main__":
file_path = "../nomal/train_data_nor.csv"
BP_training(file_path)