量化投资与机器学习微信公众号,是业内垂直于量化投资、对冲基金、Fintech、人工智能、大数据等领域的主流自媒体。公众号拥有来自公募、私募、券商、期货、银行、保险、高校等行业30W 关注者,荣获2021年度AMMA优秀品牌力、优秀洞察力大奖,连续2年被腾讯云 社区评选为“年度最佳作者”。 量化投资与机器学习公众号 独家撰写 公众号为全网读者带来Backtrader系列自推出第一期以来,受到了众多读者的喜爱与点赞,QIML也会继续把这个系列做好。 让那些割韭菜的课程都随风而去吧!!! 公众号将为大家多维度、多策略、多场景来讲述Backtrader在量化投资领域的实践应用。同时,我们对每段代码都做了解读说明,愿你在Quant的道路上学有所获!
预定系列
- Backtrader 来了
- Backtrader 数据篇
- Backtrader 指标篇
- Backtrader 交易篇(上)
- Backtrader 交易篇(下)
- Backtrader 策略篇
- Backtrader 可视化篇(重构)
- Backtrader 常见案例汇总
- Backtrader 常见问题汇总
- ······
QIML公众号官方Github已上线!
希望大家多Follow,多给星 ★ 我们会把相关推文的数据、代码一并进行同步。同时,我们会在今后的日子里,在全网发布一系列好用、实用、你绝对爱不释手的量化开源工具包! 公众号希望给国内量化投资圈贡献一份自己的力量。希望影响更多人了解量化、学习量化、找到属于一条属于自己的路! 前言 本期案例主要涉及了“选股”、“择时”、“套利” 3 个方面,其中“选股”列举的是多因子选股策略;“择时”列举了均线策略和海龟交易法则;“套利”列举了配对交易策略;共计 6 个案例。 案例1:多因子选股策略 - 基于调仓表 对于逻辑复杂的多因子选股策略,建议将选股过程和回测过程分离开来,在 Backtrader 回测框架外,进行选股,选股结果存成调仓表,然后再将调仓表传给 Backtrader ,让 Backtrader 读取调仓表上的信息,进行策略回测。调仓表上存的选股结果,其实就是每个调仓日应该持有哪些股票以及对应的持仓权重。 import backtrader as bt import backtrader as bt import pandas as pd import datetime # 回测策略 class StockSelectStrategy(bt.Strategy): '''多因子选股 - 基于调仓表''' def __init__(self): # 读取调仓表,表结构如下所示: # trade_date sec_code weight # 0 2019-01-31 000006.SZ 0.007282 # 1 2019-01-31 000008.SZ 0.009783 # ... ... ... ... # 2494 2021-01-28 688088.SH 0.007600 self.buy_stock = pd.read_csv("./Backtrader/data/trade_info.csv", parse_dates=['trade_date']) # 读取调仓日期,即每月的最后一个交易日,回测时,会在这一天下单,然后在下一个交易日,以开盘价买入 self.trade_dates = pd.to_datetime(self.buy_stock['trade_date'].unique()).tolist() self.order_list = [] # 记录以往订单,方便调仓日对未完成订单做处理 self.buy_stocks_pre = [] # 记录上一期持仓 def log(self, txt, dt=None): ''' 策略日志打印函数''' dt = dt or self.datas[0].datetime.date(0) print('%s, %s' % (dt.isoformat(), txt)) def next(self): dt = self.datas[0].datetime.date(0) # 获取当前的回测时间点 # 如果是调仓日,则进行调仓操作 if dt in self.trade_dates: print("--------------{} 为调仓日----------".format(dt)) # 在调仓之前,取消之前所下的没成交也未到期的订单 if len(self.order_list) > 0: for od in self.order_list: self.cancel(od) # 如果订单未完成,则撤销订单 self.order_list = [] #重置订单列表 # 提取当前调仓日的持仓列表 buy_stocks_data = self.buy_stock.query(f"trade_date=='{dt}'") long_list = buy_stocks_data['sec_code'].tolist() print('long_list', long_list) # 打印持仓列表 # 对现有持仓中,调仓后不再继续持有的股票进行卖出平仓 sell_stock = [i for i in self.buy_stocks_pre if i not in long_list] print('sell_stock', sell_stock) # 打印平仓列表 if len(sell_stock) > 0: print("-----------对不再持有的股票进行平仓--------------") for stock in sell_stock: data = self.getdatabyname(stock) if self.getposition(data).size > 0 : od = self.close(data=data) self.order_list.append(od) # 记录卖出订单 # 买入此次调仓的股票:多退少补原则 print("-----------买入此次调仓期的股票--------------") for stock in long_list: w = buy_stocks_data.query(f"sec_code=='{stock}'")['weight'].iloc[0] # 提取持仓权重 data = self.getdatabyname(stock) order = self.order_target_percent(data=data, target=w*0.95) # 为减少可用资金不足的情况,留 5% 的现金做备用 self.order_list.append(order) self.buy_stocks_pre = long_list # 保存此次调仓的股票列表 def notify_order(self, order): # 未被处理的订单 if order.status in [order.Submitted, order.Accepted]: return # 已经处理的订单 if order.status in [order.Completed, order.Canceled, order.Margin]: if order.isbuy(): self.log( 'BUY EXECUTED, ref:%.0f, Price: %.2f, Cost: %.2f, Comm %.2f, Size: %.2f, Stock: %s' % (order.ref, # 订单编号 order.executed.price, # 成交价 order.executed.value, # 成交额 order.executed.comm, # 佣金 order.executed.size, # 成交量 order.data._name)) # 股票名称 else: # Sell self.log('SELL EXECUTED, ref:%.0f, Price: %.2f, Cost: %.2f, Comm %.2f, Size: %.2f, Stock: %s' % (order.ref, order.executed.price, order.executed.value, order.executed.comm, order.executed.size, order.data._name)) # 实例化 cerebro cerebro = bt.Cerebro() # 读取行情数据 daily_price = pd.read_csv("./Backtrader/data/daily_price.csv", parse_dates=['datetime']) daily_price = daily_price.set_index(['datetime']) # 将datetime设置成index # 按股票代码,依次循环传入数据 for stock in daily_price['sec_code'].unique(): # 日期对齐 data = pd.DataFrame(index=daily_price.index.unique()) # 获取回测区间内所有交易日 df = daily_price.query(f"sec_code=='{stock}'")[['open','high','low','close','volume','openinterest']] data_ = pd.merge(data, df, left_index=True, right_index=True, how='left') # 缺失值处理:日期对齐时会使得有些交易日的数据为空,所以需要对缺失数据进行填充 data_.loc[:,['volume','openinterest']] = data_.loc[:,['volume','openinterest']].fillna(0) data_.loc[:,['open','high','low','close']] = data_.loc[:,['open','high','low','close']].fillna(method='pad') data_.loc[:,['open','high','low','close']] = data_.loc[:,['open','high','low','close']].fillna(0) # 导入数据 datafeed = bt.feeds.PandasData(dataname=data_, fromdate=datetime.datetime(2019,1,2), todate=datetime.datetime(2021,1,28)) cerebro.adddata(datafeed, name=stock) # 通过 name 实现数据集与股票的一一对应 print(f"{stock} Done !") # 初始资金 100,000,000 cerebro.broker.setcash(100000000.0) # 佣金,双边各 0.0003 cerebro.broker.setcommission(commission=0.0003) # 滑点:双边各 0.0001 cerebro.broker.set_slippage_perc(perc=0.0001) # 将编写的策略添加给大脑,别忘了 ! cerebro.addstrategy(StockSelectStrategy) # 回测时需要添加 PyFolio 分析器 cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio') result = cerebro.run() # 借助 pyfolio 进一步做回测结果分析 pyfolio = result[0].analyzers.pyfolio # 注意:后面不要调用 .get_analysis() 方法 # 或者是 result[0].analyzers.getbyname('pyfolio') returns, positions, transactions, gross_lev = pyfolio.get_pf_items() import pyfolio as pf pf.create_full_tear_sheet(returns) 上述案例 StockSelectStrategy 类策略部分的核心是:
- 在 __init__() 中一次性读入调仓表,从调仓表中提取出调仓日期;
- 在 next() 中不断的判断当前回测时间点是否为调仓日:如果是调仓日,对被剔除的标的进行平仓,买入新增的标的;如果是非调仓日,不触发下单操作。
案例2:多因子选股策略 - 直接指标选股 直接指标选股就是在将选股逻辑写在 Strategy 策略里,然后一边选股一边交易,包括选股指标的计算也可能是在 __init__() 或 next() 中完成的。多因子选股的常规逻辑是基于多个因子对股票进行排序,然后选出表现好的股票和表现差的股票,而且分为顺序筛选和同时筛选 2 种方式:
- 顺序筛选:在顺序筛选中,投资组合经理会按照选股标准的优先级顺序一步步做出筛选。首先根据最重要的选股标准剔除股票池中不符合条件的股票,然后根据第二最重要的条件对第一次选出的股票再次进行股票筛选,依此类推,直到将投资范围缩小为满足所有选股条件的股票清单为止。
- 同时筛选:在同时筛选中,投资组合经理将所有选股标准(选股因子)同时应用于股票筛选,并计算待筛选股票在整个选股标准集上的综合得分,再基于综合得分的排名筛选股票。
本案例对应的是同时筛选这种方式,在横截面上,计算每只股票在所有因子上的rank,然后将所有 rank 进行求和,得到综合 rank,再基于综合 rank 进行选股和调仓。 import backtrader as bt import datetime import pandas as pd class PandasData_more(bt.feeds.PandasData): lines = ('ROE', 'EP', ) # 要添加的线 # 设置 line 在数据源上的列位置 params = dict( ROE=-1, # 设置新增指标的位置,-1表示自动按列明匹配数据 EP=-1 ) class StockSelectStrategy(bt.Strategy): params = dict( selnum=30, # 设置持仓股数在总的股票池中的占比,如买入表现最好的前30只股票 rperiod=1, # 计算收益率的周期 vperiod=6, # 计算波动率的周期,过去6个月的波动率 mperiod=2, # 计算动量的周期,如过去2个月的收益 reserve=0.05 # 5% 为了避免出现资金不足的情况,每次调仓都预留 5% 的资金不用于交易 ) def log(self, arg): print('{} {}'.format(self.datetime.date(), arg)) def __init__(self): # 计算持仓权重,等权 self.perctarget = (1.0 - self.p.reserve) / self.p.selnum # 循环计算每只股票的收益波动率因子 self.rs = {d:bt.ind.PctChange(d, period=self.p.rperiod) for d in self.datas} self.vs = {d:1/(bt.ind.StdDev(ret, period=self.p.vperiod) 0.000001) for d,ret in self.rs.items()} # 循环计算每只股票的动量因子 self.ms = {d:bt.ind.ROC(d, period=self.p.mperiod) for d in self.datas} # 将 ep 和 roe 因子进行匹配 self.EP = {d:d.lines.EP for d in self.datas} self.ROE = {d:d.lines.ROE for d in self.datas} self.all_factors = [self.rs, self.vs, self.ms, self.EP, self.ROE] def next(self): # 在每个横截面上计算所有因子的综合排名 stocks = list(self.datas) ranks = {d:0 for d in stocks} # 计算每个因子的rank,并进行求和 for factor in self.all_factors: stocks.sort(key=lambda x: factor[x][0], reverse=True) # print({x._name:factor[x][0] for x in stocks}) ranks = {d:i ranks[d] for d,i in zip(stocks, range(1,len(stocks) 1))} # print({d._name:rank for d,rank in ranks.items()}) # 对各因子rank求和后的综合值进行最后的排序,最大综合值排最前面 # 买入 动量、ep、roe 高;波动率低的股票 ranks = sorted(ranks.items(), key=lambda x: x[1], reverse=False) # print({i._name:rank for (i,rank) in ranks}) # 选取前 self.p.selnum 只股票作为持仓股 rtop = dict(ranks[:self.p.selnum]) # 剩余股票将从持仓中剔除(如果在持仓里的话) rbot = dict(ranks[self.p.selnum:]) # 提取有仓位的股票 posdata = [d for d, pos in self.getpositions().items() if pos] # 删除不在继续持有的股票,进而释放资金用于买入新的股票 for d in (d for d in posdata if d not in rtop): self.log('Leave {} - Rank {:.2f}'.format(d._name, rbot[d])) self.order_target_percent(d, target=0.0) # 对下一期继续持有的股票,进行仓位调整 for d in (d for d in posdata if d in rtop): self.log('Rebal {} - Rank {:.2f}'.format(d._name, rtop[d])) self.order_target_percent(d, target=self.perctarget) del rtop[d] # 买入当前持仓中没有的股票 for d in rtop: self.log('Enter {} - Rank {:.2f}'.format(d._name, rtop[d])) self.order_target_percent(d, target=self.perctarget) # 实例化 cerebro cerebro = bt.Cerebro() # 读取行情数据 month_price = pd.read_csv("./data/month_price.csv", parse_dates=['datetime']) month_price = month_price.set_index(['datetime']).sort_index() # 将datetime设置成index # 按股票代码,依次循环传入数据 for stock in month_price['sec_code'].unique(): # 日期对齐 data = pd.DataFrame(index=month_price.index.unique()) # 获取回测区间内所有交易日 df = month_price.query(f"sec_code=='{stock}'")[['open','high','low','close','volume','openinterest']] data_ = pd.merge(data, df, left_index=True, right_index=True, how='left') # 缺失值处理:日期对齐时会使得有些交易日的数据为空,所以需要对缺失数据进行填充 data_.loc[:,['volume','openinterest']] = data_.loc[:,['volume','openinterest']].fillna(0) data_.loc[:,['open','high','low','close','EP','ROE']] = data_.loc[:,['open','high','low','close']].fillna(method='pad') data_.loc[:,['open','high','low','close','EP','ROE']] = data_.loc[:,['open','high','low','close']].fillna(0.0000001) # 导入数据 datafeed = PandasData_more(dataname=data_, fromdate=datetime.datetime(2019,1,31), todate=datetime.datetime(2021,8,31), timeframe=bt.TimeFrame.Months) # 将数据的时间周期设置为月度 cerebro.adddata(datafeed, name=stock) # 通过 name 实现数据集与股票的一一对应 print(f"{stock} Done !") # 初始资金 100,000,000 cerebro.broker.setcash(100000000.0) # 佣金,双边各 0.0003 cerebro.broker.setcommission(commission=0.0003) # 滑点:双边各 0.0001 cerebro.broker.set_slippage_perc(perc=0.0001) # 将编写的策略添加给大脑,别忘了 ! cerebro.addstrategy(StockSelectStrategy) # 返回收益率时序 cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='_TimeReturn') result = cerebro.run() # 得到收益率时序 ret = pd.Series(result[0].analyzers._TimeReturn.get_analysis()) ######### 注意 ######### # PyFolio 分析器返回的收益也是月度收益,但是绘制的各种收益分析图形会有问题,有些图绘制不出来 上述案例 StockSelectStrategy 类策略部分的核心是:
- 简单的选用了波动率、动量、EP、ROE 4个因子,其中波动率、动量是在__init__() 中临时计算的,EP、ROE 是与行情数据一起直接导入的;
- 在 next() 中,借助 sort(适用于 list 的排序)和 sorted(可对所有可迭代的对象进行排序操作,案例中用于对 dict 进行排序)对每一只股票的 self.data 进行排序,然后再以 self.data 为 key,给每个 key 赋一个 rank 值;循环每个因子都进行排序,然后累加上之前因子的排序值;最后再基于总的 rank 累加值,筛选出 rank 排名最靠前的 N 只股票作为持仓股;
- 本案例用的是月度数据,每月末进行调仓操作;
- 策略参考来源:Rebalancing - Conservative Formula - Backtrader(https://www.backtrader.com/blog/2019-07-19-rebalancing-conservative/rebalancing-conservative/);
- 在上面策略的 next() 中,“# step2: 对下一期继续持有的股票,进行仓位调整”这一步是可以省略的,后面的“# step3: 买入当前持仓中没有的股票”中的 order_target_percent() 下单函数就能一步到位的实现 step2 和 step3 的仓位调整操作。
案例3:均线策略 - 双均线 均线策略中最常见的一种方法是根据长期均线和短期均线的交叉情况来确定交易信号,即:当短期均线从下往上穿越长期均线时,形成金叉,做多;反之,当长期均线从上往下穿越短期均线时,形成死叉,做空或平仓。下面是常见的双均线策略如下:
- 均线:以 5 日均线为短期均线、以 20 日均线为长期均线;
- 买入开仓:当前无持仓,当日 5 日均线上穿 20 日均线,第二天以市价单买入,开仓;
- 卖出平仓:当前持有多单,当日 5 日均线下穿 20 日均线,第二天以市价单卖出,平仓。
import backtrader as bt # 自定义信号指标 class MySignal(bt.Indicator): lines = ('signal',) # 声明 signal 线,交易信号放在 signal line 上 params = dict( short_period=5, long_period=20) def __init__(self): self.s_ma = bt.ind.SMA(period=self.p.short_period) self.l_ma = bt.ind.SMA(period=self.p.long_period) # 短期均线上穿长期均线,取值为1;反之,短期均线下穿长期均线,取值为-1 self.lines.signal = bt.ind.CrossOver(self.s_ma, self.l_ma) # 实例化大脑 cerebro = bt.Cerebro() # 加载数据 # 读取行情数据 daily_price = pd.read_csv("./data/daily_price.csv", parse_dates=['datetime']) stock_name = '000006.SZ' stock_price = daily_price.query(f"sec_code=='{stock_name}'").set_index('datetime') datafeed = bt.feeds.PandasData(dataname=stock_price, fromdate=pd.to_datetime('2019-01-02'), todate=pd.to_datetime('2021-01-28')) cerebro.adddata(datafeed, name=stock_name) # 初始资金 1,000,000 cerebro.broker.setcash(1000000.0) # 佣金,双边各 0.0003 cerebro.broker.setcommission(commission=0.0003) # 滑点:双边各 0.0001 cerebro.broker.set_slippage_perc(perc=0.0001) #每次固定交易100股 cerebro.addsizer(bt.sizers.FixedSize, stake=100) # 添加交易信号 cerebro.add_signal(bt.SIGNAL_LONG, MySignal) # 回测时需要添加 PyFolio 分析器 cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio') result = cerebro.run() cerebro.plot(iplot=False) # 借助 pyfolio 进一步做回测结果分析 案例4:均线策略 - 三均线 三均线策略与双均线策略类似,只不过交易信号是由短期均线、中期均线、长期均线这 3 条均线共同确定的。如果只考虑做多的情况,一般是短期均线>中期均线>长期均线,呈多头排列时,买入开仓;出现短期均线下穿中期均线时,卖出平仓。下面是案例具体的策略逻辑:
- 均线:5 日均线为短期均线、20 日均线为中期均线、60 日均线为长期均线;
- 买入开仓:当前无持仓,当开始出现 5 日均线>20 日均线>60 日均线多头排列时,第二天以市价单买入,开仓;
- 卖出平仓:当前持有多单,当日 5 日均线下穿 20 日均线,第二天以市价单卖出,平仓。
import backtrader as bt # 自定义信号指标 class MySignal(bt.Indicator): lines = ('signal',) # 声明 signal 线,交易信号放在 signal line 上 params = dict( short_period=5, median_period=20, long_period=60) def __init__(self): self.s_ma = bt.ind.SMA(period=self.p.short_period) self.m_ma = bt.ind.SMA(period=self.p.median_period) self.l_ma = bt.ind.SMA(period=self.p.long_period) # 短期均线在中期均线上方,且中期均取也在长期均线上方,三线多头排列,取值为1;反之,取值为0 self.signal1 = bt.And(self.m_ma>self.l_ma, self.s_ma>self.m_ma) # 求上面 self.signal1 的环比增量,可以判断得到第一次同时满足上述条件的时间,第一次满足条件为1,其余条件为0 self.buy_signal = bt.If((self.signal1-self.signal1(-1))>0, 1, 0) # 短期均线下穿长期均线时,取值为1;反之取值为0 self.sell_signal = bt.ind.CrossDown(self.s_ma, self.m_ma) # 将买卖信号合并成一个信号 self.lines.signal = bt.Sum(self.buy_signal, self.sell_signal*(-1)) # 实例化大脑 cerebro = bt.Cerebro() # 加载数据 # 读取行情数据 daily_price = pd.read_csv("./data/daily_price.csv", parse_dates=['datetime']) stock_name = '000006.SZ' stock_price = daily_price.query(f"sec_code=='{stock_name}'").set_index('datetime') datafeed = bt.feeds.PandasData(dataname=stock_price, fromdate=pd.to_datetime('2019-01-02'), todate=pd.to_datetime('2021-01-28')) cerebro.adddata(datafeed, name=stock_name) # 初始资金 1,000,000 cerebro.broker.setcash(1000000.0) # 佣金,双边各 0.0003 cerebro.broker.setcommission(commission=0.0003) # 滑点:双边各 0.0001 cerebro.broker.set_slippage_perc(perc=0.0001) #每次固定交易100股 cerebro.addsizer(bt.sizers.FixedSize, stake=100) # 添加交易信号 cerebro.add_signal(bt.SIGNAL_LONG, MySignal) # 回测时需要添加 PyFolio 分析器 cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio') result = cerebro.run() cerebro.plot(iplot=False) # 借助 pyfolio 进一步做回测结果分析 上述案例的特点是:
- 在双均线策略的基础上,加入了中期均线;
- 3 条均线的多头排列是通过 bt.And 来判断的,但还需要捕捉第一次出现多头排列的时间点,借助了环比增量的计算逻辑,使得第一次出现的那个时间点取值为 1,其它剩余的时间点都不是产生多头信号的时间点(取值等于0或-1);
- 因为买入信号和卖出信号是各算各的,所以最后还需要对两个信号进行整合;由于两个信号形成条件不存在冲突,所以直接求和即可,-1 对应卖出信号、1对应买入信号、0 对应不做调仓操作;
- 如果想借助多个技术指标生成一个综合信号,也可以借鉴上面的多信号合成逻辑,将信号合成问题转换成数学指标的计算问题。
案例5:海龟交易策略 海龟交易法是一套非常经典的交易系统,因为它涵盖了交易品种的选择、头寸规模、单位头寸的限制、入场、逐步加仓、止损、离场(止盈)这一整套相对完备的交易体系,特别是其中的头寸管理或资金管理的思想,无论是股票交易,还是期货交易......都非常有借鉴意义。下面案例实现的策略细节如下:
- 策略标的:沪深 300 主力合约(行情数据为前复权数据)
- 指标计算:
- 用 20 日的最高、最低、收盘价计算平均真实波幅 ATR;
- 计算出近 20 日的最高与 20 日最低价,构建唐奇安通道。
- 交易信号:
- 入场:价格突破 20 日价格高点时,入场;
- 加仓:价格继续上涨至 0.5 倍 ATR ,再次加仓,加仓次数不超过 3 次;
- 止损:价格回落 2 倍 ATR 时止损离场;
- 止盈:价格突破 10 日最低点时止盈离场;
- 做空与做多的逻辑相反。
class TurtleTradingStrategy(bt.Strategy): params = dict( N1= 20, # 唐奇安通道上轨的t N2=10, # 唐奇安通道下轨的t ) def log(self, txt, dt=None): dt = dt or self.datas[0].datetime.date(0) print('%s, %s' % (dt.isoformat(), txt)) def __init__(self): self.order = None self.buy_count = 0 # 记录买入次数 self.last_price = 0 # 记录买入价格 # 准备第一个标的沪深300主力合约的close、high、low 行情数据 self.close = self.datas[0].close self.high = self.datas[0].high self.low = self.datas[0].low # 计算唐奇安通道上轨:过去20日的最高价 self.DonchianH = bt.ind.Highest(self.high(-1), period=self.p.N1, subplot=True) # 计算唐奇安通道下轨:过去10日的最低价 self.DonchianL = bt.ind.Lowest(self.low(-1), period=self.p.N2, subplot=True) # 生成唐奇安通道上轨突破:close>DonchianH,取值为1.0;反之为 -1.0 self.CrossoverH = bt.ind.CrossOver(self.close(0), self.DonchianH, subplot=False) # 生成唐奇安通道下轨突破: self.CrossoverL = bt.ind.CrossOver(self.close(0), self.DonchianL, subplot=False) # 计算 ATR self.TR = bt.ind.Max((self.high(0)-self.low(0)), # 当日最高价-当日最低价 abs(self.high(0)-self.close(-1)), # abs(当日最高价−前一日收盘价) abs(self.low(0)-self.close(-1))) # abs(当日最低价-前一日收盘价) self.ATR = bt.ind.SimpleMovingAverage(self.TR, period=self.p.N1, subplot=False) # 计算 ATR,直接调用 talib ,使用前需要安装 python3 -m pip install TA-Lib # self.ATR = bt.talib.ATR(self.high, self.low, self.close, timeperiod=self.p.N1, subplot=True) def next(self): # 如果还有订单在执行中,就不做新的仓位调整 if self.order: return # 如果当前持有多单 if self.position.size > 0 : # 多单加仓:价格上涨了买入价的0.5的ATR且加仓次数少于等于3次 if self.datas[0].close >self.last_price 0.5*self.ATR[0] and self.buy_count <= 4: print('if self.datas[0].close >self.last_price 0.5*self.ATR[0] and self.buy_count <= 4:') print('self.buy_count',self.buy_count) # 计算建仓单位:self.ATR*期货合约乘数300*保证金比例0.1 self.buy_unit = max((self.broker.getvalue()*0.005)/(self.ATR*300*0.1),1) self.buy_unit = int(self.buy_unit) # 交易单位为手 # self.sizer.p.stake = self.buy_unit self.order = self.buy(size=self.buy_unit) self.last_price = self.position.price # 获取买入价格 self.buy_count = self.buy_count 1 #多单止损:当价格回落2倍ATR时止损平仓 elif self.datas[0].close < (self.last_price - 2*self.ATR[0]): print('elif self.datas[0].close < (self.last_price - 2*self.ATR[0]):') self.order = self.sell(size=abs(self.position.size)) self.buy_count = 0 #多单止盈:当价格突破10日最低点时止盈离场 平仓 elif self.CrossoverL < 0: print('self.CrossoverL < 0') self.order = self.sell(size=abs(self.position.size)) self.buy_count = 0 # 如果当前持有空单 elif self.position.size < 0 : # 空单加仓:价格小于买入价的0.5的ATR且加仓次数少于等于3次 if self.datas[0].close<self.last_price-0.5*self.ATR[0] and self.buy_count <= 4: print('self.datas[0].close<self.last_price-0.5*self.ATR[0] and self.buy_count <= 4') # 计算建仓单位:self.ATR*期货合约乘数300*保证金比例0.1 self.buy_unit = max((self.broker.getvalue()*0.005)/(self.ATR*300*0.1),1) self.buy_unit = int(self.buy_unit) # 交易单位为手 # self.sizer.p.stake = self.buy_unit self.order = self.sell(size=self.buy_unit) self.last_price = self.position.price # 获取买入价格 self.buy_count = self.buy_count 1 #空单止损:当价格上涨至2倍ATR时止损平仓 elif self.datas[0].close < (self.last_price 2*self.ATR[0]): print('self.datas[0].close < (self.last_price 2*self.ATR[0])') self.order = self.buy(size=abs(self.position.size)) self.buy_count = 0 #多单止盈:当价格突破20日最高点时止盈平仓 elif self.CrossoverH>0: print('self.CrossoverH>0') self.order = self.buy(size=abs(self.position.size)) self.buy_count = 0 else: # 如果没有持仓,等待入场时机 #入场: 价格突破上轨线且空仓时,做多 if self.CrossoverH > 0 and self.buy_count == 0: print('if self.CrossoverH > 0 and self.buy_count == 0:') # 计算建仓单位:self.ATR*期货合约乘数300*保证金比例0.1 self.buy_unit = max((self.broker.getvalue()*0.005)/(self.ATR*300*0.1),1) self.buy_unit = int(self.buy_unit) # 交易单位为手 self.order = self.buy(size=self.buy_unit) self.last_price = self.position.price # 记录买入价格 self.buy_count = 1 # 记录本次交易价格 #入场: 价格跌破下轨线且空仓时,做空 elif self.CrossoverL < 0 and self.buy_count == 0: print('self.CrossoverL < 0 and self.buy_count == 0') # 计算建仓单位:self.ATR*期货合约乘数300*保证金比例0.1 self.buy_unit = max((self.broker.getvalue()*0.005)/(self.ATR*300*0.1),1) self.buy_unit = int(self.buy_unit) # 交易单位为手 self.order = self.sell(size=self.buy_unit) self.last_price = self.position.price # 记录买入价格 self.buy_count = 1 # 记录本次交易价格 # 打印订单日志 def notify_order(self, order): order_status = ['Created','Submitted','Accepted','Partial', 'Completed','Canceled','Expired','Margin','Rejected'] # 未被处理的订单 if order.status in [order.Submitted, order.Accepted]: self.log('ref:%.0f, name: %s, Order: %s'% (order.ref, order.data._name, order_status[order.status])) return # 已经处理的订单 if order.status in [order.Partial, order.Completed]: if order.isbuy(): self.log( 'BUY EXECUTED, status: %s, ref:%.0f, name: %s, Size: %.2f, Price: %.2f, Cost: %.2f, Comm %.2f' % (order_status[order.status], # 订单状态 order.ref, # 订单编号 order.data._name, # 股票名称 order.executed.size, # 成交量 order.executed.price, # 成交价 order.executed.value, # 成交额 order.executed.comm)) # 佣金 else: # Sell self.log('SELL EXECUTED, status: %s, ref:%.0f, name: %s, Size: %.2f, Price: %.2f, Cost: %.2f, Comm %.2f' % (order_status[order.status], order.ref, order.data._name, order.executed.size, order.executed.price, order.executed.value, order.executed.comm)) elif order.status in [order.Canceled, order.Margin, order.Rejected, order.Expired]: # 订单未完成 self.log('ref:%.0f, name: %s, status: %s'% ( order.ref, order.data._name, order_status[order.status])) self.order = None def notify_trade(self, trade): # 交易刚打开时 if trade.justopened: self.log('Trade Opened, name: %s, Size: %.2f,Price: %.2f' % ( trade.getdataname(), trade.size, trade.price)) # 交易结束 elif trade.isclosed: self.log('Trade Closed, name: %s, GROSS %.2f, NET %.2f, Comm %.2f' %( trade.getdataname(), trade.pnl, trade.pnlcomm, trade.commission)) # 更新交易状态 else: self.log('Trade Updated, name: %s, Size: %.2f,Price: %.2f' % ( trade.getdataname(), trade.size, trade.price)) # 创建主控制器 cerebro = bt.Cerebro() # 准备股票日线数据,输入到backtrader IF_price = pd.read_csv('./data/IF_20200101_20220430.csv', parse_dates=['datetime'], index_col=0) datafeed = bt.feeds.PandasData(dataname=IF_price, fromdate=pd.to_datetime('2020-01-01'), todate=pd.to_datetime('2022-04-30')) cerebro.adddata(datafeed, name='IF') # 初始资金 100,000,000 cerebro.broker.setcash(1000000.0) cerebro.broker.setcommission(commission=0.1, # 按 0.1% 来收取手续费 mult=300, # 合约乘数 margin=0.1, # 保证金比例 percabs=False, # 表示 commission 以 % 为单位 commtype=bt.CommInfoBase.COMM_FIXED, stocklike=False) # 加入策略 cerebro.addstrategy(TurtleTradingStrategy) # 回测时需要添加 PyFolio 分析器 cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio') result = cerebro.run() # 借助 pyfolio 进一步做回测结果分析 pyfolio = result[0].analyzers.pyfolio # 注意:后面不要调用 .get_analysis() 方法 # 或者是 result[0].analyzers.getbyname('pyfolio') returns, positions, transactions, gross_lev = pyfolio.get_pf_items() import pyfolio as pf pf.create_full_tear_sheet(returns) 其他说明:
- 如果想将海龟交易法则用在股票上,由于国内股票不存在做空机制,大家直接将上面做空部分的操作逻辑删除即可;
- 唐奇安通道和布林带是非常相似的,所以布林带策略也可以借鉴上面案例中买卖条件的设定逻辑来实现。
案例6:配对交易策略 配对交易策略属于统计套利范畴。本案例要介绍的配对交易策略是基于价格序列之间的协整关系而构建的,投资标的是股票,主要思想是:寻找具有协整关系(表明双方存在长期均衡关系,价格走势相似)的一对股票,然后利用双方价差进行套利。在使用 Backtrader 实现配对交易策略前,需要先寻找出存在协整关系的一对股票,再交易这对股票,整个配对交易策略的流程可以分为如下几步:
- step1:在股票池中,基于收盘价序列,筛选出存在协整关系的股票对:
from statsmodels.tsa.stattools import coint #协整分析的模块 def find_cointergrated_stocks(data): '''查找股票列表中有协整关系的股票''' cols = data.columns pairs = [] for i,j in zip(cols[:-1], cols[1:]): data_ = data.loc[:,[i,j]].dropna() result = coint(data_.loc[:,i],data_.loc[:,j]) # 对两只股票进行协整检验 if result[1]<0.01: # 当 pvalue 小于 0.01 时,拒绝原假设,股票间存在显著的协整关系 pairs.append((i,j,result[1])) #记录存在协整关系的股票 pairs = sorted(pairs, key=la
- step2:选择一对满足协整关系的股票,检验它们是否是同阶单整(协整关系的前提条件):先检验原序列是否是平稳序列,再检验一阶差分后的序列是否都为平稳序列,如果前者都为非平稳序列,后者都为平稳序列,则可以认为双方都是 1 阶单整的;
from statsmodels.tsa.stattools import adfuller # adf检验模块 def test_stationarity(data): adftest = adfuller(data) result = pd.Series(adftest[0:4],index=['adf','pvalue','lags used','number of pbservations']) for key,value in adftest[4].items(): result['Critical value(%s)'%key] = value return result
- step3:一般可以通过 OLS 线性回归来估计双方的长期均衡关系,并得到价差序列,这个价差序列就是回归方程的残差:spear = y - (a bx),同时还需要检验这个价差序列是否平稳;
import statsmodels.api as sm X = sm.add_constant(daily_close.loc[:,'600718.SH']) # 添加常数项 y = daily_close.loc[:,'600728.SH'] res = (sm.OLS(y,X)).fit() residual = res.resid #得到估计的残差序列 print(res.summary()) #查看OLS估计结果 print(test_stationarity(residual)) # 查看残差的平稳性
- step4: 对价差序列进行 zscore 标准化处理,并用 zscore 值构建配对交易策略(zscore衡量的是价差序列偏离了其均值多少倍的标准差):
- 当 zscore>1 则认为 y 取值相对过高,x 取值相对过低,即 y 的价格相对高估,x 的价格相对低估,此时应该做多x,做空 y ;
- 当 zscore<-1 则认为 y 取值相对过低,x 取值相对过高,即 y 的价格相对低估,x 的价格相对高估,此时应该做空 x,做多 y ;
- 当-0.5<=zscore<=0.5,认为价差相对均衡,不存在套利空间,平仓了结。
上述的 step3 和 step4,可以通过 Backtrader 中的 OLS_TransformationN 直接计算完成,bt.ind.OLS_TransformationN 的源码如下: 地址:backtrader/ols.py at e2674b1690f6366e08646d8cfd44af7bb71b3970 · mementum/backtrader (github.com) class OLS_Slope_InterceptN(PeriodN): ''' Calculates a linear regression using ``statsmodel.OLS`` (Ordinary least squares) of data1 on data0 Uses ``pandas`` and ``statsmodels`` ''' _mindatas = 2 # ensure at least 2 data feeds are passed packages = ( ('pandas', 'pd'), ('statsmodels.api', 'sm'), ) lines = ('slope', 'intercept',) params = ( ('period', 10), ) def next(self): p0 = pd.Series(self.data0.get(size=self.p.period)) p1 = pd.Series(self.data1.get(size=self.p.period)) p1 = sm.add_constant(p1) intercept, slope = sm.OLS(p0, p1).fit().params self.lines.slope[0] = slope self.lines.intercept[0] = intercept class OLS_TransformationN(PeriodN): ''' Calculates the ``zscore`` for data0 and data1. Although it doesn't directly uses any external package it relies on ``OLS_SlopeInterceptN`` which uses ``pandas`` and ``statsmodels`` ''' _mindatas = 2 # ensure at least 2 data feeds are passed lines = ('spread', 'spread_mean', 'spread_std', 'zscore',) params = (('period', 10),) def __init__(self): slint = OLS_Slope_InterceptN(*self.datas) spread = self.data0 - (slint.slope * self.data1 slint.intercept) self.l.spread = spread self.l.spread_mean = bt.ind.SMA(spread, period=self.p.period) self.l.spread_std = bt.ind.StdDev(spread, period=self.p.period) self.l.zscore = (spread - self.l.spread_mean) / self.l.spread_std
- OLS_TransformationN 返回的 zscore 就是 step4 中标准化后的价差序列。参数 period 对应的是标准化时使用的是过去某段时间的价差序列;
- OLS_TransformationN 的 OLS 估计是直接调用的OLS_Slope_InterceptN,也是采用的过去一段时间的价格序列做的 OLS 估计;
- 从上面可知,在回测的过程中,股票对的协整关系是以滚动固定窗口长度的形式进行动态更新的。
下面是以 ['600718.SH','600728.SH'] 这个存在协整关系的股票对为例,编写的回测案例: import backtrader as bt class PairTradingStrategy(bt.Strategy): params = dict( period=10, # z-score标准化处理时对应的时间窗口 upper=1, lower=-1, up_medium=0.5, low_medium=-0.5, ) def log(self, txt, dt=None): dt = dt or self.datas[0].datetime.date(0) print('%s, %s' % (dt.isoformat(), txt)) def __init__(self): # To control operation entries self.order = None self.upper_limit = self.p.upper self.lower_limit = self.p.lower self.up_medium = self.p.up_medium self.low_medium = self.p.low_medium self.status = 0 # 通过ols拟合协整关系,并对协整序列进行z-score标准化处理,返回z-score值 # self.data0 为 y ; self.data1 为 X self.transform = bt.ind.OLS_TransformationN(self.data0, self.data1, period=self.p.period) self.zscore = self.transform.zscore def next(self): # 如果订单还未完成,则不生成新订单 if self.order: return # 如果 zscore 超过上线,说明 y=data0 的价格相对高估,x=data1 的价格相对低估,此时应该做多x,做空Y; if (self.zscore[0] > self.upper_limit) and (self.status != 1): print(self.zscore[0], self.upper_limit, self.status) print('self.zscore[0] > self.upper_limit) and (self.status != 1)') # 做空data0 self.sell(data=self.data0) # 做多data1 self.buy(data=self.data1) self.status = 1 # 处于 zscore 超过上线 的状态,标记为 1 # 如果 zscore 跌破下线,说明 y=data0 的价格相对低估,x=data1 的价格相对高估,此时应该做空x,做多Y; elif (self.zscore[0] < self.lower_limit) and (self.status != 2): print(self.zscore[0], self.lower_limit, self.status) print('(self.zscore[0] < self.lower_limit) and (self.status != 2)') # 做多data0 self.buy(data=self.data0) # 做空data1 self.sell(data=self.data1) self.status = 2 # 处于 zscore 跌破下线的状况,标记为 2 # 如果 zscore 位于中间区域,认为已经不存在套利空间,则退出所有头寸 elif (self.zscore[0] <= self.up_medium) and (self.zscore[0] >= self.low_medium): print(self.zscore[0], self.up_medium, self.low_medium) print('(self.zscore[0] <= self.up_medium) and (self.zscore[0] >= self.low_medium)') self.close(self.data0) self.close(self.data1) def stop(self): print('==================================================') print('Starting Value - %.2f' % self.broker.startingcash) print('Ending Value - %.2f' % self.broker.getvalue()) print('==================================================') # 打印订单日志 def notify_order(self, order): order_status = ['Created','Submitted','Accepted','Partial', 'Completed','Canceled','Expired','Margin','Rejected'] # 未被处理的订单 if order.status in [order.Submitted, order.Accepted]: self.log('ref:%.0f, name: %s, Order: %s'% (order.ref, order.data._name, order_status[order.status])) return # 已经处理的订单 if order.status in [order.Partial, order.Completed]: if order.isbuy(): self.log( 'BUY EXECUTED, status: %s, ref:%.0f, name: %s, Size: %.2f, Price: %.2f, Cost: %.2f, Comm %.2f' % (order_status[order.status], # 订单状态 order.ref, # 订单编号 order.data._name, # 股票名称 order.executed.size, # 成交量 order.executed.price, # 成交价 order.executed.value, # 成交额 order.executed.comm)) # 佣金 else: # Sell self.log('SELL EXECUTED, status: %s, ref:%.0f, name: %s, Size: %.2f, Price: %.2f, Cost: %.2f, Comm %.2f' % (order_status[order.status], order.ref, order.data._name, order.executed.size, order.executed.price, order.executed.value, order.executed.comm)) elif order.status in [order.Canceled, order.Margin, order.Rejected, order.Expired]: # 订单未完成 self.log('ref:%.0f, name: %s, status: %s'% ( order.ref, order.data._name, order_status[order.status])) self.order = None def notify_trade(self, trade): # 交易刚打开时 if trade.justopened: self.log('Trade Opened, name: %s, Size: %.2f,Price: %.2f' % ( trade.getdataname(), trade.size, trade.price)) # 交易结束 elif trade.isclosed: self.log('Trade Closed, name: %s, GROSS %.2f, NET %.2f, Comm %.2f' %( trade.getdataname(), trade.pnl, trade.pnlcomm, trade.commission)) # 更新交易状态 else: self.log('Trade Updated, name: %s, Size: %.2f,Price: %.2f' % ( trade.getdataname(), trade.size, trade.price)) # 实例化大脑 cerebro = bt.Cerebro() # 加载数据 # 读取行情数据 daily_price = pd.read_csv("./data/daily_price.csv", parse_dates=['datetime']) for stock_name in ['600718.SH','600728.SH']: stock_price = daily_price.query(f"sec_code=='{stock_name}'").set_index('datetime') datafeed = bt.feeds.PandasData(dataname=stock_price, fromdate=pd.to_datetime('2019-01-02'), todate=pd.to_datetime('2021-01-28')) cerebro.adddata(datafeed, name=stock_name) # 初始资金 1,000,000 cerebro.broker.setcash(1000000.0) # 佣金,双边各 0.0003 cerebro.broker.setcommission(commission=0.0003) # 滑点:双边各 0.0001 cerebro.broker.set_slippage_perc(perc=0.0001) #每次固定交易100股 cerebro.addsizer(bt.sizers.FixedSize, stake=100) # 添加交易信号 cerebro.addstrategy(PairTradingStrategy) # 回测时需要添加 PyFolio 分析器 cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio') result = cerebro.run() cerebro.plot() # 借助 pyfolio 进一步做回测结果分析
- 上述案例的参考资为:https://github.com/mementum/backtrader/blob/master/contrib/samples/pair-trading/pair-trading.py
- 对于商品期货的配对交易,由于商品间的相似关系是比较明确的,有时双方的价差序列并不会通过 OLS 等方式进行拟合,而是直接求双方的价格差(spear=Y_close-X_close 或者 spear=Y_close/X_close),并对价格差进行标准化处理得到 zscore,然后进行价差套利:
class PairTradingStrategy(bt.Strategy): params = dict( window1=10, # 价差的短期移动均线 window2=60, # 价差的长期移动均线 upper=1, lower=-1, up_medium=0.5, low_medium=-0.5, ) def __init__(self): self.status = 0 # 计算价差 spread = self.data0.close / self.data1.close # 计算价差的短期均线 self.spread_ma1 = bt.ind.SMA(spread, period=self.p.window1) # 计算价差的长期均线 self.spread_ma2 = bt.ind.SMA(spread, period=self.p.window2) # 计算价差的标准差 self.spread_std = bt.ind.StdDev(spread, period=self.p.window2) # 对价差进行"标准化"处理 self.zscore = (self.spread_ma1 - self.spread_ma2) / self.spread_std def next(self): # 如果 z-score>1 时,做空价格高的data0,做多价格低的data1; if (self.zscore[0] > self.upper_limit) and (self.status != 1): # 做空data0 self.sell(data=self.data0) # 做多data1 self.buy(data=self.data1) self.status = 1 # 处于 zscore 超过上线 的状态,标记为 1 # 如果 z-score<-1 时,做多价格低的data0,做空价格高的data1; elif (self.zscore[0] < self.lower_limit) and (self.status != 2): # 做多data0 self.buy(data=self.data0) # 做空data1 self.sell(data=self.data1) self.status = 2 # 处于 zscore 跌破下线的状况,标记为 2 # 如果 zscore 位于中间区域,认为已经不存在套利空间,则退出所有头寸 elif (self.zscore[0] <= self.up_medium) and (self.zscore[0] >= self.low_medium): self.close(self.data0) self.close(self.data1) 总结 上文虽然只具体介绍了 6 个案例,但这 6 个案例的一些操作技巧却是可以复用到很多别的策略上。重要的不是某个具体的交易策略,而是各种策略交易信号的生成方式和买卖操作逻辑的实现方式。 获取完整代码,点击阅读原文