说明一下,本贴中的bar数据和K线数据其实是一回事,有时候是方便读代码就按照代码说,有时候为了尊崇人们的习惯来说,不必在意。
1 通常CTA策略都是读取和合成K线的
1.1 从一个有代表性的策略DemoStrategy谈起
代码是这样的:
from typing import Any
from vnpy.app.cta_strategy import (
CtaTemplate,
BarGenerator,
ArrayManager
)
from vnpy.trader.object import (
BarData,
TickData
)
from vnpy.trader.constant import Interval
class DemoStrategy(CtaTemplate):
""" 一个演示策略 """
author = "hxxjava"
fast_window = 10
slow_window = 20
fast_ma0 = 0
fast_ma1 = 0
slow_ma0 = 0
slow_ma1 = 0
parameters = [
"fast_window",
"slow_window"
]
variables = [
"fast_ma0",
"fast_ma1",
"slow_ma0",
"slow_ma1",
]
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict
):
"""构造函数"""
super().__init__(cta_engine,strategy_name,vt_symbol,setting)
self.bg = BarGenerator(
on_bar=self.on_bar,
window=7,
on_window_bar=on_7min_bar,
interval=Interval.Minute)
self.am = ArrayManager()
def on_init(self):
""""""
self.write_log("策略初始化")
# account_data = self.cta_engine.get_account()
self.load_bar(10)
def on_start(self):
"""策略启动"""
self.write_log("策略启动")
def on_stop(self):
""" 策略停止 """
self.write_log(" 策略停止 ")
def on_tick(self,tick:TickData):
""" Tick更新 """
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""K线更新"""
self.bg.update_bar(bar)
def on_7min_bar(self, bar: BarData):
"""K线更新"""
am = self.am
am.update_bar(bar)
if not am.inited:
return
""" 计算均线 """
fast_ma = am.sma(self.fast_window,True)
self.fast_ma0 = fast_ma[-1]
self.fast_ma1 = fast_ma[-2]
slow_ma = am.sma(self.slow_window,True)
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
""" 定义金叉和死叉 """
cross_over = (self.fast_ma0>= self.fast_ma1 and
self.slow_ma0<self.slow_ma1)
cross_below = (self.slow_ma0>self.slow_ma1 and
self.slow_ma0<=self.slow_ma1)
if cross_over:
price = bar.close_price + 5
if not self.pos:
self.buy(price,1)
elif self.pos < 0:
self.cover(price,1)
self.buy(price,1)
elif cross_below:
price = bar.close_price - 5
if not self.pos:
self.short(price,1)
elif self.pos>0:
self.sell(price,1)
self.short(price,1)
# 更新图形界面
self.put_event()
这个策略是演示如何利用1分钟K线合成7分钟K线,然后在on_7min_bar()里面利用7分钟K线计算快慢两根移动均线,
然后更加快慢移动均线的金叉和死叉信号来进行多空的开仓和平仓操作,如此实现一个自动策略买卖交易。
1.2 策略工作的过程是这样的:
1.2.1 首先执行构造函数init()
在构造函数init()中创建BarGenerator类型self.bg和管理bar的ArrayManager类型的self.am
1.2.2 然后执行on_init()函数
这里的重点是self.load_bar(10),该函数是策略的父类CtaTemplate的函数,代码是这样的:
def load_bar(
self,
days: int,
interval: Interval = Interval.MINUTE,
callback: Callable = None,
use_database: bool = False
):
"""
Load historical bar data for initializing strategy.
"""
if not callback:
callback = self.on_bar
self.cta_engine.load_bar(
self.vt_symbol,
days,
interval,
callback,
use_database
)
self.cta_engine.load_bar()位于vnpy\app\cta_strategy.py中的CtaEngine类中,代码是这样的:
def load_bar(
self,
vt_symbol: str,
days: int,
interval: Interval,
callback: Callable[[BarData], None],load_bar
use_database: bool
):
""""""
symbol, exchange = extract_vt_symbol(vt_symbol)
end = datetime.now(get_localzone())
start = end - timedelta(days)
bars = []
# Pass gateway and RQData if use_database set to True
if not use_database:
# Query bars from gateway if available
contract = self.main_engine.get_contract(vt_symbol)
if contract and contract.history_data:
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
bars = self.main_engine.query_history(req, contract.gateway_name)
# Try to query bars from RQData, if not found, load from database.
else:
bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)
if not bars:
bars = database_manager.load_bar_data(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end,
)
for bar in bars:
callback(bar)
因为在策略中使用这样的语句self.load_bar(10),所以use_database参数为默认值False,可是我们知道目前CTP接口是不支持历史数据查询的,所以contract and contract.history_data的条件为假,导致bars 为空, 最终执行了:
bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)
而self.query_bar_from_rq的代码是这样的:
def query_bar_from_rq(
self, symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime
):
"""
Query bar data from RQData.
"""
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
data = rqdata_client.query_history(req)
return data
再看看rqdata_client.query_history(req)的代码,它把产生req的symbol,interval,start 和end各字段,转换成米筐接口可以接受的rq_symbol,rq_interval ,interval,start 和end等4个变量中,然后把end加上1天的时间【注意:这是非常重要的一个技巧,不然无法取出截止到当前交易时刻的1分钟bar!】,最后执行米筐接口函数rqdata_get_price()读取所有的10天多的bar数据,注意:是10天多的bar,而不是整10天的bar!
def query_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
"""
Query history bar data from RQData.
"""
if self.symbols is None:
return None
symbol = req.symbol
exchange = req.exchange
interval = req.interval
start = req.start
end = req.end
rq_symbol = self.to_rq_symbol(symbol, exchange)
if rq_symbol not in self.symbols:
return None
rq_interval = INTERVAL_VT2RQ.get(interval)
if not rq_interval:
return None
# For adjust timestamp from bar close point (RQData) to open point (VN Trader)
adjustment = INTERVAL_ADJUSTMENT_MAP[interval]
# For querying night trading period data
end += timedelta(1)
# Only query open interest for futures contract
fields = ["open", "high", "low", "close", "volume"]
if not symbol.isdigit():
fields.append("open_interest")
df = rqdata_get_price(
rq_symbol,
frequency=rq_interval,
fields=fields,
start_date=start,
end_date=end,
adjust_type="none"
)
data: List[BarData] = []
if df is not None:
for ix, row in df.iterrows():
dt = row.name.to_pydatetime() - adjustment
dt = CHINA_TZ.localize(dt)
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=dt,
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
close_price=row["close"],
volume=row["volume"],
open_interest=row.get("open_interest", 0),
gateway_name="RQ"
)
data.append(bar)
return data
同时可以知道interval的默认值为Interval.MINUTE。
至此我们可以看出,self.load_bar(10)其实就是从米筐接口获取的1分钟历史数据。
1.2.3 当策略启动后,接收到tick数据推送时执行on_tick()
这里执行了
self.bg.update_tick(tick)
这是在调用策略的K线合成器self.bg的update_tick() 函数,这个函数是用来把tick数据按照1分钟为间隔来产生1分钟bar的,当1分钟bar合成之时再次调用策略的on_bar()。
BarGenerator的update_tick()的函数代码如下:
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return
# Filter tick data with less intraday trading volume (i.e. older timestamp)
if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
return
if not self.bar:
new_minute = True
elif self.bar.datetime.minute != tick.datetime.minute:
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
分析得知它开始生成self.bar的条件是:
if not self.bar:
new_minute = True
....
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
也就是说只要刚刚启动策略,就会立即生成一根新bar,而没有寻求对齐整分钟,这样会造成首个bar的合成非常可能是不完整的!
1.2.4 策略的on_bar()的执行:
self.bg.update_bar(bar)
这个函数是用1分钟bar来合成7分钟bar的,当7分钟bar合成完成后,它会以7分钟bar为参数调用策略的on_7min_bar()。
1.2.5 策略的on_7min_bar()的执行
am = self.am
am.update_bar(bar)
if not am.inited:
return
""" 计算均线 """
fast_ma = am.sma(self.fast_window,True)
self.fast_ma0 = fast_ma[-1]
self.fast_ma1 = fast_ma[-2]
slow_ma = am.sma(self.slow_window,True)
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
后面的代码就省略了
姑且不论策略是否可以赚钱,因为后面还要针对特定合约进行优化,这不是本帖讨论的重点!
从代码来看,一切都是那么自然,一个完美的例子!
2 如果你是在盘中启动将带来第一根K线错误
这里分析的重点是假如我们在盘中启动策略的话,会发生什么问题,请看图:
2.1 第一根合成1分钟K线的丢失部分
如上图中所示:
- 灰色的部分为策略利用self.load_bar(10)从米筐读取从启动之时起10日内历史1分钟bar,这就是1.2.2节中描述的那部分bar;
- 绿色的部分为策略利用接收到tick合成的1分钟bar,这就是1.2.3节和1.2.4节中描述的那部分bar;
- 黄色的部分为第一根合成1分钟K线的丢失部分,这是产生问题的主要原因!
我们知道从米筐接口读取的只有整分的K线数据,它不会提供没有走完的1分钟bar,所以如果你没有在整分钟结束的那一刻启动策略的话(做到这一点的概率太低了!),那么就一定会产生黄色的丢失部分。
2.2 第一根合成1分钟K线的丢失部分的影响
因为第一根合成1分钟K线出现丢失部分,导致第一根合成1分钟K线的开、高、收、低、成交量和开仓兴趣都可能是错误的,进而导致利用1分钟K线合成的7分钟K线也是错误的,这可以说是连锁反应,当然也就会导致利用7分钟K线进行信号计算和交易查询问题!
也许你会说,有那么夸张吗?我不知道!不过这个丢失部分的时间长度在0~59.99秒之间,再说了就算是只有3秒的丢失,也可能是这1分钟中几乎全部的成交量,创新高、创新低都是有可能的,它的缺失也可能是让7分钟K线严重失真的重要原因,谁知道呢!我们这里分析目前的代码就是这样的,从原理上讲它确实是会出错的!
3 怎么解决问题?
解决方法:
- 尽量不要在盘中启动策略,在盘前启动好要交易的策略,但这个方法仍然没有解决策略软件的问题。
- 在策略中增加是否盘中启动的判断,如果是盘中启动,则在第一根1分钟K线合成之时,抛弃不要,立即从米筐取得前1分钟的K线数据,这样就可以替换掉这个不完整的第一根合成1分钟K线,那么也就解决了第一根7分钟K线错误的问题,完美地解决问题。
- 那么解决该问题就需要知道启动策略的时刻是否在交易合约的交易时间段内,那么就需要知道合约的交易时间段信息。米筐接口时提供合约的交易时间段信息的,函数如下:
如果策略启动后最后一个历史1分钟bar与第一个tick数据在一个交易时间段(如9:00-10:15)中, 那么就可以判断出第一个1分钟K线出现了数据丢失,在这个第一个1分钟K线走完之时,就应该从米筐接口立即读取这个刚刚生成的历史1分钟bar,替换掉策略合成的第一个1分钟K线,其他的处理逻辑继续执行就可以了。get_trading_dates() # 合约的所有的交易日 get_trading_hours() # 合约的所有的交易时间段
- 另外一个简单解决方法是: 修改BarGenerator的update_tick(),当其返回合成第一个1分钟bar时,直接从米筐读取这个历史1分钟bar,以此替代之,后续的处理逻辑与目前的代码相同即可。这种方法的好处是不要根据合约的交易时间段来判断,简单,但是可能回因为读取米筐接口需要时间,会否影响tick数据的处理还有待编写代码来测试。
- 问题已经发现了,怎么实现代码还在思考中,应该不难。这个问题就难在你可能根本没有意识到它可能有问题!
4 一种解决第一根合成1分钟K线的方法:
按照第3节中的4的方法,修改BarGenerator,代码如下,可以解决问题:
class BarGenerator:
"""
For:
1. generating 1 minute bar data from tick data
2. generateing x minute bar/x hour bar data from 1 minute data
Notice:
1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
2. for x hour bar, x can be any number
"""
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
"""Constructor"""
self.bar: BarData = None
self.on_bar: Callable = on_bar
self.interval: Interval = interval
self.interval_count: int = 0
self.window: int = window
self.window_bar: BarData = None
self.on_window_bar: Callable = on_window_bar
self.last_tick: TickData = None
self.last_bar: BarData = None
self.is_first_bar = True # hxxjava add
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
from vnpy.trader.rqdata import rqdata_client # hxxjava add
from vnpy.trader.object import HistoryRequest # hxxjava add
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return False
# Filter tick data with less intraday trading volume (i.e. older timestamp)
if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
return False
if not self.bar:
new_minute = True
elif self.bar.datetime.minute != tick.datetime.minute:
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
# hxxjava add start
if self.is_first_bar:
self.is_first_bar = False
symbol,exchange = extract_vt_symbol(self.bar.vt_symbol)
bar_datetime = self.bar.datetime
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
start = bar_datetime,
end=bar_datetime,
interval=Interval.MINUTE
)
bars = rqdata_client.query_history(req)
self.bar = bars[-1]
print(f"【first bar time = {bar_datetime} history bar time = {self.bar.datetime},bars count={len(bars)}】")
# hxxjava add end
self.on_bar(self.bar)
new_minute = True
if new_minute:
print(f"【tick.datetime = {tick.datetime} is_first_bar={self.is_first_bar}】")
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
def update_bar(self, bar: BarData) -> None:
"""
Update 1 minute bar into generator
"""
# If not inited, creaate window bar object
if not self.window_bar:
# Generate timestamp for bar data
if self.interval == Interval.MINUTE:
dt = bar.datetime.replace(second=0, microsecond=0)
else:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
# Otherwise, update high/low price into window bar
else:
self.window_bar.high_price = max(
self.window_bar.high_price, bar.high_price)
self.window_bar.low_price = min(
self.window_bar.low_price, bar.low_price)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
# Check if window bar completed
finished = False
if self.interval == Interval.MINUTE:
# x-minute bar
if not (bar.datetime.minute + 1) % self.window:
finished = True
elif self.interval == Interval.HOUR:
if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
# 1-hour bar
if self.window == 1:
finished = True
# x-hour bar
else:
self.interval_count += 1
if not self.interval_count % self.window:
finished = True
self.interval_count = 0
if finished:
self.on_window_bar(self.window_bar)
self.window_bar = None
# Cache last bar object
self.last_bar = bar
def generate(self) -> Optional[BarData]:
"""
Generate the bar data and call callback immediately.
"""
bar = self.bar
if self.bar:
bar.datetime = bar.datetime.replace(second=0, microsecond=0)
self.on_bar(bar)
self.bar = None
return bar