vn.py量化社区
By Traders, For Traders.

置顶主题

VNPY模块间调用关系

画了一个周末,头晕了,简直就是个迷宫,放一放先...
description



Jupyter Notebook实现从IB接口历史数据获取,写入数据库,策略回测和实盘交易

刚好有个同学问怎么实现IB历史数据获取,和策略回测和实盘交易。想着熟悉vnpy2.0操作,就用Jupyter Notebook都是跑了一边。VNPY2.0的整体架构设计很有扩展性,而且调用也比起v1.0先进清晰很多,引擎加载调用非常方便。

讲讲注意点:

  1. IB接口历史数据大多是要收费的订阅的,如果收费会有报错信息提示,这里找个免费的作为使用。另外vnpy是按照最大6个月历史数据设计的。
  2. 数据库定义有个小坑,我是用mongodb的,在第一次填写 trader/setting.py中密码写错了,后面在trader/setting.py改发现怎么也改不好;原来当第一次维护后,配置会写入.vntrader/vt_setting,之后系统只会去.vntrader/vt_setting读取。去改vt_setting,而不是trader/setting.py。
  3. 使用CtaStrategyApp支持加入新策略,系统会自动输出json保持策略信息;所以第二次运行代码时候,会提示已经有了,不是问题。
  4. 我在代码里面把回测和实盘放在一次,如果直接跑下来可能会报错,建议跑实盘时候先注释的回测。
  5. 使用script_engine订阅历史数据是是默认从rqdata获取,vnpy v2.07 IB接口已经提供历史数据获取,这里创建HistoryRequest用main_engine来获取,

为了方便贴出来,改成.py代码格式,直接跑也没有问题。

from vnpy.app.script_trader import init_cli_trading
from vnpy.app.script_trader.cli import process_log_event
from vnpy.gateway.ib import IbGateway
from time import sleep
from datetime import datetime
import pandas as pd
# 连接到服务器
setting = {
    "TWS地址": "127.0.0.1",
    "TWS端口": 7497,
    "客户号":8 #每个链接用一个独立的链接号,一个IBAPI支持32个来同时链接
}
engine = init_cli_trading([IbGateway]) #返回Script_engine 示例,并且给main_engine注册了gateway
engine.connect_gateway(setting, "IB") #链接

# 查询资金 - 自动
sleep(10)
print("***查询资金和持仓***")
print(engine.get_all_accounts(use_df = True))
# 查询持仓
print(engine.get_all_positions(use_df = True))

# 订阅行情
from vnpy.trader.constant import Exchange
from vnpy.trader.object import SubscribeRequest
# 从我测试直接用Script_engine有问题,IB的品种太多,get_all_contracts命令不行,需要指定具体后才可以,这里使用main_engine订阅
req1 = SubscribeRequest("12087792",Exchange.IDEALPRO) #创建行情订阅
engine.main_engine.subscribe(req1,"IB")


# 使用script_engine订阅历史数据是从rqdata获取,vnpy v2.07已经提供历史数据获取,这里创建HistoryRequest来获取,
# 查询如果没有endtime,默认当前。返回历史数据输出到数据库和csv文件
# 关于api更多信息可以参见 https://interactivebrokers.github.io/tws-api/historical_bars.html
print("***从IB读取历史数据, 返回历史数据输出到数据库和csv文件***")
from vnpy.trader.object import HistoryRequest
from vnpy.trader.object import Interval
start = datetime.strptime('20190901', "%Y%m%d")

historyreq = HistoryRequest(
    symbol="12087792",
    exchange=Exchange.IDEALPRO,
    start=start,
    interval=Interval.MINUTE
)
# # 读取历史数据,并把历史数据BarData放入数据库
bardatalist = engine.main_engine.query_history(historyreq,"IB")
from vnpy.trader.database import database_manager
database_manager.save_bar_data(bardatalist)

# 把历史数据BarData输出到csv
pd.DataFrame(bardatalist).to_csv("C:\Project\\"+ str(historyreq.symbol) + ".csv" , index=True, header=True)
print("History data export to CSV")

# # 参考backtesting.ipynb, 使用自带的双均线策略回测,10日上穿60日做多,否则反之
print("***从数据库读取历史数据, 进行回测***")
from vnpy.app.cta_strategy.backtesting import BacktestingEngine
from vnpy.app.cta_strategy.strategies.double_ma_strategy import (
    DoubleMaStrategy,
)
btengine = BacktestingEngine() #新建回测引擎
btengine.set_parameters(
    vt_symbol="12087792.IDEALPRO",
    interval="1m",
    start=datetime(2019, 9, 1),
    end=datetime(2019, 10, 5),
    rate = 0,
    slippage=0.00005,
    size=1000,
    pricetick=0.00005,
    capital=1_000_000,
)
btengine.add_strategy(DoubleMaStrategy, {"fast_window":10, "slow_window": 60})

btengine.load_data()
btengine.run_backtesting()
df = btengine.calculate_result()
btengine.calculate_statistics()
btengine.show_chart()

# 给script_engine载入双均线策略,实盘运行
print("***从数据库读取准备数据, 实盘运行***")
# 使用cta交易引擎
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
engine.event_engine.register(EVENT_CTA_LOG, process_log_event)
cta_engine = engine.main_engine.add_app(CtaStrategyApp) #加入app
cta_engine.init_engine()
cta_engine.add_strategy("DoubleMaStrategy","DoubleMaStrategy_IB_12087792_v1", "12087792.IDEALPRO",{"fast_window":10, "slow_window": 50})
sleep(10)
cta_engine.init_strategy("DoubleMaStrategy_IB_12087792_v1")
sleep(10)
cta_engine.start_strategy("DoubleMaStrategy_IB_12087792_v1")


一张图解锁Python量化$发的知识要点!

发布于vn.py社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2021-01-09
 
基于《30天解锁Python量化开发》的课程内容,我们制作了这张【知识要点图】:

  • 对于已经购买课程的学员,可以更好的把课程中学到的内容要点,构建成为自己的Python量化开发知识体系
  • 对于目前还在自学的用户,可以有一套方向清晰的入门路径图,顺着循序渐进的目标一步一个脚印的学习前进

看完对课程感兴趣的话,请戳课程传送门】
 

description
 

最后,vn.py进驻【Gitee】(简单来说就是中国版的Github)差不多已经一个月的时间了,目前已经收获了218个Star和73个Fork,同时也拿到了【GVP】(Gitee最有价值开源项目)。

看来对于许多我们的用户,访问Github速度太慢真心是个需求痛点啊,好在现在有了一个更好的国内替代选择。同时也发现Fork/Star比例超过Github,可能因为同步方便所以更多人愿意尝试自己扩展开发?

最后再贴下仓库地址:https://gitee.com/vnpy/vnpy 。该Gitee仓库会每日和Github仓库同步,自动更新最新版本的代码,欢迎大家Star和Fork!
 



币安历史数据服务的下载流程及代码分享

币安提供历史数据服务下载,地址如下:https://www.binance.com/zh-CN/support/articles/360044357931
vn的gateway只能下载分钟粒度的数据,在这里申请可以下载到tick数据,对于有tick需求的朋友可以关注。
由于下载属于一次性服务,所以对代码没有做太多的优化,属于能用即可。
代码分为两个文件,第一个文件是获取文件下载地址的link_id。第二个文件是从link_id获取到对应的文件下载地址,然后下载到本地data文件夹。

第一个文件 download_id.py,默认将获取到的信息存储在/home/result_id.csv文件中,可以自行修改csv_file路径,由于下载权重过大,需要较长的延时进行下一条请求,所以直接设置sleep 1800秒后进行下一条请求

import hmac
import hashlib
from urllib import parse
import time
from datetime import datetime, timedelta
import requests
import pandas as pd
import numpy as np
import os

key = "自己的key"
secret = "自己的secret"
proxy_host = '127.0.0.1'
proxy_port = 1080
id_path = "https://api.binance.com/sapi/v1/futuresHistDataId"
link_path = "https://api.binance.com/sapi/v1/downloadLink"

query_symbols = ['BTCUSDT','ETHUSDT','LTCUSDT','EOSUSDT']

def sign(params:dict,secret:str):
    query = parse.urlencode(sorted(params.items()))
    secret = secret.encode()
    signature = hmac.new(secret, query.encode("utf-8"), hashlib.sha256).hexdigest()
    return signature

headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Accept": "application/json",
            "X-MBX-APIKEY": '自己的key'
        }

def get_query_path(path, params):
    timestamp = int(time.time()*1000)
    params["timestamp"] = timestamp
    query = parse.urlencode(sorted(params.items()))
    signature = sign(params, secret)
    query += "&signature={}".format(signature)
    query_path = path + '?' + query
    return query_path

def request_post(path, headers):
    response = requests.request('POST', path, headers=headers)
    return response
def request_get(path, headers):
    response = requests.request('GET', path, headers=headers)
    return response

def geterate_periods(start='2020-01-01 00:00:00', end='2021-04-05 00:00:00', days=15):

    if end:
        endArray = time.strptime(end, "%Y-%m-%d %H:%M:%S")
        end = int(time.mktime(endArray)) * 1000
    else:
        end = int(time.time()) * 1000

    startArray = time.strptime(start, "%Y-%m-%d %H:%M:%S")
    start = int(time.mktime(startArray)) * 1000
    return range(start, end, days*24*60*60*1000)

def get_symbol_start(csv_file,symbol,data_type,days):
    if not os.path.exists(csv_file):
        time_start = int(time.mktime(datetime.strptime('2020-01-01','%Y-%m-%d'))) * 1000
    else:
        df = pd.read_csv(csv_file)
        time_s= df[(df['symbol'] == symbol)&(df['dataType']==data_type)]['start'].max()
        if time_s is np.nan:
            time_start = int(time.mktime(datetime.strptime('2020-01-01','%Y-%m-%d').timetuple())) * 1000
        else:
            time_p = datetime.strptime(time_s,'%Y-%m-%d') + timedelta(days=days)
            time_start = int(time.mktime(time_p.timetuple())) * 1000
    return time_start


def run_id(csv_file, days):
    for symbol in query_symbols:
        if symbol == 'BTCUSDT':
            data_type_list = ['T_TRADE','S_DEPTH']
        else:
            data_type_list = ['T_TRADE']
        for data_type in data_type_list:
            time_start = get_symbol_start(csv_file, symbol, data_type,days)
            while time_start < int(time.mktime(datetime.today().timetuple()))*1000:
                start = time_start
                end = start + days * 24 * 60 * 60 * 1000
                params = {
                'symbol': symbol,
                'startTime': str(start),
                'endTime': str(end),
                'dataType': data_type
                }
                query_path_ = get_query_path(id_path, params)
                ret = request_post(query_path_, headers)
                print(ret.text)
                if ret.status_code == 200:
                    ret_id = ret.json()['id']
                    result_dic = {
                        'symbol':symbol,
                        'start':datetime.fromtimestamp(start/1000),
                        'dataType':data_type,
                        'id':ret_id
                    }
                    df = pd.DataFrame(result_dic, index=[0])
                    if not os.path.exists(csv_file):
                        df.to_csv(csv_file, index=False, mode='a')
                    else:
                        df.to_csv(csv_file, index=False, mode='a', header=False)
                    time_start = end
                else:
                    time.sleep(1800)

if __name__ == '__main__':
    csv_file = '/home/result_id.csv'
    days = 7
    run_id(csv_file, days)


集合竞价时段最后1分钟的tick问题

1 集合竞价时段

股票、期货合约都有集合竞价时段,如国内期货合约为前一交易日的21:00前5分钟,股指期货合约为当前交易日的9:30前5分钟,股票合约的为当前交易日的9:30之前。
集合竞价完成之后通常在开市前1分钟提供CTP接口推送第一个tick,注意:这个tick的时间戳为开市前1分钟,而不在各个交易时间段内!
如:
国内期货合约如果第一个交易段为21:00,那么这个tick的时间戳为20:59
国内股指期货合约为当前交易日的9:30,那么这个tick的时间戳为9:29

2 vnpy当前的BarGenerator无法正确处理该tick

实盘中每个合约都会有集合竞价时段,这时候采用自带的BarGenerator来合成1分钟bar就会有问题,因为20:59:00的那个tick既不属于上一交易日的1分钟bar(15:59)的,也不属于21:00的那一个1分钟bar,当然就只能够孤独地自成一个1bar啦!由此带来的问题是21:00的那一个1分钟bar的开盘价和成交量(这个成交量可能是很大)都可能是错了。
当然由此导致的5分钟、10分钟、15分钟、30分钟乃至1日的bar都可能是有问题的,因为它们都是1分钟bar合成的。

3 解决方法:

  • 从合约信息中提前交易时间段信息入手,通常某个交易日的第一个交易时间段前1分钟收到的tick应该归入该交易日的第一个1分钟bar,
  • 如国内期货的为前一交易日的21:00的前1分钟收到的tick应该归入21:00的那一个1分钟bar,
  • 股指期货的为当前交易日的9:30的前1分钟收到的tick应该归入9:30的那一个1分钟bar。
  • 在正确处理了交易日首个1分钟bar合成的基础上,其他的n分钟bar的合成才可能是正确的。

4 顺便谈谈没有办法合成90分钟K线的问题

你现在用BarGenerator没有办法合成90分钟K线,不信你试试看!挺搞笑的一个问题!

在此呼吁vnpy官方,重视CTP接口中的合约信息的作用,重构BarGenerator,作出一个解决当前问题众多的bar合成器。

天下苦此BarGenerator久矣!



《vn.py 2.2.0源代码深入分析》

2019年写了《vn.py 2.0.7源代码深入分析》,感谢各位老师的认可。
vn.py的维护团队是一个非常有朝气的团队,版本更新很快,3月底发布了最新的2.2.0版。
使用vn.py做交易,因为需要做一些自己的定制,所以并不是每个版本都随着更新,前面使用的是2.1.4。
但vn.py的每个新版本我都会关注,并不断更新文档。vn.py从2.1.9开始对基本功能(主要是数据管理部分)做了比较大的优化,我认为2.2.0应该是比较稳定好用的版本,准备将日常使用的系统升级到2.2.0。3月26日终于等到了2.2.0版,立刻开始分析代码并完善文档,这几天夜以继日,终于形成了这个文档,发出来请各位老师批评指正。

《10-vn.py 2.2.0源代码深入分析》
百度网盘链接:https://pan.baidu.com/s/1X3WNoE27RKJgxx6leC5X0g
提取码:6vl3



【文档】,再也不是vn.py的吐槽点!

发布于vn.py社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2021-3-31
 

过去几年里,如果问大家vn.py最大的吐槽点是什么,可能大部分人的第一反应就是【文档】。

作为官方团队,之前我们找了很多理由安慰自己:

  • 每月一个版本的持续迭代开发任务太重,没时间写~
  • 都开源了,用户有问题可以直接看源代码~
  • 公众号和知乎专栏有大把详细的分享文章~
  • 或者,程序猿(包括Quant)就是不喜欢写文档~

但随着vn.py逐渐成熟,我们越来越多的被用户们提醒着文档的不足,尤其在对比zipline、RQAlpha、backtrader等项目的文档后,更是有一种丑小鸭的自卑感。

所以从2021年初就开始准备全面的文档更新计划,上周2.2.0版本发布后总算是拿出了一点成果。更新后的文档不再去过多介绍内部代码的实现(这算是过去vn.py文档最大的坑之一),而是全部从用户角度出发,来详细讲解各项功能的使用方法和注意事项。

截止本文,已更新完成的文档篇幅包括:

目前还在更新中,预计会在二季度全部完成的篇幅:

  • 功能介绍

  • 安装指南

  • 基本使用

  • 交易接口

  • 数据库配置

  • 价差交易模块

  • 期权波动率交易模块

  • 多合约组合策略模块

  • 算法交易

  • 脚本策略

  • RPC服务

  • Excel RTD模块

  • 贡献代码

所有文档均可以通过vn.py官网的文档版块直接查看:

https://www.vnpy.com/docs/cn/index.html

同时文档的Markdown源文件,也都包含在Github仓库的docs文件夹下:

https://github.com/vnpy/vnpy/tree/dev/docs

大家在阅读过程中发现内容错误或者改进点,都欢迎直接发起PR到dev-docs分支。

有任何文档相关的建议也欢迎在下方留言,希望今年二季度结束前,可以让我们的用户再也不必为vn.py的文档头疼!



全市场录制行情数据

全市场行情录制,1核512内存应该也足够了,建议2核1G以上服务器

description

直接上代码

import sys
import multiprocessing
import re
from contextlib import closing
from copy import copy
from copy import deepcopy
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData, HistoryRequest, Product, TickData
from vnpy.trader.database import init
from vnpy.trader.setting import get_settings
from enum import Enum
from time import sleep
from datetime import datetime, time, timedelta
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
from vnpy.trader.event import EVENT_CONTRACT, EVENT_TICK

from vnpy.app.data_recorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")


def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))


class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"


class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def load_setting(self):
        pass

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", copy(tick))
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not is_trading(bar.vt_symbol, bar_time):
            return
        task = ("bar", copy(bar))
        self.queue.put(task)

    def process_contract_event(self, event):
        """"""
        contract = event.data
        vt_symbol = contract.vt_symbol
        # 不录制期权
        if is_futures(vt_symbol):
            if RecordMode.BAR in self.record_modes:
                self.add_bar_recording(vt_symbol)
            if RecordMode.TICK in self.record_modes:
                self.add_tick_recording(vt_symbol)
            self.subscribe(contract)


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)


def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(12, 0)

    AFTERNOON_START = time(12, 45)
    AFTERNOON_END = time(15, 35)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(3, 5)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)


if __name__ == "__main__":
    run_parent()


分析一下盘中启动CTA策略带来的第一根$K线错误

说明一下,本贴中的bar数据和K线数据其实是一回事,有时候是方便读代码就按照代码说,有时候为了尊崇人们的习惯来说,不必在意。

1 通常CTA策略都是读取和合成K线的

1.1 从一个有代表性的策略DemoStrategy谈起

代码是这样的:

from typing import Any

from vnpy.app.cta_strategy import (
    CtaTemplate,
    BarGenerator,
    ArrayManager
)

from vnpy.trader.object import (
    BarData,
    TickData
)

from vnpy.trader.constant import Interval

class DemoStrategy(CtaTemplate):
    """ 一个演示策略 """
    author = "hxxjava"

    fast_window = 10
    slow_window = 20

    fast_ma0 = 0
    fast_ma1 = 0
    slow_ma0 = 0
    slow_ma1 = 0

    parameters = [
        "fast_window",
        "slow_window"
    ]

    variables = [
        "fast_ma0",
        "fast_ma1",
        "slow_ma0",
        "slow_ma1",
    ]

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict 
    ):
        """构造函数"""
        super().__init__(cta_engine,strategy_name,vt_symbol,setting)

        self.bg = BarGenerator(
            on_bar=self.on_bar,
            window=7,
            on_window_bar=on_7min_bar,
            interval=Interval.Minute)

        self.am = ArrayManager()


    def on_init(self):
        """"""
        self.write_log("策略初始化")
        # account_data = self.cta_engine.get_account()
        self.load_bar(10)

    def on_start(self):
        """策略启动"""
        self.write_log("策略启动")

    def on_stop(self):
        """ 策略停止 """
        self.write_log(" 策略停止 ")

    def on_tick(self,tick:TickData):
        """ Tick更新 """
        self.bg.update_tick(tick) 

    def on_bar(self, bar: BarData):
        """K线更新"""
        self.bg.update_bar(bar)

    def on_7min_bar(self, bar: BarData):
        """K线更新"""
        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        """ 计算均线 """
        fast_ma = am.sma(self.fast_window,True)
        self.fast_ma0 = fast_ma[-1]
        self.fast_ma1 = fast_ma[-2]

        slow_ma = am.sma(self.slow_window,True)
        self.slow_ma0 = slow_ma[-1]
        self.slow_ma1 = slow_ma[-2]

        """ 定义金叉和死叉 """

        cross_over = (self.fast_ma0>= self.fast_ma1 and
                      self.slow_ma0<self.slow_ma1)  

        cross_below = (self.slow_ma0>self.slow_ma1 and 
                      self.slow_ma0<=self.slow_ma1)

        if cross_over:
            price = bar.close_price + 5

            if not self.pos:
                self.buy(price,1)
            elif self.pos < 0:
                self.cover(price,1)
                self.buy(price,1)
        elif cross_below:
            price = bar.close_price - 5

            if not self.pos:
                self.short(price,1)
            elif self.pos>0:
                self.sell(price,1)
                self.short(price,1)

        # 更新图形界面 
        self.put_event()

这个策略是演示如何利用1分钟K线合成7分钟K线,然后在on_7min_bar()里面利用7分钟K线计算快慢两根移动均线,
然后更加快慢移动均线的金叉和死叉信号来进行多空的开仓和平仓操作,如此实现一个自动策略买卖交易。

1.2 策略工作的过程是这样的:

1.2.1 首先执行构造函数init()

在构造函数init()中创建BarGenerator类型self.bg和管理bar的ArrayManager类型的self.am

1.2.2 然后执行on_init()函数

这里的重点是self.load_bar(10),该函数是策略的父类CtaTemplate的函数,代码是这样的:

    def load_bar(
        self,
        days: int,
        interval: Interval = Interval.MINUTE,
        callback: Callable = None,
        use_database: bool = False
    ):
        """
        Load historical bar data for initializing strategy.
        """
        if not callback:
            callback = self.on_bar

        self.cta_engine.load_bar(
            self.vt_symbol,
            days,
            interval,
            callback,
            use_database
        )

self.cta_engine.load_bar()位于vnpy\app\cta_strategy.py中的CtaEngine类中,代码是这样的:

    def load_bar(
        self,
        vt_symbol: str,
        days: int,
        interval: Interval,
        callback: Callable[[BarData], None],load_bar
        use_database: bool
    ):
        """"""
        symbol, exchange = extract_vt_symbol(vt_symbol)
        end = datetime.now(get_localzone())
        start = end - timedelta(days)
        bars = []

        # Pass gateway and RQData if use_database set to True
        if not use_database:
            # Query bars from gateway if available
            contract = self.main_engine.get_contract(vt_symbol)

            if contract and contract.history_data:
                req = HistoryRequest(
                    symbol=symbol,
                    exchange=exchange,
                    interval=interval,
                    start=start,
                    end=end
                )
                bars = self.main_engine.query_history(req, contract.gateway_name)

            # Try to query bars from RQData, if not found, load from database.
            else:
                bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)

        if not bars:
            bars = database_manager.load_bar_data(
                symbol=symbol,
                exchange=exchange,
                interval=interval,
                start=start,
                end=end,
            )

        for bar in bars:
            callback(bar)

因为在策略中使用这样的语句self.load_bar(10),所以use_database参数为默认值False,可是我们知道目前CTP接口是不支持历史数据查询的,所以contract and contract.history_data的条件为假,导致bars 为空, 最终执行了:

bars = self.query_bar_from_rq(symbol, exchange, interval, start, end)

而self.query_bar_from_rq的代码是这样的:

    def query_bar_from_rq(
        self, symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime
    ):
        """
        Query bar data from RQData.
        """
        req = HistoryRequest(
            symbol=symbol,
            exchange=exchange,
            interval=interval,
            start=start,
            end=end
        )
        data = rqdata_client.query_history(req)
        return data

再看看rqdata_client.query_history(req)的代码,它把产生req的symbol,interval,start 和end各字段,转换成米筐接口可以接受的rq_symbol,rq_interval ,interval,start 和end等4个变量中,然后把end加上1天的时间【注意:这是非常重要的一个技巧,不然无法取出截止到当前交易时刻的1分钟bar!】,最后执行米筐接口函数rqdata_get_price()读取所有的10天多的bar数据,注意:是10天多的bar,而不是整10天的bar!

def query_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
        """
        Query history bar data from RQData.
        """
        if self.symbols is None:
            return None

        symbol = req.symbol
        exchange = req.exchange
        interval = req.interval
        start = req.start
        end = req.end

        rq_symbol = self.to_rq_symbol(symbol, exchange)
        if rq_symbol not in self.symbols:
            return None

        rq_interval = INTERVAL_VT2RQ.get(interval)
        if not rq_interval:
            return None

        # For adjust timestamp from bar close point (RQData) to open point (VN Trader)
        adjustment = INTERVAL_ADJUSTMENT_MAP[interval]

        # For querying night trading period data
        end += timedelta(1)

        # Only query open interest for futures contract
        fields = ["open", "high", "low", "close", "volume"]
        if not symbol.isdigit():
            fields.append("open_interest")

        df = rqdata_get_price(
            rq_symbol,
            frequency=rq_interval,
            fields=fields,
            start_date=start,
            end_date=end,
            adjust_type="none"
        )

        data: List[BarData] = []

        if df is not None:
            for ix, row in df.iterrows():
                dt = row.name.to_pydatetime() - adjustment
                dt = CHINA_TZ.localize(dt)

                bar = BarData(
                    symbol=symbol,
                    exchange=exchange,
                    interval=interval,
                    datetime=dt,
                    open_price=row["open"],
                    high_price=row["high"],
                    low_price=row["low"],
                    close_price=row["close"],
                    volume=row["volume"],
                    open_interest=row.get("open_interest", 0),
                    gateway_name="RQ"
                )

                data.append(bar)

        return data

同时可以知道interval的默认值为Interval.MINUTE。
至此我们可以看出,self.load_bar(10)其实就是从米筐接口获取的1分钟历史数据。

1.2.3 当策略启动后,接收到tick数据推送时执行on_tick()

这里执行了

self.bg.update_tick(tick)

这是在调用策略的K线合成器self.bg的update_tick() 函数,这个函数是用来把tick数据按照1分钟为间隔来产生1分钟bar的,当1分钟bar合成之时再次调用策略的on_bar()。
BarGenerator的update_tick()的函数代码如下:

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        # Filter tick data with 0 last price
        if not tick.last_price:
            return

        # Filter tick data with less intraday trading volume (i.e. older timestamp)
        if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
            return

        if not self.bar:
            new_minute = True
        elif self.bar.datetime.minute != tick.datetime.minute:
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )
            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)

        self.last_tick = tick

分析得知它开始生成self.bar的条件是:

 if not self.bar:
    new_minute = True
   ....

 if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )

也就是说只要刚刚启动策略,就会立即生成一根新bar,而没有寻求对齐整分钟,这样会造成首个bar的合成非常可能是不完整的!

1.2.4 策略的on_bar()的执行:

self.bg.update_bar(bar)

这个函数是用1分钟bar来合成7分钟bar的,当7分钟bar合成完成后,它会以7分钟bar为参数调用策略的on_7min_bar()。

1.2.5 策略的on_7min_bar()的执行

        am = self.am
        am.update_bar(bar)
        if not am.inited:
            return

        """ 计算均线 """
        fast_ma = am.sma(self.fast_window,True)
        self.fast_ma0 = fast_ma[-1]
        self.fast_ma1 = fast_ma[-2]

        slow_ma = am.sma(self.slow_window,True)
        self.slow_ma0 = slow_ma[-1]
        self.slow_ma1 = slow_ma[-2]
        后面的代码就省略了

姑且不论策略是否可以赚钱,因为后面还要针对特定合约进行优化,这不是本帖讨论的重点!
从代码来看,一切都是那么自然,一个完美的例子!

2 如果你是在盘中启动将带来第一根K线错误

这里分析的重点是假如我们在盘中启动策略的话,会发生什么问题,请看图:

description

2.1 第一根合成1分钟K线的丢失部分

如上图中所示:

  1. 灰色的部分为策略利用self.load_bar(10)从米筐读取从启动之时起10日内历史1分钟bar,这就是1.2.2节中描述的那部分bar;
  2. 绿色的部分为策略利用接收到tick合成的1分钟bar,这就是1.2.3节和1.2.4节中描述的那部分bar;
  3. 黄色的部分为第一根合成1分钟K线的丢失部分,这是产生问题的主要原因!

我们知道从米筐接口读取的只有整分的K线数据,它不会提供没有走完的1分钟bar,所以如果你没有在整分钟结束的那一刻启动策略的话(做到这一点的概率太低了!),那么就一定会产生黄色的丢失部分。

2.2 第一根合成1分钟K线的丢失部分的影响

因为第一根合成1分钟K线出现丢失部分,导致第一根合成1分钟K线的开、高、收、低、成交量和开仓兴趣都可能是错误的,进而导致利用1分钟K线合成的7分钟K线也是错误的,这可以说是连锁反应,当然也就会导致利用7分钟K线进行信号计算和交易查询问题!
也许你会说,有那么夸张吗?我不知道!不过这个丢失部分的时间长度在0~59.99秒之间,再说了就算是只有3秒的丢失,也可能是这1分钟中几乎全部的成交量,创新高、创新低都是有可能的,它的缺失也可能是让7分钟K线严重失真的重要原因,谁知道呢!我们这里分析目前的代码就是这样的,从原理上讲它确实是会出错的!

3 怎么解决问题?

解决方法:

  1. 尽量不要在盘中启动策略,在盘前启动好要交易的策略,但这个方法仍然没有解决策略软件的问题。
  2. 在策略中增加是否盘中启动的判断,如果是盘中启动,则在第一根1分钟K线合成之时,抛弃不要,立即从米筐取得前1分钟的K线数据,这样就可以替换掉这个不完整的第一根合成1分钟K线,那么也就解决了第一根7分钟K线错误的问题,完美地解决问题。
  3. 那么解决该问题就需要知道启动策略的时刻是否在交易合约的交易时间段内,那么就需要知道合约的交易时间段信息。米筐接口时提供合约的交易时间段信息的,函数如下:
         get_trading_dates() # 合约的所有的交易日
         get_trading_hours() # 合约的所有的交易时间段
    如果策略启动后最后一个历史1分钟bar与第一个tick数据在一个交易时间段(如9:00-10:15)中, 那么就可以判断出第一个1分钟K线出现了数据丢失,在这个第一个1分钟K线走完之时,就应该从米筐接口立即读取这个刚刚生成的历史1分钟bar,替换掉策略合成的第一个1分钟K线,其他的处理逻辑继续执行就可以了。
  4. 另外一个简单解决方法是: 修改BarGenerator的update_tick(),当其返回合成第一个1分钟bar时,直接从米筐读取这个历史1分钟bar,以此替代之,后续的处理逻辑与目前的代码相同即可。这种方法的好处是不要根据合约的交易时间段来判断,简单,但是可能回因为读取米筐接口需要时间,会否影响tick数据的处理还有待编写代码来测试。
  5. 问题已经发现了,怎么实现代码还在思考中,应该不难。这个问题就难在你可能根本没有意识到它可能有问题!

4 一种解决第一根合成1分钟K线的方法:

按照第3节中的4的方法,修改BarGenerator,代码如下,可以解决问题:

class BarGenerator:
    """
    For:
    1. generating 1 minute bar data from tick data
    2. generateing x minute bar/x hour bar data from 1 minute data

    Notice:
    1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
    2. for x hour bar, x can be any number
    """

    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE
    ):
        """Constructor"""
        self.bar: BarData = None
        self.on_bar: Callable = on_bar

        self.interval: Interval = interval
        self.interval_count: int = 0

        self.window: int = window
        self.window_bar: BarData = None
        self.on_window_bar: Callable = on_window_bar

        self.last_tick: TickData = None
        self.last_bar: BarData = None
        self.is_first_bar = True            # hxxjava add

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        from vnpy.trader.rqdata import rqdata_client    # hxxjava add
        from vnpy.trader.object import HistoryRequest   # hxxjava add

        new_minute = False

        # Filter tick data with 0 last price
        if not tick.last_price:
            return False

        # Filter tick data with less intraday trading volume (i.e. older timestamp)
        if self.last_tick and tick.volume and tick.volume < self.last_tick.volume:
            return False

        if not self.bar:
            new_minute = True
        elif self.bar.datetime.minute != tick.datetime.minute:
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )

            # hxxjava add start
            if self.is_first_bar:   
                self.is_first_bar = False

                symbol,exchange = extract_vt_symbol(self.bar.vt_symbol)
                bar_datetime = self.bar.datetime
                req = HistoryRequest(
                    symbol=symbol,
                    exchange=exchange,
                    start = bar_datetime,
                    end=bar_datetime,
                    interval=Interval.MINUTE
                )
                bars = rqdata_client.query_history(req)
                self.bar = bars[-1]
                print(f"【first bar time = {bar_datetime} history bar time = {self.bar.datetime},bars count={len(bars)}】")
            # hxxjava add end

            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            print(f"【tick.datetime = {tick.datetime} is_first_bar={self.is_first_bar}】")
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)

        self.last_tick = tick

    def update_bar(self, bar: BarData) -> None:
        """
        Update 1 minute bar into generator
        """
        # If not inited, creaate window bar object
        if not self.window_bar:
            # Generate timestamp for bar data
            if self.interval == Interval.MINUTE:
                dt = bar.datetime.replace(second=0, microsecond=0)
            else:
                dt = bar.datetime.replace(minute=0, second=0, microsecond=0)

            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        # Otherwise, update high/low price into window bar
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price, bar.high_price)
            self.window_bar.low_price = min(
                self.window_bar.low_price, bar.low_price)

        # Update close price/volume into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += int(bar.volume)
        self.window_bar.open_interest = bar.open_interest

        # Check if window bar completed
        finished = False

        if self.interval == Interval.MINUTE:
            # x-minute bar
            if not (bar.datetime.minute + 1) % self.window:
                finished = True
        elif self.interval == Interval.HOUR:
            if self.last_bar and bar.datetime.hour != self.last_bar.datetime.hour:
                # 1-hour bar
                if self.window == 1:
                    finished = True
                # x-hour bar
                else:
                    self.interval_count += 1

                    if not self.interval_count % self.window:
                        finished = True
                        self.interval_count = 0

        if finished:
            self.on_window_bar(self.window_bar)
            self.window_bar = None

        # Cache last bar object
        self.last_bar = bar

    def generate(self) -> Optional[BarData]:
        """
        Generate the bar data and call callback immediately.
        """
        bar = self.bar

        if self.bar:
            bar.datetime = bar.datetime.replace(second=0, microsecond=0)
            self.on_bar(bar)

        self.bar = None
        return bar


看完这篇,彻底搞定期货穿透式CTP API接入

操作流程

不多废话先上结论(操作流程图):

description

下面是步骤说明,只要照着做100%可以搞定!!!

 

第一步:申请穿透式接入

电话联系你的客户经理,向期货公司申请进行穿透式接入测试。如果期货公司服务质量较好,可能已经主动联系你邀请测试了。

 

第二步:填表提交AppID

填写申请表,每家期货公司有所区别,但整体上需要提供的信息可能包括:

  • CPU序列号
  • 硬盘序列号
  • 硬盘主分区盘符和大小
  • 网卡MAC地址
  • 内网IP和外网IP
  • 交易程序的AppID

以上多条信息的获取,就需要用到cmd中的系统命令工具了。

输入以下命令获取CPU序列号:

wmic cpu get processorid

逐条输入以下命令,获取硬盘序列号、主分区盘符和大小:

diskpart
select disk 0
detail disk

输入以下命令,“以太网适配器”下的“物理地址”就是MAC地址,“IPv4地址”就是内网IP:

ipconfig /all

访问www.ip138.com获取你的外网IP,或者直接百度搜索“IP”也行。

最后的AppID,是一个由用户提供的交易程序代码,以个人身份申请时,格式为:

client_xxxx_yyyy

其中xxxx是你的软件名称,yyyy是版本号,这两个字段都是客户自己填的信息(没有固定规则),以vn.py的v2.0版本为例,AppID可能为:

client_vnpy888_2.0

其中888的部分,是自定义的一个字符串,主要为了避免你的AppID和其他人重复,你可以选择随意选择:姓名拼音缩写、某个数字、幸运词......

 

第三步:拿到AuthCode

提交申请表后,一般当天或者第二天就能拿到期货公司针对你的这个AppID提供的测试账号信息,包括:

  • 用户名、密码
  • 经纪商代码、仿真测试服务器地址(交易、行情)
  • 产品名称(你填的AppID)、授权编码(AuthCode)

 

第四步:仿真测试

我们这里测试环节以最新版的VN Studio为例,如果没有的话请点击下载:VNStudio-2.0.3。同样你也可以选择使用任何其他的软件程序来操作,如果不幸掉坑后爬不出来的话再回到VN Station好了。

双击桌面的VN Station图标,启动后会弹出登录框。如果是第一次使用,请点击“微信登录”按钮,扫描二维码后注册VN Station账号(同样也是vn.py官方社区论坛www.vnpy.com的登录账号),如果已经有账号了可以直接输入后点击“登录”。

description

登录完成后会看到VN Station主界面,此时请点击底部的“VN Trader Pro”,并在弹出的目录选择对话框中直接点“选择文件夹”按钮(即在默认的Windows用户目录下启动VN Station):

description

随后会弹出配置VN Treader的对话框,注意此时请一定只勾选加载CTPTEST接口,千万不要同时勾选加载CTP接口,会因为dll冲突导致后续测试失败!!!!

description

在VN Trader主界面上,点击左上角的“系统”->“连接CTPTEST”,在弹出的登录配置对话框中输入期货公司提供的测试账号信息(产品名称就是AppID),点击连接按钮后登录CTP穿透式测试用服务器:

description

当VN Trader左下角的日志监控组件中,刷新出熟悉的日志信息,看到“合约信息获取成功”的时候,就意味着我们已经完成测试了!

 

第五步:期货公司校验

搞定上面的测试服务器连接登录后,就可以联系期货公司进行校验工作了,通常可以一次性直接通过,如果遇到不通过的情况请查看本文最后的常见问题来解决。

 

第六步:实盘接入

期货公司校验通过后,会将客户申请的AppID和AuthCode添加到实盘CTP的服务器上,此时只要把启动VN Trader Pro时,加载的接口由CTPTEST改为CTP,就可以连接上实盘交易环境,和以往一样进行量化交易了。

description

 
 

名词解释

穿透式监管

新的监管模式主要是明确了期货公司对于其客户交易行为的管理责任,因此需要对所有接入交易柜台系统的交易终端软件进行认证管理,防止坏人耍流氓后一走了之,难以追查。

穿透式API

穿透式监管的主要实现工具,支持对交易终端机器的信息采集功能(即采集之前提到的CPU序列号、MAC地址等信息),并在加密后直接上传期货市场监控中心。除了本文中用到的CTP穿透式API外,其他的柜台也都提供了对应的穿透式API版本:恒生、易盛、飞创等,操作方法基本类似。

执行日期

在6月14日当天,所有期货公司的柜台系统全部强制升级为穿透式监管版本,老的非穿透式柜台会全部下线,没有所谓的“过渡期”,现在就已经是“过渡期”了!!!还有不到两周的时间,所有通过API接入交易的用户请赶紧吧,不要到了那天没法交易才着急,而且普遍的拖拉习惯,目前在申请接入测试的客户与日俱增,也对期货公司每天繁忙的后台IT部门表示感谢。

6.3.13

CTP穿透式柜台的仿真测试版本,也是CTPTEST接口中使用的API版本,所有客户的仿真接入认证测试都必须使用该版本!主要因为该版API采集客户的信息是没有加密的,期货公司可以在后台查看来进行认证工作。

6.3.15

CTP穿透式柜台的实盘交易版本,也是CTP接口中使用的API版本,完成仿真接入测试后,必须使用该版本才能连接实盘交易的CTP柜台。该版本的采集信息是安全加密的,期货公司的IT用后台系统也看不到。

直连模式

指的是所有用交易程序直接使用CTP的API连接CTP柜台,进行行情获取和委托交易的情况,几乎所有自主开发或者使用开源框架的量化交易客户都属于这种情况,直接使用穿透式监管版本的API进行开发就行(带_se后缀的)。

中继模式

指的是:交易客户端->中继服务器->CTP柜台,采用这种连接模式主要包括商业量化交易软件(比如文华财经)以及机构量化资管系统(比如O32),只有中继模式才需要用到那个DataCollect.dll文件。

 
 

API内部工作流程

同样以CTP为例:

  1. 调用Init,开始连接
  2. 收到OnFrontConnected,确认连接成功
  3. 调用ReqAuthenticate,这一步填入AppID和AuthCode,进行认证
  4. 收到OnRspAuthenticate,确认认证成功
  5. 调用ReqUserLogin,这一步同样需要填入AppID,进行登录
  6. 收到OnRspUserLogin,确认登录成功

只有交易接口TD需要进行认证,MD直接登录就行。每一步出错的话都会有相应的报错输出提示,查看错误信息内容后照着修改就行。

 
 

常见问题

哪个版本的vn.py目前支持穿透式API?

最新的v2.0.3发布版本(Py3 64位),和v1.9.2-LTS版本(Py2 32位),都支持了穿透式API,推荐使用Windows进行相关测试工作(Linux上需要自己调整链接库做编译)。

更新后SimNow环境连不上了!

截止目前的2019年6月1日,SimNow上的交易测试环境(包括第一套和第二套)依旧为非穿透式的老版本,因此用穿透式版本的API都是连不上的。

SimNow的终端厂商测试环境连上后没有行情!

SimNow所提供的6.3.13测试环境,目前仅仅为了满足用户的穿透式版本测试需求(也就是能成功登录上来查询一下合约信息等),尚未提供第一套或者第二套环境中的仿真行情以及仿真交易功能,所以:就是没有行情的~

报错4097,cmd有输出Decrypt handshake data failed

这是因为你的API版本和服务器的版本不一致导致的,请按照以下流程排查:

  1. 是否同时import了CTP(CtpGateway)和CTPTEST(CtptestGateway)接口,如有请移除另一个(两个同时加载会冲突)
  2. 确保使用CtptestGateway来连接6.3.13穿透式测试环境,用CtpGateway来连接6.3.15穿透式实盘环境
  3. v2.0.3以及v1.9.2-LTS(最新Github代码)都已升级到穿透式API,因此无法用于连接SimNow的老版本环境

UserProductInfo字段是用来干嘛的?

该字段是之前非穿透式API时,用来进行客户认证的产品名称字段(配合AuthCode一起)。穿透式版本接入的方案文档并没有对该字段的强制要求,目前我们这边已经对接了的5家期货公司也均未要求使用,但听说某些公司需要:如中信建投等,如果有了解其他公司情况的请在评论中分享。

能否使用云服务器或者虚拟机进行测试认证?

虚拟机和云服务器,对于本文开头部分提到的CPU序列号、硬盘序列号等信息,有可能获取不全或者部分字段不符合规定。目前有些期货公司要求严格,必须全部能正确获取到,且和第二部申请表中填写的内容一致,才能算认证测试成功;另一些公司则是十分宽松,表也不用填,采集信息也不看,只要登录上来就算测试通过。

所以,能否使用云服务器和虚拟机,完全取决于你开户的这家期货公司了。

如果使用v1.9.2之前老版本的vn.py怎么办?

请将v1.9.2的以下内容复制到你的老版本对应的目录下:

  • vnpy/api/ctp
  • vnpy/trader/gateway/ctpGateway
  • vnpy/trader/gateway/ctptestGateway

并采用上文提到的方式去做认证。

6.3.15的穿透式实盘API,想进行下测试怎么办?

目前只发现中信期货提供了6.3.15的仿真测试环境(忍不住竖起大拇指,不愧是中信),但最近估计申请人数过多,新的申请处理非常缓慢,如果大家发现别家提供6.3.15的测试环境也欢迎在评论里告知。

为什么期货公司一定要用6.3.13和6.3.15两个版本,接下来是否会合并都使用6.3.15?

为了满足穿透式监管认证要求,期货公司认证时要看到客户机器采集的信息,就只能通过6.3.13版本的CTP API。而实盘交易的环境中,期货监控中心要求直接上报采集信息,禁止期货公司查看和修改,就必须通过6.3.15版本。

所以目前来看仿真和实盘使用两个版本的API,是监管中心比较放心得过的方案吧,也就意味着交易客户端必须要两套API都对接准备好了。


统计

主题
5470
帖子
20939
已注册用户
22199
最新用户
在线用户
374
在线来宾用户
371
© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3

沪公网安备 31011502017034号