02|时间序列-基于LightGBM预测数字货币收益

2024-09-18 16:42:06 浏览数 (4)

在上一篇文章,我们已经学会对真实市场的BTC、ETH 分钟级别数据,进行读取、蜡烛图可视化、缺失数据填补、相关性分析等基本操作。

那么书接上回,本篇文章我们将使用LightGBM模型对真实的市场收益进行预测,特别说明本文代码来自于Kaggle竞赛的第三名解决方案,我对部分代码进行了解读。

数据处理

需要引入的头文件,文件路径和一些基本参数设置。注意lag表示时间周期,例如以60为周期求平均就是过去60时刻移动平均,代码中时间周期包括[60,300,900]

代码语言:javascript复制
import os
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import lightgbm as lgb
# import gresearch_crypto
import time
import datetime
import pickle
import gc
from tqdm import tqdm


n_fold = 7
seed0 = 8586
use_supple_for_train = True

# If True, the period used to evaluate Public LB will not be used for training.
# Set to False on final submission.
not_use_overlap_to_train = False

TRAIN_CSV = '量化数据/train.csv'
SUPPLE_TRAIN_CSV = '量化数据/supplemental_train.csv'
ASSET_DETAILS_CSV = '量化数据/asset_details.csv'

pd.set_option('display.max_rows', 6)
pd.set_option('display.max_columns', 350)
lags = [60,300,900]

Light GBM 模型的参数,各参数的含义可参考前面的链接资料。

代码语言:javascript复制
params = {
    'early_stopping_rounds': 50,
    'objective': 'regression',
    'metric': 'rmse',
#     'metric': 'None',
    'boosting_type': 'gbdt',
    'max_depth': 5,
    'verbose': -1,
    'max_bin':600,
    'min_data_in_leaf':50,
    'learning_rate': 0.03,
    'subsample': 0.7,
    'subsample_freq': 1,
    'feature_fraction': 1,
    'lambda_l1': 0.5,
    'lambda_l2': 2,
    'seed':seed0,
    'feature_fraction_seed': seed0,
    'bagging_fraction_seed': seed0,
    'drop_seed': seed0,
    'data_random_seed': seed0,
    'extra_trees': True,
    'extra_seed': seed0,
    'zero_as_missing': True,
    "first_metric_only": True
         }

内存优化函数,这是一个非常实用的通用函数,Kaggle比赛对存储和运行时间都有限制,使用如下函数对dataframe进行处理后,可大幅度降低存储占用,大家可以收藏起来。

代码语言:javascript复制
def reduce_mem_usage(df):
    """ iterate through all the columns of a dataframe and modify the data type
        to reduce memory usage.        
    """
    start_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
    for col in df.columns:
        col_type = df[col].dtype
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            df[col] = df[col].astype('category')

    end_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
    return df

接下来我们使用pandas库读取train.csv,supplemental_train.csv和asset_details.csv三个文件,前两个是同样的数据结构只是数据周期不一样,而asset_details.csv则包含不同货币的信息和权重。两个表的内容结构在上篇文章已经解释过,大家也可以自己读取并查看。

关于target

上图中所有的特征你应该都很熟悉,但是target是怎么计算的呢。请看以下两个公式,R(t)是Target 是在 15 分钟对数收益率。因为我们上一篇的分析得出,不同货币的变化实际是具有相关性的,target(t)减去的一项数值就是为了引入这种相关性。M(t)是使用货币对的权重对14种货币对加权结果,beta公式里的尖括号 ⟨.⟩表示滚动平均(窗口为 3750 分钟)。当然,target是已经计算好的,不过还是建议理解其计算过程。

下面的代码读取了数据集,并使用concat函数对train和supplement文件进行了上下拼接。然后使用前面定义的reduce_mem_usage()函数降低内存,可以看到效果还是很明显,存储空间减少超过50%。

代码语言:javascript复制
df_train = pd.read_csv(TRAIN_CSV, usecols=['timestamp','Asset_ID', 'Close', 'Target'])
if use_supple_for_train:        df_supple = pd.read_csv(SUPPLE_TRAIN_CSV, usecols=['timestamp','Asset_ID', 'Close', 'Target'])#     display(df_supple)    df_train = pd.concat([df_train, df_supple])    del df_suppledf_train = reduce_mem_usage(df_train)# df_train

由于原始数据14种货币对是乱序的,前一行可能是BTC在a时刻的数据,下一行则是ETH在B时刻的数据,我们期望进行处理,使其每一行(一行相当于一个时间点)包含所有14种货币对的特征数据。于是,下面的代码定义了空train_merged,然后使用merge函数,每次在train_merged并入一种货币对的['timestamp', 'Close','Target']信息。参数suffixes是为了给不同货币对的变量进行区分。如果对merge用法不熟悉,可参考link。

代码语言:javascript复制
%%time
train_merged = pd.DataFrame()
train_merged[df_train.columns] = 0
for id in tqdm( range(14) ):
    train_merged = train_merged.merge(df_train.loc[df_train["Asset_ID"] == id, ['timestamp', 'Close','Target']].copy(), on="timestamp", how='outer',suffixes=['', "_" str(id)])
        
train_merged = train_merged.drop(df_train.columns.drop("timestamp"), axis=1)
train_merged = train_merged.sort_values('timestamp', ascending=True)
display(train_merged.head())

这样,我们在使用排序函数,按时间戳升序排列,就得到了如下结构的数据。每一行(时间点)包含14种货币对的收盘价和Targets数值。

但如你所见,有些货币对在一些时刻是空的,这非常常见,上一篇文章我们也统计分析过。为了填补这些空值,我们使用fillna()函数进行填补,注意由于参数limit限制了最大填补数量,所以最终还是会有些空值。实际上填补太多数值反而会影响预测结果。

代码语言:javascript复制
# forward fill# Set an upper limit on the number of fills, since there may be long term gaps.for id in range(14):#     print(id, train_merged[f'Close_{id}'].isnull().sum())   # Number of missing before forward fill    train_merged[f'Close_{id}'] = train_merged[f'Close_{id}'].fillna(method='ffill', limit=100)#     print(id, train_merged[f'Close_{id}'].isnull().sum())   # Number of missing after forward fill

特征工程

深度学习调优通常以设计改进模型结构为主,但LightGBM核心则在于特征工程。以下是本文的核心特征工程部分。

代码语言:javascript复制
def get_features(df, train=True):   
    if train == True:
        totimestamp = lambda s: np.int32(time.mktime(datetime.datetime.strptime(s, "%d/%m/%Y").timetuple()))
        valid_window = [totimestamp("12/03/2021")]
#         valid_window = [totimestamp("15/08/2021")]  #検証用
        df['train_flg'] = np.where(df['timestamp']>=valid_window[0], 0,1)

        supple_start_window = [totimestamp("22/09/2021")]
        if use_supple_for_train:
            df['train_flg'] = np.where(df['timestamp']>=supple_start_window[0], 1 ,df['train_flg']  )

   
    for id in range(14):    
        for lag in lags:
            df[f'log_close/mean_{lag}_id{id}'] = np.log( np.array(df[f'Close_{id}']) /  np.roll(np.append(np.convolve( np.array(df[f'Close_{id}']), np.ones(lag)/lag, mode="valid"), np.ones(lag-1)), lag-1)  )
            df[f'log_return_{lag}_id{id}']     = np.log( np.array(df[f'Close_{id}']) /  np.roll(np.array(df[f'Close_{id}']), lag)  )
    for lag in lags:
        df[f'mean_close/mean_{lag}'] =  np.mean(df.iloc[:,df.columns.str.startswith(f'log_close/mean_{lag}_id')], axis=1)
        df[f'mean_log_returns_{lag}'] = np.mean(df.iloc[:,df.columns.str.startswith(f'log_return_{lag}_id')] ,    axis=1)
        for id in range(14):
            df[f'log_close/mean_{lag}-mean_close/mean_{lag}_id{id}'] = np.array( df[f'log_close/mean_{lag}_id{id}']) - np.array( df[f'mean_close/mean_{lag}']  )
            df[f'log_return_{lag}-mean_log_returns_{lag}_id{id}']    = np.array( df[f'log_return_{lag}_id{id}'])     - np.array( df[f'mean_log_returns_{lag}'] )

    if train == True:
        for id in range(14):
            df = df.drop([f'Close_{id}'], axis=1)
        oldest_use_window = [totimestamp("12/01/2019")]
        df = df[  df['timestamp'] >= oldest_use_window[0]   ]

    return df

文中设计了三种特征,如下所示:

  • 收盘价和过去N分钟的平均收盘价之比F1
  • 收盘价和N分钟之前的收盘价之比F2
  • 使用加权系数计算所有货币F1、F2的平均值得到F1_mean、F2_mean
  • 将F1、F2和F1_mean、F2_mean作差得到F1_residual、F2_residual
代码语言:javascript复制
np.roll(np.append(np.convolve( np.array(df[f'Close_{id}']), np.ones(lag)/lag, mode="valid"), np.ones(lag-1)), lag-1)  )

上面这个计算可能比较绕,单独讲一下,np.convolve()求的是周期为lag的移动平均,很显然,前lag-1个数据是不够求移动平均的,所以使用np.append()在序列最后补上了lag-1个‘1’,然后使用np.roll()把序列整体右移lag-1位,这样用np.array(df[f'Close_{id}']) 除以刚刚的到的结果,就是当前收盘价和过去N分钟的平均收盘价之比,理解了这个其他也就好理解了。

获取特征

我们传入的合并好的train_merged,使用刚刚的get_features()函数获取特征,记为feat

代码语言:javascript复制
%%time
feat = get_features(train_merged)
feat

我们之后训练用到的特征就是刚刚通过get_feature求的,其他的target、timestamp等统统不需要,这里做了一个标记。

另外非常重要的一点:我们通常理解的预测是用前一个时刻预测当前时刻,但是在这个方案中,“前一个时刻”实际是前一段时间(60,300,900)的F1、F2、F1_mean、F2_mean、F1_residual、F2_residual,并且包含多个货币对,也就是用多个货币在过去的特征,预测本货币对的target。

代码语言:javascript复制
# define features for LGBM
not_use_features_train = ['timestamp', 'train_flg']
for id in range(14):
    not_use_features_train.append(f'Target_{id}')

features = feat.columns 
features = features.drop(not_use_features_train)
features = list(features)
# display(features)  
print(len(features))

del train_merged
del df_train
gc.collect()

模型训练

下面是评估用的函数和显示特征重要性的绘图函数,我觉得知道怎么用就行,具体怎么实现不必太关注。

代码语言:javascript复制
# define the evaluation metric
def correlation(a, train_data):
    
    b = train_data.get_label()
    
    a = np.ravel(a)
    b = np.ravel(b)

    len_data = len(a)
    mean_a = np.sum(a) / len_data
    mean_b = np.sum(b) / len_data
    var_a = np.sum(np.square(a - mean_a)) / len_data
    var_b = np.sum(np.square(b - mean_b)) / len_data

    cov = np.sum((a * b))/len_data - mean_a*mean_b
    corr = cov / np.sqrt(var_a * var_b)

    return 'corr', corr, True

# For CV score calculation
def corr_score(pred, valid):
    len_data = len(pred)
    mean_pred = np.sum(pred) / len_data
    mean_valid = np.sum(valid) / len_data
    var_pred = np.sum(np.square(pred - mean_pred)) / len_data
    var_valid = np.sum(np.square(valid - mean_valid)) / len_data

    cov = np.sum((pred * valid))/len_data - mean_pred*mean_valid
    corr = cov / np.sqrt(var_pred * var_valid)

    return corr

# For CV score calculation
def wcorr_score(pred, valid, weight):
    len_data = len(pred)
    sum_w = np.sum(weight)
    mean_pred = np.sum(pred * weight) / sum_w
    mean_valid = np.sum(valid * weight) / sum_w
    var_pred = np.sum(weight * np.square(pred - mean_pred)) / sum_w
    var_valid = np.sum(weight * np.square(valid - mean_valid)) / sum_w

    cov = np.sum((pred * valid * weight)) / sum_w - mean_pred*mean_valid
    corr = cov / np.sqrt(var_pred * var_valid)

    return corr

# from: https://blog.amedama.jp/entry/lightgbm-cv-feature-importance
# (used in nyanp's Optiver solution)
def plot_importance(importances, features_names = features, PLOT_TOP_N = 20, figsize=(10, 10)):
    importance_df = pd.DataFrame(data=importances, columns=features)
    sorted_indices = importance_df.median(axis=0).sort_values(ascending=False).index
    sorted_importance_df = importance_df.loc[:, sorted_indices]
    plot_cols = sorted_importance_df.columns[:PLOT_TOP_N]
    _, ax = plt.subplots(figsize=figsize)
    ax.grid()
    ax.set_xscale('log')
    ax.set_ylabel('Feature')
    ax.set_xlabel('Importance')
    sns.boxplot(data=sorted_importance_df[plot_cols],
                orient='h',
                ax=ax)
    plt.show()

这是划分数据的函数,7折交叉验证。大部分人应该能看懂代码,但我提醒一点embargo = 3750,是因为在计算target的时候,用到了过去3750的移动平均,为了避免未来信息的泄漏,所以代码中在切分数据集的时候,至少要间隔3750以上。

代码语言:javascript复制
# from: https://www.kaggle.com/code/nrcjea001/lgbm-embargocv-weightedpearson-lagtarget/
def get_time_series_cross_val_splits(data, cv = n_fold, embargo = 3750):
    all_train_timestamps = data['timestamp'].unique()
    len_split = len(all_train_timestamps) // cv
    test_splits = [all_train_timestamps[i * len_split:(i   1) * len_split] for i in range(cv)]
    # fix the last test split to have all the last timestamps, in case the number of timestamps wasn't divisible by cv
    rem = len(all_train_timestamps) - len_split*cv
    if rem>0:
        test_splits[-1] = np.append(test_splits[-1], all_train_timestamps[-rem:])

    train_splits = []
    for test_split in test_splits:
        test_split_max = int(np.max(test_split))
        test_split_min = int(np.min(test_split))
        # get all of the timestamps that aren't in the test split
        train_split_not_embargoed = [e for e in all_train_timestamps if not (test_split_min <= int(e) <= test_split_max)]
        # embargo the train split so we have no leakage. Note timestamps are expressed in seconds, so multiply by 60
        embargo_sec = 60*embargo
        train_split = [e for e in train_split_not_embargoed if
                       abs(int(e) - test_split_max) > embargo_sec and abs(int(e) - test_split_min) > embargo_sec]
        train_splits.append(train_split)

    # convenient way to iterate over train and test splits
    train_test_zip = zip(train_splits, test_splits)
    return train_test_zip

加油,马上到训练的函数了。如你所见,当我们把特征定义好后,使用lightGBM非常简单,get_Xy_and_model_for_asset()函数最终返回的是真实的target和对应的预测target。提醒一下,train_split, test_split存储的都是时间戳,我们使用df_proc.loc[]函数,配合之前定义的features,就可以切分数据集了。

代码语言:javascript复制
def get_Xy_and_model_for_asset(df_proc, asset_id):
    df_proc = df_proc.loc[  (df_proc[f'Target_{asset_id}'] == df_proc[f'Target_{asset_id}'])  ]
    if not_use_overlap_to_train:
        df_proc = df_proc.loc[  (df_proc['train_flg'] == 1)  ]
    
# EmbargoCV
    train_test_zip = get_time_series_cross_val_splits(df_proc, cv = n_fold, embargo = 3750)
    print("entering time series cross validation loop")
    importances = []
    oof_pred = []
    oof_valid = []
    
    for split, train_test_split in enumerate(train_test_zip):
        gc.collect()
        
        print(f"doing split {split 1} out of {n_fold}")
        train_split, test_split = train_test_split
        train_split_index = df_proc['timestamp'].isin(train_split)
        test_split_index = df_proc['timestamp'].isin(test_split)
    
        train_dataset = lgb.Dataset(df_proc.loc[train_split_index, features],
                                    df_proc.loc[train_split_index, f'Target_{asset_id}'].values, 
                                    feature_name = features, 
                                   )
        val_dataset = lgb.Dataset(df_proc.loc[test_split_index, features], 
                                  df_proc.loc[test_split_index, f'Target_{asset_id}'].values, 
                                  feature_name = features, 
                                 )

        print(f"number of train data: {len(df_proc.loc[train_split_index])}")
        print(f"number of val data:   {len(df_proc.loc[test_split_index])}")

        model = lgb.train(params = params,
                          train_set = train_dataset, 
                          valid_sets=[train_dataset, val_dataset],
                          valid_names=['tr', 'vl'],
                          num_boost_round = 5000,
                          verbose_eval = 100,     
                          feval = correlation,
                         )
        importances.append(model.feature_importance(importance_type='gain'))
        
        file = f'trained_model_id{asset_id}_fold{split}.pkl'
        pickle.dump(model, open(file, 'wb'))
        print(f"Trained model was saved to 'trained_model_id{asset_id}_fold{split}.pkl'")
        print("")
            
        oof_pred  = list(  model.predict(df_proc.loc[test_split_index, features])        )
        oof_valid  = list(   df_proc.loc[test_split_index, f'Target_{asset_id}'].values    )
    
    
    plot_importance(np.array(importances),features, PLOT_TOP_N = 20, figsize=(10, 5))

    return oof_pred, oof_valid

恭喜你,运行到下面的代码,不出意外的话已经开始训练了

代码语言:javascript复制
oof = [ [] for id in range(14)   ]

all_oof_pred = []
all_oof_valid = []
all_oof_weight = []

for asset_id, asset_name in zip(df_asset_details['Asset_ID'], df_asset_details['Asset_Name']):
    print(f"Training model for {asset_name:<16} (ID={asset_id:<2})")
    
    oof_pred, oof_valid = get_Xy_and_model_for_asset(feat, asset_id)
    
    weight_temp = float( df_asset_details.loc[  df_asset_details['Asset_ID'] == asset_id  , 'Weight'   ]  )
    
    all_oof_pred  = oof_pred
    all_oof_valid  = oof_valid
    all_oof_weight  = [weight_temp] * len(oof_pred)
    
    oof[asset_id] = corr_score(     np.array(oof_pred)   ,    np.array(oof_valid)    )
    
    print(f'OOF corr score of {asset_name} (ID={asset_id}) is {oof[asset_id]:.5f}. (Weight: {float(weight_temp):.5f})')
    print('')
    print('')

应该能得到下面的训练结果:

训练结束,你会保存一系列的模型文件,通过pickle.load()你可以载入这些模型,然后调用model.predict使用你训练好的模型。快去试一下吧~

1 人点赞