VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 5
声望: 0

在海通CTPTest环境下。使用VNPY2.1.2 可以订阅获取合约信息,但是却无法获得行情数据。
在前辈的指点下。使用2.0.7.可以成功订阅合约信息,并且获取行情数据。
我想问下在2.0.7 -》 2.1.2 是否做了相关改动导致海通证券无法获取行情数据。
相关代码如下

import multiprocessing
from time import sleep
from datetime import datetime, time
from logging import INFO
from vnpy.app.script_trader import ScriptEngine
from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine

from vnpy.gateway.ctptest import CtptestGateway
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True

ctp_setting = {
    # 资金账号
    "用户名": "",
    # 密码
    "密码": "",
    "经纪商代码": "",
    "交易服务器": "",
    "行情服务器": "",
    # APPID
    "产品名称": "client_vnpy_2.1.2",
    # 授权码
    "授权编码": "",
    # APPID
    "产品信息": "client_vnpy_2.1.2"
}


def run(engine: ScriptEngine):
    # 合约信息
    vt_symbols = ["IF2007.CFFEX"]
    # 订阅行情
    engine.subscribe(vt_symbols)
    # 获取合约信息
    for vt_symbol in vt_symbols:
        msg = f"合约入参,{vt_symbol}"
        engine.write_log(msg)
        contract = engine.get_contract(vt_symbol, True)
        msg = f"合约信息,{contract}"
        engine.write_log(msg)
    # 持续运行,使用strategy_active来判断是否要退出程序
    while engine.strategy_active:
        # 轮询获取行情
        for vt_symbol in vt_symbols:
            tick = engine.get_tick(vt_symbol, True)
            msg = f"最新行情, {tick}"
            engine.write_log(msg)

        # 等待3秒进入下一轮
        sleep(3)


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtptestGateway)
    # cta_engine = main_engine.add_app(CtaStrategyApp)
    main_engine.write_log("主引擎创建成功")

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(ctp_setting, "CTPTEST")
    main_engine.write_log("连接CTPTEST接口")

    sleep(10)
    #
    # cta_engine.init_engine()
    # main_engine.write_log("CTA策略初始化完成")
    #
    # cta_engine.init_all_strategies()
    # sleep(60)  # Leave enough time to complete strategy initialization
    # main_engine.write_log("CTA策略全部初始化")
    #
    # cta_engine.start_all_strategies()
    # main_engine.write_log("CTA策略全部启动")

    script_engine = ScriptEngine(main_engine, event_engine)
    script_engine.strategy_active = "true"
    run(script_engine)
    while True:
        sleep(1)


def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    DAY_START = time(8, 45)
    DAY_END = time(15, 30)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(2, 45)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if ((DAY_START <= current_time <= DAY_END)
                or (current_time >= NIGHT_START)
                or (current_time <= NIGHT_END)):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("子进程启动成功")

        # 非记录时间则退出子进程
        if not trading and child_process is not None:
            print("关闭子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("子进程关闭成功")

        sleep(5)


if __name__ == "__main__":
    run_parent()
Member
avatar
加入于:
帖子: 5
声望: 0

description

description

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】