VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 16
声望: 0

wrote:

全市场行情录制,1核512内存应该也足够了,建议2核1G以上服务器

description

直接上代码

import sys
import multiprocessing
import re
from contextlib import closing
from copy import copy
from copy import deepcopy
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData, HistoryRequest, Product, TickData
from vnpy.trader.database import init
from vnpy.trader.setting import get_settings
from enum import Enum
from time import sleep
from datetime import datetime, time, timedelta
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
from vnpy.trader.event import EVENT_CONTRACT, EVENT_TICK

from vnpy.app.data_recorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")


def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))


class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"


class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def load_setting(self):
        pass

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", copy(tick))
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not is_trading(bar.vt_symbol, bar_time):
            return
        task = ("bar", copy(bar))
        self.queue.put(task)

    def process_contract_event(self, event):
        """"""
        contract = event.data
        vt_symbol = contract.vt_symbol
        # 不录制期权
        if is_futures(vt_symbol):
            if RecordMode.BAR in self.record_modes:
                self.add_bar_recording(vt_symbol)
            if RecordMode.TICK in self.record_modes:
                self.add_tick_recording(vt_symbol)
            self.subscribe(contract)


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)


def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(12, 0)

    AFTERNOON_START = time(12, 45)
    AFTERNOON_END = time(15, 35)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(3, 5)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)


if __name__ == "__main__":
    run_parent()

大佬,这个太老了,最新2.9版本用不了,不知道怎么改,能不能贴一下最新的代码???

Member
avatar
加入于:
帖子: 16
声望: 0

libiya2000 wrote:

青青子荆 wrote:

libiya2000 wrote:

shunyuzhiqian wrote:

description

hi 我也遇到相同的问题了,请问你是如何处理这块,而解决的?
请参考一下13楼

也是不好用,我在windows下完全可以用,可是换到Ubuntu20.04 就不好用了。
在def record_tick(self, tick: TickData):里面加打印函数也没反应。
后来通过main_engine.add_engine(WholeMarketRecorder)的方式让def record_tick(self, tick: TickData):里面有了打印。
但是实际上仍然没有在sql里写入数据。
而且全程连个报错也没有出现。

大佬,你的 datarecoder.py 能不能分享一下?

Member
avatar
加入于:
帖子: 16
声望: 0

全市场录制会卡死把。我实测全录制的,只能收到很少的数据,服务器不发送了。只有录制一小部分才能收到

Member
avatar
加入于:
帖子: 11
声望: 5

我本地装的是2.7版本,修改了部分代码可以跑起来了,代码在下面。
可以录制data_recorder_setting.json里配置的品种,如果想录制全市场,就把注释的那一大段代码打开就可以了。

import sys
import multiprocessing
import re
from copy import copy
from vnpy.trader.constant import Exchange
from vnpy.trader.object import BarData, TickData
from enum import Enum
from time import sleep
from datetime import datetime, time
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy_ctp import CtpGateway
from vnpy_ctastrategy.base import EVENT_CTA_LOG
from vnpy_datarecorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")

def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))

class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"

class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not self.is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", [copy(tick)])
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not self.is_trading(bar.vt_symbol, bar_time):
            return
        print("push")
        print(bar)
        task = ("bar", [copy(bar)])
        self.queue.put(task)

    # def load_setting(self):
    #     # 不读取原数据记录设置
    #     pass
    # def process_contract_event(self, event):
    #     """
    #     设置记录所有期货合约
    #     """
    #     contract = event.data
    #     vt_symbol = contract.vt_symbol
    #     
    #     # 不录制期权
    #     if is_futures(vt_symbol):
    #         if RecordMode.BAR in self.record_modes:
    #             self.add_bar_recording(vt_symbol)
    #         if RecordMode.TICK in self.record_modes:
    #             self.add_tick_recording(vt_symbol)
    #         self.subscribe(contract)

def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎
    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    # oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)

def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(11, 45)

    AFTERNOON_START = time(13, 15)
    AFTERNOON_END = time(15, 15)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(2, 45)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)

if __name__ == "__main__":
    run_parent()
Member
avatar
加入于:
帖子: 227
声望: 0

请问数据保存在哪里

Member
avatar
加入于:
帖子: 227
声望: 0

代码中没有设定订阅 期货代码 ,请问只订阅单只怎么办

Member
avatar
加入于:
帖子: 716
声望: 62

保存在veighna使用的数据库中,没有设置的话默认是sqlite。录制的品种在data_recorder_setting.json填写,具体填写方法在ui下先配置需要订阅的合约,然后将ui环境中的data_recorder_setting.json复制到no_ui中,并进行修改即可。

Member
avatar
加入于:
帖子: 227
声望: 0

谢谢
郭易燔 wrote:

保存在veighna使用的数据库中,没有设置的话默认是sqlite。录制的品种在data_recorder_setting.json填写,具体填写方法在ui下先配置需要订阅的合约,然后将ui环境中的data_recorder_setting.json复制到no_ui中,并进行修改即可。

Member
avatar
加入于:
帖子: 12
声望: 0

你好,全场景录制里的 while循环是什么作用,会影响录制的时间吗

description

如果增加一个心跳记录函数,和全局对象,记录每分钟tick个数,在while里每分钟打印一次,会影响到整体性能以及耽误到接收的消息吗

#!/usr/bin/python
# -*- coding:utf-8 -*-
import logging
import sys
import multiprocessing
import re
from copy import copy
from vnpy.trader.constant import Exchange
from vnpy.trader.object import BarData, TickData
from enum import Enum
from time import sleep
from datetime import datetime, time

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy_ctp import CtpGateway
from vnpy_ctastrategy.base import EVENT_CTA_LOG
from vnpy_datarecorder.engine import RecorderEngine

from logger import loglog

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = logging.INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")
global HIS_DATE
HIS_DATE = datetime.now()
global g_tick_cnt
g_tick_cnt = 0


def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))


class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"


class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if self.drop_start <= current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if self.rest_start <= current_time < self.rest_end:
                return False
        return True

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not self.is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", [copy(tick)])
        self.queue.put(task)
        global g_tick_cnt
        g_tick_cnt = g_tick_cnt + 1

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not self.is_trading(bar.vt_symbol, bar_time):
            return
        print("push")
        print(bar)
        task = ("bar", [copy(bar)])
        self.queue.put(task)

    def load_setting(self):
        # 不读取原数据记录设置
        pass

    def process_contract_event(self, event):
        """
        设置记录所有期货合约
        """
        contract = event.data
        vt_symbol = contract.vt_symbol
        # 选择is_futures()=TRUE则录制期货,FALSE则录制期权
        if not is_futures(vt_symbol):
            if RecordMode.BAR in self.record_modes:
                print(f"record bar {vt_symbol}")
                self.add_bar_recording(vt_symbol)
            if RecordMode.TICK in self.record_modes:
                print(f"record tick {vt_symbol}")
                self.add_tick_recording(vt_symbol)
            self.subscribe(contract)


def change_print(frequency):
    global HIS_DATE
    global g_tick_cnt
    delta = 0
    if frequency == 'h':
        nn_hour = datetime.now().hour
        his_hour = HIS_DATE.hour
        delta = nn_hour - his_hour
    elif frequency == 'm':
        nn_minute = datetime.now().minute
        his_minute = HIS_DATE.minute
        delta = nn_minute - his_minute

    if delta != 0:
        loglog.info(f'frequency={frequency} cnt={g_tick_cnt}')
        g_tick_cnt = 0
        HIS_DATE = datetime.now()


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎
    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine,
                                                record_modes=[RecordMode.TICK])

    main_engine.write_log("开始录制数据")
    # oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)
        change_print(frequency='m')


def run_parent():
    """
    Running in the parent process.
    """
    loglog.info("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(11, 45)

    AFTERNOON_START = time(13, 15)
    AFTERNOON_END = time(15, 15)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(2, 45)

    child_process = None

    while True:
        current_time = time(10, 0)
        # current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (MORNING_START <= current_time <= MORNING_END)
            or (AFTERNOON_START <= current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            loglog.info("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            loglog.info("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            loglog.info("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            loglog.info("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)


if __name__ == "__main__":
    run_parent()
Member
avatar
加入于:
帖子: 1
声望: 0

脚本运行是依赖vn的配置文件的,可以使用studio的录制功能,把gate,存储,还有录制品种都添加一遍,确认能录到数据之后,再使用脚本自动录制。
配置文件是用户目录的.vntrader内:
connect_xxx.json 行情及交易服务器地址,账户等
vt_setting.json 行情数据存储位置等
data_recorder_setting.json 订阅及录制的合约名称

我修改了一个用来录中金期的期货合约行情,过滤掉了期权合约。使用的是tts的接口,替换dll之后接收期货公司的实盘行情。
运行是没问题,就是订阅到的只有主力和次主力合约,缺其他月份的合约,不知道问题在哪里。麻烦大家帮忙看一下??

description

import sys
import multiprocessing
import re
from copy import copy
from vnpy.trader.constant import Exchange
from vnpy.trader.object import BarData, TickData
from enum import Enum
from time import sleep
from datetime import datetime, time
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

# from vnpy_ctp import CtpGateway
from vnpy_tts import TtsGateway

from vnpy_ctastrategy.base import EVENT_CTA_LOG
from vnpy_datarecorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
# CTP_SETTING = load_json("connect_ctp.json")
CTP_SETTING = load_json("connect_tts.json")


def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))

class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"

class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR, RecordMode.TICK]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 5)
        self.drop_end = time(8, 55)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, tick_time) -> bool:
        """
        交易时间 过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        # if tick_time >= self.drop_start and tick_time < self.drop_end:
        #     return False
        # if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
        #     if tick_time >= self.rest_start and tick_time < self.rest_end:
        #         return False
        return True

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not self.is_trading(tick.vt_symbol, tick_time):
            return

        print(tick_time.strftime("%H:%M:%S.%f"), tick.vt_symbol,'tick pushed')
        task = ("tick", [copy(tick)])
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not self.is_trading(bar.vt_symbol, bar_time):
            return

        print(bar_time.strftime("%H:%M:%S.%f"), bar.vt_symbol,'bar pushed')
        task = ("bar", [copy(bar)])
        self.queue.put(task)

    # 如果需要录制全部品种,则注释掉以下代码
    def load_setting(self):
        # 不读取原数据记录setting
        pass

    def save_setting(self):
        # 不更新原数据记录setting
        pass

    def process_contract_event(self, event):
        """
        仅记录中金期的合约
        """
        if event.data.exchange == Exchange.CFFEX:
            contract = event.data
            vt_symbol = contract.vt_symbol

            # 过滤掉非期货合约
            if is_futures(vt_symbol):
                if RecordMode.BAR in self.record_modes:
                    self.add_bar_recording(vt_symbol)
                if RecordMode.TICK in self.record_modes:
                    self.add_tick_recording(vt_symbol)
                self.subscribe(contract)
                # print(contract)
                print("订阅合约:", vt_symbol)
    # 如果需要录制全部品种,则注释掉以上代码


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(TtsGateway)
    main_engine.write_log("创建主引擎")

    # 记录引擎
    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    # 连接行情服务器
    main_engine.connect(CTP_SETTING, "TTS")
    main_engine.write_log("连接TTS接口")

    #启动扩展的行情记录模块
    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)
    main_engine.write_log("开始录制")


    # oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)

def run_parent():
    """
    Running in the parent process.
    """
    print(f"{datetime.now().time()}---启动守护父进程,交易时段将启动行情录制子进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(9, 25)
    MORNING_END = time(11, 35)

    # 股指期货合约从下午1点开始
    AFTERNOON_START = time(12, 55)
    AFTERNOON_END = time(15, 5)

    #夜盘品种 部分到次日凌晨2点半
    # NIGHT_START = time(20, 55)
    # NIGHT_END = time(2, 35)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            # or (current_time >= NIGHT_START)
            # or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print(f"{datetime.now().time()}---启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print(f"{datetime.now().time()}---数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print(f"{datetime.now().time()}---关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print(f"{datetime.now().time()}---数据录制子进程关闭成功")
        sys.stdout.flush()

        sleep(10)

if __name__ == "__main__":
    run_parent()
Member
avatar
加入于:
帖子: 4
声望: 0

xzhangef wrote:

我本地装的是2.7版本,修改了部分代码可以跑起来了,代码在下面。
可以录制data_recorder_setting.json里配置的品种,如果想录制全市场,就把注释的那一大段代码打开就可以了。

import sys
import multiprocessing
import re
from copy import copy
from vnpy.trader.constant import Exchange
from vnpy.trader.object import BarData, TickData
from enum import Enum
from time import sleep
from datetime import datetime, time
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy_ctp import CtpGateway
from vnpy_ctastrategy.base import EVENT_CTA_LOG
from vnpy_datarecorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")

def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))

class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"

class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not self.is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", [copy(tick)])
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not self.is_trading(bar.vt_symbol, bar_time):
            return
        print("push")
        print(bar)
        task = ("bar", [copy(bar)])
        self.queue.put(task)

    # def load_setting(self):
    #     # 不读取原数据记录设置
    #     pass
    # def process_contract_event(self, event):
    #     """
    #     设置记录所有期货合约
    #     """
    #     contract = event.data
    #     vt_symbol = contract.vt_symbol
    #     
    #     # 不录制期权
    #     if is_futures(vt_symbol):
    #         if RecordMode.BAR in self.record_modes:
    #             self.add_bar_recording(vt_symbol)
    #         if RecordMode.TICK in self.record_modes:
    #             self.add_tick_recording(vt_symbol)
    #         self.subscribe(contract)

def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎
    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    # oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)

def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(11, 45)

    AFTERNOON_START = time(13, 15)
    AFTERNOON_END = time(15, 15)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(2, 45)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)

if __name__ == "__main__":
    run_parent()

感谢大佬,这个可以用,2023.6.5

Member
avatar
加入于:
帖子: 257
声望: 3

juchao1023 wrote:

xzhangef wrote:

我本地装的是2.7版本,修改了部分代码可以跑起来了,代码在下面。
可以录制data_recorder_setting.json里配置的品种,如果想录制全市场,就把注释的那一大段代码打开就可以了。

import sys
import multiprocessing
import re
from copy import copy
from vnpy.trader.constant import Exchange
from vnpy.trader.object import BarData, TickData
from enum import Enum
from time import sleep
from datetime import datetime, time
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy_ctp import CtpGateway
from vnpy_ctastrategy.base import EVENT_CTA_LOG
from vnpy_datarecorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")

def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))

class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"

class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not self.is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", [copy(tick)])
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not self.is_trading(bar.vt_symbol, bar_time):
            return
        print("push")
        print(bar)
        task = ("bar", [copy(bar)])
        self.queue.put(task)

    # def load_setting(self):
    #     # 不读取原数据记录设置
    #     pass
    # def process_contract_event(self, event):
    #     """
    #     设置记录所有期货合约
    #     """
    #     contract = event.data
    #     vt_symbol = contract.vt_symbol
    #     
    #     # 不录制期权
    #     if is_futures(vt_symbol):
    #         if RecordMode.BAR in self.record_modes:
    #             self.add_bar_recording(vt_symbol)
    #         if RecordMode.TICK in self.record_modes:
    #             self.add_tick_recording(vt_symbol)
    #         self.subscribe(contract)

def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎
    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    # oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)

def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(11, 45)

    AFTERNOON_START = time(13, 15)
    AFTERNOON_END = time(15, 15)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(2, 45)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)

if __name__ == "__main__":
    run_parent()

感谢大佬,这个可以用,2023.6.5

经反复测试,当日可以正常录制全市场行情数据,但是次日发现,后面几天仅部分合约有录制进数据库,原因具体是?

Member
avatar
加入于:
帖子: 4684
声望: 285

可以打印排查一下是否是收到非交易时段tick导致的

Member
avatar
加入于:
帖子: 257
声望: 3

xiaohe wrote:

可以打印排查一下是否是收到非交易时段tick导致的

经反复测试排查发现,当日可以正常录制全市场行情数据,但是次日发现,后面几天仅部分合约有录制进数据库,是因为这个全市场录制行情数据代码没有收到DCE与CZCE交易所的数据推送。

Member
avatar
加入于:
帖子: 84
声望: 1

ranjianlin wrote:

xiaohe wrote:

可以打印排查一下是否是收到非交易时段tick导致的

经反复测试排查发现,当日可以正常录制全市场行情数据,但是次日发现,后面几天仅部分合约有录制进数据库,是因为这个全市场录制行情数据代码没有收到DCE与CZCE交易所的数据推送。

怎么解决的呢?

Member
avatar
加入于:
帖子: 84
声望: 1

有个很大的疑问,
交易所会主动推送所有数据吗?
如果不是,怎么得到全市场所有行情的呢?
如果是,那为什么需要订阅呢?是本地过滤吗?看起来又不像

Member
avatar
加入于:
帖子: 84
声望: 1

请教:
行情录制含期权,加了一个打印self.main_engine.write_log(f"process_contract_event,{vt_symbol}"),
2023-11-01 15:00:13,195 INFO: process_contract_event,l2404-C-8600.DCE
Send Heartbeat 1698822018
2023-11-01 15:00:25,488 INFO: process_contract_event,l2312-P-8500.DCE
Send Heartbeat 1698822033
2023-11-01 15:00:39,827 INFO: process_contract_event,l2410-P-8500.DCE
Send Heartbeat 1698822052
2023-11-01 15:00:53,916 INFO: process_contract_event,l2409-C-7200.DCE
Send Heartbeat 1698822067
2023-11-01 15:01:12,988 INFO: process_contract_event,l2402-C-7200.DCE
Send Heartbeat 1698822082
2023-11-01 15:01:27,593 INFO: process_contract_event,l2403-C-8000.DCE
Send Heartbeat 1698822097
2023-11-01 15:01:51,378 INFO: process_contract_event,l2403-P-7100.DCE
Send Heartbeat 1698822112
2023-11-01 15:02:04,811 INFO: process_contract_event,l2403-C-7000.DCE
Send Heartbeat 1698822127
Send Heartbeat 1698822142
2023-11-01 15:02:27,426 INFO: process_contract_event,l2312-C-9300.DCE
Send Heartbeat 1698822157
2023-11-01 15:02:43,506 INFO: process_contract_event,l2405-P-7200.DCE
Send Heartbeat 1698822172
2023-11-01 15:02:57,646 INFO: process_contract_event,l2403-C-9300.DCE
Send Heartbeat 1698822187
2023-11-01 15:03:11,458 INFO: process_contract_event,l2403-C-7800.DCE
Send Heartbeat 1698822202
2023-11-01 15:03:28,129 INFO: process_contract_event,l2405-C-9200.DCE

看起来每次间隔时间还挺长的,而且在盘后还在执行,应该是队列里的任务还没有执行完,
这是因为电脑(J4125+8G)垃圾吗?或者有别的原因?

Member
avatar
加入于:
帖子: 4684
声望: 285

lazysnake2003 wrote:

有个很大的疑问,
交易所会主动推送所有数据吗?
如果不是,怎么得到全市场所有行情的呢?
如果是,那为什么需要订阅呢?是本地过滤吗?看起来又不像
订阅哪个合约就推送哪个合约的行情。如果是收到交易时段外tick导致录制失败,可以自己在process_tick_event里进行过滤(对交易时段进行限制)

Member
avatar
加入于:
帖子: 4684
声望: 285

lazysnake2003 wrote:

请教:
行情录制含期权,加了一个打印self.main_engine.write_log(f"process_contract_event,{vt_symbol}"),
2023-11-01 15:00:13,195 INFO: process_contract_event,l2404-C-8600.DCE
Send Heartbeat 1698822018
2023-11-01 15:00:25,488 INFO: process_contract_event,l2312-P-8500.DCE
Send Heartbeat 1698822033
2023-11-01 15:00:39,827 INFO: process_contract_event,l2410-P-8500.DCE
Send Heartbeat 1698822052
2023-11-01 15:00:53,916 INFO: process_contract_event,l2409-C-7200.DCE
Send Heartbeat 1698822067
2023-11-01 15:01:12,988 INFO: process_contract_event,l2402-C-7200.DCE
Send Heartbeat 1698822082
2023-11-01 15:01:27,593 INFO: process_contract_event,l2403-C-8000.DCE
Send Heartbeat 1698822097
2023-11-01 15:01:51,378 INFO: process_contract_event,l2403-P-7100.DCE
Send Heartbeat 1698822112
2023-11-01 15:02:04,811 INFO: process_contract_event,l2403-C-7000.DCE
Send Heartbeat 1698822127
Send Heartbeat 1698822142
2023-11-01 15:02:27,426 INFO: process_contract_event,l2312-C-9300.DCE
Send Heartbeat 1698822157
2023-11-01 15:02:43,506 INFO: process_contract_event,l2405-P-7200.DCE
Send Heartbeat 1698822172
2023-11-01 15:02:57,646 INFO: process_contract_event,l2403-C-9300.DCE
Send Heartbeat 1698822187
2023-11-01 15:03:11,458 INFO: process_contract_event,l2403-C-7800.DCE
Send Heartbeat 1698822202
2023-11-01 15:03:28,129 INFO: process_contract_event,l2405-C-9200.DCE

看起来每次间隔时间还挺长的,而且在盘后还在执行,应该是队列里的任务还没有执行完,
这是因为电脑(J4125+8G)垃圾吗?或者有别的原因?
这是process_contract_event不是process_tick_event

Member
avatar
加入于:
帖子: 84
声望: 1

xiaohe wrote:

lazysnake2003 wrote:

请教:
行情录制含期权,加了一个打印self.main_engine.write_log(f"process_contract_event,{vt_symbol}"),
2023-11-01 15:00:13,195 INFO: process_contract_event,l2404-C-8600.DCE
Send Heartbeat 1698822018
2023-11-01 15:00:25,488 INFO: process_contract_event,l2312-P-8500.DCE
Send Heartbeat 1698822033
2023-11-01 15:00:39,827 INFO: process_contract_event,l2410-P-8500.DCE
Send Heartbeat 1698822052
2023-11-01 15:00:53,916 INFO: process_contract_event,l2409-C-7200.DCE
Send Heartbeat 1698822067
2023-11-01 15:01:12,988 INFO: process_contract_event,l2402-C-7200.DCE
Send Heartbeat 1698822082
2023-11-01 15:01:27,593 INFO: process_contract_event,l2403-C-8000.DCE
Send Heartbeat 1698822097
2023-11-01 15:01:51,378 INFO: process_contract_event,l2403-P-7100.DCE
Send Heartbeat 1698822112
2023-11-01 15:02:04,811 INFO: process_contract_event,l2403-C-7000.DCE
Send Heartbeat 1698822127
Send Heartbeat 1698822142
2023-11-01 15:02:27,426 INFO: process_contract_event,l2312-C-9300.DCE
Send Heartbeat 1698822157
2023-11-01 15:02:43,506 INFO: process_contract_event,l2405-P-7200.DCE
Send Heartbeat 1698822172
2023-11-01 15:02:57,646 INFO: process_contract_event,l2403-C-9300.DCE
Send Heartbeat 1698822187
2023-11-01 15:03:11,458 INFO: process_contract_event,l2403-C-7800.DCE
Send Heartbeat 1698822202
2023-11-01 15:03:28,129 INFO: process_contract_event,l2405-C-9200.DCE

看起来每次间隔时间还挺长的,而且在盘后还在执行,应该是队列里的任务还没有执行完,
这是因为电脑(J4125+8G)垃圾吗?或者有别的原因?
这是process_contract_event不是process_tick_event

那么这个process_contract_event应该由事件EVENT_CONTRACT触发的对吧?那么queue里面的这个事件由谁写进去的呢?

如《行情记录过程中MainEngine都$了什么?行情记录模块源码部分分析》这篇所说,“这个connect方法其实是为主引擎注入灵魂的操作.CtpGateway类继承了Base_gateway类, 同时绑定了CtpMdApi和CTPTdApi两个类,里面有个self.init()方法, 具体逻辑看不到, 但是可以推测其功能, 如何推测呢, 看执行完connect之后oms引擎里头变量的变化,里面原本的部分空字典已经有数据了, 挑一个最复杂的self.contracts字典. 可以发现oms的self.contracts字典里已经充满了ctp接口内包含的所有合约代码的基本信息,”,

这是connect之后交易所会把所有的合约都推送出来吗?由OnRspQryMulticastInstrument回调传进来的吗?这个回调函数在哪里继承的啊?

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】