1. 本地价数据差合成的过程
首先说明,这里所合成出来的价差K线只是简易价差K线,它和交易所发表的套利品种的K线相比还是有所差别的,主要的差别在最高价、最低价和开盘价,但收盘价是准确的。
2. 实现过程:
2.1 在app\spread_trading\base.py添加下面query_tick_from_rq()函数
# hxxjava debug spread_trading
def query_tick_from_rq(
symbol: str, exchange: Exchange, start: datetime, end: datetime
):
"""
Query tick data from RQData.
"""
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.object import HistoryRequest
if not rqdata_client.inited:
rqdata_client.init()
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=Interval.TICK,
start=start,
end=end
)
data = rqdata_client.query_tick_history(req)
return data
2.2 在app\spread_trading\base.py修改load_tick_data()函数
有两种方式:一种是从米筐加载数据下载tick,另一种是从数据库中读取已经录制的该价差的tick数据。
@lru_cache(maxsize=999)
def load_tick_data(
spread: SpreadData,
start: datetime,
end: datetime,
pricetick: float = 0
):
""""""
# hxxjava debug spread_trading
# 目前没有考虑反向合约的情况,以后解决
spread_ticks: List[TickData] = []
try:
# 防止因为用户没有米筐tick数据权限而发生异常
# Load tick data of each spread leg
dt_legs: Dict[str, Dict] = {} # datetime string : Dict[vt_symbol:tick]
format_str = "%Y%m%d%H%M%S.%f"
for vt_symbol in spread.legs.keys():
symbol, exchange = extract_vt_symbol(vt_symbol)
# hxxjava debug spread_trading
tick_data = query_tick_from_rq(symbol=symbol, exchange=exchange,start=start,end=end)
if tick_data:
print(f"load from rqdatac {symbol}.{exchange} tick_data, len of = {len(tick_data)}")
# save all the spread's legs tick into a dictionary by tick's datetime
for tick in tick_data:
dt_str = tick.datetime.strftime(format_str)
if dt_str in dt_legs:
dt_legs[dt_str].update({vt_symbol:tick})
else:
dt_legs[dt_str] = {vt_symbol:tick}
# Calculate spread bar data
# snapshot of all legs's ticks
snapshot:Dict[str,TickData] = {}
spread_leg_count = len(spread.legs)
for dt_str in sorted(dt_legs.keys()):
dt = datetime.strptime(dt_str,format_str).astimezone(LOCAL_TZ)
# get each datetime
spread_price = 0
spread_value = 0
# get all legs's ticks dictionary at the datetime
leg_ticks = dt_legs.get(dt_str)
for vt_symbol,tick in leg_ticks.items():
# save each tick into the snapshot
snapshot.update({vt_symbol:tick})
if len(snapshot) < spread_leg_count:
# if not all legs tick saved in the snapshot
continue
# out_str = f"{dt_str} "
# format_str1 = "%Y-%m-%d %H:%M:%S.%f "
for vt_symbol,tick in snapshot.items():
price_multiplier = spread.price_multipliers[vt_symbol]
spread_price += price_multiplier * tick.last_price
spread_value += abs(price_multiplier) * tick.last_price
# out_str += f"[{vt_symbol} {tick.datetime.strftime(format_str1)} {tick.last_price}],"
# print(out_str)
if pricetick:
spread_price = round_to(spread_price, pricetick)
spread_tick = TickData(
symbol=spread.name,
exchange=exchange.LOCAL,
datetime=dt,
open_price=spread_price,
high_price=spread_price,
low_price=spread_price,
last_price=spread_price,
gateway_name="SPREAD")
spread_tick.value = spread_value
spread_ticks.append(spread_tick)
if spread_ticks:
print(f"load {symbol}.{exchange}' ticks from rqdatac, len of = {len(tick_data)}")
finally:
if not spread_ticks:
# 读取数据库中已经录制过的该价差的tick数据
spread_ticks = database_manager.load_tick_data(spread.name, Exchange.LOCAL, start, end)
return spread_ticks
3. 如何使用load_tick_data()?
3.1 修改价差策略的on_init()
只要在你的价差策略中on_init()中添加如下代码,就可以调用:
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_tick(days=3)
3.2 如何剔除节假日?
修改vnpy\app\spread_trading\engine.py
3.2.1 利用米筐接口函数剔除节假日
增加下面函数:
def get_previous_trading_date(dt:datetime,days:int): # hxxjava add
"""
得到某个日期dt的去除了节假日的前days个交易日
"""
from vnpy.trader.rqdata import rqdata_client
import rqdatac as rq
if not rqdata_client.inited:
rqdata_client.init()
prev_td = rq.get_previous_trading_date(date=dt.date(),n=days)
return prev_td
3.2.2 修改SpreadStrategyEngine的load_bar()和load_tick()
修改如下,修改后两个函数的days参数就代表交易日了。
def load_bar(
self, spread: SpreadData, days: int, interval: Interval, callback: Callable
):
""""""
end = datetime.now()
# start = end - timedelta(days)
start = get_previous_trading_date(dt = end,days=days) # hxxjava change
bars = load_bar_data(spread, interval, start, end)
print(f"{spread.name} {start}-{end} len of bars = {len(bars)}") # hxxjava debug spead_trading
for bar in bars:
callback(bar)
def load_tick(self, spread: SpreadData, days: int, callback: Callable):
""""""
end = datetime.now()
# start = end - timedelta(days=days)
start = get_previous_trading_date(dt = end,days=days) # hxxjava change
ticks = load_tick_data(spread, start, end)
for tick in ticks:
callback(tick)