先厘清大思路,后面逐步完成。
vnpy系统自带了一个BarGenerator,它可以帮助我们生成1分钟,n分钟,n小时,日周期的K线,也叫bar。可是除了1分钟比较完美之外,有很多问题。它在读取历史数据、回测的时候多K线的处理和实盘却有不一样的效果。具体的问题我已经在解决vnpy 2.9.0版本的BarGenerator产生30分钟Bar的错误!这个帖子中做过尝试,但也不是很成功。因为系统的BarGenerator靠时间窗口与1分钟bar的时间分钟关系来决定是否该新建和结束一个bar,这个有问题。于是我改用对1分钟bar进行计数来决定是否该新建和结束一个bar,这也是有不可靠的问题,遇到行情比较清淡的时候,可能有的分钟就没有1分钟bar产生,这是完全有可能的!
K线几乎是绝大部分交易策略分析的基础,除非你从事的是极高频交易,否则你就得用它。可是如果你连生成一个稳健可靠的K线都不能够保证,那么运行在K线基础上的指标及由此产生的交易信号就无从谈起,K线错了,它们就是错误的,以此为依据做出点交易指令有可能是南辕北辙,所以必须解决它!
K线不是交易所发布的,它有很多种产生机制。其对齐方式、表现形式多种多样。关于K线的分类本人在以往的帖子中做出过比较详细的说明,有兴趣的读者可以去我以往的帖子中查看,这里就不再赘述。
市面上的绝大部分软件如通达信、大智慧、文华财经等软件,除非用户特别设定,他们最常提供给用户的K线多是日内对齐等交易时长K线。常用是一定是有道理的,因为它们已经为广大用户和投资者所接受。
1)什么是日内对齐等交易时长K线?
它具有这些特点:以每日开盘为起点,每根K线都包含相同交易时间的数据,跳过中间的休市时间,直至当前交易日的收盘,收盘不足n分钟也就是K线。实盘中,每日的第一个n分钟K线含集合竞价的那个tick数据。
2)为什么这种K线能够被普遍接受?
为它尽可能地保证一个交易日内的所有K线所表达的意义内容上是一致的,它们包含相等的交易时长。这非常重要,因为你把一个5分钟时长的K线与一个30分钟时长的K线放在一起谈论是没有意义的。但是如果为了保证K线在交易时长上的一致性,让n分钟K线跨日的话也是不太合理,因为这跨日,跨周末时间太长,这中间会发生什么意外事情,可能会产生出非常巨大的幅度大K线,掩盖了隔日跳空的行情变化,这对解读行情是不利的。当然n日的K线日跨日的,但是它是n个交易日的K线融合而成的,不过其融合的每个日K线也是对齐各自的日开盘的。
另外日内对齐等交易时长K线还有一个好处,那就是你以任何之前的时间为起点,在读取历史数据重新生成该日的n分钟K线的时候,得到的改日的K线是一致的。举个例子,如果我们的CTA策略在init()中常常是这么一句:
self.load_bar(20) # 先加载20日历史1分钟数据
这么简单的一句,包含着很多你意识不到的变化——你今天运行策略和明天运行你的策略,其中的历史数据的范围发生了变化,也就是说加载数据的起点变了。如果我们合成的K线的对齐方式不采用日内对齐的话,而采用对齐加载时间起点的话,你今天、明天加载出来之前的某日的K线就可能完全是不同的。而采用日内对齐等交易时长的K线则不存在这个问题。
3)需要知道合约的交易时间段
既然要对齐每日开盘,还有跳过各个休市时间,还要知道收市时间,那么我们就知道生成这种K线必须知道其所表达合约或对象的交易时间段,交易时间段中包含了这些信息,不知道这些信息,BarGenerator就不知道如何生成这种bar。这是必须的!
目前vnpy系统中的是没有合约的交易时间段的。到哪里获取合约的交易时间段的呢?
1) 它与合约相关,应该到保存合约的数据类ContractData中去找,没有找到。
2) 是否可以提供接口,从交易所获得,这个也是比较基础的数据。于是到CTP接口中(我使用的是CTP接口,您也许不一样) ,在最新版本的CTP接口文档中也没有找到任何与交易时间段相关的信息,绝望!
解决方法:
打开vnpy.trader.datafeed.py文件为Datafeed的基类BaseDatafeed扩展下面的接口
class BaseDatafeed(ABC):
"""
Abstract datafeed class for connecting to different datafeed.
"""
def init(self) -> bool:
"""
Initialize datafeed service connection.
"""
pass
def update_all_trading_hours(self) -> bool: # hxxjava add
""" 更新所有合约的交易时间段到trading_hours.json文件中 """
pass
def load_all_trading_hours(self) -> dict: # hxxjava add
""" 从trading_hours.json文件中读取所有合约的交易时间段 """
pass
def query_bar_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
"""
Query history bar data.
"""
pass
def query_tick_history(self, req: HistoryRequest) -> Optional[List[TickData]]:
"""
Query history tick data.
"""
pass
其中的trading_hours.json文件我会在后面的文章中做详细的介绍。有了它我们才能展开其他的设计。
在vnpy_rqdata\rqdata_datafeed.py中增加下面的代码
from datetime import timedelta,date # hxxjava add
def update_all_trading_hours(self) -> bool: # hxxjava add
""" 更新所有合约的交易时间段到trading_hours.json文件中 """
if not self.inited:
self.init()
if not self.inited:
return False
ths_dict = load_json(self.trading_hours_file)
# instruments = all_instruments(type=['Future','Stock','Index','Spot'])
trade_hours = {}
for stype in ['Future','Stock','Index','Fund','Spot']:
instruments = all_instruments(type=[stype])
# print(f"{stype} instruments count={len(instruments)}")
for idx,inst in instruments.iterrows():
# 获取每个最新发布的合约的建议时间段
if ('trading_hours' not in inst) or not(isinstance(inst.trading_hours,str)):
# 跳过没有交易时间段或者交易时间段无效的合约
continue
inst_name = inst.trading_code if stype == 'Future' else inst.order_book_id
inst_name = inst_name.upper()
if inst_name.find('.') < 0:
inst_name += '.' + inst.exchange
if inst_name not in ths_dict:
str_trading_hours = inst.trading_hours
# 把'01-'或'31-'者替换成'00-'或'30-'
suffix_pair = [('1-','0-'),('6-','5-')]
for s1,s2 in suffix_pair:
str_trading_hours = str_trading_hours.replace(s1,s2)
# 如果原来没有,提取出来
trade_hours[inst_name] = {"name": inst.symbol,"trading_hours": str_trading_hours}
# print(f"trade_hours old count {len(ths_dict)},append count={len(trade_hours)}")
if trade_hours:
ths_dict.update(trade_hours)
save_json(self.trading_hours_file,ths_dict)
return True
def load_all_trading_hours(self) -> dict: # hxxjava add
""" 从trading_hours.json文件中读取所有合约的交易时间段 """
json_file = get_file_path(self.trading_hours_file)
if not json_file.exists():
return {}
else:
return load_json(self.trading_hours_file)
在vnpy\trader\engine.py中:
from .datafeed import get_datafeed # hxxjava add
def get_trading_hours(self,vt_symbol:str) -> str: # hxxjava add
""" get vt_symbol's trading hours """
ths = self.all_trading_hours.get(vt_symbol.upper(),"")
return ths["trading_hours"] if ths else ""
因为无论你运行vnpy中的哪个app,你都会启动main_engine,无需绕弯子就可以得到这些信息,而我们的用户策略中都包含各自策略的引擎,这样就方便获取交易时间段信息。
如CTA策略中包含cta_engine,而cta_engine它的成员就包含main_engine。那么在策略中执行类似下面的语句就可以获取您交易品种的交易时间段信息:
trading_hours = self.cta_engine.main_engine.get_trading_hours(selt.vt_symbol)
如PortFolioStrategy策略中包含strategy_engine,而strategy_engine它的成员就包含main_engine。那么在策略中执行类似下面的语句就可以获取多个交易品种的交易时间段信息:
trading_hours_list = [self.cta_engine.main_engine.get_trading_hours(vt_symbol) for vt_symbol in self.vt_symbols]
是不是很方便呢?
vnpy 3.0的启动界面中已经集成了一个叫“投研”的功能,其实它是jupyter lab,启动之后输入下面的代码:
# 测试update_all_trading_hours()函数和load_all_trading_hours()
from vnpy.trader.datafeed import get_datafeed
df = get_datafeed()
df.init()
df.update_all_trading_hours() # 更新所有合约的交易时间段到本地文件中
ths = df.load_all_trading_hours() # 从本地文件中读取所有合约的交易时间段
当然您可以在vnpy的trader中主界面的菜单中增加一项,方便您在需要的时候执行下面语句。不过这个更新交易时间段的功能并不需要频繁执行,手动也就够了,记得就好。
经过上面步骤3.4.4,您就在本地得到了一个trading_hours.json文件,该文件在您的用户目录下的.vntrader\中,其内容如下:
{
"A0303.DCE": {
"name": "豆一0303",
"trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
},
"A0305.DCE": {
"name": "豆一0305",
"trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
},
"A0307.DCE": {
"name": "豆一0307",
"trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
},
"A0309.DCE": {
"name": "豆一0309",
"trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
},
"A0311.DCE": {
"name": "豆一0311",
"trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
},
"A0401.DCE": {
"name": "豆一0401",
"trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
},
... ...
}
观察其格式,在你没有米筐数据接口或者这里没有的合约,您也可以手动输入合约交易时间段信息。
按照程序中算法,这个文件文件中一共包含约16500多个合约的交易时间段信息。可以覆盖国内金融市场几乎全部都产品,但是不包括金融二次衍生品期权。
为什么没有期权交易时间段信息,因为不需要。期权合约有其对应的标的物,从其名称和编号就可以解析出来。期权合约的交易时间段其和标的物的交易时间段是完全相同的,因此不需要保存到该文件中。
发布于vn.py社区公众号【vnpy-community】
原文作者:Bili | 发布时间:2021-09-07
在前面的文章当中,我们对RSJ指标有了初步的了解,并且在RSJ指标样本内外实验中也取得一个不错的回测结果。接下来,我们将继续根据陶勤英博士的财通证券研究所的研报中所指出的将RSJ指标与其他技术指标组合使用来进行进一步的实验研究观察回测后的效果。
首先,我们根据文章选取了另外三个技术指标,分别是DMI指标、RSI指标和ROC指标分别与RSJ指标进行叠加使用进行回测。所以接下来我们需要了解各个指标的基本原理以及他们的信号逻辑:
基本原理:DMI指标基本原理是在于寻找股票价格涨跌过程中,股价藉以创新高价或新低价的功能,研判多空力量,进而寻求买卖双方的均衡点及股价在双方互动下波动的循环过程。与大多数技术指标相比DMI指标把每日的高低波动的幅度因素计算在内,从而更加准确的反应行情的走势及更好的预测行情未来的发展变化。
信号逻辑:趋向技术指标 DMI 包括多空指标 PDI、MDI 以及趋向指标 ADX。我们针对 DMI 指标建立相应的择时策略,即若昨日的 ADX 大于前一日的 ADX,则今日在收盘的时候看多;若昨日的 ADX、PDI、MDI 均小于前一日的相对应的指标值,则今日在收盘的时候看空(其中,ADX 值若不在[20,60]之内,则不进行交易)。
基本原理:RSI的原理简单来说是以数字计算的方法求出买卖双方的力量对比,譬如有100个人面对一-件商品, 如果50个人以上要买,竞相抬价,商品价格必涨。相反,如果50个人以上争着卖出,价格自然下跌。
信号逻辑:对于相对强弱指标 RSI,两种力量的对比决定了个股及大盘所处的状态:强势或弱势。从 RSI 值的变动范围来看:当 20 < RSI <50 时,弱势,卖出,空仓;当 50 < RSI <80 时,强势,买入,持仓。因此,若昨日的 RSI 值在(50,80]内,则今日在收盘的时候看多;若昨日的RSI 值在[20,50)内,则今日在收盘的时候看空。
基本原理:变动率指标(ROC),是以当日的收盘价和N天前的收盘价比较,通过计算股价某一段时间内收盘价变动的比例,应用价格的移动比较来测量价位动量,达到事先探测股价买卖供需力量的强弱,进而分析股价的趋势及其是否有转势的意愿,属于反趋势指标之一。
信号逻辑:ROC指标测量股价动量,可以用来监视常态性和极端性两种行情,对买卖股票提供强有力的参考。ROC 变大,代表未来市场仍会在短时间内上涨,反之则代表市场可能下跌。因此,若昨日的 ROC 大于前一日的 ROC,则今日在收盘的时候看多;反之,发出看空信号。
接下来,我们将正式进入我们的实验阶段。为了避免文章内容的重复,我们仅以RSJ+DMI为例,为大家进行代码分析:
在本策略当中,我们需要使用到两种K线——5分钟K线和日K线,所以我们还需创建日K线生成器DailyBarGenerator:
class DailyBarGenerator:
def __init__(self, on_daily_bar: Callable) -> None:
"""日K线合成器"""
self.on_daily_bar: Callable = on_daily_bar
self.daily_bar: BarData = None
self.last_bar: BarData = None
接下来构造update_bar函数用于更新K线到容器中,缓存K线本身的OHLCV数据:
def update_bar(self, bar: BarData) -> None:
"""更新分钟K线"""
if not self.daily_bar:
dt = bar.datetime.replace(
hour=0,
minute=0,
second=0,
microsecond=0
)
self.daily_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
# Otherwise, update high/low price into window bar
else:
self.daily_bar.high_price = max(
self.daily_bar.high_price,
bar.high_price
)
self.daily_bar.low_price = min(
self.daily_bar.low_price,
bar.low_price
)
# Update close price/volume/turnover into window bar
self.daily_bar.close_price = bar.close_price
self.daily_bar.volume += bar.volume
self.daily_bar.open_interest = bar.open_interest
# Check if window bar completed
if (
self.last_bar
and self.last_bar.datetime.date() != bar.datetime.date()
):
self.on_daily_bar(self.daily_bar)
self.daily_bar = None
# Cache last bar object
self.last_bar = bar
交易信号执行:
5分钟K线用于计算RSJ指标,我们需要先使用BarGenerator将1分钟K线合成为5分钟K线,日K线用于计算DMI指标:
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar, 5, self.on_5min_bar)
self.am = NewArrayManager()
self.daily_bg = DailyBarGenerator(self.on_daily_bar)
self.daily_am = NewArrayManager()
计算技术指标:
分别将RSJ、ADX、PDI以及MDI指标计算出来:
# 计算技术指标
self.rsj_value = self.am.rsj(self.rsj_window)
adx_array = self.daily_am.adx(self.dmi_window, array=True)
pdi_array = self.daily_am.plus_di(self.dmi_window, array=True)
mdi_array = self.daily_am.minus_di(self.dmi_window, array=True)
adx_value = adx_array[-1]
判断交易信号:
当 RSJ 和 DMI指标同时发出看多信号,则在当日收盘的时候发出看多信号;若两个指标同时发出看空信号,则在当日收盘的时候发出看空信号;否则,不发出任何信号。
# 判断交易信号
target_pos = 0
if bar.datetime.time() == time(14, 50):
# 满足开仓交易的ADX要求
if 20 <= adx_value <= 60:
# 做多条件
if (
self.rsj_value < 0
and adx_array[-1] > adx_array[-2]
):
target_pos = 1
# 做空条件
elif (
self.rsj_value > 0
and adx_array[-1] < adx_array[-2]
and pdi_array[-1] < pdi_array[-2]
and mdi_array[-1] < mdi_array[-2]
):
target_pos = -1
交易逻辑:
基于目标仓位和实际仓位,决定如何交易:
trade_volume = target_pos - self.pos
if trade_volume > 0:
if self.pos >= 0:
self.buy(bar.close_price + 5, trade_volume)
else:
self.cover(bar.close_price + 5, trade_volume)
elif trade_volume < 0:
if self.pos <= 0:
self.short(bar.close_price - 5, abs(trade_volume))
else:
self.sell(bar.close_price - 5, abs(trade_volume))
tips:
由于策略中涉及到多个技术指标,代码变得较为复杂。所以在这时将计算技术指标、判断交易信号和交易逻辑三个部分分开来写,会让我们的代码变得更加清晰从而增加代码的可读性。
我们用 2017 年 7 月 23 日-2020 年 7 月 22 日的数据,对该日内指标进行回测。指标当日发出信号后,分别于当日收盘分别对上证 50、沪深 300、中证 500指数进行操作。
其中 DMI 组合指标时间参数为14。
资金曲线的形状和财通研报中的结果基本一致 ,可以认为比较正确的复现了策略逻辑:
其中 RSI 指标的时间参数为 6。
本地代码:IF888.CFFEX、IH888.CFFEX、IC888.CFFEX
K线周期:1分钟
开始日期:2017-7-23
结束日期:2020-7-22
手续费率:0.00003
交易滑点:0.4
合约乘数:300
价格跳动:0.2
回测资金:100W
资金曲线的形状和财通研报中的结果基本一致 ,可以认为比较正确的复现了策略逻辑:
其中 ROC 指标参数为 12。
本地代码:IF888.CFFEX、IH888.CFFEX、IC888.CFFEX
K线周期:1分钟
开始日期:2017-7-23
结束日期:2020-7-22
手续费率:0.00003
交易滑点:0.4
合约乘数:300
价格跳动:0.2
回测资金:100W
资金曲线的形状和财通研报中的结果基本一致 ,可以认为比较正确的复现了策略逻辑:
RSJ指标分别与DMI、RSI和ROC指标组合使用的回测效果整体上资金曲线都是呈上扬趋势,但是对于部分品种来说资金曲线不太稳键,还有较大的改进提升空间。
本文中提到的三个策略代码较长,就不在这里贴出了。请关注本公众号(vnpy-community)后,回复内容【RSJ】获取源代码下载链接。
在行情接口与策略和应用之间建起一个tick过滤器——TickFilter,对tick数据进行过滤。
声明:本文基于【CTP接口规范6.3.15_API接口说明】做出的修改。
4.1 定义相关的常量和数据类
在vnpy\trader\constant.py中增加下面的合约交易状态InstrumentStatus常量类型定义:
class InstrumentStatus(Enum):
"""
合约交易状态类型 hxxjava debug
"""
BEFORE_TRADING = "开盘前"
NO_TRADING = "非交易"
CONTINOUS = "连续交易"
AUCTION_ORDERING = "集合竞价报单"
AUCTION_BALANCE = "集合竞价价格平衡"
AUCTION_MATCH = "集合竞价撮合"
CLOSE = "收盘"
# 有效交易状态
VALID_TRADE_STATUSES = [
InstrumentStatus.CONTINOUS,
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
# 集合竞价交易状态
AUCTION_STATUS = [
InstrumentStatus.AUCTION_ORDERING,
InstrumentStatus.AUCTION_BALANCE,
InstrumentStatus.AUCTION_MATCH
]
class StatusEnterReason(Enum):
"""
品种进入交易状态原因类型 hxxjava debug
"""
AUTOMATIC = "自动切换"
MANUAL = "手动切换"
FUSE = "熔断"
在vnpy\trader\object.py中增加下面的交易状态数据类StatusData:
@dataclass
class StatusData(BaseData):
"""
hxxjava debug
"""
symbol:str
exchange : Exchange
settlement_group_id : str = ""
instrument_status : InstrumentStatus = None
trading_segment_sn : int = None
enter_time : str = ""
enter_reason : str = ""
exchange_inst_id : str = ""
def __post_init__(self):
""" """
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def belongs_to(self,vt_symbol:str):
symbol,exchange_str = vt_symbol.split(".")
instrument = left_alphas(symbol).upper()
return (self.symbol.upper() == instrument) and (self.exchange.value == exchange_str)
在vnpy\trader\event.py中增加交易状态消息类型
EVENT_STATUS = "eStatus" # hxxjava debug
EVENT_ORIGIN_TICK = "eOriginTick." # hxxjava debug
EVENT_AUCTION_TICK = "eAuctionTick." # hxxjava debug
在vnpy\trader\gateway.py中合约状态接口,修改tick推送接口:
from .event import EVENT_ORIGIN_TICK,EVENT_STATUS # hxxjava add
from .object import StatusData # hxxjava add
def on_tick(self, tick: TickData) -> None:
"""
Tick event push.
Tick event of a specific vt_symbol is also pushed.
"""
self.on_event(EVENT_ORIGIN_TICK, tick) # hxxjava add
# self.on_event(EVENT_TICK, tick)
# self.on_event(EVENT_TICK + tick.vt_symbol, tick)
def on_status(self, status: StatusData) -> None: # hxxjava debug
"""
Instrument Status event push.
"""
self.on_event(EVENT_STATUS, status)
self.on_event(EVENT_STATUS + status.vt_symbol, status)
修改vnpy_cpt\ctp_gateway.py:
from vnpy.trader.constant import InstrumentStatus,StatusEnterReason # hxxjava debug
rom vnpy.trader.object import StatusData, # hxxjava debug
# 品种状态进入原因映射 hxxjava debug
INSTRUMENTSTATUS_CTP2VT: Dict[str, InstrumentStatus] = {
"0": InstrumentStatus.BEFORE_TRADING,
"1": InstrumentStatus.NO_TRADING,
"2": InstrumentStatus.CONTINOUS,
"3": InstrumentStatus.AUCTION_ORDERING,
"4": InstrumentStatus.AUCTION_BALANCE,
"5": InstrumentStatus.AUCTION_MATCH,
"6": InstrumentStatus.CLOSE,
"7": InstrumentStatus.CLOSE
}
# 品种状态进入原因映射 hxxjava debug
ENTERREASON_CTP2VT: Dict[str, StatusEnterReason] = {
"1": StatusEnterReason.AUTOMATIC,
"2": StatusEnterReason.MANUAL,
"3": StatusEnterReason.FUSE
}
def onRtnInstrumentStatus(self,data:dict):
"""
当接收到合约品种状态信息 # hxxjava debug
"""
if data:
# print(f"【data={data}】")
status = StatusData(
symbol = data["InstrumentID"],
exchange = EXCHANGE_CTP2VT[data["ExchangeID"]],
settlement_group_id = data["SettlementGroupID"],
instrument_status = INSTRUMENTSTATUS_CTP2VT[data["InstrumentStatus"]],
trading_segment_sn = data["TradingSegmentSN"],
enter_time = data["EnterTime"],
enter_reason = ENTERREASON_CTP2VT[data["EnterReason"]],
exchange_inst_id = data["ExchangeInstID"],
gateway_name=self.gateway_name
)
# print(f"status={status}")
self.gateway.on_status(status)
from vnpy.trader.event import EVENT_AUCTION_TICK # hxxjava add
class MyCtaEngine(CtaEngine):
""" """
condition_filename = "condition_order.json" # 历史条件单存储文件
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine,event_engine)
self.condition_orders:Dict[str,ConditionOrder] = {} # strategy_name: ConditionOrder
self.triggered_condition_orders:List[ConditionOrder] = [] # 已经触发点条件单,为流控设计
def load_active_condtion_orders(self):
""" """
return {}
def register_event(self):
""""""
super().register_event()
self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
def process_auction_tick_event(self,event:Event):
""" 集合竞价消息处理 """
tick:TickData = event.data
strategies = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
for strategy in strategies:
if strategy.inited:
# 执行策略的集合竞价消息处理
self.call_strategy_func(strategy, strategy.on_auction_tick, tick)
def process_tick_event(self,event:Event):
""" 用tick的价格检查条件单 """
super().process_tick_event(event)
tick:TickData = event.data
all_condition_orders = [order for order in self.condition_orders.values() \
if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
for order in all_condition_orders:
# 检查条件单是否满足条件
self.check_condition_order(order,tick)
def check_condition_order(self,order:ConditionOrder,tick:TickData):
""" 检查条件单是否满足条件 """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
price = tick.last_price
is_be = order.condition == Condition.BE and price >= order.price
is_le = order.condition == Condition.LE and price <= order.price
is_bt = order.condition == Condition.BT and price > order.price
is_lt = order.condition == Condition.LT and price < order.price
if is_be or is_le or is_bt or is_lt:
# 满足触发条件
if order.execute_price == ExecutePrice.MARKET:
# 取市场价
price = tick.last_price
elif order.execute_price == ExecutePrice.EXTREME:
# 取极限价
price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
else:
# 取设定价
price = order.price
# 执行委托
order_ids = strategy.send_order(
direction = order.direction,
offset=order.offset,
price=price,
volume=order.volume
)
if order_ids:
order.trigger_time = tick.datetime
order.status = CondOrderStatus.TRIGGERED
order.vt_orderids = order_ids
self.call_strategy_func(strategy,strategy.on_condition_order,order)
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
def find_condition_order(self,vt_orderid:str):
""" 根据委托单号查询所属条件单 """
corder:ConditionOrder = None
for order in self.condition_orders.values():
if vt_orderid in order.vt_orderids:
corder = order
break
return corder
def process_trade_event(self, event: Event):
""" 委托单推送处理 """
super().process_trade_event(event)
trade:TradeData = event.data
vt_orderid = trade.vt_orderid
corder = self.find_condition_order(vt_orderid)
if corder:
# 该成交单属于某个条件单
strategy = self.strategies.get(corder.strategy_name,None)
if strategy and strategy.trading:
# 找到了该条件单属实策略实例且正在交易中
# 累计条件单的成交量
corder.traded += trade.volume
# 推送该条件单给策略
self.call_strategy_func(strategy,strategy.on_condition_order,corder)
# 刷新条件单列表控件
self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))
def send_condition_order(self,order:ConditionOrder):
""" """
strategy = self.strategies.get(order.strategy_name,None)
if not strategy or not strategy.trading:
return False
if order.cond_orderid not in self.condition_orders:
self.condition_orders[order.cond_orderid] = order
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
return False
def cancel_condition_order(self,cond_orderid:str):
""" """
order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
if not order:
return False
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
def cancel_all_condition_orders(self,strategy_name:str):
""" """
for order in self.condition_orders.values():
if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
order.status = CondOrderStatus.CANCELLED
self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
return True
__init__.py
中的CtaStrategyApp做如下修改class CtaStrategyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略"
# engine_class = CtaEngine
engine_class = MyCtaEngine # hxxjava add
widget_name = "CtaManager"
icon_name = str(app_path.joinpath("ui", "cta.ico"))
修改vnpy_ctastrategy\CtaTemplate.py如下,为CtaTemplate增加on_auction_tick():
@virtual
def on_auction_tick(self, tick: TickData):
"""
Callback of new tick data update. # hxxjava add for auction tick
"""
pass
为class BaseDatabase增加下面两个接口函数:
@abstractmethod
def save_last_tick(self, ticks: List[TickData]) -> bool:
"""
Save last tick data into database. # hxxjava add
"""
pass
@abstractmethod
def load_last_tick(
self,
gateway_name : str,
exchange: Exchange = None,
symbol: str = None
) -> List[TickData]:
"""
Load last tick data from database. # hxxjava add
"""
pass
class MyDateTimeField(DateTimeField):
def get_modifiers(self):
return [6]
class DbLastTick(Model): # hxxjava add
""" 最新TICK数据表映射对象 """
id = AutoField()
gateway_name: str = CharField()
symbol: str = CharField()
exchange: str = CharField()
datetime: datetime = MyDateTimeField()
name: str = CharField()
volume: float = FloatField()
turnover: float = FloatField()
open_interest: float = FloatField()
last_price: float = FloatField()
last_volume: float = FloatField()
limit_up: float = FloatField()
limit_down: float = FloatField()
open_price: float = FloatField()
high_price: float = FloatField()
low_price: float = FloatField()
pre_close: float = FloatField()
bid_price_1: float = FloatField()
bid_price_2: float = FloatField(null=True)
bid_price_3: float = FloatField(null=True)
bid_price_4: float = FloatField(null=True)
bid_price_5: float = FloatField(null=True)
ask_price_1: float = FloatField()
ask_price_2: float = FloatField(null=True)
ask_price_3: float = FloatField(null=True)
ask_price_4: float = FloatField(null=True)
ask_price_5: float = FloatField(null=True)
bid_volume_1: float = FloatField()
bid_volume_2: float = FloatField(null=True)
bid_volume_3: float = FloatField(null=True)
bid_volume_4: float = FloatField(null=True)
bid_volume_5: float = FloatField(null=True)
ask_volume_1: float = FloatField()
ask_volume_2: float = FloatField(null=True)
ask_volume_3: float = FloatField(null=True)
ask_volume_4: float = FloatField(null=True)
ask_volume_5: float = FloatField(null=True)
localtime: datetime = DateTimeField(null=True)
class Meta:
database = db
indexes = ((("gateway_name","symbol", "exchange", "datetime"), True),)
class MysqlDatabase的初始化做如下修改:
def __init__(self) -> None:
""""""
self.db = db
self.db.connect()
self.db.create_tables([DbContractData, DbBarData, DbTickData, DbLastTick, DbBarOverview]) # hxxjava add DbLastTick,DbContractData
再为class MysqlDatabase添加下面两个函数:
def save_last_tick(self, ticks: List[TickData]) -> bool:
"""
Save last tick data into database. # hxxjava add
"""
vt_symbols = [t.vt_symbol for t in ticks]
# 删除ticks列表中包含合约的旧的tick记录
d: ModelDelete = DbLastTick.delete().where(
(DbLastTick.symbol+'.'+DbLastTick.exchange in vt_symbols)
)
count = d.execute()
# print(f"delete {count} last ticks")
# 构造最新的ticks列表数据
data = []
for t in ticks:
tick:TickData = deepcopy(t) # hxxjava change
tick.datetime = tick.datetime
d = tick.__dict__
d["exchange"] = d["exchange"].value
d.pop("vt_symbol")
data.append(d)
# print(tick.symbol,tick.exchange,tick.datetime.strftime('%Y-%m-%d %H:%M:%S %f'))
# 使用upsert操作将数据更新到数据库中
with self.db.atomic():
for c in chunked(data, 50):
DbLastTick.insert_many(c).on_conflict_replace().execute()
return True
def load_last_tick(
self,
gateway_name : str,
exchange: Exchange = None,
symbol: str = None
) -> List[TickData]:
"""
Load last tick data from database. # hxxjava add
"""
try:
# 从DbLastTick查询符合条件的最新tick记录
s: ModelSelect = (
DbLastTick.select().where(
(DbLastTick.gateway_name == gateway_name)
& (exchange is None or DbLastTick.exchange == exchange.value)
& (symbol is None or DbLastTick.symbol == symbol)
).order_by(DbLastTick.gateway_name,DbLastTick.datetime)
)
# 利用最新tick记录构造ticks列表
ticks: List[TickData] = []
for db_tick in s:
tick:TickData = TickData(
symbol=db_tick.symbol,
exchange=Exchange(db_tick.exchange),
datetime=to_china_tz(db_tick.datetime),
name=db_tick.name,
volume=db_tick.volume,
turnover=db_tick.turnover,
open_interest=db_tick.open_interest,
last_price=db_tick.last_price,
last_volume=db_tick.last_volume,
limit_up=db_tick.limit_up,
limit_down=db_tick.limit_down,
open_price=db_tick.open_price,
high_price=db_tick.high_price,
low_price=db_tick.low_price,
pre_close=db_tick.pre_close,
bid_price_1=db_tick.bid_price_1,
bid_price_2=db_tick.bid_price_2,
bid_price_3=db_tick.bid_price_3,
bid_price_4=db_tick.bid_price_4,
bid_price_5=db_tick.bid_price_5,
ask_price_1=db_tick.ask_price_1,
ask_price_2=db_tick.ask_price_2,
ask_price_3=db_tick.ask_price_3,
ask_price_4=db_tick.ask_price_4,
ask_price_5=db_tick.ask_price_5,
bid_volume_1=db_tick.bid_volume_1,
bid_volume_2=db_tick.bid_volume_2,
bid_volume_3=db_tick.bid_volume_3,
bid_volume_4=db_tick.bid_volume_4,
bid_volume_5=db_tick.bid_volume_5,
ask_volume_1=db_tick.ask_volume_1,
ask_volume_2=db_tick.ask_volume_2,
ask_volume_3=db_tick.ask_volume_3,
ask_volume_4=db_tick.ask_volume_4,
ask_volume_5=db_tick.ask_volume_5,
localtime=db_tick.localtime,
gateway_name=db_tick.gateway_name
)
ticks.append(tick)
return ticks
except:
# 当DbLastTick表不存在的时候,会发生错误
return []
在vnpy.usertools下创建tickfilter.py文件,其内容如下:
"""
本文件主要实现tick数据过滤器——TickFilter。
tick数据过滤器的功能:
1. 过滤重复tick,保证已经参与K线合成的tick不会再次被系统使用
2. 过滤无效tick,抛弃不在交易状态下的tick
3. 识别集合竞价tick,为使用tick的应用或用户策略处理集合竞价tick提供支持
作者:hxxjava
日期:2022-06-16
修改日期: 修改原因:
"""
from typing import Dict,List,Tuple
from threading import Thread
from vnpy.event import Event,EVENT_TIMER,EventEngine
from vnpy.trader.constant import InstrumentStatus,VALID_TRADE_STATUSES
from vnpy.trader.object import TickData,StatusData
from vnpy.trader.event import (
EVENT_ORIGIN_TICK,
EVENT_AUCTION_TICK,
EVENT_TICK,
EVENT_STATUS
)
from vnpy.trader.database import get_database
from vnpy.trader.utility import extract_vt_symbol
def left_alphas(instr:str):
"""
得到字符串左边的字符部分
"""
ret_str = ''
for s in instr:
if s.isalpha():
ret_str += s
else:
break
return ret_str
def get_vt_instrument(vt_symbol:str):
"""
从完整合约代码转换到完整品种代码
"""
symbol,exchange = extract_vt_symbol(vt_symbol)
instrument = left_alphas(symbol)
return f"{instrument}.{exchange.value}"
class TickFilter():
""" tick数据过滤器 """
CHECK_INTERVAL:int = 5 # 更新到数据库间隔
def __init__(self,event_engine:EventEngine,gateway_name:str):
""" tick数据过滤器初始化 """
self.event_engine = event_engine
self.gateway_name = gateway_name
self.db = get_database()
# 最新tick字典 {(gateway_name,vt_symbol),(update,tick)}
self.last_ticks:Dict[Tuple[str,str],Tuple[bool,TickData]] = {}
# 品种及合约状态字典 { vt_symbol : StatusData }
self.statuses:Dict[str,StatusData] = {}
self.second_cnt = 0
self.load_last_ticks()
self.register_event()
# print(f"TickFilter {gateway_name}")
def load_last_ticks(self):
"""
加载属于网关名称为self.gateway_name的最新tick列表
"""
last_ticks:List[TickData] = self.db.load_last_tick(gateway_name=self.gateway_name)
for tick in last_ticks:
self.last_ticks[(tick.gateway_name,tick.vt_symbol)] = (False,tick)
# print(f"load {len(last_ticks)} last ticks")
def register_event(self):
""" 注册消息 """
self.event_engine.register(EVENT_ORIGIN_TICK,self.process_tick_event)
self.event_engine.register(EVENT_STATUS,self.process_status_event)
self.event_engine.register(EVENT_TIMER,self.check_last_ticks)
def process_tick_event(self,event:Event):
""" 对原始tick进行过滤 """
tick:TickData = event.data
# 检查tick合约的经验状态是否位有效交易状态
status:StatusData = self.statuses.get(tick.vt_symbol,None)
if not status:
vt_instrument = get_vt_instrument(tick.vt_symbol)
status = self.statuses.get(vt_instrument,None)
if not status:
# 未收到交易状态,返回
return
if status.instrument_status not in VALID_TRADE_STATUSES:
# 不在有效交易状态,返回
return
key = (tick.gateway_name,tick.vt_symbol)
_,oldtick = self.last_ticks.get(key,(None,None))
valid_tick = False
if not oldtick:
# 没有该合约的历史tick
self.last_ticks[key] = (True,tick)
valid_tick = True
elif tick.datetime > oldtick.datetime:
#
self.last_ticks[key] = (True,tick)
valid_tick = True
else:
print(f"【特别tick = {tick}】")
if valid_tick == True:
# 如果是有效的tick
if status.instrument_status != InstrumentStatus.CONTINOUS:
# 发送集合竞价tic消息到系统中
self.event_engine.put(Event(EVENT_AUCTION_TICK,tick))
self.event_engine.put(Event(EVENT_AUCTION_TICK + tick.vt_symbol, tick))
else:
# 发送连续竞价tic消息到系统中
self.event_engine.put(Event(EVENT_TICK,tick))
self.event_engine.put(Event(EVENT_TICK + tick.vt_symbol, tick))
def process_status_event(self, event: Event):
""" 交易状态通知消息处理 """
status:StatusData = event.data
self.statuses[status.vt_symbol] = status
# print(f"【{status.gateway_name} {status}】")
def check_last_ticks(self,event:Event) -> None:
""" 原始tick过滤器 """
self.second_cnt += 1
if self.second_cnt % self.CHECK_INTERVAL == 0:
# 如果到了定时间隔
# 查询所有更新的tick
changed_ticks = []
for key,(update,tick) in self.last_ticks.items():
if update:
changed_ticks.append(tick)
self.last_ticks[key] = (False,tick)
if changed_ticks:
# 如果存在更新的tick,保存到数据库
t = Thread(target=self.db.save_last_tick,kwargs=({"ticks":changed_ticks}),daemon=True)
t.start()
# print(f"{self.second_cnt}: status count={len(self.statuses)} save {len(changed_ticks)} ticks")
修改vnpy\trader\engine.py
from vnpy.usertools.tickfilter import TickFilter # hxxjava add
在MainEngine的初始化函数def init(self, event_engine: EventEngine = None)中增加如下内容:
self.tick_filters:Dict[str,TickFilter] = {} # hxxjava add
修改其add_gateway(),内容如下:
def add_gateway(self, gateway_class: Type[BaseGateway], gateway_name: str = "") -> BaseGateway:
"""
Add gateway.
"""
# Use default name if gateway_name not passed
if not gateway_name:
gateway_name = gateway_class.default_name
gateway = gateway_class(self.event_engine, gateway_name)
self.gateways[gateway_name] = gateway
# Add gateway supported exchanges into engine
for exchange in gateway.exchanges:
if exchange not in self.exchanges:
self.exchanges.append(exchange)
# add a tick data filter for the gateway # hxxjava add
if gateway_name not in self.tick_filters:
self.tick_filters[gateway_name] = TickFilter(self.event_engine,gateway_name)
return gateway
只要你能够从网关行情接口实时得到合约的交易状态推送,把网关的行情接口做出类似的修改,这套方法同样是可用的。tickfilter的代码可以不用修改直接使用。
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
""" Constructor """
... ... # 其他代码省略
self.auction_tick:TickData = None
self.last_tick: TickData = None
def update_auction_tick(self,tick:TickData):
""" 更新集合竞价tick """
self.auction_tick = tick
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
if self.auction_tick:
# 合约集合竞价tick到当前tick
tick.high_price = max(tick.high_price,self.auction_tick.high_price)
tick.low_price = min(tick.low_price,self.auction_tick.low_price)
# 构造最新tick,以便把集合竞价的成交量和成交额合成到1分钟bar中
self.last_tick = deepcopy(self.auction_tick)
# 成交量和成交额每天从0开始单调递增
self.last_tick.volume = 0.0
self.last_tick.turnover = 0.0
# 用完集合竞价tick就丢弃
self.auction_tick = None
... ... # 其他代码省略
def on_auction_tick(self, tick: TickData):
"""
集合竞价tick处理
"""
self.bg.update_auction_tick(tick) # 假设self.bg是已经创建过的bar生成器
1.首先完善converter.py
class PositionHolding:
""""""
def __init__(self, contract: ContractData = None):
""""""
if contract:
self.vt_symbol = contract.vt_symbol
self.exchange = contract.exchange
self.active_orders = {}
self.order_id = ""
self.long_pos = 0
self.long_pnl = 0
self.long_price = 0
self.long_yd = 0
self.long_td = 0
self.short_pos = 0
self.short_pnl = 0
self.short_price = 0
self.short_yd = 0
self.short_td = 0
self.long_pos_frozen = 0
self.long_yd_frozen = 0
self.long_td_frozen = 0
self.short_pos_frozen = 0
self.short_yd_frozen = 0
self.short_td_frozen = 0
def update_position(self, position: PositionData):
""""""
if position.direction == Direction.LONG:
self.long_pos = position.volume
self.long_pnl = position.pnl
self.long_yd = position.yd_volume
self.long_td = self.long_pos - self.long_yd
self.long_price = position.price
self.long_pos_frozen = position.frozen
else:
self.short_pos = position.volume
self.short_pnl = position.pnl
self.short_yd = position.yd_volume
self.short_td = self.short_pos - self.short_yd
self.short_price = position.price
self.short_pos_frozen = position.frozen
def update_order(self, order: OrderData):
""""""
#active_orders只记录未成交和部分成交委托单
if order.status in [Status.NOTTRADED, Status.PARTTRADED]:
self.active_orders[order.vt_orderid] = order
else:
if order.vt_orderid in self.active_orders:
self.active_orders.pop(order.vt_orderid)
self.calculate_frozen()
def update_order_request(self, req: OrderRequest, vt_orderid: str):
""""""
#分离gateway_name和orderid
gateway_name,*split_orderid = vt_orderid.split("_")
if len(split_orderid) == 1:
self.order_id = split_orderid[0]
elif len(split_orderid) == 2:
self.order_id = "_".join([split_orderid[0],split_orderid[1]])
elif len(split_orderid) == 3:
self.order_id = "_".join([split_orderid[0],split_orderid[1],split_orderid[2]])
elif len(split_orderid) == 4:
self.order_id = "_".join([split_orderid[0],split_orderid[1],split_orderid[2],split_orderid[3]])
if self.order_id:
order = req.create_order_data(self.order_id, gateway_name)
self.update_order(order)
def update_trade(self, trade: TradeData):
""""""
if trade.direction == Direction.LONG:
if trade.offset == Offset.OPEN:
self.long_td += trade.volume
elif trade.offset == Offset.CLOSETODAY:
self.short_td -= trade.volume
elif trade.offset == Offset.CLOSEYESTERDAY:
self.short_yd -= trade.volume
elif trade.offset == Offset.CLOSE:
if trade.exchange in [Exchange.SHFE, Exchange.INE]:
self.short_yd -= trade.volume
else:
self.short_td -= trade.volume
if self.short_td < 0:
self.short_yd += self.short_td
self.short_td = 0
else:
if trade.offset == Offset.OPEN:
self.short_td += trade.volume
elif trade.offset == Offset.CLOSETODAY:
self.long_td -= trade.volume
elif trade.offset == Offset.CLOSEYESTERDAY:
self.long_yd -= trade.volume
elif trade.offset == Offset.CLOSE:
if trade.exchange in [Exchange.SHFE, Exchange.INE]:
self.long_yd -= trade.volume
else:
self.long_td -= trade.volume
if self.long_td < 0:
self.long_yd += self.long_td
self.long_td = 0
self.long_pos = self.long_td + self.long_yd
self.short_pos = self.short_td + self.short_yd
def calculate_frozen(self):
""""""
self.long_pos_frozen = 0
self.long_yd_frozen = 0
self.long_td_frozen = 0
self.short_pos_frozen = 0
self.short_yd_frozen = 0
self.short_td_frozen = 0
for order in self.active_orders.values():
# Ignore position open orders
if order.offset == Offset.OPEN:
continue
frozen = order.volume - order.traded
if order.direction == Direction.LONG:
if order.offset == Offset.CLOSETODAY:
self.short_td_frozen += frozen
elif order.offset == Offset.CLOSEYESTERDAY:
self.short_yd_frozen += frozen
elif order.offset == Offset.CLOSE:
self.short_td_frozen += frozen
if self.short_td_frozen > self.short_td:
self.short_yd_frozen += (
self.short_td_frozen - self.short_td)
self.short_td_frozen = self.short_td
elif order.direction == Direction.SHORT:
if order.offset == Offset.CLOSETODAY:
self.long_td_frozen += frozen
elif order.offset == Offset.CLOSEYESTERDAY:
self.long_yd_frozen += frozen
elif order.offset == Offset.CLOSE:
self.long_td_frozen += frozen
if self.long_td_frozen > self.long_td:
self.long_yd_frozen += (
self.long_td_frozen - self.long_td)
self.long_td_frozen = self.long_td
self.long_pos_frozen = self.long_td_frozen + self.long_yd_frozen
self.short_pos_frozen = self.short_td_frozen + self.short_yd_frozen
def convert_order_request_shfe(self, req: OrderRequest):
""""""
if req.offset == Offset.OPEN:
return [req]
if req.direction == Direction.LONG:
pos_available = self.short_pos - self.short_pos_frozen
td_available = self.short_td - self.short_td_frozen
else:
pos_available = self.long_pos - self.long_pos_frozen
td_available = self.long_td - self.long_td_frozen
if req.volume > pos_available:
return []
elif req.volume <= td_available:
req_td = copy(req)
req_td.offset = Offset.CLOSETODAY
return [req_td]
else:
req_list = []
if td_available > 0:
req_td = copy(req)
req_td.offset = Offset.CLOSETODAY
req_td.volume = td_available
req_list.append(req_td)
req_yd = copy(req)
req_yd.offset = Offset.CLOSEYESTERDAY
req_yd.volume = req.volume - td_available
req_list.append(req_yd)
return req_list
def convert_order_request_lock(self, req: OrderRequest):
""""""
if req.direction == Direction.LONG:
td_volume = self.short_td
yd_available = self.short_yd - self.short_yd_frozen
else:
td_volume = self.long_td
yd_available = self.long_yd - self.long_yd_frozen
# If there is td_volume, we can only lock position
if td_volume:
req_open = copy(req)
req_open.offset = Offset.OPEN
return [req_open]
# If no td_volume, we close opposite yd position first
# then open new position
else:
open_volume = max(0, req.volume - yd_available)
req_list = []
if yd_available:
req_yd = copy(req)
if self.exchange in [Exchange.SHFE, Exchange.INE]:
req_yd.offset = Offset.CLOSEYESTERDAY
else:
req_yd.offset = Offset.CLOSE
req_list.append(req_yd)
if open_volume:
req_open = copy(req)
req_open.offset = Offset.OPEN
req_open.volume = open_volume
req_list.append(req_open)
return req_list
单台行情服务器,有出现Tick丢失的情况,这个策略有一定的 影响,可以通过增加多台行情服务器的方法,解决问题
from collections import deque
from vnpy.gateway.ctp.ctp_gateway import *
class CtpGatewayDouble(CtpGateway):
default_setting = {
"用户名": "",
"密码": "",
"经纪商代码": "",
"交易服务器": "",
"主行情服务器": "",
"次行情服务器": "",
"产品名称": "",
"授权编码": "",
"产品信息": "",
}
def __init__(self, event_engine):
super(CtpGateway, self).__init__(event_engine, "Double")
self.td_api = CtpTdApi(self)
# 主行情
self.md_api = CtpMdApi(self)
# 备用行情
self.sub_md_api = CtpMdApi(self)
# 缓存
self.tick_buffer = deque(maxlen=100_000)
def connect(self, setting: dict):
""""""
userid = setting["用户名"]
password = setting["密码"]
brokerid = setting["经纪商代码"]
td_address = setting["交易服务器"]
md_address = setting["主行情服务器"]
sub_md_address = setting["次行情服务器"]
appid = setting["产品名称"]
auth_code = setting["授权编码"]
product_info = setting["产品信息"]
if (not td_address.startswith("tcp://")) and (
not td_address.startswith("ssl://")
):
td_address = "tcp://" + td_address
if (not md_address.startswith("tcp://")) and (
not md_address.startswith("ssl://")
):
md_address = "tcp://" + md_address
if (not sub_md_address.startswith("tcp://")) and (
not sub_md_address.startswith("ssl://")
):
sub_md_address = "tcp://" + sub_md_address
self.td_api.connect(
td_address, userid, password, brokerid, auth_code, appid, product_info
)
self.md_api.connect(md_address, userid, password, brokerid)
self.sub_md_api.connect(sub_md_address, userid, password, brokerid)
self.init_query()
def subscribe(self, req: SubscribeRequest):
""""""
self.md_api.subscribe(req)
self.sub_md_api.subscribe(req)
def close(self):
""""""
self.td_api.close()
self.md_api.close()
self.sub_md_api.close()
def on_tick(self, tick):
tick_hash = hash(tick)
if tick_hash not in self.tick_buffer:
self.tick_buffer.append(tick_hash)
super().on_tick(tick)
我学Python的目的很明确,就是量化交易。从一开始就有关注vn.py,但我学的是Python3,那时vn.py还处于版本1.x时期,所以只能望vn.py兴叹。
vn.py 2.0出来之后我并没有及时注意,等反应过来已经是2.0.7版。很兴奋,认真研究,并将心得写成《vn.py 2.0.7源代码深入分析》,分享在vn.py社区的经验分享板块。
出于对量化交易的爱好,出于对Python在量化交易中作用的认同,一定程度受vn.py强大功能的鼓舞,我与同事合写了《Python量化交易从入门到实战》一书,对vn.py的讨论是其中很重要的一部分内容。
后续又写了《vn.py 2.1.4源代码深入分析》和《vn.py 2.2.0源代码深入分析》两个文档,感谢各位老师的认可。
vn.py 3.0.0版发布于2022-03-23,这是我一直期待的一个版本,所以它刚一推出,我就立刻开始试用,并着手整理《vn.py 3.0.0源代码深入分析》。夜以继日,终于在前天完成。先发到了书籍的资源群中,接受了两天批评,现分享到此处。
写作本文档的一个主要目的是对vn.py的开源精神做出一点支持,希望本文档能够对大家学习使用vn.py有所帮助。
百度网盘链接:https://pan.baidu.com/s/1cl2MA9hNFhHlxfHM0gGe2A
提取码:s7u6
刚接触PYTHON,经过测试可用,如有不妥欢迎指正。
新增加了刷新策略按钮,按下后刷新策略文件并重新加载。
修改以下文件:
1:添加按钮和方法
vnpy\app\cta_strategy\ui\widget.py
在add_button.clicked.connect(self.add_strategy) 下方添加以下两行
reload_button = QtWidgets.QPushButton("刷新策略")
reload_button.clicked.connect(self.reload_class)
在hbox1.addWidget(add_button)下方添加以下一行
hbox1.addWidget(reload_button)
在 def update_class_combo(self): 上方添加
def reload_class(self):
self.cta_engine.load_strategy_class()
self.class_combo.clear()
self.update_class_combo()
2:修改加载逻辑
vnpy\app\cta_strategy\engine.py
修改load_strategy_class方法
在path1 = Path(file).parent.joinpath("strategies")上方添加
for loadClass in self.classes:
del loadClass
self.classes.clear()
修改load_strategy_class_from_module方法
在module = importlib.import_module(module_name)的下方添加
module = importlib.reload(module)
这样可以在不重启vntrader的情况下,修改策略文件,并重新加载,方便调试。
就像2020年总结提到,最近想在中高频策略搞搞。对于高频交易,如果有深度行情当然是必须的,看到说上期所五档行情免费提供,而且我用其他行情软件可以查到,就想在VNPY保存到数据库先研究下。有点发现,写下来记录下。
首先看了下代码,VNPY的ctp_gateway已经支持五档行情接收,也不需要什么配置,但是没有发现五档行情数据,无论是界面端还是数据库。
然后研究下,因为ctp_gateway行情接收是采用继承接口被调用的模式,是由CTP API工作线程驱动;没法被直接debug,倒是可以直接插入print方法输出。发现是有返回值,但是是1.79769313486232e+308这样的溢出值,被一个VNPY专门写静态方法adjust_price给过滤掉了。顺便说下,1.9.2版本也实现ctp五档行情读取,不过要改改ctp_gateway代码,把上期所加入。
我想是不是VNPY封装的CTP接口太老呢,不支持呢,因为看Github历史记录是2年前更新的。就研究了下CTP-API,发现原来是由下面几个点要注意,稍微说下。
首先按照CTP-API接口文档说的支持的通讯模式有三种:对话通讯模式,私有通讯模式,广播通讯模式,
对话通讯模式
对话通讯模式是指由会员端主动发起的通讯请求。该请求被交易所端接收和处理,并给予响应。例如报单、查询等。这种通讯模式与普通的客户/服务器模式相同。CTP-API中的命名Req------或者ReqQry------这样都是发起API
私有通讯模式
私有通讯模式是指交易所端主动,向某个特定的会员发出的信息。例如成交回报等,一般是On-----这样名字,也是上面提到继承接口被CTP调用。
广播通讯模式(公有流)
广播通讯模式又称公有流,是指交易所端主动,向市场中的所有会员都发出相同的信息。例如公告、市场公共信息等。另外,广播模式是不能在公网搞得,ctp所说的广播就是针对在对应所有会员也就是期货公司的内网广播。
但是,在CTP-API文档里面很明确的说明,五档行情是使用组播模式,针对同一组的机器进行广播。
二代组播行情(下文简称二代行情):交易所以组播方式提供的实时五档行情。因为是组播,所以接收端必须在内部网络并且要加入组播组。
目前交易所不允许投资者直接连接交易所报盘网去接收组播行情,如果期货公司将行情转发出来给投资者使用,投资者便能享受到快速的组播行情。我和我期货公司沟通,必须要把机器放在期货公司委托机房才可以接收转发组播的五档行情。这边补充下,查了些文档,组播也可以在公网搞,但是因为UDP没有确认,容易掉包,CTP-API提供增量方法来保证针对这个情况加以弥补,但是很少期货公司提供公网组播,比较吃了不讨好,如果那个提供我就去开户了,哈。
就算机器放在托管机房了,还有一个地方要设置,就是设置订阅组播行情前置,CTP-API方法如下
static CThostFtdcMdApi CreateFtdcMdApi(const char pszFlowPath = "", const bool bIsUsingUdp=false, const bool bIsMulticast=false);
各类型行情字段组合如下:
bIsUsingUdp bIsMulticast
TCP行情前置 false false
UDP行情前置 true false
组播行情前置 true true
那么说,五档行情时候必须参数bIsUsingUdp和bIsMulticast都为True才可以。
我看了下VNPY 2.1.8的ctp-api,vnctpmd.h头文件的接口是createFtdcMdApi(string pszFlowPath = ""),没有提供bIsUsingUdp和bIsMulticast参数录入。可能是vnpy封装的CTP-API的版本太老,因为6.3.15api中的好像最早版本的行情部分并不支持二代,必须自己拼接。
总结下,就是要拿到免费五档行情,需要机器放在托管机房,另外ctp行情订阅必须把bIsUsingUdp和bIsMulticast都为True,对于VNPY需要重新封装以下CTP-API,更新下ctp_gateway。
参考了这篇文章 https://zhuanlan.zhihu.com/p/103178845
R-Breaker是一种中高频的日内交易策略,这个策略也长期被Future Truth杂志评为最赚钱的策略之一。R-Breaker策略结合了趋势和反转两种交易方式,所以交易机会相对较多,比较适合日内1分钟K线或者5分钟K线级别的数据。
R-Breaker的策略逻辑由以下4部分构成:
1)计算6个目标价位
根据昨日的开高低收价位计算出今日的6个目标价位,按照价格高低依次是:
他们的计算方法如下:(其中a、b、c、d为策略参数)
2)设计委托逻辑
趋势策略情况:
反转策略情况:
3)设定相应的止盈止损。
4)日内策略要求收盘前平仓。
上面是原版R-Breaker策略逻辑,但是使用RQData从2010年至今(即2019年10月)的1分钟沪深300股指期货主力连续合约(IF88)测试,效果并不理想。
实际上R-Breaker策略可以拆分成趋势策略和反转策略。下面分别对这对2种策略逻辑进行优化:
1)趋势策略:
2)反转策略:
其代码实现逻辑如下:
self.tend_high, self.tend_low = am.donchian(self.donchian_window)
if bar.datetime.time() < self.exit_time:
if self.pos == 0:
self.intra_trade_low = bar.low_price
self.intra_trade_high = bar.high_price
# Trend Condition
if self.tend_high > self.sell_setup:
long_entry = max(self.buy_break, self.day_high)
self.buy(long_entry, self.fixed_size, stop=True)
self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
elif self.tend_low < self.buy_setup:
short_entry = min(self.sell_break, self.day_low)
self.short(short_entry, self.fixed_size, stop=True)
self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
self.cover(short_stop, abs(self.pos), stop=True)
# Close existing position
else:
if self.pos > 0:
self.sell(bar.close_price * 0.99, abs(self.pos))
elif self.pos < 0:
self.cover(bar.close_price * 1.01, abs(self.pos))
同样使用10年的1分钟IF88数据进行回测。不过,在展示强化版R-Breaker策略效果前,先分别展示一下拆分后的趋势策略和反转策略。
1)趋势策略:
2)反转策略
综合对比2种策略的日均成交笔数和资金曲线,我们可以知道:
由于趋势策略和反转策略是互斥的,在某些方面呈现出此消彼长的特点。那么,根据投资组合理论,可以把反转策略看作是看跌期权,买入一定规模的看跌期权来对消非系统性风险,那么组合的收益会更加稳健,即夏普比率更高。
由于趋势策略和反转策略日均成交手数比是2.6:0.4,若它们都只委托1手的话,反转策略的对冲效果微乎其微。
为了方便演示,我们设置趋势策略每次交易1手;反转策略则是3手。然后合成R-Breaker策略。发现夏普比率提高到2,资金曲线整体上扬,而且没有较大且持续时间较长的回撤。
R-Breaker策略成功之处在于它并不是纯粹的趋势类策略,它属于复合型策略,它的alpha由2部分构成:趋势策略alpha;反转策略alpha。
这类复合型策略可以看作是轻量级的投资组合,因为它的交易标的只有一个:沪深300股指期货的主力合约。
更复杂的话,可以交易多个标的,如在商品期货做虚拟钢厂套利(同时交易螺纹钢、铁矿石、焦炭),在IF股指期货上做日内CTA策略。考虑到市场容量不同,价差套利能分配更多的资金。这样在价差套利提供稳定收益率基础上,CTA策略能在行情好的时候贡献更多alpha(高盈亏比特征导致的)。
从上面例子可以看出,一个合理的投资组合,往往比单个策略具有更高的夏普比率。因为夏普比率=超额收益/风险。夏普比率高意味着资金曲线非常平滑;这也意味着我们可以有效控制使用杠杆的风险。
当某个投资组合策略夏普足够高,而且策略资金容量允许,交易成本能有效控制等情况下,就可以通过杠杆来提升组合收益了。例如向银行贷款或者发放债券,这时候交易团队是债务人角色,即在承担更大风险同时,追求更到收益。债权人享受利息收益(类无风险收益)。
向公众发产品是另一种增加杠杆的方式,但此时投资组合风险已经转移到了客户这方,交易团队可以享受着管理费收益(类无风险收益)。根据目标客户的不同:
最后附上策略源代码:
from datetime import time
from vnpy.app.cta_strategy import (
CtaTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager
)
class RBreakStrategy(CtaTemplate):
""""""
author = "KeKe"
setup_coef = 0.25
break_coef = 0.2
enter_coef_1 = 1.07
enter_coef_2 = 0.07
fixed_size = 1
donchian_window = 30
trailing_long = 0.4
trailing_short = 0.4
multiplier = 3
buy_break = 0 # 突破买入价
sell_setup = 0 # 观察卖出价
sell_enter = 0 # 反转卖出价
buy_enter = 0 # 反转买入价
buy_setup = 0 # 观察买入价
sell_break = 0 # 突破卖出价
intra_trade_high = 0
intra_trade_low = 0
day_high = 0
day_open = 0
day_close = 0
day_low = 0
tend_high = 0
tend_low = 0
exit_time = time(hour=14, minute=55)
parameters = ["setup_coef", "break_coef", "enter_coef_1", "enter_coef_2", "fixed_size", "donchian_window"]
variables = ["buy_break", "sell_setup", "sell_enter", "buy_enter", "buy_setup", "sell_break"]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
super(RBreakStrategy, self).__init__(
cta_engine, strategy_name, vt_symbol, setting
)
self.bg = BarGenerator(self.on_bar)
self.am = ArrayManager()
self.bars = []
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
self.bars.append(bar)
if len(self.bars) <= 2:
return
else:
self.bars.pop(0)
last_bar = self.bars[-2]
# New Day
if last_bar.datetime.date() != bar.datetime.date():
if self.day_open:
self.buy_setup = self.day_low - self.setup_coef * (self.day_high - self.day_close) # 观察买入价
self.sell_setup = self.day_high + self.setup_coef * (self.day_close - self.day_low) # 观察卖出价
self.buy_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_high # 反转买入价
self.sell_enter = (self.enter_coef_1 / 2) * (self.day_high + self.day_low) - self.enter_coef_2 * self.day_low # 反转卖出价
self.buy_break = self.buy_setup + self.break_coef * (self.sell_setup - self.buy_setup) # 突破买入价
self.sell_break = self.sell_setup - self.break_coef * (self.sell_setup - self.buy_setup) # 突破卖出价
self.day_open = bar.open_price
self.day_high = bar.high_price
self.day_close = bar.close_price
self.day_low = bar.low_price
# Today
else:
self.day_high = max(self.day_high, bar.high_price)
self.day_low = min(self.day_low, bar.low_price)
self.day_close = bar.close_price
if not self.sell_setup:
return
self.tend_high, self.tend_low = am.donchian(self.donchian_window)
if bar.datetime.time() < self.exit_time:
if self.pos == 0:
self.intra_trade_low = bar.low_price
self.intra_trade_high = bar.high_price
if self.tend_high > self.sell_setup:
long_entry = max(self.buy_break, self.day_high)
self.buy(long_entry, self.fixed_size, stop=True)
self.short(self.sell_enter, self.multiplier * self.fixed_size, stop=True)
elif self.tend_low < self.buy_setup:
short_entry = min(self.sell_break, self.day_low)
self.short(short_entry, self.fixed_size, stop=True)
self.buy(self.buy_enter, self.multiplier * self.fixed_size, stop=True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
long_stop = self.intra_trade_high * (1 - self.trailing_long / 100)
self.sell(long_stop, abs(self.pos), stop=True)
elif self.pos < 0:
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
short_stop = self.intra_trade_low * (1 + self.trailing_short / 100)
self.cover(short_stop, abs(self.pos), stop=True)
# Close existing position
else:
if self.pos > 0:
self.sell(bar.close_price * 0.99, abs(self.pos))
elif self.pos < 0:
self.cover(bar.close_price * 1.01, abs(self.pos))
self.put_event()
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
pass
发布于veighna社区公众号【vnpy-community】
原文作者:用Python的交易员 | 发布时间:2022-06-06
2022年以来陆续已经有不少同学咨询小班特训营的开始时间,但是由于上海疫情的影响加上开发任务的增加,今年的小班特训营一直拖着。
最近终于迎来了上海解封,2022年的VeighNa小班特训营也正式开始报名!!计划今年推出的场次不会太多,感兴趣的同学请抓紧机会~~
目前先开放两场名额(有些老铁们去年就已经报名锁定了今年的名额,感谢对我们课程的认可与支持),老规矩还是放几张之前课程的照片:
准备完毕,静候同学们到达
学习量化,先从掌握核心框架
深入代码,分析策略逻辑细节
所有小班特训营时间定在周末两天,一共包含周六周日两个下午共计10+小时的课程,设立特训营专属答疑群,包括后续三个月的助教跟踪辅导,提供VeighNa小班特训营专属内部核心资料。
线下课程的地点在上海浦东,考虑到新冠以来大家对于坐火车飞机的健康风险顾虑,不想来上海的同学我们也提供远程线上听课(直播+录播)。对于所有参加小班特训营的学员,在课程结束后都会拿到课程的完整录播视频,可永久回看。
日期:2022年7月16日(周六)和7月17日(周日)
时间:两天下午1点-6点,共计10小时
大纲:
初识套利价差交易
a. 套利类策略的特点:靠高胜率获得核心优势
b. 如何寻找好的套利价差,从数据面和基本面着手
c. 价差组合时间序列建模:相关性分析、协整算法
价差数据结构设计
a. 价差腿LegData和价差组合SpreadData
b. 价差盘口计算原理:价格、数量、统计算法
c. 基于动态解析的灵活价差数据计算
d. 实盘数据流驱动,底层接口到上层算法
价差交易算法实现
a. 价差执行算法和价差量化策略的异同
b. 基于SpreadAlgoTemplate实现狙击算法
c. 价差做市算法实现,盘口细粒度委托控制
价差量化策略开发
a. 半自动固定范围买卖策略
b. 全自动统计套利模型策略
c. 网格区间价差交易策略
价差交易实战进阶:
a. 价差策略回测:TICK模式和K线模式
b. 实盘策略运维原则,安全、稳定
c. 主动腿挂单做市算法的实现
价格:11999元(老学员和Elite会员折扣不变)
日期:2022年8月27日(周六)和8月28日(周日)
时间:两天下午1点-6点,共计10小时
大纲:
交易接口封装
a. 从0开发CtpGateway:掌握交易API的工作原理
b. VeighNa数据结构:贯穿整个系统的数据对象定义
c. 统一交易接口功能:标准化对接各类交易API接口
核心引擎设计
a. 事件驱动引擎原理:如何实现Event-Driven架构
b. 高性能数据缓存:O(1)查询复杂度容器应用
c. CTA交易引擎构架:开发量化策略实盘交易引擎
d. CTA回测引擎详解:保证实盘一致性的回测体系
图形界面开发
a. 上手PySide6开发:新一代的Qt UI开发工具库
b. 掌握图形控件使用:实现和策略引擎的数据交互
c. 数据实时监控刷新:QTableWidget的高性能用法
d. 窗口和对话框管理:开发满足实盘交易需求的界面布局
项目代码管理
a. 开通Github仓库:大型项目代码开发和维护的神器
b.项目代码管理流程:开发->测试->提交->合并->发布
c. Type Hinting实践:Python语言中的类型提示管理
d. 自动化代码检查工具:大幅减少代码中的各种类型Bug
应用开发实战
a. 全自动任务运行:无人值守模式下的策略交易管理
b. 守护父进程实现:实现交易子进程的监控和定时启停
c. 异常监控报警:捕捉到策略或者系统异常后实时通知
d. 应用模块开发:构造属于你的专属交易应用app模块
价格:11999元(老学员和Elite会员折扣不变)
报名方式和之前一样,请发送邮件到vn.py@foxmail.com,注明想参加的课程、姓名、手机、公司、职位即可。或者也可以扫描下方二维码添加小助手咨询报名:
课程对于之前参加过小班特训营的学员优先开放,同时提供9折的价格优惠。