前言
本文是BackTrader数据源系列的中篇。
文内会省略大量上篇文章的代码内容,直接阅读会产生轻微困惑。阅读前请务必完整理解并完成了Quickstart Guide - Backtrader 并阅读和实操过本系列第一篇:
【PA交易】BackTrader(一): 如何使用实时tick数据和蜡烛图-CSDN博客
回顾
上一篇文章介绍了背景和需求,同时展示了如何自定义数据源并将tick数据读取到策略中使用。
现在为了看起来更贴近实盘交易,我们将上一篇中的MyDataFeedStatic简单修改一下,修改为MyDataFeedDynamic,该类的整体实现如下:
python">
TICK_DATA_COLUMNS = ('price', 'vol', 'amount', 'ccl', 'bid', 'bidVol', 'ask', 'askVol')
class MyDataFeedDynamic(bt.feed.DataBase):
lines = TICK_DATA_COLUMNS
def __init__(self, data_reader: ABCDataReader):
super(MyDataFeedDynamic, self).__init__()
self._data_reader = data_reader
def islive(self):
return True
def _load(self):
tick = self._data_reader.read_tick()
if tick is None:
# exhausted all rows
return False
for datafield in self.getlinealiases():
if datafield in TICK_DATA_COLUMNS:
line = getattr(self.lines, datafield)
line[0] = tick[datafield]
# -------------------------------------------
# 添加日期时间
self.lines.datetime[0] = date2num(tick['tickdt'])
return True
对于这个类的实现有任何困惑之处可以参考上一篇文章。相比于上篇文章中的MyDataFeedStatic,这个新的MyDataFeedDynamic有一个最大不同是使用了一个MyDataReader接口。这个接口定义了一个read_tick方法:
python">from abc import ABC, abstractmethod
class ABCDataReader(ABC):
@abstractmethod
def read_tick(self):
raise NotImplemented()
这可以模拟我们分段从CSV、数据库、甚至CTP读取数据的连续过程。相信对于数据量巨大的tick数据源尤其有用。
处于演示目的,ABCDataReader的实现可以很简单,我们直接将上一篇的测试CSV读入内存,之后使用一个行指针顺序读取即可,这和MyDataFeedStatic的实现完全相同。
为方便下文叙述,将这个ABCDataReader的实现命名为MyDataReader。
合并tick为bar
下面我们开始这篇文章的主要内容。
BackTrader将tick转换为K线实际提供了Data Feeds - Resample - Backtrader,关于这个API,在本系列的下篇会主要介绍。但是考虑到CTP的tick不稳定性,以及我们希望实现更加灵活,我们的选择是自行合并tick为最基础的分钟线。之后在需要更大周期数据时,再去使用Resample API(见同系列下一篇)。
首先,这里给出一个简单的合并K线函数:
python">def merge_bar_from_tick(self, tick, cur_bar, period) -> {}:
"""
简化的合并 tick 为 K bar 的逻辑
这个方法忽略了绝大多数的数据异常, 仅仅是为了演示目的
"""
def make_k_bar(tick, cur_bar):
if cur_bar is None:
return {
"datetime": tick['tickdt'],
'close': tick['price'],
'low': tick['price'],
'high': tick['price'],
'open': tick['price'],
'volume': tick['vol'], # 仅仅是为了演示目的, 实际示例数据中可能并非如此
'openinterest': tick['ccl'],
}
else:
return {
"datetime": cur_bar['datetime'],
'close': tick['price'],
'low': min(cur_bar['low'], tick['price']),
'high': max(cur_bar['high'], tick['price']),
'open': cur_bar['open'],
'volume': tick['vol'], # 仅仅是为了演示目的, 实际示例数据中可能并非如此
'openinterest': tick['ccl'],
}
cur_bar = make_k_bar(tick, cur_bar)
# 根据周期检查当前bar是否已经构成了一个完整的bar
tick_ts = tick['tickdt'].timestamp()
bar_ts = cur_bar["datetime"].timestamp()
if tick_ts - bar_ts >= period * 60:
# 当前bar已经完成,
return {
"ready_bar": cur_bar,
"cur_bar": None
}
else:
# 当前bar尚未完成,
return {
"ready_bar": cur_bar,
"cur_bar": cur_bar
}
之后我们为MyDataReader添加一些实现,首先是构造函数,改动后接收一个额外的bars_period,用于指定要合并的K bar分钟周期,
python">
class MyDataReader(ABCDataReader):
def __init__(self, df, bar_period = 1):
# ...
# 当前tick所述的Kbar
self._cur_bar = None
# 要合并到的bar周期
self._bar_period = bar_period
# 已经合并完成的bar
self._ready_bar = None
接下来,在每次读取tick的时候都额外调用一次merge_bar_from_tick方法。
python">def read_tick(self):
# ...
tick = # 读取 tick, 略
# ...
result = self.merge_bar_from_tick(tick, self._cur_bar, self._bar_period)
self._cur_bar = result['cur_bar']
self._ready_bar = result['ready_bar']
当self._cur_bar为空时, 表明已经合成了一个新的bar。否则表示没有合成新的bar。
MyDataReader还需要提供一个获取bar的接口,从前面的代码可以看出,self._ready_bar是真正要返回的K bar。所以我们可以直接将其返回:
python">def read_bar(self, index):
if self._ready_bars is None:
return None
return self._ready_bars[index]
实时bar VS 完整Bar
方法merge_bar_from_tick中每次都会将tick无条件的更新到传入的cur_bar中,如果没有传入,则新建,否则将更新他。而对于返回值:
- cur_bar: 可能为空或者当前K线
- ready_bar: 总是返回一个有效的K线
我们根据交易策略算法不同可能需要不同的类型,当需要实时bar的形态时,可以总是将最新的bar进行更新并且返回;如果每次都要求根据一个完整的bar去做决策,则可以返回最终已经完成的bar。
这也是之所以选择自己实现这段合bar逻辑的原因之一:更加灵活可控。
关于产品化
示例的MyDataReader逻辑极其简单,这对于初学者可能有帮助,但是距离产品化有很大距离。在实践中,作为分享,我的一些操作包括:
- 使用队列或者一些漏斗算法进行流控
- 对空数据(无交易或者丢包)的情况进行额外向前填充ffill
- 对于乱序tick进行一些基于hash的自动填充等等。
同时从编程角度,协程间的协作也是很重要的点。(没有接触过协程的朋友可以观看我这篇文章:Python:浅谈迭代器、生成器与协程的演化路径-CSDN博客)
数据源MyDataFeedWithBar
前面我们给出了一个只有tick的数据源,现在我们考虑同时携带tick和分钟线的数据源。
因为数据模块MyDataReader已经承担了大多数工作,所以数据源类的工作并不需要太多。回顾系列文章第一篇中我提到的架构图:
基础设计架构https://blog.csdn.net/josephus_mu/article/details/139833207MyDataFeed更多仅仅是充当一个adapter的功能,而多数数据实际来源工作应该在数据模块中完成
大多数功能的改动集中于_load方法的内部:
python">
class MyDataFeedWithBar(bt.feed.DataBase):
lines = (('tickdt', 'price', 'vol', 'amount', 'ccl', 'bid', 'bidVol', 'ask', 'askVol'))
#...
def _load(self):
tick = self._data_reader.read_tick()
if tick is None:
# exhausted all rows
return False
bar = self._data_reader.read_bar(self.__bar_index)
for datafield in self.getlinealiases():
if datafield == 'tickdt':
continue
if datafield in TICK_DATA_COLUMNS:
line = getattr(self.lines, datafield)
line[0] = tick[datafield]
if datafield in BAR_DATA_COLUMNS:
line = getattr(self.lines, datafield)
line[0] = bar[datafield]
# -------------------------------------------
# 添加日期时间
self.lines.tickdt[0] = date2num(tick['tickdt'])
self.lines.datetime[0] = date2num(bar['datetime'])
return True
这里依然是遍历数据源的Lines,根据Lines的别名找到tick或bars中的名称,之后根据这些名词将其绑定到Lines中。如上篇所述, OHLC类中已经默认定义了一些基础管线:
python">class OHLC(DataSeries):
lines = ('close', 'low', 'high', 'open', 'volume', 'openinterest',)
class OHLCDateTime(OHLC):
lines = (('datetime'),)
前文中merge_bar_from_tick中对于每个bar的字段定义也是来源于此,新的MyDataFeedWithBar不再需要重复定义这些管线。
另外,可以看到在处理时间字段时,对于tick的时间,我们使用了一个新的管线tickdt。这样做的原因是为了能够更加明确的和Kbar进行区分。而定义于OHLC类的datetime管线我们则直接绑定为分钟bar的时间。这样做的好处是指标的计算会更容易(参见Indicators - Usage - Backtrader)。
使用自定义数据源
首先需要将我们自定义的数据源设定为cerebro的数据,对于本文讨论的实现:
python">
df = pd.read_csv('./datas/DCE.m2501.tick.202402.csv')
df['tickdt'] = pd.to_datetime(df['tickdt'])
data_reader = dr.MyDataReader(df, 1)
tick_feed = bfeed.MyDataFeedWithBar(data_reader, 0)
cerebro.adddata(tick_feed)
cerebro.run()
在策略中,我们依然依照一般习惯去指定一些别名:
python">class TestStrategy(bt.Strategy):
def __init__(self):
# Ticks 字段
self.tickdt = self.datas[0].tickdt
self.price = self.datas[0].price
self.vol = self.datas[0].close
self.amount = self.datas[0].amount
self.ccl = self.datas[0].ccl
self.bid = self.datas[0].bid
self.bidVol = self.datas[0].bidVol
self.ask = self.datas[0].ask
self.askVol = self.datas[0].askVol
self.local_tz = get_localzone()
# 1Min bar 字段: 使用框架标准字段
self.datetime = self.datas[0].datetime
self.close = self.datas[0].close
self.low = self.datas[0].low
self.high = self.datas[0].high
self.open = self.datas[0].open
之后在运行策略时,next方法就可以很容易的同时使用tick和K线数据了:
python">def next(self):
self.log('Tick price, %.2f' % self.price[0], self.tickdt)
self.log(f'bar OHLC: ({self.open[0]}, {self.high[0]}, {self.low[0]}, {self.close[0]})', self.datetime)
总结
本篇总结了在BackTrader运行策略中同时使用tick和K线数据的方法。这里我们没有使用框架的Data Feeds - Resample - Backtrader方法。而是采用了将tick和最小的分钟周期绑定在同一个数据源的方式,这增加了灵活性,而且一点也不复杂,同时bar数据使用标准管线仍然支持所有框架指标的使用。
此外本文讨论了如何使用动态数据源的方式,并且处于演示目的给出了一个非常简化的示例。基于Tick的回测基本可以确定全是大数据回测,几G的数据丝毫不奇怪。所以动态读取数据几乎是必须得操作。
实际操作中,我们通常不仅仅需要看一个周期,还希望策略可以同时检测更大的周期。本系列下一篇文章将会讨论这个话题。