在上一篇文章,我们已经学会对真实市场的BTC、ETH 分钟级别数据,进行读取、蜡烛图可视化、缺失数据填补、相关性分析等基本操作。
那么书接上回,本篇文章我们将使用LightGBM模型对真实的市场收益进行预测,特别说明本文代码来自于Kaggle竞赛的第三名解决方案,我对部分代码进行了解读。
数据处理
需要引入的头文件,文件路径和一些基本参数设置。注意lag表示时间周期,例如以60为周期求平均就是过去60时刻移动平均,代码中时间周期包括[60,300,900]
代码语言:javascript复制import os
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import lightgbm as lgb
# import gresearch_crypto
import time
import datetime
import pickle
import gc
from tqdm import tqdm
n_fold = 7
seed0 = 8586
use_supple_for_train = True
# If True, the period used to evaluate Public LB will not be used for training.
# Set to False on final submission.
not_use_overlap_to_train = False
TRAIN_CSV = '量化数据/train.csv'
SUPPLE_TRAIN_CSV = '量化数据/supplemental_train.csv'
ASSET_DETAILS_CSV = '量化数据/asset_details.csv'
pd.set_option('display.max_rows', 6)
pd.set_option('display.max_columns', 350)
lags = [60,300,900]
Light GBM 模型的参数,各参数的含义可参考前面的链接资料。
代码语言:javascript复制params = {
'early_stopping_rounds': 50,
'objective': 'regression',
'metric': 'rmse',
# 'metric': 'None',
'boosting_type': 'gbdt',
'max_depth': 5,
'verbose': -1,
'max_bin':600,
'min_data_in_leaf':50,
'learning_rate': 0.03,
'subsample': 0.7,
'subsample_freq': 1,
'feature_fraction': 1,
'lambda_l1': 0.5,
'lambda_l2': 2,
'seed':seed0,
'feature_fraction_seed': seed0,
'bagging_fraction_seed': seed0,
'drop_seed': seed0,
'data_random_seed': seed0,
'extra_trees': True,
'extra_seed': seed0,
'zero_as_missing': True,
"first_metric_only": True
}
内存优化函数,这是一个非常实用的通用函数,Kaggle比赛对存储和运行时间都有限制,使用如下函数对dataframe进行处理后,可大幅度降低存储占用,大家可以收藏起来。
代码语言:javascript复制def reduce_mem_usage(df):
""" iterate through all the columns of a dataframe and modify the data type
to reduce memory usage.
"""
start_mem = df.memory_usage().sum() / 1024**2
print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
for col in df.columns:
col_type = df[col].dtype
if col_type != object:
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
df[col] = df[col].astype(np.int64)
else:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
else:
df[col] = df[col].astype('category')
end_mem = df.memory_usage().sum() / 1024**2
print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
return df
接下来我们使用pandas库读取train.csv,supplemental_train.csv和asset_details.csv三个文件,前两个是同样的数据结构只是数据周期不一样,而asset_details.csv则包含不同货币的信息和权重。两个表的内容结构在上篇文章已经解释过,大家也可以自己读取并查看。
关于target
上图中所有的特征你应该都很熟悉,但是target是怎么计算的呢。请看以下两个公式,R(t)是Target 是在 15 分钟对数收益率。因为我们上一篇的分析得出,不同货币的变化实际是具有相关性的,target(t)减去的一项数值就是为了引入这种相关性。M(t)是使用货币对的权重对14种货币对加权结果,beta公式里的尖括号 ⟨.⟩表示滚动平均(窗口为 3750 分钟)。当然,target是已经计算好的,不过还是建议理解其计算过程。
下面的代码读取了数据集,并使用concat函数对train和supplement文件进行了上下拼接。然后使用前面定义的reduce_mem_usage()函数降低内存,可以看到效果还是很明显,存储空间减少超过50%。
代码语言:javascript复制df_train = pd.read_csv(TRAIN_CSV, usecols=['timestamp','Asset_ID', 'Close', 'Target'])
if use_supple_for_train: df_supple = pd.read_csv(SUPPLE_TRAIN_CSV, usecols=['timestamp','Asset_ID', 'Close', 'Target'])# display(df_supple) df_train = pd.concat([df_train, df_supple]) del df_suppledf_train = reduce_mem_usage(df_train)# df_train
由于原始数据14种货币对是乱序的,前一行可能是BTC在a时刻的数据,下一行则是ETH在B时刻的数据,我们期望进行处理,使其每一行(一行相当于一个时间点)包含所有14种货币对的特征数据。于是,下面的代码定义了空train_merged,然后使用merge函数,每次在train_merged并入一种货币对的['timestamp', 'Close','Target']信息。参数suffixes是为了给不同货币对的变量进行区分。如果对merge用法不熟悉,可参考link。
代码语言:javascript复制%%time
train_merged = pd.DataFrame()
train_merged[df_train.columns] = 0
for id in tqdm( range(14) ):
train_merged = train_merged.merge(df_train.loc[df_train["Asset_ID"] == id, ['timestamp', 'Close','Target']].copy(), on="timestamp", how='outer',suffixes=['', "_" str(id)])
train_merged = train_merged.drop(df_train.columns.drop("timestamp"), axis=1)
train_merged = train_merged.sort_values('timestamp', ascending=True)
display(train_merged.head())
这样,我们在使用排序函数,按时间戳升序排列,就得到了如下结构的数据。每一行(时间点)包含14种货币对的收盘价和Targets数值。
但如你所见,有些货币对在一些时刻是空的,这非常常见,上一篇文章我们也统计分析过。为了填补这些空值,我们使用fillna()函数进行填补,注意由于参数limit限制了最大填补数量,所以最终还是会有些空值。实际上填补太多数值反而会影响预测结果。
代码语言:javascript复制# forward fill# Set an upper limit on the number of fills, since there may be long term gaps.for id in range(14):# print(id, train_merged[f'Close_{id}'].isnull().sum()) # Number of missing before forward fill train_merged[f'Close_{id}'] = train_merged[f'Close_{id}'].fillna(method='ffill', limit=100)# print(id, train_merged[f'Close_{id}'].isnull().sum()) # Number of missing after forward fill
特征工程
深度学习调优通常以设计改进模型结构为主,但LightGBM核心则在于特征工程。以下是本文的核心特征工程部分。
代码语言:javascript复制def get_features(df, train=True):
if train == True:
totimestamp = lambda s: np.int32(time.mktime(datetime.datetime.strptime(s, "%d/%m/%Y").timetuple()))
valid_window = [totimestamp("12/03/2021")]
# valid_window = [totimestamp("15/08/2021")] #検証用
df['train_flg'] = np.where(df['timestamp']>=valid_window[0], 0,1)
supple_start_window = [totimestamp("22/09/2021")]
if use_supple_for_train:
df['train_flg'] = np.where(df['timestamp']>=supple_start_window[0], 1 ,df['train_flg'] )
for id in range(14):
for lag in lags:
df[f'log_close/mean_{lag}_id{id}'] = np.log( np.array(df[f'Close_{id}']) / np.roll(np.append(np.convolve( np.array(df[f'Close_{id}']), np.ones(lag)/lag, mode="valid"), np.ones(lag-1)), lag-1) )
df[f'log_return_{lag}_id{id}'] = np.log( np.array(df[f'Close_{id}']) / np.roll(np.array(df[f'Close_{id}']), lag) )
for lag in lags:
df[f'mean_close/mean_{lag}'] = np.mean(df.iloc[:,df.columns.str.startswith(f'log_close/mean_{lag}_id')], axis=1)
df[f'mean_log_returns_{lag}'] = np.mean(df.iloc[:,df.columns.str.startswith(f'log_return_{lag}_id')] , axis=1)
for id in range(14):
df[f'log_close/mean_{lag}-mean_close/mean_{lag}_id{id}'] = np.array( df[f'log_close/mean_{lag}_id{id}']) - np.array( df[f'mean_close/mean_{lag}'] )
df[f'log_return_{lag}-mean_log_returns_{lag}_id{id}'] = np.array( df[f'log_return_{lag}_id{id}']) - np.array( df[f'mean_log_returns_{lag}'] )
if train == True:
for id in range(14):
df = df.drop([f'Close_{id}'], axis=1)
oldest_use_window = [totimestamp("12/01/2019")]
df = df[ df['timestamp'] >= oldest_use_window[0] ]
return df
文中设计了三种特征,如下所示:
- 收盘价和过去N分钟的平均收盘价之比F1
- 收盘价和N分钟之前的收盘价之比F2
- 使用加权系数计算所有货币F1、F2的平均值得到F1_mean、F2_mean
- 将F1、F2和F1_mean、F2_mean作差得到F1_residual、F2_residual
np.roll(np.append(np.convolve( np.array(df[f'Close_{id}']), np.ones(lag)/lag, mode="valid"), np.ones(lag-1)), lag-1) )
上面这个计算可能比较绕,单独讲一下,np.convolve()求的是周期为lag的移动平均,很显然,前lag-1个数据是不够求移动平均的,所以使用np.append()在序列最后补上了lag-1个‘1’,然后使用np.roll()把序列整体右移lag-1位,这样用np.array(df[f'Close_{id}']) 除以刚刚的到的结果,就是当前收盘价和过去N分钟的平均收盘价之比,理解了这个其他也就好理解了。
获取特征
我们传入的合并好的train_merged,使用刚刚的get_features()函数获取特征,记为feat
代码语言:javascript复制%%time
feat = get_features(train_merged)
feat
我们之后训练用到的特征就是刚刚通过get_feature求的,其他的target、timestamp等统统不需要,这里做了一个标记。
另外非常重要的一点:我们通常理解的预测是用前一个时刻预测当前时刻,但是在这个方案中,“前一个时刻”实际是前一段时间(60,300,900)的F1、F2、F1_mean、F2_mean、F1_residual、F2_residual,并且包含多个货币对,也就是用多个货币在过去的特征,预测本货币对的target。
代码语言:javascript复制# define features for LGBM
not_use_features_train = ['timestamp', 'train_flg']
for id in range(14):
not_use_features_train.append(f'Target_{id}')
features = feat.columns
features = features.drop(not_use_features_train)
features = list(features)
# display(features)
print(len(features))
del train_merged
del df_train
gc.collect()
模型训练
下面是评估用的函数和显示特征重要性的绘图函数,我觉得知道怎么用就行,具体怎么实现不必太关注。
代码语言:javascript复制# define the evaluation metric
def correlation(a, train_data):
b = train_data.get_label()
a = np.ravel(a)
b = np.ravel(b)
len_data = len(a)
mean_a = np.sum(a) / len_data
mean_b = np.sum(b) / len_data
var_a = np.sum(np.square(a - mean_a)) / len_data
var_b = np.sum(np.square(b - mean_b)) / len_data
cov = np.sum((a * b))/len_data - mean_a*mean_b
corr = cov / np.sqrt(var_a * var_b)
return 'corr', corr, True
# For CV score calculation
def corr_score(pred, valid):
len_data = len(pred)
mean_pred = np.sum(pred) / len_data
mean_valid = np.sum(valid) / len_data
var_pred = np.sum(np.square(pred - mean_pred)) / len_data
var_valid = np.sum(np.square(valid - mean_valid)) / len_data
cov = np.sum((pred * valid))/len_data - mean_pred*mean_valid
corr = cov / np.sqrt(var_pred * var_valid)
return corr
# For CV score calculation
def wcorr_score(pred, valid, weight):
len_data = len(pred)
sum_w = np.sum(weight)
mean_pred = np.sum(pred * weight) / sum_w
mean_valid = np.sum(valid * weight) / sum_w
var_pred = np.sum(weight * np.square(pred - mean_pred)) / sum_w
var_valid = np.sum(weight * np.square(valid - mean_valid)) / sum_w
cov = np.sum((pred * valid * weight)) / sum_w - mean_pred*mean_valid
corr = cov / np.sqrt(var_pred * var_valid)
return corr
# from: https://blog.amedama.jp/entry/lightgbm-cv-feature-importance
# (used in nyanp's Optiver solution)
def plot_importance(importances, features_names = features, PLOT_TOP_N = 20, figsize=(10, 10)):
importance_df = pd.DataFrame(data=importances, columns=features)
sorted_indices = importance_df.median(axis=0).sort_values(ascending=False).index
sorted_importance_df = importance_df.loc[:, sorted_indices]
plot_cols = sorted_importance_df.columns[:PLOT_TOP_N]
_, ax = plt.subplots(figsize=figsize)
ax.grid()
ax.set_xscale('log')
ax.set_ylabel('Feature')
ax.set_xlabel('Importance')
sns.boxplot(data=sorted_importance_df[plot_cols],
orient='h',
ax=ax)
plt.show()
这是划分数据的函数,7折交叉验证。大部分人应该能看懂代码,但我提醒一点embargo = 3750,是因为在计算target的时候,用到了过去3750的移动平均,为了避免未来信息的泄漏,所以代码中在切分数据集的时候,至少要间隔3750以上。
代码语言:javascript复制# from: https://www.kaggle.com/code/nrcjea001/lgbm-embargocv-weightedpearson-lagtarget/
def get_time_series_cross_val_splits(data, cv = n_fold, embargo = 3750):
all_train_timestamps = data['timestamp'].unique()
len_split = len(all_train_timestamps) // cv
test_splits = [all_train_timestamps[i * len_split:(i 1) * len_split] for i in range(cv)]
# fix the last test split to have all the last timestamps, in case the number of timestamps wasn't divisible by cv
rem = len(all_train_timestamps) - len_split*cv
if rem>0:
test_splits[-1] = np.append(test_splits[-1], all_train_timestamps[-rem:])
train_splits = []
for test_split in test_splits:
test_split_max = int(np.max(test_split))
test_split_min = int(np.min(test_split))
# get all of the timestamps that aren't in the test split
train_split_not_embargoed = [e for e in all_train_timestamps if not (test_split_min <= int(e) <= test_split_max)]
# embargo the train split so we have no leakage. Note timestamps are expressed in seconds, so multiply by 60
embargo_sec = 60*embargo
train_split = [e for e in train_split_not_embargoed if
abs(int(e) - test_split_max) > embargo_sec and abs(int(e) - test_split_min) > embargo_sec]
train_splits.append(train_split)
# convenient way to iterate over train and test splits
train_test_zip = zip(train_splits, test_splits)
return train_test_zip
加油,马上到训练的函数了。如你所见,当我们把特征定义好后,使用lightGBM非常简单,get_Xy_and_model_for_asset()函数最终返回的是真实的target和对应的预测target。提醒一下,train_split, test_split存储的都是时间戳,我们使用df_proc.loc[]函数,配合之前定义的features,就可以切分数据集了。
代码语言:javascript复制def get_Xy_and_model_for_asset(df_proc, asset_id):
df_proc = df_proc.loc[ (df_proc[f'Target_{asset_id}'] == df_proc[f'Target_{asset_id}']) ]
if not_use_overlap_to_train:
df_proc = df_proc.loc[ (df_proc['train_flg'] == 1) ]
# EmbargoCV
train_test_zip = get_time_series_cross_val_splits(df_proc, cv = n_fold, embargo = 3750)
print("entering time series cross validation loop")
importances = []
oof_pred = []
oof_valid = []
for split, train_test_split in enumerate(train_test_zip):
gc.collect()
print(f"doing split {split 1} out of {n_fold}")
train_split, test_split = train_test_split
train_split_index = df_proc['timestamp'].isin(train_split)
test_split_index = df_proc['timestamp'].isin(test_split)
train_dataset = lgb.Dataset(df_proc.loc[train_split_index, features],
df_proc.loc[train_split_index, f'Target_{asset_id}'].values,
feature_name = features,
)
val_dataset = lgb.Dataset(df_proc.loc[test_split_index, features],
df_proc.loc[test_split_index, f'Target_{asset_id}'].values,
feature_name = features,
)
print(f"number of train data: {len(df_proc.loc[train_split_index])}")
print(f"number of val data: {len(df_proc.loc[test_split_index])}")
model = lgb.train(params = params,
train_set = train_dataset,
valid_sets=[train_dataset, val_dataset],
valid_names=['tr', 'vl'],
num_boost_round = 5000,
verbose_eval = 100,
feval = correlation,
)
importances.append(model.feature_importance(importance_type='gain'))
file = f'trained_model_id{asset_id}_fold{split}.pkl'
pickle.dump(model, open(file, 'wb'))
print(f"Trained model was saved to 'trained_model_id{asset_id}_fold{split}.pkl'")
print("")
oof_pred = list( model.predict(df_proc.loc[test_split_index, features]) )
oof_valid = list( df_proc.loc[test_split_index, f'Target_{asset_id}'].values )
plot_importance(np.array(importances),features, PLOT_TOP_N = 20, figsize=(10, 5))
return oof_pred, oof_valid
恭喜你,运行到下面的代码,不出意外的话已经开始训练了
代码语言:javascript复制oof = [ [] for id in range(14) ]
all_oof_pred = []
all_oof_valid = []
all_oof_weight = []
for asset_id, asset_name in zip(df_asset_details['Asset_ID'], df_asset_details['Asset_Name']):
print(f"Training model for {asset_name:<16} (ID={asset_id:<2})")
oof_pred, oof_valid = get_Xy_and_model_for_asset(feat, asset_id)
weight_temp = float( df_asset_details.loc[ df_asset_details['Asset_ID'] == asset_id , 'Weight' ] )
all_oof_pred = oof_pred
all_oof_valid = oof_valid
all_oof_weight = [weight_temp] * len(oof_pred)
oof[asset_id] = corr_score( np.array(oof_pred) , np.array(oof_valid) )
print(f'OOF corr score of {asset_name} (ID={asset_id}) is {oof[asset_id]:.5f}. (Weight: {float(weight_temp):.5f})')
print('')
print('')
应该能得到下面的训练结果:
训练结束,你会保存一系列的模型文件,通过pickle.load()你可以载入这些模型,然后调用model.predict使用你训练好的模型。快去试一下吧~