VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 35
声望: 0

`
import multiprocessing
import sys
from time import sleep
from datetime import datetime, time

from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine

from my_ctp_gateway import MyCtpGateway, MyMdApi, MyTdApi

这是注释 SimNow24h

ctp_setting = {
"用户名": "你的SimNow账号",
"密码": "你的SimNow密码",
"经纪商代码": "9999",
"交易服务器": "tcp://180.168.146.187:10130",
"行情服务器": "tcp://180.168.146.187:10131",
"产品名称": "simnow_client_test",
"授权编码": "0000000000000000",
"产品信息": ""
}

DAY_START = time(8, 45)
DAY_END = time(15, 0)

NIGHT_START = time(20, 45)
NIGHT_END = time(2, 45)

def check_trading_period():
""""""
current_time = datetime.now().time()

trading = False
if (
    (current_time >= DAY_START and current_time <= DAY_END)
    or (current_time >= NIGHT_START)
    or (current_time <= NIGHT_END)
):
    trading = True

return trading

def run_child():
"""
Running in the child process.
"""
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(MyCtpGateway)

main_engine.connect(ctp_setting, "CTP")
while True:
    sleep(10)

    trading = check_trading_period()
    if not trading:
        print("关闭子进程")
        main_engine.close()
        sys.exit(0)

def run_parent():
"""
Running in the parent process.
"""
print("启动CTA策略守护父进程")

child_process = None

while True:
    trading = check_trading_period()

    # Start child process in trading period
    if trading and child_process is None:
        print("启动子进程")
        child_process = multiprocessing.Process(target=run_child)
        child_process.start()
        print("子进程启动成功")

    # 非记录时间则退出子进程
    if not trading and child_process is not None:
        if not child_process.is_alive():
            child_process = None
            print("子进程关闭成功")

    sleep(5)

if name == "main":
run_parent()

`

Member
avatar
加入于:
帖子: 35
声望: 0

运行后控制台输出打印如下:

启动CTA策略守护父进程
启动子进程
子进程启动成功
发起连接交易服务器请求
订阅私有流
订阅公共流
注册前置机网络地址
初始化......
发起连接行情服务器请求
连接交易接口
初始化查询任务
启动指定网关的连接
交易服务器连接成功
向交易服务器发起授权验证请求
发起用户登录行情服务器请求
行情服务器连接成功
行情服务器登录成功
发起用户登录交易服务器请求:
交易服务器授权验证成功
交易服务器登录成功
结算信息确认成功

合约信息查询成功

Member
avatar
加入于:
帖子: 35
声望: 0

my_ctp_gateway.py 文件内容如下

Member
avatar
加入于:
帖子: 35
声望: 0

from datetime import datetime
from time import sleep

from pathlib import Path
from vnpy_ctp.api import MdApi, TdApi
from vnpy_ctp.api.ctp_constant import *

from vnpy_ctp.gateway.ctp_gateway import *

from vnpy.event import Event, EventEngine
from vnpy_ctp import CtpGateway

from vnpy.trader.constant import (
Direction,
Offset,
Exchange,
OrderType,
Product,
Status,
OptionType
)

from vnpy.trader.object import (
TickData,
OrderData,
TradeData,
PositionData,
AccountData,
ContractData,
OrderRequest,
CancelRequest,
SubscribeRequest,
)

from vnpy_ctp.gateway.ctp_gateway import (
EXCHANGE_CTP2VT, symbol_contract_map, CHINA_TZ, adjust_price, DIRECTION_CTP2VT, OFFSET_CTP2VT, PRODUCT_CTP2VT,
OPTIONTYPE_CTP2VT, STATUS_CTP2VT, ORDERTYPE_CTP2VT, OFFSET_VT2CTP, ORDERTYPE_VT2CTP, DIRECTION_VT2CTP, MAX_FLOAT
)

from vnpy.trader.event import (
EVENT_LOG, EVENT_TICK, EVENT_CONTRACT, EVENT_POSITION, EVENT_TIMER
)

from vnpy.trader.utility import get_folder_path, ZoneInfo
from vnpy.trader.event import EVENT_TIMER

class MyCtpGateway(CtpGateway):
"""
用于对接期货CTP柜台的交易接口。
"""

default_name: str = "CTP"

default_setting: dict[str, str] = {
    "用户名": "",
    "密码": "",
    "经纪商代码": "",
    "交易服务器": "",
    "行情服务器": "",
    "产品名称": "",
    "授权编码": ""
}

exchanges: list[str] = list(EXCHANGE_CTP2VT.values())

def __init__(self, event_engine: EventEngine, gateway_name: str) -> None:
    """构造函数"""
    super().__init__(event_engine, gateway_name)

    self.td_api: MyTdApi = MyTdApi(self)
    self.md_api: MyMdApi = MyMdApi(self)

def connect(self, setting: dict) -> None:
    """连接交易接口"""
    userid: str = setting["用户名"]
    password: str = setting["密码"]
    brokerid: str = setting["经纪商代码"]
    td_address: str = setting["交易服务器"]
    md_address: str = setting["行情服务器"]
    appid: str = setting["产品名称"]
    auth_code: str = setting["授权编码"]

    if (
        (not td_address.startswith("tcp://"))
        and (not td_address.startswith("ssl://"))
        and (not td_address.startswith("socks"))
    ):
        td_address = "tcp://" + td_address

    if (
        (not md_address.startswith("tcp://"))
        and (not md_address.startswith("ssl://"))
        and (not md_address.startswith("socks"))
    ):
        md_address = "tcp://" + md_address

    self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid)
    self.md_api.connect(md_address, userid, password, brokerid)
    print("连接交易接口")
    self.init_query()

def subscribe(self, req: SubscribeRequest) -> None:
    """订阅行情"""
    self.md_api.subscribe(req)

def send_order(self, req: OrderRequest) -> str:
    """委托下单"""
    return self.td_api.send_order(req)

def cancel_order(self, req: CancelRequest) -> None:
    """委托撤单"""
    self.td_api.cancel_order(req)

def query_account(self) -> None:
    """查询资金"""
    self.td_api.query_account()

def query_position(self) -> None:
    """查询持仓"""
    self.td_api.query_position()

def close(self) -> None:
    """关闭接口"""
    self.td_api.close()
    self.md_api.close()

def write_error(self, msg: str, error: dict) -> None:
    """输出错误信息日志"""
    error_id: int = error["ErrorID"]
    error_msg: str = error["ErrorMsg"]
    msg: str = f"{msg},代码:{error_id},信息:{error_msg}"
    self.write_log(msg)

def process_timer_event(self, event) -> None:
    """定时事件处理"""
    self.count += 1
    if self.count < 2:
        return
    self.count = 0

    func = self.query_functions.pop(0)
    func()
    self.query_functions.append(func)

    self.md_api.update_date()

def init_query(self) -> None:
    """初始化查询任务"""
    self.count: int = 0
    self.query_functions: list = [self.query_account, self.query_position]
    self.event_engine.register(EVENT_TIMER, self.process_timer_event)
    print("初始化查询任务")


class MyMdApi(MdApi):
"""
CTP的行情API
"""
def init(self, gateway: CtpGateway) -> None:
"""构造函数"""
super().init()

    self.gateway: CtpGateway = gateway
    self.gateway_name: str = gateway.gateway_name

    self.reqid: int = 0

    self.connect_status: bool = False
    self.login_status: bool = False
    self.subscribed: set = set()

    self.userid: str = ""
    self.password: str = ""
    self.brokerid: str = ""

    self.current_date: str = datetime.now().strftime("%Y%m%d")

def onFrontConnected(self) -> None:
    """服务器连接成功回报"""
    self.gateway.write_log("行情服务器连接成功")
    self.login()
    print("行情服务器连接成功")

def onFrontDisconnected(self, reason: int) -> None:
    """服务器连接断开回报"""
    self.login_status = False
    self.gateway.write_log(f"行情服务器连接断开,原因{reason}")
    print("行情服务器连接断开")

def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
    """用户登录请求回报"""
    global is_loginMd
    if not error["ErrorID"]:
        self.login_status = True
        self.gateway.write_log("行情服务器登录成功")
        print("行情服务器登录成功")
        self.subscribeMarketData("ss2409")
        is_loginMd = True
        for symbol in self.subscribed:
            self.subscribeMarketData(symbol)
    else:
        self.gateway.write_error("行情服务器登录失败", error)
        is_loginMd = False
        print("行情服务器登录失败")

def onRspError(self, error: dict, reqid: int, last: bool) -> None:
    """请求报错回报"""
    self.gateway.write_error("行情接口报错", error)

def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool) -> None:
    """订阅行情回报"""
    if not error or not error["ErrorID"]:
        return

    self.gateway.write_error("行情订阅失败", error)

def onRtnDepthMarketData(self, data: dict) -> None:
    """行情数据推送"""
    # 过滤没有时间戳的异常行情数据
    if not data["UpdateTime"]:
        return

    # 过滤还没有收到合约数据前的行情推送
    symbol: str = data["InstrumentID"]
    contract: ContractData = symbol_contract_map.get(symbol, None)
    if not contract:
        return

    # 对大商所的交易日字段取本地日期
    if not data["ActionDay"] or contract.exchange == Exchange.DCE:
        date_str: str = self.current_date
    else:
        date_str: str = data["ActionDay"]

    timestamp: str = f"{date_str} {data['UpdateTime']}.{data['UpdateMillisec']}"
    dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
    dt: datetime = dt.replace(tzinfo=CHINA_TZ)

    tick: TickData = TickData(
        symbol=symbol,
        exchange=contract.exchange,
        datetime=dt,
        name=contract.name,
        volume=data["Volume"],
        turnover=data["Turnover"],
        open_interest=data["OpenInterest"],
        last_price=adjust_price(data["LastPrice"]),
        limit_up=data["UpperLimitPrice"],
        limit_down=data["LowerLimitPrice"],
        open_price=adjust_price(data["OpenPrice"]),
        high_price=adjust_price(data["HighestPrice"]),
        low_price=adjust_price(data["LowestPrice"]),
        pre_close=adjust_price(data["PreClosePrice"]),
        bid_price_1=adjust_price(data["BidPrice1"]),
        ask_price_1=adjust_price(data["AskPrice1"]),
        bid_volume_1=data["BidVolume1"],
        ask_volume_1=data["AskVolume1"],
        gateway_name=self.gateway_name
    )

    if data["BidVolume2"] or data["AskVolume2"]:
        tick.bid_price_2 = adjust_price(data["BidPrice2"])
        tick.bid_price_3 = adjust_price(data["BidPrice3"])
        tick.bid_price_4 = adjust_price(data["BidPrice4"])
        tick.bid_price_5 = adjust_price(data["BidPrice5"])

        tick.ask_price_2 = adjust_price(data["AskPrice2"])
        tick.ask_price_3 = adjust_price(data["AskPrice3"])
        tick.ask_price_4 = adjust_price(data["AskPrice4"])
        tick.ask_price_5 = adjust_price(data["AskPrice5"])

        tick.bid_volume_2 = data["BidVolume2"]
        tick.bid_volume_3 = data["BidVolume3"]
        tick.bid_volume_4 = data["BidVolume4"]
        tick.bid_volume_5 = data["BidVolume5"]

        tick.ask_volume_2 = data["AskVolume2"]
        tick.ask_volume_3 = data["AskVolume3"]
        tick.ask_volume_4 = data["AskVolume4"]
        tick.ask_volume_5 = data["AskVolume5"]

    self.gateway.on_tick(tick)

def connect(self, address: str, userid: str, password: str, brokerid: str) -> None:
    """连接服务器"""
    print("发起连接行情服务器请求")
    self.userid = userid
    self.password = password
    self.brokerid = brokerid

    # 禁止重复发起连接,会导致异常崩溃
    if not self.connect_status:
        path: Path = get_folder_path(self.gateway_name.lower())
        self.createFtdcMdApi((str(path) + "\\Md").encode("GBK"))

        self.registerFront(address)
        self.init()

        self.connect_status = True

def login(self) -> None:
    """用户登录"""
    print("发起用户登录行情服务器请求")
    ctp_req: dict = {
        "UserID": self.userid,
        "Password": self.password,
        "BrokerID": self.brokerid
    }

    self.reqid += 1
    self.reqUserLogin(ctp_req, self.reqid)

def subscribe(self, req: SubscribeRequest) -> None:
    """订阅行情"""
    if self.login_status:
        self.subscribeMarketData(req.symbol)
    self.subscribed.add(req.symbol)

def close(self) -> None:
    """关闭连接"""
    print("发起关闭行情服务器连接请求")
    if self.connect_status:
        self.exit()

def update_date(self) -> None:
    """更新当前日期"""
    self.current_date = datetime.now().strftime("%Y%m%d")

class MyTdApi(TdApi):
"""
CTP的交易API
"""

def __init__(self, gateway: CtpGateway) -> None:
    """构造函数"""
    super().__init__()

    self.gateway: CtpGateway = gateway
    self.gateway_name: str = gateway.gateway_name

    self.reqid: int = 0
    self.order_ref: int = 0

    self.connect_status: bool = False
    self.login_status: bool = False
    self.auth_status: bool = False
    self.login_failed: bool = False
    self.auth_failed: bool = False
    self.contract_inited: bool = False

    self.userid: str = ""
    self.password: str = ""
    self.brokerid: str = ""
    self.auth_code: str = ""
    self.appid: str = ""

    self.frontid: int = 0
    self.sessionid: int = 0
    self.order_data: list[dict] = []
    self.trade_data: list[dict] = []
    self.positions: dict[str, PositionData] = {}
    self.sysid_orderid_map: dict[str, str] = {}

def onFrontConnected(self) -> None:
    """服务器连接成功回报"""
    self.gateway.write_log("交易服务器连接成功")
    print("交易服务器连接成功")
    if self.auth_code:
        self.authenticate()
    else:
        self.login()

def onFrontDisconnected(self, reason: int) -> None:
    """服务器连接断开回报"""
    print("交易服务器连接断开")
    global is_loginTd
    self.login_status = False
    self.gateway.write_log(f"交易服务器连接断开,原因{reason}")
    is_loginTd = False

def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool) -> None:
    """用户授权验证回报"""
    if not error['ErrorID']:
        self.auth_status = True
        self.gateway.write_log("交易服务器授权验证成功")
        self.login()
        print("交易服务器授权验证成功")
    else:
        # 如果是授权码错误,则禁止再次发起认证
        if error['ErrorID'] == 63:
            self.auth_failed = True

        self.gateway.write_error("交易服务器授权验证失败", error)
        print("交易服务器授权验证失败")

def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
    """用户登录请求回报"""
    global is_loginTd
    if not error["ErrorID"]:
        self.frontid = data["FrontID"]
        self.sessionid = data["SessionID"]
        self.login_status = True
        self.gateway.write_log("交易服务器登录成功")
        print("交易服务器登录成功")
        is_loginTd = True

        # 自动确认结算单
        ctp_req: dict = {
            "BrokerID": self.brokerid,
            "InvestorID": self.userid
        }
        self.reqid += 1
        self.reqSettlementInfoConfirm(ctp_req, self.reqid)
    else:
        self.login_failed = True

        self.gateway.write_error("交易服务器登录失败", error)
        print("交易服务器登录失败")
        is_loginTd = False

def onRspOrderInsert(self, data: dict, error: dict, reqid: int, last: bool) -> None:
    """委托下单失败回报"""
    order_ref: str = data["OrderRef"]
    orderid: str = f"{self.frontid}_{self.sessionid}_{order_ref}"

    symbol: str = data["InstrumentID"]
    contract: ContractData = symbol_contract_map[symbol]

    order: OrderData = OrderData(
        symbol=symbol,
        exchange=contract.exchange,
        orderid=orderid,
        direction=DIRECTION_CTP2VT[data["Direction"]],
        offset=OFFSET_CTP2VT.get(data["CombOffsetFlag"], Offset.NONE),
        price=data["LimitPrice"],
        volume=data["VolumeTotalOriginal"],
        status=Status.REJECTED,
        gateway_name=self.gateway_name
    )
    self.gateway.on_order(order)

    self.gateway.write_error("交易委托失败", error)

def onRspOrderAction(self, data: dict, error: dict, reqid: int, last: bool) -> None:
    """委托撤单失败回报"""
    self.gateway.write_error("交易撤单失败", error)

def onRspSettlementInfoConfirm(self, data: dict, error: dict, reqid: int, last: bool) -> None:
    """确认结算单回报"""
    global is_settlement, api_tq
    self.gateway.write_log("结算信息确认成功")
    print("结算信息确认成功\n")
    is_settlement = True

    # 由于流控,单次查询可能失败,通过while循环持续尝试,直到成功发出请求
    while True:
        self.reqid += 1
        n: int = self.reqQryInstrument({}, self.reqid)

        if not n:
            break
        else:
            sleep(1)

def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool) -> None:
    """持仓查询回报"""
    if not data:
        return

    # 必须已经收到了合约信息后才能处理
    symbol: str = data["InstrumentID"]
    contract: ContractData = symbol_contract_map.get(symbol, None)

    if contract:
        # 获取之前缓存的持仓数据缓存
        key: str = f"{data['InstrumentID'], data['PosiDirection']}"
        position: PositionData = self.positions.get(key, None)
        if not position:
            position = PositionData(
                symbol=data["InstrumentID"],
                exchange=contract.exchange,
                direction=DIRECTION_CTP2VT[data["PosiDirection"]],
                gateway_name=self.gateway_name
            )
            self.positions[key] = position

        # 对于上期所昨仓需要特殊处理
        if position.exchange in {Exchange.SHFE, Exchange.INE}:
            if data["YdPosition"] and not data["TodayPosition"]:
                position.yd_volume = data["Position"]
        # 对于其他交易所昨仓的计算
        else:
            position.yd_volume = data["Position"] - data["TodayPosition"]

        # 获取合约的乘数信息
        size: int = contract.size

        # 计算之前已有仓位的持仓总成本
        cost: float = position.price * position.volume * size

        # 累加更新持仓数量和盈亏
        position.volume += data["Position"]
        position.pnl += data["PositionProfit"]

        # 计算更新后的持仓总成本和均价
        if position.volume and size:
            cost += data["PositionCost"]
            position.price = cost / (position.volume * size)

        # 更新仓位冻结数量
        if position.direction == Direction.LONG:
            position.frozen += data["ShortFrozen"]
        else:
            position.frozen += data["LongFrozen"]

    if last:
        for position in self.positions.values():
            self.gateway.on_position(position)

        self.positions.clear()

def onRspQryTradingAccount(self, data: dict, error: dict, reqid: int, last: bool) -> None:
    """资金查询回报"""
    if "AccountID" not in data:
        return

    account: AccountData = AccountData(
        accountid=data["AccountID"],
        balance=data["Balance"],
        frozen=data["FrozenMargin"] + data["FrozenCash"] + data["FrozenCommission"],
        gateway_name=self.gateway_name
    )
    account.available = data["Available"]

    self.gateway.on_account(account)

def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
    """合约查询回报"""
    product: Product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
    if product:
        contract: ContractData = ContractData(
            symbol=data["InstrumentID"],
            exchange=EXCHANGE_CTP2VT[data["ExchangeID"]],
            name=data["InstrumentName"],
            product=product,
            size=data["VolumeMultiple"],
            pricetick=data["PriceTick"],
            gateway_name=self.gateway_name
        )

        # 期权相关
        if contract.product == Product.OPTION:
            # 移除郑商所期权产品名称带有的C/P后缀
            if contract.exchange == Exchange.CZCE:
                contract.option_portfolio = data["ProductID"][:-1]
            else:
                contract.option_portfolio = data["ProductID"]

            contract.option_underlying = data["UnderlyingInstrID"]
            contract.option_type = OPTIONTYPE_CTP2VT.get(data["OptionsType"], None)
            contract.option_strike = data["StrikePrice"]
            contract.option_index = str(data["StrikePrice"])
            contract.option_listed = datetime.strptime(data["OpenDate"], "%Y%m%d")
            contract.option_expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")

        self.gateway.on_contract(contract)

        symbol_contract_map[contract.symbol] = contract

    if last:
        self.contract_inited = True
        self.gateway.write_log("合约信息查询成功")
        print("合约信息查询成功")

        for data in self.order_data:
            self.onRtnOrder(data)
        self.order_data.clear()

        for data in self.trade_data:
            self.onRtnTrade(data)
        self.trade_data.clear()

def onRtnOrder(self, data: dict) -> None:
    """委托更新推送"""
    if not self.contract_inited:
        self.order_data.append(data)
        return

    symbol: str = data["InstrumentID"]
    contract: ContractData = symbol_contract_map[symbol]

    frontid: int = data["FrontID"]
    sessionid: int = data["SessionID"]
    order_ref: str = data["OrderRef"]
    orderid: str = f"{frontid}_{sessionid}_{order_ref}"

    status: Status = STATUS_CTP2VT.get(data["OrderStatus"], None)
    if not status:
        self.gateway.write_log(f"收到不支持的委托状态,委托号:{orderid}")
        return

    timestamp: str = f"{data['InsertDate']} {data['InsertTime']}"
    dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
    dt: datetime = dt.replace(tzinfo=CHINA_TZ)

    tp: tuple = (data["OrderPriceType"], data["TimeCondition"], data["VolumeCondition"])
    order_type: OrderType = ORDERTYPE_CTP2VT.get(tp, None)
    if not order_type:
        self.gateway.write_log(f"收到不支持的委托类型,委托号:{orderid}")
        return

    order: OrderData = OrderData(
        symbol=symbol,
        exchange=contract.exchange,
        orderid=orderid,
        type=order_type,
        direction=DIRECTION_CTP2VT[data["Direction"]],
        offset=OFFSET_CTP2VT[data["CombOffsetFlag"]],
        price=data["LimitPrice"],
        volume=data["VolumeTotalOriginal"],
        traded=data["VolumeTraded"],
        status=status,
        datetime=dt,
        gateway_name=self.gateway_name
    )
    self.gateway.on_order(order)

    self.sysid_orderid_map[data["OrderSysID"]] = orderid

def onRtnTrade(self, data: dict) -> None:
    """成交数据推送"""
    if not self.contract_inited:
        self.trade_data.append(data)
        return

    symbol: str = data["InstrumentID"]
    contract: ContractData = symbol_contract_map[symbol]

    orderid: str = self.sysid_orderid_map[data["OrderSysID"]]

    timestamp: str = f"{data['TradeDate']} {data['TradeTime']}"
    dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S")
    dt: datetime = dt.replace(tzinfo=CHINA_TZ)

    trade: TradeData = TradeData(
        symbol=symbol,
        exchange=contract.exchange,
        orderid=orderid,
        tradeid=data["TradeID"],
        direction=DIRECTION_CTP2VT[data["Direction"]],
        offset=OFFSET_CTP2VT[data["OffsetFlag"]],
        price=data["Price"],
        volume=data["Volume"],
        datetime=dt,
        gateway_name=self.gateway_name
    )
    self.gateway.on_trade(trade)

def connect(
    self,
    address: str,
    userid: str,
    password: str,
    brokerid: str,
    auth_code: str,
    appid: str
) -> None:
    """连接服务器"""
    print("发起连接交易服务器请求")
    self.userid = userid
    self.password = password
    self.brokerid = brokerid
    self.auth_code = auth_code
    self.appid = appid

    if not self.connect_status:
        path: Path = get_folder_path(self.gateway_name.lower())
        self.createFtdcTraderApi((str(path) + "\\Td").encode("GBK")) # 创建TraderApi

        self.subscribePrivateTopic(0) # 订阅私有流
        print("订阅私有流")
        self.subscribePublicTopic(0) # 订阅公共流
        print("订阅公共流")

        self.registerFront(address) # 注册前置机网络地址
        print("注册前置机网络地址")
        self.init()
        print("初始化......")

        self.connect_status = True
    else:
        self.authenticate() # 发起授权验证

def authenticate(self) -> None:
    """发起授权验证"""

    if self.auth_failed:
        return

    ctp_req: dict = {
        "UserID": self.userid,
        "BrokerID": self.brokerid,
        "AuthCode": self.auth_code,
        "AppID": self.appid
    }

    self.reqid += 1
    self.reqAuthenticate(ctp_req, self.reqid) # 发起客户端认证请求
    print("向交易服务器发起授权验证请求")

def login(self) -> None:
    """用户登录"""

    if self.login_failed:
        return

    ctp_req: dict = {
        "UserID": self.userid,
        "Password": self.password,
        "BrokerID": self.brokerid
    }

    self.reqid += 1
    self.reqUserLogin(ctp_req, self.reqid)
    print("发起用户登录交易服务器请求: ")

def send_order(self, req: OrderRequest) -> str:
    """委托下单"""
    if req.offset not in OFFSET_VT2CTP:
        self.gateway.write_log("请选择开平方向")
        return ""

    if req.type not in ORDERTYPE_VT2CTP:
        self.gateway.write_log(f"当前接口不支持该类型的委托{req.type.value}")
        return ""

    self.order_ref += 1

    tp: tuple = ORDERTYPE_VT2CTP[req.type]
    price_type, time_condition, volume_condition = tp

    ctp_req: dict = {
        "InstrumentID": req.symbol,
        "ExchangeID": req.exchange.value,
        "LimitPrice": req.price,
        "VolumeTotalOriginal": int(req.volume),
        "OrderPriceType": price_type,
        "Direction": DIRECTION_VT2CTP.get(req.direction, ""),
        "CombOffsetFlag": OFFSET_VT2CTP.get(req.offset, ""),
        "OrderRef": str(self.order_ref),
        "InvestorID": self.userid,
        "UserID": self.userid,
        "BrokerID": self.brokerid,
        "CombHedgeFlag": THOST_FTDC_HF_Speculation,
        "ContingentCondition": THOST_FTDC_CC_Immediately,
        "ForceCloseReason": THOST_FTDC_FCC_NotForceClose,
        "IsAutoSuspend": 0,
        "TimeCondition": time_condition,
        "VolumeCondition": volume_condition,
        "MinVolume": 1
    }

    self.reqid += 1
    n: int = self.reqOrderInsert(ctp_req, self.reqid)
    if n:
        self.gateway.write_log(f"委托请求发送失败,错误代码:{n}")
        return ""

    orderid: str = f"{self.frontid}_{self.sessionid}_{self.order_ref}"
    order: OrderData = req.create_order_data(orderid, self.gateway_name)
    self.gateway.on_order(order)

    return order.vt_orderid

def cancel_order(self, req: CancelRequest) -> None:
    """委托撤单"""
    frontid, sessionid, order_ref = req.orderid.split("_")

    ctp_req: dict = {
        "InstrumentID": req.symbol,
        "ExchangeID": req.exchange.value,
        "OrderRef": order_ref,
        "FrontID": int(frontid),
        "SessionID": int(sessionid),
        "ActionFlag": THOST_FTDC_AF_Delete,
        "BrokerID": self.brokerid,
        "InvestorID": self.userid
    }

    self.reqid += 1
    self.reqOrderAction(ctp_req, self.reqid)

def query_account(self) -> None:
    """查询资金"""
    self.reqid += 1
    self.reqQryTradingAccount({}, self.reqid)

def query_position(self) -> None:
    """查询持仓"""
    if not symbol_contract_map:
        return

    ctp_req: dict = {
        "BrokerID": self.brokerid,
        "InvestorID": self.userid
    }

    self.reqid += 1
    self.reqQryInvestorPosition(ctp_req, self.reqid)

def close(self) -> None:
    """关闭连接"""
    if self.connect_status:
        self.exit()
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】