VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 8
声望: 0

ScriptTrader - 脚本策略模块,订阅行情后,怎么可以获取到实时行情推送的tick?
example给的是每次调用get_tick去获取最新tick。
tick = engine.get_tick(vt_symbol="rb1910.SHFE",use_df=False)

如果不调用engine.get_tick的话,应该怎么调用拿到实时推送tick数据?
新手,期待大牛解答,万分感谢~

附上代码:

from time import sleep
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy_ctp import CtpGateway
from vnpy_scripttrader import ScriptEngine
from vnpy_scripttrader import ScriptTraderApp
from vnpy.trader.object import (
    SubscribeRequest,
)


def tick():
    ctp_setting = {
        "用户名": "XXX",
        "密码": "XXX",
        "经纪商代码": "9999",
        "交易服务器": "180.168.146.187:10130",
        "行情服务器": "180.168.146.187:10131",
        "产品名称": "simnow_client_test",
        "授权编码": "0000000000000000",
        "产品信息": ""
    }
    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)

    main_engine.add_app(ScriptTraderApp)
    main_engine.write_log("主引擎创建成功")
    main_engine.connect(ctp_setting, "CTP")
    main_engine.write_log("连接CTP接口")

    sleep(10)
    script_engine = ScriptEngine(main_engine, event_engine)
    vt_symbol = 'IF2201.CFFEX'
    # add_tick_recording('IF2201.CFFEX')
    contract = script_engine.get_contract(vt_symbol="IF2201.CFFEX", use_df=False)
    # contract = script_engine.get_contract(vt_symbol)
    if not contract:
        main_engine.write_log(f"找不到合约:{vt_symbol}")
        return
    req: SubscribeRequest = SubscribeRequest(
        symbol=contract.symbol,
        exchange=contract.exchange
    )
    main_engine.subscribe(req, contract.gateway_name)

if __name__ == '__main__':
    tick()

另外在ctp_gateway.py中的onRtnDepthMarketData方法中写了 print(tick)

    def onRtnDepthMarketData(self, data: dict) -> None:
        """行情数据推送"""
        # 过滤没有时间戳的异常行情数据
        if not data["UpdateTime"]:
            return

        # 过滤还没有收到合约数据前的行情推送
        symbol: str = data["InstrumentID"]
        contract: ContractData = symbol_contract_map.get(symbol, None)
        if not contract:
            return

        # 对大商所的交易日字段取本地日期
        if contract.exchange == Exchange.DCE:
            date_str: str = self.current_date
        else:
            date_str: str = data["ActionDay"]

        timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
        dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
        dt: datetime = CHINA_TZ.localize(dt)

        tick: TickData = TickData(
            symbol=symbol,
            exchange=contract.exchange,
            datetime=dt,
            name=contract.name,
            volume=data["Volume"],
            turnover=data["Turnover"],
            open_interest=data["OpenInterest"],
            last_price=adjust_price(data["LastPrice"]),
            limit_up=data["UpperLimitPrice"],
            limit_down=data["LowerLimitPrice"],
            open_price=adjust_price(data["OpenPrice"]),
            high_price=adjust_price(data["HighestPrice"]),
            low_price=adjust_price(data["LowestPrice"]),
            pre_close=adjust_price(data["PreClosePrice"]),
            bid_price_1=adjust_price(data["BidPrice1"]),
            ask_price_1=adjust_price(data["AskPrice1"]),
            bid_volume_1=data["BidVolume1"],
            ask_volume_1=data["AskVolume1"],
            gateway_name=self.gateway_name
        )

        if data["BidVolume2"] or data["AskVolume2"]:
            tick.bid_price_2 = adjust_price(data["BidPrice2"])
            tick.bid_price_3 = adjust_price(data["BidPrice3"])
            tick.bid_price_4 = adjust_price(data["BidPrice4"])
            tick.bid_price_5 = adjust_price(data["BidPrice5"])

            tick.ask_price_2 = adjust_price(data["AskPrice2"])
            tick.ask_price_3 = adjust_price(data["AskPrice3"])
            tick.ask_price_4 = adjust_price(data["AskPrice4"])
            tick.ask_price_5 = adjust_price(data["AskPrice5"])

            tick.bid_volume_2 = data["BidVolume2"]
            tick.bid_volume_3 = data["BidVolume3"]
            tick.bid_volume_4 = data["BidVolume4"]
            tick.bid_volume_5 = data["BidVolume5"]

            tick.ask_volume_2 = data["AskVolume2"]
            tick.ask_volume_3 = data["AskVolume3"]
            tick.ask_volume_4 = data["AskVolume4"]
            tick.ask_volume_5 = data["AskVolume5"]
        print(tick)
        self.gateway.on_tick(tick)
Member
avatar
加入于:
帖子: 3583
声望: 234

scripttrader模块就是通过get_tick/get_ticks函数获取main_engine的ticks字典里收到的tick数据的

Member
avatar
加入于:
帖子: 8
声望: 0

xiaohe wrote:

scripttrader模块就是通过get_tick/get_ticks函数获取main_engine的ticks字典里收到的tick数据的
您好,那这样一来,tick的实时性没法保证啊?该模块下有其他什么解决办法嘛?

Member
avatar
加入于:
帖子: 716
声望: 55

期货数据的tick是0.5秒一个,对于期货数据,可以写个while循环,每0.5秒查询一次。

Member
avatar
加入于:
帖子: 8
声望: 0

郭易燔 wrote:

期货数据的tick是0.5秒一个,对于期货数据,可以写个while循环,每0.5秒查询一次。

兄弟,查询数据和推送数据怎么能一样,推送数据都会有延迟,更别说查询了。另外再加上你的代码逻辑每次循环耗时也不一样,查询的实时性更无法保证。

Member
avatar
加入于:
帖子: 716
声望: 55

你可以在vnpy.trader.engine.py的process_tick_event函数下进行修改,每次tick推送都会调用这个函数。

ScriptTrader模块本身是没有监听tick推送事件的,如果要添加监听tick,需要自行修改。

Member
avatar
加入于:
帖子: 8
声望: 0

郭易燔 wrote:

你可以在vnpy.trader.engine.py的process_tick_event函数下进行修改,每次tick推送都会调用这个函数。

ScriptTrader模块本身是没有监听tick推送事件的,如果要添加监听tick,需要自行修改。

好的,我先看看,多谢

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】