VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 2
声望: 3

直接上代码,不做过多解释了,需要的 拿去稍微改下就能自用

import os
import multiprocessing
import sys
from time import sleep
from datetime import datetime, time
from logging import INFO


from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import save_json

from vnpy_ctp import CtpGateway
from vnpy_ctastrategy import CtaStrategyApp
from vnpy_ctastrategy.base import EVENT_CTA_LOG

from vnpy_datarecorder import DataRecorderApp
from vnpy.trader.utility import TRADER_DIR,TEMP_DIR


SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True


ctp_setting = {
    "用户名": "usersimnow",
    "密码": "passwordsimnow",
    "经纪商代码": "9999",
    "交易服务器": "180.168.146.187:10201", ##10201 实时  10130 7*24
    "行情服务器": "180.168.146.187:10211", ##10211 实时  10131 7*24
    "产品名称": "simnow_client_test",
    "授权编码": "0000000000000000",
    "产品信息": ""
}

futures_setting = [
    ##大商所
    ("AL8","a2301.DCE",1,1.0,"house"),  ##豆一 20M
    ("BL8","b2301.DCE",1,1.0,"house"),  ##豆二 8M
    ("CL8","c2303.DCE",1,1.0,"house"),  ##玉米 19M
    ("CSL8","cs2301.DCE",1,1.0,"house"),  ##淀粉 9M
    ("VL8","v2301.DCE",1,1.0,"house"),  ##pvc  110M
    ("LL8","l2305.DCE",1,1.0,"house"),  ##乙烯  40M
    ("ML8","m2301.DCE",1,1.0,"house"),  ##豆粕  91M
    ("YL8","y2305.DCE",1,1.0,"house"),  ##豆油  138M
    ("PL8","p2305.DCE",1,1.0,"house"),  ##棕榈油   194M
     ("IL8","i2305.DCE",1,1.0,"house"),  ##铁矿石   230M
    ##上期所
    ("LUL8","lu2303.INE",1,1.0,"house"),  ##低硫燃油 26M
    ("SSL8","ss2301.SHFE",1,1.0,"house"),  ##不锈钢  30M
    ("PBL8","pb2301.SHFE",1,1.0,"house"),  ##沪铅  11M
    ("ALL8","al2301.SHFE",1,1.0,"house"),  ##沪铝  36M
    ("ZNL8","zn2301.SHFE",1,1.0,"house"),  ##沪锌  56M
    ("BUL8","bu2302.SHFE",1,1.0,"house"),  ##沥青  40M
    ##郑商所
    ("TAL8","TA305.CZCE",1,1.0,"house"),  ##PTA  141M
    ("RML8","RM305.CZCE",1,1.0,"house"),  ##菜粕 43M



]


# Chinese futures market trading period (day/night)
DAY_START = time(8, 50)
DAY_END = time(15, 0)

NIGHT_START = time(20, 50)
NIGHT_END = time(23, 1)

##保存策略配置
def save_cta_strategy_setting(futures_setting):
    data = {}

    for future in futures_setting:
        data[future[0]]={
            "class_name": future[4],
            "vt_symbol": future[1],
            "setting": {
                "class_name": future[4],
                "fixed_size": future[2],
                "fixed_offset": future[3]
            }
        }
    save_json("cta_strategy_setting.json",data)
    return data
##保存行情录制配置
def save_data_recorder_setting(futures_setting):
    data = {'tick':{},'bar':{}}
    for future in futures_setting:
        data["bar"][future[1]]={
            "symbol": future[1].split(".")[0],
            "exchange": future[1].split(".")[1],
            "gateway_name": "CTP"
        }
    save_json("data_recorder_setting.json",data)
    return data








def check_trading_period():
    """"""
    current_time = datetime.now().time()

    trading = False
    if (
        (current_time >= DAY_START and current_time <= DAY_END) or \
        (current_time >= NIGHT_START and current_time <= NIGHT_END)
    ):
        trading=True
    return trading


def run_recorder():
    """
    Running in the child process.
    """
    SETTINGS["log.active"] = True
    SETTINGS["log.level"] = INFO
    SETTINGS["log.console"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    gateway=main_engine.add_gateway(CtpGateway)
    recorder_engine=main_engine.add_app(DataRecorderApp)
    main_engine.write_log("主引擎创建成功")

    # 记录引擎

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(ctp_setting, "CTP")
    main_engine.write_log("连接CTP接口")
    while not gateway.td_api.contract_inited:
        sleep(1)
    main_engine.write_log("开始录制数据.....")

    oms_engine = main_engine.get_engine("oms")
    while True:
        sleep(10)
        trading = check_trading_period()
        if not trading:
            print("关闭recorder子进程")
            recorder_engine.close()
            main_engine.close()
            sys.exit(0)


def run_ctastrategy():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)  ##添加事件驱动 
    gateway = main_engine.add_gateway(CtpGateway)##添加CTP接口驱动 
    cta_engine = main_engine.add_app(CtaStrategyApp)
    main_engine.write_log("主引擎创建成功")

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(ctp_setting, "CTP")   ##连接CTP接口
    main_engine.write_log("连接CTP接口")
    while not gateway.td_api.contract_inited:
        sleep(1)
    cta_engine.init_engine()
    main_engine.write_log("CTA引擎初始化完成")
    futures=cta_engine.init_all_strategies()
    bfuture=True
    while(bfuture) :
        sleep(10)
        bfuture=False
        for key in futures:
            if futures[key].result()!=None :
                bfuture=True



    main_engine.write_log("CTA策略初始化完成")
    cta_engine.start_all_strategies()
    main_engine.write_log("CTA策略启动完成")

    while True:
        sleep(10)

        trading = check_trading_period()
        if not trading:
            print("关闭子进程")
            main_engine.close()
            sys.exit(0)


def run_parent():
    """
    Running in the parent process.
    """
    print("启动CTA策略守护父进程")

    child_process = None
    ctastrategy_process=None
    recorder_process=None



    while True:
        trading = check_trading_period()

        """
        ##Start child process in trading period
        if trading and child_process is None:
            print("启动子进程")
            child_process = multiprocessing.Process(target=run_ctastrategy)
            child_process.start()
            print("子进程启动成功")


        if trading and ctastrategy_process is None:
            print("启动策略子进程")
            ctastrategy_process = multiprocessing.Process(target=run_ctastrategy)
            ctastrategy_process.start()
            print("ctastrategy进程启动成功")
        sleep(5)
        """
        if trading and recorder_process is None:
            print("启动录制行情子进程")
            recorder_process = multiprocessing.Process(target=run_recorder)
            recorder_process.start()
            print("recorder进程启动成功")



        # 非记录时间则退出子进程
        if not trading and ctastrategy_process is not None:
            if not ctastrategy_process.is_alive():
                ctastrategy_process = None
                print("ctastrategy子进程关闭成功")
        if not trading and recorder_process is not None:
            if not recorder_process.is_alive():
                recorder_process = None
                print("recorder 子进程关闭成功")
        if not trading :
            print(str(TRADER_DIR) + ": loop...." +datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


        sleep(5)


if __name__ == "__main__":
    print(TRADER_DIR)
    save_cta_strategy_setting(futures_setting)
    save_data_recorder_setting(futures_setting)
    run_parent()
Member
avatar
加入于:
帖子: 2
声望: 0

学习了,谢谢分享!

Member
avatar
加入于:
帖子: 1690
声望: 122

感谢分享!

LLM学员
avatar
加入于:
帖子: 27
声望: 0

谢谢分享!

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】