直接上代码,不做过多解释了,需要的 拿去稍微改下就能自用
import os
import multiprocessing
import sys
from time import sleep
from datetime import datetime, time
from logging import INFO
from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import save_json
from vnpy_ctp import CtpGateway
from vnpy_ctastrategy import CtaStrategyApp
from vnpy_ctastrategy.base import EVENT_CTA_LOG
from vnpy_datarecorder import DataRecorderApp
from vnpy.trader.utility import TRADER_DIR,TEMP_DIR
SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
ctp_setting = {
"用户名": "usersimnow",
"密码": "passwordsimnow",
"经纪商代码": "9999",
"交易服务器": "180.168.146.187:10201", ##10201 实时 10130 7*24
"行情服务器": "180.168.146.187:10211", ##10211 实时 10131 7*24
"产品名称": "simnow_client_test",
"授权编码": "0000000000000000",
"产品信息": ""
}
futures_setting = [
##大商所
("AL8","a2301.DCE",1,1.0,"house"), ##豆一 20M
("BL8","b2301.DCE",1,1.0,"house"), ##豆二 8M
("CL8","c2303.DCE",1,1.0,"house"), ##玉米 19M
("CSL8","cs2301.DCE",1,1.0,"house"), ##淀粉 9M
("VL8","v2301.DCE",1,1.0,"house"), ##pvc 110M
("LL8","l2305.DCE",1,1.0,"house"), ##乙烯 40M
("ML8","m2301.DCE",1,1.0,"house"), ##豆粕 91M
("YL8","y2305.DCE",1,1.0,"house"), ##豆油 138M
("PL8","p2305.DCE",1,1.0,"house"), ##棕榈油 194M
("IL8","i2305.DCE",1,1.0,"house"), ##铁矿石 230M
##上期所
("LUL8","lu2303.INE",1,1.0,"house"), ##低硫燃油 26M
("SSL8","ss2301.SHFE",1,1.0,"house"), ##不锈钢 30M
("PBL8","pb2301.SHFE",1,1.0,"house"), ##沪铅 11M
("ALL8","al2301.SHFE",1,1.0,"house"), ##沪铝 36M
("ZNL8","zn2301.SHFE",1,1.0,"house"), ##沪锌 56M
("BUL8","bu2302.SHFE",1,1.0,"house"), ##沥青 40M
##郑商所
("TAL8","TA305.CZCE",1,1.0,"house"), ##PTA 141M
("RML8","RM305.CZCE",1,1.0,"house"), ##菜粕 43M
]
# Chinese futures market trading period (day/night)
DAY_START = time(8, 50)
DAY_END = time(15, 0)
NIGHT_START = time(20, 50)
NIGHT_END = time(23, 1)
##保存策略配置
def save_cta_strategy_setting(futures_setting):
data = {}
for future in futures_setting:
data[future[0]]={
"class_name": future[4],
"vt_symbol": future[1],
"setting": {
"class_name": future[4],
"fixed_size": future[2],
"fixed_offset": future[3]
}
}
save_json("cta_strategy_setting.json",data)
return data
##保存行情录制配置
def save_data_recorder_setting(futures_setting):
data = {'tick':{},'bar':{}}
for future in futures_setting:
data["bar"][future[1]]={
"symbol": future[1].split(".")[0],
"exchange": future[1].split(".")[1],
"gateway_name": "CTP"
}
save_json("data_recorder_setting.json",data)
return data
def check_trading_period():
""""""
current_time = datetime.now().time()
trading = False
if (
(current_time >= DAY_START and current_time <= DAY_END) or \
(current_time >= NIGHT_START and current_time <= NIGHT_END)
):
trading=True
return trading
def run_recorder():
"""
Running in the child process.
"""
SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
gateway=main_engine.add_gateway(CtpGateway)
recorder_engine=main_engine.add_app(DataRecorderApp)
main_engine.write_log("主引擎创建成功")
# 记录引擎
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
main_engine.connect(ctp_setting, "CTP")
main_engine.write_log("连接CTP接口")
while not gateway.td_api.contract_inited:
sleep(1)
main_engine.write_log("开始录制数据.....")
oms_engine = main_engine.get_engine("oms")
while True:
sleep(10)
trading = check_trading_period()
if not trading:
print("关闭recorder子进程")
recorder_engine.close()
main_engine.close()
sys.exit(0)
def run_ctastrategy():
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
event_engine = EventEngine()
main_engine = MainEngine(event_engine) ##添加事件驱动
gateway = main_engine.add_gateway(CtpGateway)##添加CTP接口驱动
cta_engine = main_engine.add_app(CtaStrategyApp)
main_engine.write_log("主引擎创建成功")
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
main_engine.connect(ctp_setting, "CTP") ##连接CTP接口
main_engine.write_log("连接CTP接口")
while not gateway.td_api.contract_inited:
sleep(1)
cta_engine.init_engine()
main_engine.write_log("CTA引擎初始化完成")
futures=cta_engine.init_all_strategies()
bfuture=True
while(bfuture) :
sleep(10)
bfuture=False
for key in futures:
if futures[key].result()!=None :
bfuture=True
main_engine.write_log("CTA策略初始化完成")
cta_engine.start_all_strategies()
main_engine.write_log("CTA策略启动完成")
while True:
sleep(10)
trading = check_trading_period()
if not trading:
print("关闭子进程")
main_engine.close()
sys.exit(0)
def run_parent():
"""
Running in the parent process.
"""
print("启动CTA策略守护父进程")
child_process = None
ctastrategy_process=None
recorder_process=None
while True:
trading = check_trading_period()
"""
##Start child process in trading period
if trading and child_process is None:
print("启动子进程")
child_process = multiprocessing.Process(target=run_ctastrategy)
child_process.start()
print("子进程启动成功")
if trading and ctastrategy_process is None:
print("启动策略子进程")
ctastrategy_process = multiprocessing.Process(target=run_ctastrategy)
ctastrategy_process.start()
print("ctastrategy进程启动成功")
sleep(5)
"""
if trading and recorder_process is None:
print("启动录制行情子进程")
recorder_process = multiprocessing.Process(target=run_recorder)
recorder_process.start()
print("recorder进程启动成功")
# 非记录时间则退出子进程
if not trading and ctastrategy_process is not None:
if not ctastrategy_process.is_alive():
ctastrategy_process = None
print("ctastrategy子进程关闭成功")
if not trading and recorder_process is not None:
if not recorder_process.is_alive():
recorder_process = None
print("recorder 子进程关闭成功")
if not trading :
print(str(TRADER_DIR) + ": loop...." +datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
sleep(5)
if __name__ == "__main__":
print(TRADER_DIR)
save_cta_strategy_setting(futures_setting)
save_data_recorder_setting(futures_setting)
run_parent()