vn.py量化社区
By Traders, For Traders.
Member
avatar
加入于:
帖子: 19
声望: 8

全市场行情录制,1核512内存应该也足够了,建议2核1G以上服务器

description

直接上代码

import sys
import multiprocessing
import re
from contextlib import closing
from copy import copy
from copy import deepcopy
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData, HistoryRequest, Product, TickData
from vnpy.trader.database import init
from vnpy.trader.setting import get_settings
from enum import Enum
from time import sleep
from datetime import datetime, time, timedelta
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol

from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
from vnpy.trader.event import EVENT_CONTRACT, EVENT_TICK

from vnpy.app.data_recorder.engine import RecorderEngine

EXCHANGE_LIST = [
    Exchange.SHFE,
    Exchange.DCE,
    Exchange.CZCE,
    Exchange.CFFEX,
    Exchange.INE,
]

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")


def is_futures(vt_symbol: str) -> bool:
    """
    是否是期货
    """
    return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))


class RecordMode(Enum):
    BAR = "bar"
    TICK = "tick"


class WholeMarketRecorder(RecorderEngine):
    def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
        super().__init__(main_engine, event_engine)
        self.record_modes = record_modes
        # 非交易时间
        self.drop_start = time(3, 15)
        self.drop_end = time(8, 45)

        # 大连、上海、郑州交易所,小节休息
        self.rest_start = time(10, 15)
        self.rest_end = time(10, 30)

    def is_trading(self, vt_symbol, current_time) -> bool:
        """
        交易时间,过滤校验Tick
        """
        symbol, exchange = extract_vt_symbol(vt_symbol)

        if current_time >= self.drop_start and current_time < self.drop_end:
            return False
        if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
            if current_time >= self.rest_start and current_time < self.rest_end:
                return False
        return True

    def load_setting(self):
        pass

    def record_tick(self, tick: TickData):
        """
        抛弃非交易时间校验数据
        """
        tick_time = tick.datetime.time()
        if not is_trading(tick.vt_symbol, tick_time):
            return
        task = ("tick", copy(tick))
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """
        抛弃非交易时间校验数据
        """
        bar_time = bar.datetime.time()
        if not is_trading(bar.vt_symbol, bar_time):
            return
        task = ("bar", copy(bar))
        self.queue.put(task)

    def process_contract_event(self, event):
        """"""
        contract = event.data
        vt_symbol = contract.vt_symbol
        # 不录制期权
        if is_futures(vt_symbol):
            if RecordMode.BAR in self.record_modes:
                self.add_bar_recording(vt_symbol)
            if RecordMode.TICK in self.record_modes:
                self.add_tick_recording(vt_symbol)
            self.subscribe(contract)


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)

    main_engine.write_log("开始录制数据")
    oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(1)


def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    # Chinese futures market trading period (day/night)
    MORNING_START = time(8, 45)
    MORNING_END = time(12, 0)

    AFTERNOON_START = time(12, 45)
    AFTERNOON_END = time(15, 35)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(3, 5)

    child_process = None

    while True:
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= MORNING_START and current_time <= MORNING_END)
            or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动数据录制子进程")
            child_process = multiprocessing.Process(target=run_child)
            child_process.start()
            print("数据录制子进程启动成功")

        # 非记录时间则退出数据录制子进程
        if not trading and child_process is not None:
            print("关闭数据录制子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("数据录制子进程关闭成功")
        sys.stdout.flush()
        sleep(5)


if __name__ == "__main__":
    run_parent()
Administrator
avatar
加入于:
帖子: 4603
声望: 262

给你加个精华

Member
avatar
加入于:
帖子: 2
声望: 0

第87、97行代码报错。请问我把“if not is_trading(tick.vt_symbol, tick_time):” 改成“if not self.is_trading(tick.vt_symbol, tick_time):对不对?

Member
加入于:
帖子: 7
声望: 2

Member
avatar
加入于:
帖子: 13
声望: 0

description
代码都看不懂,错了也改不来的,怎么办,怎么办!!!

Member
avatar
加入于:
帖子: 154
声望: 0

@丘
1:老师您好,数据录入是进默认的sqlite数据库吗?
2:例如,我数据库数据截止2020年5月1日,从2020年6月9日录制全市场行情,期间2020年5月1日----2020年6月9日是不是也应该没有呢?

Member
加入于:
帖子: 30
声望: 3

土娃哥 wrote:

第87、97行代码报错。请问我把“if not is_trading(tick.vt_symbol, tick_time):” 改成“if not self.is_trading(tick.vt_symbol, tick_time):对不对?
对的,windows 下和Linux下有些部分好像不一样

Member
加入于:
帖子: 30
声望: 3

个人觉得应该在执行完main_engine.connect(CTP_SETTING, "CTP")之后,在主线程中添加阻塞,以等待connect方法完全执行, 如果不阻塞一下主线程的话, 有一定的概率有部分eContract事件会先"跑掉",做不到在执行完来自oms的process_contract_event之后紧接着执行来自RecorderEngine的process_contract_event方法, 这是有风险的,最保险的做法是在主线程中增加sleep, 阻塞主线程一些时间.

Member
avatar
加入于:
帖子: 47
声望: 1

这个好像是有些问题,我运行的时候,只能获得少数几个远期合约,把我原来的行情记录的配置也给我删除了

Member
avatar
加入于:
帖子: 71
声望: 0

哪位大神能讲讲这段代码呢,运行了后 输出完

2020-11-23 14:05:06,807 INFO: 主引擎创建成功
2020-11-23 14:05:06,809 INFO: 注册日志事件监听
2020-11-23 14:05:06,935 INFO: 交易服务器连接成功
2020-11-23 14:05:06,947 INFO: 行情服务器连接成功
2020-11-23 14:05:07,045 INFO: 交易服务器授权验证成功
2020-11-23 14:05:07,045 INFO: 行情服务器登录成功
2020-11-23 14:05:07,160 INFO: 交易服务器登录成功
2020-11-23 14:05:09,437 INFO: 结算信息确认成功
2020-11-23 14:05:19,790 INFO: 合约信息查询成功
2020-11-23 14:05:21,811 INFO: 连接CTP接口
2020-11-23 14:05:21,811 INFO: 开始录制数据

然后就没反应了,也没发现有数据录入数据库

诚心求指导!

Member
avatar
加入于:
帖子: 71
声望: 0

description

Member
avatar
加入于:
帖子: 71
声望: 0

我理解,这个行情录制代码里,新建了一个数据记录类WholeMarketRecorder,在子进程中实例了一次(但是实例的那些方法何时调用的 不清楚。。)

运行这段代码后输出如上,然后后续一直没反应,数据库中也没有数据

同时我在WholeMarketRecorder的process_contract_event方法中 写了打印合约名称,也没运行出来(说明没运行这段代码)

大佬们有空的话 方便指导下么

Member
avatar
加入于:
帖子: 71
声望: 0

守望长城2020-6-11-艾瑞巴蒂 wrote:

个人觉得应该在执行完main_engine.connect(CTP_SETTING, "CTP")之后,在主线程中添加阻塞,以等待connect方法完全执行, 如果不阻塞一下主线程的话, 有一定的概率有部分eContract事件会先"跑掉",做不到在执行完来自oms的process_contract_event之后紧接着执行来自RecorderEngine的process_contract_event方法, 这是有风险的,最保险的做法是在主线程中增加sleep, 阻塞主线程一些时间.

问题初步解决 不能加sleep 加了就运行不了 还没搞清楚原因

Member
avatar
加入于:
帖子: 3
声望: 0

请问有办法优化吗,我现在发现存到mysql会有大概20分钟多的延迟。全行情的数据太多了

Member
avatar
加入于:
帖子: 92
声望: 7

Bambi wrote:

请问有办法优化吗,我现在发现存到mysql会有大概20分钟多的延迟。全行情的数据太多了
https://www.vnpy.com/forum/topic/5238-you-mei-you-yi-chong-kao-pu-de-lu-zhi-quan-shi-chang-xing-qing-de-fang-fa-ni

Member
avatar
加入于:
帖子: 18
声望: 0

请问如果想启动bar 与tick数据同时录制,这个方法下可以如何实现呢,sqlite 只能单进程,那么切换recordmode同时录制不行。

请问如何不默认存储到 默认用户文件夹的.db,而可以存储到自定义的文件夹,如何在脚本部分更改呢?

Member
avatar
加入于:
帖子: 2
声望: 0

求教: 我改造了下,目前可以正常连接火币,提示行情API连接成功,但没有数据记录到数据库,是不是没有传入要记录那些交易品种的数据?

Member
avatar
加入于:
帖子: 20
声望: 1

mengrong wrote:

请问如果想启动bar 与tick数据同时录制,这个方法下可以如何实现呢,sqlite 只能单进程,那么切换recordmode同时录制不行。

请问如何不默认存储到 默认用户文件夹的.db,而可以存储到自定义的文件夹,如何在脚本部分更改呢?

我也正在找这个办法,你解决了吗?

Member
avatar
加入于:
帖子: 20
声望: 1

description

可以录制,但是会报个错误,这个要怎么去掉?

Member
avatar
加入于:
帖子: 20
声望: 1

mengrong wrote:

请问如果想启动bar 与tick数据同时录制,这个方法下可以如何实现呢,sqlite 只能单进程,那么切换recordmode同时录制不行。

请问如何不默认存储到 默认用户文件夹的.db,而可以存储到自定义的文件夹,如何在脚本部分更改呢?

我用的是mongodb,可以同时录制bar和tick
修改def init(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
为def init(self, main_engine, event_engine, record_modes=[RecordMode.BAR,RecordMode.TICK]):

© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3

沪公网安备 31011502017034号