画了一个周末,头晕了,简直就是个迷宫,放一放先...
刚好有个同学问怎么实现IB历史数据获取,和策略回测和实盘交易。想着熟悉vnpy2.0操作,就用Jupyter Notebook都是跑了一边。VNPY2.0的整体架构设计很有扩展性,而且调用也比起v1.0先进清晰很多,引擎加载调用非常方便。
讲讲注意点:
为了方便贴出来,改成.py代码格式,直接跑也没有问题。
from vnpy.app.script_trader import init_cli_trading
from vnpy.app.script_trader.cli import process_log_event
from vnpy.gateway.ib import IbGateway
from time import sleep
from datetime import datetime
import pandas as pd
# 连接到服务器
setting = {
"TWS地址": "127.0.0.1",
"TWS端口": 7497,
"客户号":8 #每个链接用一个独立的链接号,一个IBAPI支持32个来同时链接
}
engine = init_cli_trading([IbGateway]) #返回Script_engine 示例,并且给main_engine注册了gateway
engine.connect_gateway(setting, "IB") #链接
# 查询资金 - 自动
sleep(10)
print("***查询资金和持仓***")
print(engine.get_all_accounts(use_df = True))
# 查询持仓
print(engine.get_all_positions(use_df = True))
# 订阅行情
from vnpy.trader.constant import Exchange
from vnpy.trader.object import SubscribeRequest
# 从我测试直接用Script_engine有问题,IB的品种太多,get_all_contracts命令不行,需要指定具体后才可以,这里使用main_engine订阅
req1 = SubscribeRequest("12087792",Exchange.IDEALPRO) #创建行情订阅
engine.main_engine.subscribe(req1,"IB")
# 使用script_engine订阅历史数据是从rqdata获取,vnpy v2.07已经提供历史数据获取,这里创建HistoryRequest来获取,
# 查询如果没有endtime,默认当前。返回历史数据输出到数据库和csv文件
# 关于api更多信息可以参见 https://interactivebrokers.github.io/tws-api/historical_bars.html
print("***从IB读取历史数据, 返回历史数据输出到数据库和csv文件***")
from vnpy.trader.object import HistoryRequest
from vnpy.trader.object import Interval
start = datetime.strptime('20190901', "%Y%m%d")
historyreq = HistoryRequest(
symbol="12087792",
exchange=Exchange.IDEALPRO,
start=start,
interval=Interval.MINUTE
)
# # 读取历史数据,并把历史数据BarData放入数据库
bardatalist = engine.main_engine.query_history(historyreq,"IB")
from vnpy.trader.database import database_manager
database_manager.save_bar_data(bardatalist)
# 把历史数据BarData输出到csv
pd.DataFrame(bardatalist).to_csv("C:\Project\\"+ str(historyreq.symbol) + ".csv" , index=True, header=True)
print("History data export to CSV")
# # 参考backtesting.ipynb, 使用自带的双均线策略回测,10日上穿60日做多,否则反之
print("***从数据库读取历史数据, 进行回测***")
from vnpy.app.cta_strategy.backtesting import BacktestingEngine
from vnpy.app.cta_strategy.strategies.double_ma_strategy import (
DoubleMaStrategy,
)
btengine = BacktestingEngine() #新建回测引擎
btengine.set_parameters(
vt_symbol="12087792.IDEALPRO",
interval="1m",
start=datetime(2019, 9, 1),
end=datetime(2019, 10, 5),
rate = 0,
slippage=0.00005,
size=1000,
pricetick=0.00005,
capital=1_000_000,
)
btengine.add_strategy(DoubleMaStrategy, {"fast_window":10, "slow_window": 60})
btengine.load_data()
btengine.run_backtesting()
df = btengine.calculate_result()
btengine.calculate_statistics()
btengine.show_chart()
# 给script_engine载入双均线策略,实盘运行
print("***从数据库读取准备数据, 实盘运行***")
# 使用cta交易引擎
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
engine.event_engine.register(EVENT_CTA_LOG, process_log_event)
cta_engine = engine.main_engine.add_app(CtaStrategyApp) #加入app
cta_engine.init_engine()
cta_engine.add_strategy("DoubleMaStrategy","DoubleMaStrategy_IB_12087792_v1", "12087792.IDEALPRO",{"fast_window":10, "slow_window": 50})
sleep(10)
cta_engine.init_strategy("DoubleMaStrategy_IB_12087792_v1")
sleep(10)
cta_engine.start_strategy("DoubleMaStrategy_IB_12087792_v1")
发布于vn.py社区公众号【vnpy-community】
原文作者:用Python的交易员 | 发布时间:2021-01-09
基于《30天解锁Python量化开发》的课程内容,我们制作了这张【知识要点图】:
看完对课程感兴趣的话,请戳【课程传送门】。
最后,vn.py进驻【Gitee】(简单来说就是中国版的Github)差不多已经一个月的时间了,目前已经收获了218个Star和73个Fork,同时也拿到了【GVP】(Gitee最有价值开源项目)。
看来对于许多我们的用户,访问Github速度太慢真心是个需求痛点啊,好在现在有了一个更好的国内替代选择。同时也发现Fork/Star比例超过Github,可能因为同步方便所以更多人愿意尝试自己扩展开发?
最后再贴下仓库地址:https://gitee.com/vnpy/vnpy 。该Gitee仓库会每日和Github仓库同步,自动更新最新版本的代码,欢迎大家Star和Fork!
币安提供历史数据服务下载,地址如下:https://www.binance.com/zh-CN/support/articles/360044357931
vn的gateway只能下载分钟粒度的数据,在这里申请可以下载到tick数据,对于有tick需求的朋友可以关注。
由于下载属于一次性服务,所以对代码没有做太多的优化,属于能用即可。
代码分为两个文件,第一个文件是获取文件下载地址的link_id。第二个文件是从link_id获取到对应的文件下载地址,然后下载到本地data文件夹。
第一个文件 download_id.py,默认将获取到的信息存储在/home/result_id.csv文件中,可以自行修改csv_file路径,由于下载权重过大,需要较长的延时进行下一条请求,所以直接设置sleep 1800秒后进行下一条请求
import hmac
import hashlib
from urllib import parse
import time
from datetime import datetime, timedelta
import requests
import pandas as pd
import numpy as np
import os
key = "自己的key"
secret = "自己的secret"
proxy_host = '127.0.0.1'
proxy_port = 1080
id_path = "https://api.binance.com/sapi/v1/futuresHistDataId"
link_path = "https://api.binance.com/sapi/v1/downloadLink"
query_symbols = ['BTCUSDT','ETHUSDT','LTCUSDT','EOSUSDT']
def sign(params:dict,secret:str):
query = parse.urlencode(sorted(params.items()))
secret = secret.encode()
signature = hmac.new(secret, query.encode("utf-8"), hashlib.sha256).hexdigest()
return signature
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"X-MBX-APIKEY": '自己的key'
}
def get_query_path(path, params):
timestamp = int(time.time()*1000)
params["timestamp"] = timestamp
query = parse.urlencode(sorted(params.items()))
signature = sign(params, secret)
query += "&signature={}".format(signature)
query_path = path + '?' + query
return query_path
def request_post(path, headers):
response = requests.request('POST', path, headers=headers)
return response
def request_get(path, headers):
response = requests.request('GET', path, headers=headers)
return response
def geterate_periods(start='2020-01-01 00:00:00', end='2021-04-05 00:00:00', days=15):
if end:
endArray = time.strptime(end, "%Y-%m-%d %H:%M:%S")
end = int(time.mktime(endArray)) * 1000
else:
end = int(time.time()) * 1000
startArray = time.strptime(start, "%Y-%m-%d %H:%M:%S")
start = int(time.mktime(startArray)) * 1000
return range(start, end, days*24*60*60*1000)
def get_symbol_start(csv_file,symbol,data_type,days):
if not os.path.exists(csv_file):
time_start = int(time.mktime(datetime.strptime('2020-01-01','%Y-%m-%d'))) * 1000
else:
df = pd.read_csv(csv_file)
time_s= df[(df['symbol'] == symbol)&(df['dataType']==data_type)]['start'].max()
if time_s is np.nan:
time_start = int(time.mktime(datetime.strptime('2020-01-01','%Y-%m-%d').timetuple())) * 1000
else:
time_p = datetime.strptime(time_s,'%Y-%m-%d') + timedelta(days=days)
time_start = int(time.mktime(time_p.timetuple())) * 1000
return time_start
def run_id(csv_file, days):
for symbol in query_symbols:
if symbol == 'BTCUSDT':
data_type_list = ['T_TRADE','S_DEPTH']
else:
data_type_list = ['T_TRADE']
for data_type in data_type_list:
time_start = get_symbol_start(csv_file, symbol, data_type,days)
while time_start < int(time.mktime(datetime.today().timetuple()))*1000:
start = time_start
end = start + days * 24 * 60 * 60 * 1000
params = {
'symbol': symbol,
'startTime': str(start),
'endTime': str(end),
'dataType': data_type
}
query_path_ = get_query_path(id_path, params)
ret = request_post(query_path_, headers)
print(ret.text)
if ret.status_code == 200:
ret_id = ret.json()['id']
result_dic = {
'symbol':symbol,
'start':datetime.fromtimestamp(start/1000),
'dataType':data_type,
'id':ret_id
}
df = pd.DataFrame(result_dic, index=[0])
if not os.path.exists(csv_file):
df.to_csv(csv_file, index=False, mode='a')
else:
df.to_csv(csv_file, index=False, mode='a', header=False)
time_start = end
else:
time.sleep(1800)
if __name__ == '__main__':
csv_file = '/home/result_id.csv'
days = 7
run_id(csv_file, days)
股票、期货合约都有集合竞价时段,如国内期货合约为前一交易日的21:00前5分钟,股指期货合约为当前交易日的9:30前5分钟,股票合约的为当前交易日的9:30之前。
集合竞价完成之后通常在开市前1分钟提供CTP接口推送第一个tick,注意:这个tick的时间戳为开市前1分钟,而不在各个交易时间段内!
如:
国内期货合约如果第一个交易段为21:00,那么这个tick的时间戳为20:59
国内股指期货合约为当前交易日的9:30,那么这个tick的时间戳为9:29
实盘中每个合约都会有集合竞价时段,这时候采用自带的BarGenerator来合成1分钟bar就会有问题,因为20:59:00的那个tick既不属于上一交易日的1分钟bar(15:59)的,也不属于21:00的那一个1分钟bar,当然就只能够孤独地自成一个1bar啦!由此带来的问题是21:00的那一个1分钟bar的开盘价和成交量(这个成交量可能是很大)都可能是错了。
当然由此导致的5分钟、10分钟、15分钟、30分钟乃至1日的bar都可能是有问题的,因为它们都是1分钟bar合成的。
你现在用BarGenerator没有办法合成90分钟K线,不信你试试看!挺搞笑的一个问题!
天下苦此BarGenerator久矣!
2019年写了《vn.py 2.0.7源代码深入分析》,感谢各位老师的认可。
vn.py的维护团队是一个非常有朝气的团队,版本更新很快,3月底发布了最新的2.2.0版。
使用vn.py做交易,因为需要做一些自己的定制,所以并不是每个版本都随着更新,前面使用的是2.1.4。
但vn.py的每个新版本我都会关注,并不断更新文档。vn.py从2.1.9开始对基本功能(主要是数据管理部分)做了比较大的优化,我认为2.2.0应该是比较稳定好用的版本,准备将日常使用的系统升级到2.2.0。3月26日终于等到了2.2.0版,立刻开始分析代码并完善文档,这几天夜以继日,终于形成了这个文档,发出来请各位老师批评指正。
《10-vn.py 2.2.0源代码深入分析》
百度网盘链接:https://pan.baidu.com/s/1X3WNoE27RKJgxx6leC5X0g
提取码:6vl3
发布于vn.py社区公众号【vnpy-community】
原文作者:用Python的交易员 | 发布时间:2021-3-31
过去几年里,如果问大家vn.py最大的吐槽点是什么,可能大部分人的第一反应就是【文档】。
作为官方团队,之前我们找了很多理由安慰自己:
但随着vn.py逐渐成熟,我们越来越多的被用户们提醒着文档的不足,尤其在对比zipline、RQAlpha、backtrader等项目的文档后,更是有一种丑小鸭的自卑感。
所以从2021年初就开始准备全面的文档更新计划,上周2.2.0版本发布后总算是拿出了一点成果。更新后的文档不再去过多介绍内部代码的实现(这算是过去vn.py文档最大的坑之一),而是全部从用户角度出发,来详细讲解各项功能的使用方法和注意事项。
截止本文,已更新完成的文档篇幅包括:
目前还在更新中,预计会在二季度全部完成的篇幅:
功能介绍
安装指南
基本使用
交易接口
数据库配置
价差交易模块
期权波动率交易模块
多合约组合策略模块
算法交易
脚本策略
RPC服务
Excel RTD模块
贡献代码
所有文档均可以通过vn.py官网的文档版块直接查看:
https://www.vnpy.com/docs/cn/index.html
同时文档的Markdown源文件,也都包含在Github仓库的docs文件夹下:
https://github.com/vnpy/vnpy/tree/dev/docs
大家在阅读过程中发现内容错误或者改进点,都欢迎直接发起PR到dev-docs分支。
有任何文档相关的建议也欢迎在下方留言,希望今年二季度结束前,可以让我们的用户再也不必为vn.py的文档头疼!
全市场行情录制,1核512内存应该也足够了,建议2核1G以上服务器
直接上代码
import sys
import multiprocessing
import re
from contextlib import closing
from copy import copy
from copy import deepcopy
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData, HistoryRequest, Product, TickData
from vnpy.trader.database import init
from vnpy.trader.setting import get_settings
from enum import Enum
from time import sleep
from datetime import datetime, time, timedelta
from logging import INFO
from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol
from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
from vnpy.trader.event import EVENT_CONTRACT, EVENT_TICK
from vnpy.app.data_recorder.engine import RecorderEngine
EXCHANGE_LIST = [
Exchange.SHFE,
Exchange.DCE,
Exchange.CZCE,
Exchange.CFFEX,
Exchange.INE,
]
SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")
def is_futures(vt_symbol: str) -> bool:
"""
是否是期货
"""
return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))
class RecordMode(Enum):
BAR = "bar"
TICK = "tick"
class WholeMarketRecorder(RecorderEngine):
def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
super().__init__(main_engine, event_engine)
self.record_modes = record_modes
# 非交易时间
self.drop_start = time(3, 15)
self.drop_end = time(8, 45)
# 大连、上海、郑州交易所,小节休息
self.rest_start = time(10, 15)
self.rest_end = time(10, 30)
def is_trading(self, vt_symbol, current_time) -> bool:
"""
交易时间,过滤校验Tick
"""
symbol, exchange = extract_vt_symbol(vt_symbol)
if current_time >= self.drop_start and current_time < self.drop_end:
return False
if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
if current_time >= self.rest_start and current_time < self.rest_end:
return False
return True
def load_setting(self):
pass
def record_tick(self, tick: TickData):
"""
抛弃非交易时间校验数据
"""
tick_time = tick.datetime.time()
if not is_trading(tick.vt_symbol, tick_time):
return
task = ("tick", copy(tick))
self.queue.put(task)
def record_bar(self, bar: BarData):
"""
抛弃非交易时间校验数据
"""
bar_time = bar.datetime.time()
if not is_trading(bar.vt_symbol, bar_time):
return
task = ("bar", copy(bar))
self.queue.put(task)
def process_contract_event(self, event):
""""""
contract = event.data
vt_symbol = contract.vt_symbol
# 不录制期权
if is_futures(vt_symbol):
if RecordMode.BAR in self.record_modes:
self.add_bar_recording(vt_symbol)
if RecordMode.TICK in self.record_modes:
self.add_tick_recording(vt_symbol)
self.subscribe(contract)
def run_child():
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
main_engine.write_log("主引擎创建成功")
# 记录引擎
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
main_engine.connect(CTP_SETTING, "CTP")
main_engine.write_log("连接CTP接口")
whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)
main_engine.write_log("开始录制数据")
oms_engine = main_engine.get_engine("oms")
while True:
sleep(1)
def run_parent():
"""
Running in the parent process.
"""
print("启动CTA策略守护父进程")
# Chinese futures market trading period (day/night)
MORNING_START = time(8, 45)
MORNING_END = time(12, 0)
AFTERNOON_START = time(12, 45)
AFTERNOON_END = time(15, 35)
NIGHT_START = time(20, 45)
NIGHT_END = time(3, 5)
child_process = None
while True:
current_time = datetime.now().time()
trading = False
# Check whether in trading period
if (
(current_time >= MORNING_START and current_time <= MORNING_END)
or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
or (current_time >= NIGHT_START)
or (current_time <= NIGHT_END)
):
trading = True
# Start child process in trading period
if trading and child_process is None:
print("启动数据录制子进程")
child_process = multiprocessing.Process(target=run_child)
child_process.start()
print("数据录制子进程启动成功")
# 非记录时间则退出数据录制子进程
if not trading and child_process is not None:
print("关闭数据录制子进程")
child_process.terminate()
child_process.join()
child_process = None
print("数据录制子进程关闭成功")
sys.stdout.flush()
sleep(5)
if __name__ == "__main__":
run_parent()
说明一下,本贴中的bar数据和K线数据其实是一回事,有时候是方便读代码就按照代码说,有时候为了尊崇人们的习惯来说,不必在意。
代码是这样的:
from typing import Any
from vnpy.app.cta_strategy import (
CtaTemplate,
BarGenerator,
ArrayManager
)
from vnpy.trader.object import (
BarData,
TickData
)
from vnpy.trader.constant import Interval
class DemoStrategy(CtaTemplate):
""" 一个演示策略 """
author = "hxxjava"
fast_window = 10
slow_window = 20
fast_ma0 = 0
fast_ma1 = 0
slow_ma0 = 0
slow_ma1 = 0
parameters = [
"fast_window",
"slow_window"
]
variables = [
"fast_ma0",
"fast_ma1",
"slow_ma0",
"slow_ma1",
]
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict
):
"""构造函数"""
super().__init__(cta_engine,strategy_name,vt_symbol,setting)
self.bg = BarGenerator(
on_bar=self.on_bar,
window=7,
on_window_bar=on_7min_bar,
interval=Interval.Minute)
self.am = ArrayManager()
def on_init(self):
""""""
self.write_log("策略初始化")
# account_data = self.cta_engine.get_account()
self.load_bar(10)
def on_start(self):
"""策略启动"""
self.write_log("策略启动")
def on_stop(self):
""" 策略停止 """
self.write_log(" 策略停止 ")
def on_tick(self,tick:TickData):
""" Tick更新 """
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""K线更新"""
self.bg.update_bar(bar)
def on_7min_bar(self, bar: BarData):
"""K线更新"""
am = self.am
am.update_bar(bar)
if not am.inited:
return
""" 计算均线 """
fast_ma = am.sma(self.fast_window,True)
self.fast_ma0 = fast_ma[-1]
self.fast_ma1 = fast_ma[-2]
slow_ma = am.sma(self.slow_window,True)
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
""" 定义金叉和死叉 """
cross_over = (self.fast_ma0>= self.fast_ma1 and
self.slow_ma0<self.slow_ma1)
cross_below = (self.slow_ma0>self.slow_ma1 and
self.slow_ma0<=self.slow_ma1)
if cross_over:
price = bar.close_price + 5
if not self.pos:
self.buy(price,1)
elif self.pos < 0:
self.cover(price,1)
self.buy(price,1)
elif cross_below:
price = bar.close_price - 5
if not self.pos:
self.short(price,1)
elif self.pos>0:
self.sell(price,1)
self.short(price,1)
# 更新图形界面
self.put_event()
这个策略是演示如何利用1分钟K线合成7分钟K线,然后在on_7min_bar()里面利用7分钟K线计算快慢两根移动均线,
然后更加快慢移动均线的金叉和死叉信号来进行多空的开仓和平仓操作,如此实现一个自动策略买卖交易。
在构造函数init()中创建BarGenerator类型self.bg和管理bar的ArrayManager类型的self.am
这里的重点是self.load_bar(10),该函数是策略的父类CtaTemplate的函数,代码是这样的:
def load_bar(
self,
days: int,
interval: Interval = Interval.MINUTE,
callback: Callable = None,
use_database: bool = False
):
"""
Load historical bar data for initializing strategy.
"""
if not callback:
callback = self.on_bar
self.cta_engine.load_bar(
self.vt_symbol,
days,
interval,
callback,
use_database
)
self.cta_engine.load_bar()位于vnpy\app\cta_strategy.py中的CtaEngine类中,代码是这样的:
def load_bar(
self,
vt_symbol: str,
days: int,
interval: Interval,
callback: Callable[[BarData], None],load_bar
use_database: bool
):
""""""
symbol, exchange = extract_vt_symbol(vt_symbol)
end = datetime.now(get_localzone())
start = end - timedelta(days)
bars = []
# Pass gateway and RQData if use_database set to True
if not use_database:
# Query bars from gateway if available
contract = self.main_engine.get_contract(vt_symbol)
if contract and contract.history_data:
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
bars = self.main_engine.query_history(req, contract.gateway_name)
# Try to query bars from RQData, if not found, load from database.
else:
bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)
if not bars:
bars = database_manager.load_bar_data(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end,
)
for bar in bars:
callback(bar)
因为在策略中使用这样的语句self.load_bar(10),所以use_database参数为默认值False,可是我们知道目前CTP接口是不支持历史数据查询的,所以contract and contract.history_data的条件为假,导致bars 为空, 最终执行了:
bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)
而self.query_bar_from_rq的代码是这样的:
def query_bar_from_rq(
self, symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime
):
"""
Query bar data from RQData.
"""
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
data = rqdata_client.query_history(req)
return data
再看看rqdata_client.query_history(req)的代码,它把产生req的symbol,interval,start 和end各字段,转换成米筐接口可以接受的rq_symbol,rq_interval ,interval,start 和end等4个变量中,然后把end加上1天的时间【注意:这是非常重要的一个技巧,不然无法取出截止到当前交易时刻的1分钟bar!】,最后执行米筐接口函数rqdata_get_price()读取所有的10天多的bar数据,注意:是10天多的bar,而不是整10天的bar!
def query_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
"""
Query history bar data from RQData.
"""
if self.symbols is None:
return None
symbol = req.symbol
exchange = req.exchange
interval = req.interval
start = req.start
end = req.end
rq_symbol = self.to_rq_symbol(symbol, exchange)
if rq_symbol not in self.symbols:
return None
rq_interval = INTERVAL_VT2RQ.get(interval)
if not rq_interval:
return None
# For adjust timestamp from bar close point (RQData) to open point (VN Trader)
adjustment = INTERVAL_ADJUSTMENT_MAP[interval]
# For querying night trading period data
end += timedelta(1)
# Only query open interest for futures contract
fields = ["open", "high", "low", "close", "volume"]
if not symbol.isdigit():
fields.append("open_interest")
df = rqdata_get_price(
rq_symbol,
frequency=rq_interval,
fields=fields,
start_date=start,
end_date=end,
adjust_type="none"
)
data: List[BarData] = []
if df is not None:
for ix, row in df.iterrows():
dt = row.name.to_pydatetime() - adjustment
dt = CHINA_TZ.localize(dt)
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=dt,
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
close_price=row["close"],
volume=row["volume"],
open_interest=row.get("open_interest", 0),
gateway_name="RQ"
)
data.append(bar)
return data
同时可以知道interval的默认值为Interval.MINUTE。
至此我们可以看出,self.load_bar(10)其实就是从米筐接口获取的1分钟历史数据。
这里执行了
self.bg.update_tick(tick)
这是在调用策略的K线合成器self.bg的update_tick() 函数,这个函数是用来把tick数据按照1分钟为间隔来产生1分钟bar的,当1分钟bar合成之时再次调用策略的on_bar()。
BarGenerator的update_tick()的函数代码如下:
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return
# Filter tick data with less intraday trading volume (i.e. older timestamp)
if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
return
if not self.bar:
new_minute = True
elif self.bar.datetime.minute != tick.datetime.minute:
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
分析得知它开始生成self.bar的条件是:
if not self.bar:
new_minute = True
....
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
也就是说只要刚刚启动策略,就会立即生成一根新bar,而没有寻求对齐整分钟,这样会造成首个bar的合成非常可能是不完整的!
self.bg.update_bar(bar)
这个函数是用1分钟bar来合成7分钟bar的,当7分钟bar合成完成后,它会以7分钟bar为参数调用策略的on_7min_bar()。
am = self.am
am.update_bar(bar)
if not am.inited:
return
""" 计算均线 """
fast_ma = am.sma(self.fast_window,True)
self.fast_ma0 = fast_ma[-1]
self.fast_ma1 = fast_ma[-2]
slow_ma = am.sma(self.slow_window,True)
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
后面的代码就省略了
姑且不论策略是否可以赚钱,因为后面还要针对特定合约进行优化,这不是本帖讨论的重点!
从代码来看,一切都是那么自然,一个完美的例子!
这里分析的重点是假如我们在盘中启动策略的话,会发生什么问题,请看图:
如上图中所示:
我们知道从米筐接口读取的只有整分的K线数据,它不会提供没有走完的1分钟bar,所以如果你没有在整分钟结束的那一刻启动策略的话(做到这一点的概率太低了!),那么就一定会产生黄色的丢失部分。
因为第一根合成1分钟K线出现丢失部分,导致第一根合成1分钟K线的开、高、收、低、成交量和开仓兴趣都可能是错误的,进而导致利用1分钟K线合成的7分钟K线也是错误的,这可以说是连锁反应,当然也就会导致利用7分钟K线进行信号计算和交易查询问题!
也许你会说,有那么夸张吗?我不知道!不过这个丢失部分的时间长度在0~59.99秒之间,再说了就算是只有3秒的丢失,也可能是这1分钟中几乎全部的成交量,创新高、创新低都是有可能的,它的缺失也可能是让7分钟K线严重失真的重要原因,谁知道呢!我们这里分析目前的代码就是这样的,从原理上讲它确实是会出错的!
解决方法:
get_trading_dates() # 合约的所有的交易日
get_trading_hours() # 合约的所有的交易时间段
如果策略启动后最后一个历史1分钟bar与第一个tick数据在一个交易时间段(如9:00-10:15)中, 那么就可以判断出第一个1分钟K线出现了数据丢失,在这个第一个1分钟K线走完之时,就应该从米筐接口立即读取这个刚刚生成的历史1分钟bar,替换掉策略合成的第一个1分钟K线,其他的处理逻辑继续执行就可以了。按照第3节中的4的方法,修改BarGenerator,代码如下,可以解决问题:
class BarGenerator:
"""
For:
1. generating 1 minute bar data from tick data
2. generateing x minute bar/x hour bar data from 1 minute data
Notice:
1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
2. for x hour bar, x can be any number
"""
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
"""Constructor"""
self.bar: BarData = None
self.on_bar: Callable = on_bar
self.interval: Interval = interval
self.interval_count: int = 0
self.window: int = window
self.window_bar: BarData = None
self.on_window_bar: Callable = on_window_bar
self.last_tick: TickData = None
self.last_bar: BarData = None
self.is_first_bar = True # hxxjava add
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
from vnpy.trader.rqdata import rqdata_client # hxxjava add
from vnpy.trader.object import HistoryRequest # hxxjava add
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return False
# Filter tick data with less intraday trading volume (i.e. older timestamp)
if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
return False
if not self.bar:
new_minute = True
elif self.bar.datetime.minute != tick.datetime.minute:
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
# hxxjava add start
if self.is_first_bar:
self.is_first_bar = False
symbol,exchange = extract_vt_symbol(self.bar.vt_symbol)
bar_datetime = self.bar.datetime
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
start = bar_datetime,
end=bar_datetime,
interval=Interval.MINUTE
)
bars = rqdata_client.query_history(req)
self.bar = bars[-1]
print(f"【first bar time = {bar_datetime} history bar time = {self.bar.datetime},bars count={len(bars)}】")
# hxxjava add end
self.on_bar(self.bar)
new_minute = True
if new_minute:
print(f"【tick.datetime = {tick.datetime} is_first_bar={self.is_first_bar}】")
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
self.last_tick = tick
def update_bar(self, bar: BarData) -> None:
"""
Update 1 minute bar into generator
"""
# If not inited, creaate window bar object
if not self.window_bar:
# Generate timestamp for bar data
if self.interval == Interval.MINUTE:
dt = bar.datetime.replace(second=0, microsecond=0)
else:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
# Otherwise, update high/low price into window bar
else:
self.window_bar.high_price = max(
self.window_bar.high_price, bar.high_price)
self.window_bar.low_price = min(
self.window_bar.low_price, bar.low_price)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
# Check if window bar completed
finished = False
if self.interval == Interval.MINUTE:
# x-minute bar
if not (bar.datetime.minute + 1) % self.window:
finished = True
elif self.interval == Interval.HOUR:
if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
# 1-hour bar
if self.window == 1:
finished = True
# x-hour bar
else:
self.interval_count += 1
if not self.interval_count % self.window:
finished = True
self.interval_count = 0
if finished:
self.on_window_bar(self.window_bar)
self.window_bar = None
# Cache last bar object
self.last_bar = bar
def generate(self) -> Optional[BarData]:
"""
Generate the bar data and call callback immediately.
"""
bar = self.bar
if self.bar:
bar.datetime = bar.datetime.replace(second=0, microsecond=0)
self.on_bar(bar)
self.bar = None
return bar
不多废话先上结论(操作流程图):
下面是步骤说明,只要照着做100%可以搞定!!!
电话联系你的客户经理,向期货公司申请进行穿透式接入测试。如果期货公司服务质量较好,可能已经主动联系你邀请测试了。
填写申请表,每家期货公司有所区别,但整体上需要提供的信息可能包括:
以上多条信息的获取,就需要用到cmd中的系统命令工具了。
输入以下命令获取CPU序列号:
wmic cpu get processorid
逐条输入以下命令,获取硬盘序列号、主分区盘符和大小:
diskpart
select disk 0
detail disk
输入以下命令,“以太网适配器”下的“物理地址”就是MAC地址,“IPv4地址”就是内网IP:
ipconfig /all
访问www.ip138.com获取你的外网IP,或者直接百度搜索“IP”也行。
最后的AppID,是一个由用户提供的交易程序代码,以个人身份申请时,格式为:
client_xxxx_yyyy
其中xxxx是你的软件名称,yyyy是版本号,这两个字段都是客户自己填的信息(没有固定规则),以vn.py的v2.0版本为例,AppID可能为:
client_vnpy888_2.0
其中888的部分,是自定义的一个字符串,主要为了避免你的AppID和其他人重复,你可以选择随意选择:姓名拼音缩写、某个数字、幸运词......
提交申请表后,一般当天或者第二天就能拿到期货公司针对你的这个AppID提供的测试账号信息,包括:
我们这里测试环节以最新版的VN Studio为例,如果没有的话请点击下载:VNStudio-2.0.3。同样你也可以选择使用任何其他的软件程序来操作,如果不幸掉坑后爬不出来的话再回到VN Station好了。
双击桌面的VN Station图标,启动后会弹出登录框。如果是第一次使用,请点击“微信登录”按钮,扫描二维码后注册VN Station账号(同样也是vn.py官方社区论坛www.vnpy.com的登录账号),如果已经有账号了可以直接输入后点击“登录”。
登录完成后会看到VN Station主界面,此时请点击底部的“VN Trader Pro”,并在弹出的目录选择对话框中直接点“选择文件夹”按钮(即在默认的Windows用户目录下启动VN Station):
随后会弹出配置VN Treader的对话框,注意此时请一定只勾选加载CTPTEST接口,千万不要同时勾选加载CTP接口,会因为dll冲突导致后续测试失败!!!!
在VN Trader主界面上,点击左上角的“系统”->“连接CTPTEST”,在弹出的登录配置对话框中输入期货公司提供的测试账号信息(产品名称就是AppID),点击连接按钮后登录CTP穿透式测试用服务器:
当VN Trader左下角的日志监控组件中,刷新出熟悉的日志信息,看到“合约信息获取成功”的时候,就意味着我们已经完成测试了!
搞定上面的测试服务器连接登录后,就可以联系期货公司进行校验工作了,通常可以一次性直接通过,如果遇到不通过的情况请查看本文最后的常见问题来解决。
期货公司校验通过后,会将客户申请的AppID和AuthCode添加到实盘CTP的服务器上,此时只要把启动VN Trader Pro时,加载的接口由CTPTEST改为CTP,就可以连接上实盘交易环境,和以往一样进行量化交易了。
穿透式监管
新的监管模式主要是明确了期货公司对于其客户交易行为的管理责任,因此需要对所有接入交易柜台系统的交易终端软件进行认证管理,防止坏人耍流氓后一走了之,难以追查。
穿透式API
穿透式监管的主要实现工具,支持对交易终端机器的信息采集功能(即采集之前提到的CPU序列号、MAC地址等信息),并在加密后直接上传期货市场监控中心。除了本文中用到的CTP穿透式API外,其他的柜台也都提供了对应的穿透式API版本:恒生、易盛、飞创等,操作方法基本类似。
执行日期
在6月14日当天,所有期货公司的柜台系统全部强制升级为穿透式监管版本,老的非穿透式柜台会全部下线,没有所谓的“过渡期”,现在就已经是“过渡期”了!!!还有不到两周的时间,所有通过API接入交易的用户请赶紧吧,不要到了那天没法交易才着急,而且普遍的拖拉习惯,目前在申请接入测试的客户与日俱增,也对期货公司每天繁忙的后台IT部门表示感谢。
6.3.13
CTP穿透式柜台的仿真测试版本,也是CTPTEST接口中使用的API版本,所有客户的仿真接入认证测试都必须使用该版本!主要因为该版API采集客户的信息是没有加密的,期货公司可以在后台查看来进行认证工作。
6.3.15
CTP穿透式柜台的实盘交易版本,也是CTP接口中使用的API版本,完成仿真接入测试后,必须使用该版本才能连接实盘交易的CTP柜台。该版本的采集信息是安全加密的,期货公司的IT用后台系统也看不到。
直连模式
指的是所有用交易程序直接使用CTP的API连接CTP柜台,进行行情获取和委托交易的情况,几乎所有自主开发或者使用开源框架的量化交易客户都属于这种情况,直接使用穿透式监管版本的API进行开发就行(带_se后缀的)。
中继模式
指的是:交易客户端->中继服务器->CTP柜台,采用这种连接模式主要包括商业量化交易软件(比如文华财经)以及机构量化资管系统(比如O32),只有中继模式才需要用到那个DataCollect.dll文件。
同样以CTP为例:
只有交易接口TD需要进行认证,MD直接登录就行。每一步出错的话都会有相应的报错输出提示,查看错误信息内容后照着修改就行。
哪个版本的vn.py目前支持穿透式API?
最新的v2.0.3发布版本(Py3 64位),和v1.9.2-LTS版本(Py2 32位),都支持了穿透式API,推荐使用Windows进行相关测试工作(Linux上需要自己调整链接库做编译)。
更新后SimNow环境连不上了!
截止目前的2019年6月1日,SimNow上的交易测试环境(包括第一套和第二套)依旧为非穿透式的老版本,因此用穿透式版本的API都是连不上的。
SimNow的终端厂商测试环境连上后没有行情!
SimNow所提供的6.3.13测试环境,目前仅仅为了满足用户的穿透式版本测试需求(也就是能成功登录上来查询一下合约信息等),尚未提供第一套或者第二套环境中的仿真行情以及仿真交易功能,所以:就是没有行情的~
报错4097,cmd有输出Decrypt handshake data failed
这是因为你的API版本和服务器的版本不一致导致的,请按照以下流程排查:
UserProductInfo字段是用来干嘛的?
该字段是之前非穿透式API时,用来进行客户认证的产品名称字段(配合AuthCode一起)。穿透式版本接入的方案文档并没有对该字段的强制要求,目前我们这边已经对接了的5家期货公司也均未要求使用,但听说某些公司需要:如中信建投等,如果有了解其他公司情况的请在评论中分享。
能否使用云服务器或者虚拟机进行测试认证?
虚拟机和云服务器,对于本文开头部分提到的CPU序列号、硬盘序列号等信息,有可能获取不全或者部分字段不符合规定。目前有些期货公司要求严格,必须全部能正确获取到,且和第二部申请表中填写的内容一致,才能算认证测试成功;另一些公司则是十分宽松,表也不用填,采集信息也不看,只要登录上来就算测试通过。
所以,能否使用云服务器和虚拟机,完全取决于你开户的这家期货公司了。
如果使用v1.9.2之前老版本的vn.py怎么办?
请将v1.9.2的以下内容复制到你的老版本对应的目录下:
并采用上文提到的方式去做认证。
6.3.15的穿透式实盘API,想进行下测试怎么办?
目前只发现中信期货提供了6.3.15的仿真测试环境(忍不住竖起大拇指,不愧是中信),但最近估计申请人数过多,新的申请处理非常缓慢,如果大家发现别家提供6.3.15的测试环境也欢迎在评论里告知。
为什么期货公司一定要用6.3.13和6.3.15两个版本,接下来是否会合并都使用6.3.15?
为了满足穿透式监管认证要求,期货公司认证时要看到客户机器采集的信息,就只能通过6.3.13版本的CTP API。而实盘交易的环境中,期货监控中心要求直接上报采集信息,禁止期货公司查看和修改,就必须通过6.3.15版本。
所以目前来看仿真和实盘使用两个版本的API,是监管中心比较放心得过的方案吧,也就意味着交易客户端必须要两套API都对接准备好了。