Chinese futures market trading period (day/night)
DAY_START = time(8, 45)
DAY_END = time(15, 0)
NIGHT_START = time(20, 45)
NIGHT_END = time(23, 0)
def check_trading_period():
""""""
current_time = datetime.now().time()
trading = False
if (
(current_time >= DAY_START and current_time <= DAY_END)
or (current_time >= NIGHT_START)
or (current_time <= NIGHT_END)
):
trading = True
return trading
def run_child():
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
cta_engine = main_engine.add_app(CtaStrategyApp)
main_engine.write_log("主引擎创建成功")
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
main_engine.connect(ctp_setting, "CTP")
main_engine.write_log("连接CTP接口")
sleep(10)
cta_engine.init_engine()
main_engine.write_log("CTA策略初始化完成")
cta_engine.init_all_strategies()
sleep(60) # Leave enough time to complete strategy initialization
main_engine.write_log("CTA策略全部初始化")
cta_engine.start_all_strategies()
main_engine.write_log("CTA策略全部启动")
while True:
sleep(10)
trading = check_trading_period()
if not trading:
print("关闭子进程")
main_engine.close()
sys.exit(0)
def run_parent():
"""
Running in the parent process.
"""
print("启动CTA策略守护父进程")
child_process = None
while True:
trading = check_trading_period()
# Start child process in trading period
if trading and child_process is None:
print("启动子进程")
child_process = multiprocessing.Process(target=run_child)
child_process.start()
print("子进程启动成功")
# 非记录时间则退出子进程
if not trading and child_process is not None:
if not child_process.is_alive():
child_process = None
print("子进程关闭成功")
sleep(5)
if name == "main":
run_parent()
实盘运营no_ui,启动连接成功,但是有交易信息没有发单成交的问题?
上面的代码,成功启动,是否要添加具体策略信息?
还有是否要增加录制行业主代码才能成交?