21年大佬在论坛里面发布了一个全市场录制代码的帖子,链接是https://www.vnpy.com/forum/topic/3046-quan-shi-chang-lu-zhi-xing-qing-shu-ju?page=3
因为时间久远,直接运行已经不能跑起来,最近根据最新的代码进行更新,在3.0以上的版本都可以正常运行,结果和大家分享一下。直接代码:
import sys
import multiprocessing
import re
from contextlib import closing
from copy import copy
from copy import deepcopy
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData, HistoryRequest, Product, TickData
#from vnpy.trader.database import init
from vnpy.trader.setting import get_settings
from enum import Enum
from time import sleep
from datetime import datetime, time, timedelta
from logging import INFO
from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json, extract_vt_symbol
#from vnpy.gateway.ctp import CtpGateway
from vnpy_ctp import CtpGateway
#from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy_ctastrategy import CtaStrategyApp
#from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
from vnpy_ctastrategy.base import EVENT_CTA_LOG
from vnpy.trader.event import EVENT_CONTRACT, EVENT_TICK
#from vnpy_datarecorder.engine import RecorderEngine
#from vnpy.app.data_recorder.engine import RecorderEngine
from vnpy.myTrade.engine_datamanager import RecorderEngine
EXCHANGE_LIST = [
Exchange.SHFE,
Exchange.DCE,
Exchange.CZCE,
Exchange.CFFEX,
Exchange.INE,
]
SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
CTP_SETTING = load_json("connect_ctp.json")
def is_futures(vt_symbol: str) -> bool:
"""
是否是期货
"""
return bool(re.match(r"^[a-zA-Z]{1,3}\d{2,4}.[A-Z]+$", vt_symbol))
class RecordMode(Enum):
BAR = "bar"
TICK = "tick"
class WholeMarketRecorder(RecorderEngine):
def __init__(self, main_engine, event_engine, record_modes=[RecordMode.BAR]):
self.record_modes = record_modes
super().__init__(main_engine, event_engine)
# 非交易时间
self.drop_start = time(3, 15)
self.drop_end = time(8, 45)
# 大连、上海、郑州交易所,小节休息
self.rest_start = time(10, 15)
self.rest_end = time(10, 30)
'''
def process_tick_event(self, event) -> None:
""""""
tick: TickData = event.data
self.update_tick(tick)
def update_tick(self, tick: TickData) -> None:
""""""
# 过滤偏离本地时间戳过大的Tick数据
tick_delta: timedelta = abs(tick.datetime - self.filter_dt)
if abs(tick_delta) >= self.filter_delta:
return
if tick.vt_symbol in self.tick_recordings:
self.record_tick(copy(tick))
#if tick.vt_symbol in self.bar_recordings:
#bg: BarGenerator = self.get_bar_generator(tick.vt_symbol)
#bg.update_tick(copy(tick))
'''
def is_trading(self, vt_symbol, current_time) -> bool:
"""
交易时间,过滤校验Tick
"""
symbol, exchange = extract_vt_symbol(vt_symbol)
if current_time >= self.drop_start and current_time < self.drop_end:
return False
if exchange in [Exchange.DCE, Exchange.SHFE, Exchange.CZCE]:
if current_time >= self.rest_start and current_time < self.rest_end:
return False
return True
'''
def load_setting(self):
self.filter_delta = timedelta(seconds=self.filter_window)
pass
'''
def record_tick(self, tick: TickData):
"""
抛弃非交易时间校验数据
"""
tick_time = tick.datetime.time()
if not self.is_trading(tick.vt_symbol, tick_time):
return
task = ("tick", copy(tick))
self.queue.put(task)
def record_bar(self, bar: BarData):
"""
抛弃非交易时间校验数据
"""
bar_time = bar.datetime.time()
if not self.is_trading(bar.vt_symbol, bar_time):
return
task = ("bar", copy(bar))
self.queue.put(task)
'''
def process_contract_event(self, event):
""""""
contract = event.data
vt_symbol = contract.vt_symbol
# 不录制期权
if is_futures(vt_symbol):
if RecordMode.BAR in self.record_modes:
self.add_bar_recording(vt_symbol)
if RecordMode.TICK in self.record_modes:
self.add_tick_recording(vt_symbol)
self.subscribe(contract)
'''
def run_child():
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
event_engine = EventEngine()
# 初始化的时候调用event中的start方法,启动事件引擎,包括事件分包事件和时间触发事件
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
main_engine.write_log("主引擎创建成功")
# 记录引擎
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
main_engine.connect(CTP_SETTING, "CTP")
main_engine.write_log("连接CTP接口")
whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)
main_engine.write_log("开始录制数据")
oms_engine = main_engine.get_engine("oms")
while True:
#whole_market_recorder.record_bar(
sleep(1)
def run_parent():
"""
Running in the parent process.
"""
print("启动CTA策略守护父进程")
# Chinese futures market trading period (day/night)
MORNING_START = time(8, 45)
MORNING_END = time(12, 0)
AFTERNOON_START = time(12, 45)
AFTERNOON_END = time(15, 35)
NIGHT_START = time(20, 45)
NIGHT_END = time(3, 5)
child_process = None
while True:
current_time = datetime.now().time()
trading = False
# Check whether in trading period
if (
(current_time >= MORNING_START and current_time <= MORNING_END)
or (current_time >= AFTERNOON_START and current_time <= AFTERNOON_END)
or (current_time >= NIGHT_START)
or (current_time <= NIGHT_END)
):
trading = True
# Start child process in trading period
if trading and child_process is None:
print("启动数据录制子进程")
child_process = multiprocessing.Process(target=run_child)
child_process.start()
print("数据录制子进程启动成功")
# 非记录时间则退出数据录制子进程
if not trading and child_process is not None:
print("关闭数据录制子进程")
child_process.terminate()
child_process.join()
child_process = None
print("数据录制子进程关闭成功")
sys.stdout.flush()
sleep(5)
if __name__ == "__main__":
run_parent()