时间序列基于监督学习的LSTM模型为什么可以预测股票走势(附完整代码)

2020-03-02 07:46:00 浏览数 (1)

疫情期间,在家学习Python,调通了基于监督学习的LSTM神经网络预测模型代码,在一般代码的基础上,做了单步和多步通用版的改进。调通的代码附后,供各位大咖指正。

虽然代码调通了,但是发现输出的预测结果均滞后于实际值,更像是对原始数据的拟合而不是预测,这个文章主要是想请教一下:

1、代码问题在哪里?

2、如果代码没问题,预测功能是怎么体现的?

3、如果有类似的群,方便也请大咖告知,可以加群学习,谢谢。

代码语言:javascript复制
import pandas as pd 
# 设置显示的最大列、宽等参数,消掉打印不完全中间的省略号
pd.set_option('display.max_columns', 1000)
pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth', 1000)
import matplotlib.pyplot as plt
import tensorflow as tf
import os
from pandas import read_excel
import numpy as np
from pandas import DataFrame
from pandas import concat
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential, load_model
from keras.layers import LSTM, Dense, Dropout, BatchNormalization
from numpy import concatenate
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from math import sqrt

# 定义参数
start_rate = 0.99
end_rate = 0.01
n_features = 21  # 特征值数
n_predictions = 1  # 预测值数
delay = 5  # 目标是未来第5个交易日
test_trade_date = []


# 定义字符串转换为浮点型
def str_to_float(s):
    s = s[:-1]
    s_float = float(s)
    return s_float


# 定义series_to_supervised()函数
# 将时间序列转换为监督学习问题
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
    """
    Frame a time series as a supervised learning dataset.
    Arguments:
        data: Sequence of observations as a list or NumPy array.
        n_in: Number of lag observations as input (X).
        n_out: Number of observations as output (y).
        dropnan: Boolean whether or not to drop rows with NaN values.
    Returns:
        Pandas DataFrame of series framed for supervised learning.
    """
    n_vars = 1 if type(data) is list else data.shape[1]
    df = DataFrame(data)
    cols, names = list(), list()
    # input sequence (t-n, ... t-1)输入序列
    for i in range(n_in, 0, -1):
        cols.append(df.shift(i))
        names  = [('var%d(t-%d)' % (j   1, i)) for j in range(n_vars)]
    # forecast sequence (t, t 1, ... t n)预测序列
    for i in range(0, n_out):
        cols.append(df.shift(-i))
        if i == 0:
            names  = [('var%d(t)' % (j   1)) for j in range(n_vars)]
        else:
            names  = [('var%d(t %d)' % (j   1, i)) for j in range(n_vars)]
    # put it all together
    agg = concat(cols, axis=1)
    agg.columns = names
    # drop rows with NaN values
    nan_rows = agg[agg.isnull().T.any().T]
    if dropnan:
        agg.dropna(inplace=True)
    return agg


def generator(tsc='000001', delay=5):
    # 读取文件,删除不必要项
    stock_data = pd.read_csv(r"c:python日k线数据%s.csv" % tsc, index_col=0)  # 读入股票数据,防止第一列被当作数据,加入index_col=0
    stock_data['schange'] = stock_data['close']
    stock_data.reset_index(drop=True, inplace=True)
    stock_data.drop(['ts_code', 'trade_date', 'pre_close', 'change', 'pct_chg'], axis=1,
                    inplace=True)  # df=df.iloc[:,1:13]
    # 缺失值填充
    stock_data.fillna(method='bfill', inplace=True)
    stock_data.fillna(method='ffill', inplace=True)
    # 打印数据的后5行
    n_features = len(stock_data.columns)
    # 获取DataFrame中的数据,形式为数组array形式
    values = stock_data.values
    # 确保所有数据为float类型
    values = values.astype('float32')
    # 特征的归一化处理
    scaler = MinMaxScaler(feature_range=(0, 1))
    # 数据一起处理再分训练集、测试集
    scaled = scaler.fit_transform(values)
    row = len(scaled)
    reframed = series_to_supervised(scaled, delay, 1)
    # 把数据分为训练集和测试集
    values = reframed.values
    train_end = int(np.floor(start_rate * row))
    test_start = train_end
    train = values[:train_end, :]
    test = values[test_start:, :]
    # 把数据分为输入和输出
    n_obs = delay * n_features
    # 分离特征集和标签
    train_X, train_y = train[:, :n_obs], train[:, -n_predictions]
    test_X, test_y = test[:, :n_obs], test[:, -n_predictions]

    # 把输入重塑成3D格式 [样例, 时间步, 特征]
    train_X = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
    test_X = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
    # 转化为三维数据,reshape input to be 3D [samples, timesteps, features]
    train_X = train_X.reshape((train_X.shape[0], delay, n_features))
    test_X = test_X.reshape((test_X.shape[0], delay, n_features))
    return train_X, train_y, test_X, test_y, scaler


# 搭建LSTM模型
train_X, train_y, test_X, test_y, scaler = generator()
model = Sequential()
model.add(LSTM(20, input_shape=(train_X.shape[1], train_X.shape[2]), return_sequences=True))
model.add(LSTM(units=20))
model.add(Dropout(0.5))
model.add(Dense(1, activation='relu'))
model.compile(loss='mean_squared_error', optimizer='adam')  # fit network  'mean_squared_error''mae'
history = model.fit(train_X, train_y, epochs=500, batch_size=100, validation_data=(test_X, test_y), verbose=2,
                    shuffle=False)
model.save('c:pythonmodelmodel')
# 绘制损失图
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.title('LSTM_000001.SZ', fontsize='12')
plt.ylabel('loss', fontsize='10')
plt.xlabel('epoch', fontsize='10')
plt.legend()
plt.show()

print("训练完成,开始预测……")
model = tf.keras.models.load_model('c:pythonmodelmodel')
# 模型预测收益率
y_predict = model.predict(test_X)
n_features = test_X.shape[2]
# model.save(SAVE_PATH   'model')
test_X = test_X.reshape((test_X.shape[0], test_X.shape[1] * test_X.shape[2]))

# 将预测结果按比例反归一化
inv_y_test = concatenate((test_X[:, -n_features:-1], y_predict), axis=1)
inv_y_test = scaler.inverse_transform(inv_y_test)
inv_y_predict = inv_y_test[:, -1]

# invert scaling for actual
# #将真实结果按比例反归一化
test_y = test_y.reshape((len(test_y), 1))
inv_y_train = concatenate((test_X[:, -n_features:-1], test_y), axis=1)
inv_y_train = scaler.inverse_transform(inv_y_train)
inv_y = inv_y_train[:, -1]
print('反归一化后的预测结果:', inv_y_predict)
print('反归一化后的真实结果:', inv_y)

# 写入文件
df = pd.DataFrame()
df['e'] = inv_y
df['pe'] = inv_y_predict
df.to_csv("c:pythonpredict_result.csv")
# 绘图
'''
inv_y=inv_y[delay:,]
#inv_y=inv_y[:-delay,]
for i in range(delay):
    #inv_y=np.concatenate((inv_y,inv_y[-1:,]) , axis=0)
    inv_y = np.concatenate((inv_y[0:1, ],inv_y), axis=0)
'''
plt.plot(inv_y, color='red', label='Original')
plt.plot(inv_y_predict, color='green', label='Predict')
plt.xlabel('the number of test data')
plt.ylabel('5dayearn')
plt.title('predict')
plt.legend()
plt.show()

# 回归评价指标
# calculate MSE 均方误差
mse = mean_squared_error(inv_y, inv_y_predict)
# calculate RMSE 均方根误差
rmse = sqrt(mean_squared_error(inv_y, inv_y_predict))
# calculate MAE 平均绝对误差
mae = mean_absolute_error(inv_y, inv_y_predict)
# calculate R square
r_square = r2_score(inv_y, inv_y_predict)
print('均方误差(mse): %.6f' % mse)
print('均方根误差(rmse): %.6f' % rmse)
print('平均绝对误差(mae): %.6f' % mae)
print('R_square: %.6f' % r_square)

用代码生成5日数据预测和实际值对比图如下图所示:

5日数据预测值和真实值对比图5日数据预测值和真实值对比图

预测质量评价数据如下:

均方误差(mse): 0.673632

均方根误差(rmse): 0.820751

平均绝对误差(mae): 0.770078

R_square: 0.067422

调试时发现,如果在开始阶段将训练集和测试集分别进行归一化处理,预测数据质量更好,

图像的拟合程度更高,同样也能更明显的看出预测数据的滞后性:

在开始阶段将训练集和测试集分别进行归一化处理在开始阶段将训练集和测试集分别进行归一化处理

预测质量评价数据如下:

均方误差(mse): 0.149244

均方根误差(rmse): 0.386321

平均绝对误差(mae): 0.285039

R_square: 0.797429

我的QQ:652176219

0 人点赞