VeighNa量化社区
你的开源社区量化交易平台

置顶主题

K线图表的缺点和改进建议

1. vnpy系统提供的K线图表的缺点

如果您在启动vntrader的时候勾选了【ChartWizard 实时K线图表模块】,您会简单主界面上vnpy系统提供的K线图表功能图标,进入该功能模块后就可以输入本地代码,新建K线图表了。
使用了该功能之后,你会发现它有如下缺点:

  • 这个K线图表只能提供一分钟的K线图表
  • 除了K线主图和成交量之外,你不可以增加其他的主图附加指标和副图指标

这样一个太简单的K线图表是远远满足了交易者对K线图表的需求的,有多少人使用就可想而知了。

2. 它应该提供不同周期单位和窗口大小的K线显示能力

绝大多数交易策略都是基于K线来实现的。可是很少部分是只在1分钟K线的基础上运行的,可能是n分钟,n小时,n天...,只能提供一分钟的K线图是不够用的。
所以应该提供用户如下的选择:

  • 窗口大小
  • 单位选择

3. 它应该提供其他的主图附加指标和副图指标的添加和删除功能

用户之所以想看K线图,可能是想看看自己策略的算法是否正确,这一般都是使用了一个或者多个运行在窗口K线上指标计算的值计算的入场和出场信号。
这也是可以显示的,而这种指标不可能全部是系统自带的指标显示控件能够涵盖的,所以应该有方法让用户自己增加自己的指标显示部件。
所以应该提供下面功能:

  • 更改主图指标功能
  • 增加/删除主图附加指标功能
  • 增加/删除副图指标功能


如何更有效地利用合约交易状态信息——让BarGenerator在休市和收市时及时生成K线

1. 问题的由来

实盘中,如果你用K线图表对你的CTA策略生成的K线进行显示,你会发现明明已经休市了或者收盘了,最新的日K线、30分钟、5分钟K线本应该能够有了,可是迟迟见不到这些K线的生成。
以国内期货举例:

  • 10:10~10:15的那根5分钟K线在10:15:00到10:30:00的休市时间段内是出不来的;
  • 11:25:00到11:30:00的那根5分钟K线在11:30:00到13:30:00的休市时间段内是出不来的;
  • 还有日K线,已经到15:00:00收盘了,可是因为没有收到下一秒tick而不能够及时生成出来。如果您不关机,等到下一个交易日的第一个tick收到的时候才能生成这根日K线。
  • 另外如果你是个细心的人,注意下策略在加载历史数据的时候,最后一根日K线、30分钟、5分钟K线乃至最后一根1分钟K线也是见不到的。

目前vnpy的BarGenerator没有出错,但出现这种情况有违常理!

为什么会出现情况?原因是因为你的BarGenerator的生成K线的机制导致的,因为这些大K线是由1分钟K线合成的,就是说它们依赖1分钟K线的生成。
目前1分钟K线只是单纯由tick数据推动的,当收到下一分钟tick才判断当前1分钟K线的结束。如果遇到了中间休市时间或者收盘时间,网关接口就不再有新的tick推送,这样最后1分钟K线也就一直呆在BarGenerator中,无法推动5分钟、30分钟、日等大K线的生成,这就是目前BarGenerator的问题所在。

2. 合约交易状态可以解决这个问题。

交易所用合约交易状态通知交易客户端交易合约的状态已经发生了变化。它表明交易合约当前的交易时间段,在每个交易时间的开始和结束时推送,时间为分钟的开始。这个信息正好可以用于BarGenerator结束各个休市和收盘前1分钟的K线生成,进而一举解决比1分钟大的K线的生成。

3. 实现这个问题一共分成5步

套用一句宋丹丹的话,要把大象关进冰箱总共分三步:第一步把冰箱门打开,第二步把冰箱塞进冰箱中,第三步把冰箱门关上!
让BarGenerator在休市和收市时及时生成K线分五步:

  • 第一步扩展CTA策略引擎CtaEngine
  • 第二步修改CTA策略模板CtaTemplate
  • 第三步扩展K线生成器BarGenerator
  • 第四步修改CTA策略
  • 第五步启用扩展的MyCtaEngine

3.1 第一步扩展CTA策略引擎CtaEngine

在vnpy_ctastrategy\engine.py中增加下面的内容:

class MyCtaEngine(CtaEngine):
    """ 
    CTA策略引擎,对CtaEngine的功能进行扩展。 
    功能:
    1. 订阅集合竞价tick数据,并且转发给各个已经初始化的CTA策略;
    2. 订阅交易状态消息数据,并且转发给各个已经初始化的CTA策略;
    3. 条件单的功能:包括发送、监视、更新和取消条件单的功能。
    """

    condition_filename = "condition_order.json"     # 历史条件单存储文件


    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine,event_engine)

        self.condition_orders:Dict[str,ConditionOrder] = {}         # strategy_name: ConditionOrder

        self.triggered_condition_orders:List[ConditionOrder] = []   # 已经触发点条件单,为流控设计

    def load_active_condtion_orders(self):
        """  """
        return {}

    def register_event(self):
        """"""
        super().register_event()
        self.event_engine.register(EVENT_AUCTION_TICK, self.process_auction_tick_event)
        self.event_engine.register(EVENT_STATUS, self.process_status_event)

    def process_auction_tick_event(self,event:Event):
        """ 集合竞价消息处理 hxxjava add """

        tick:TickData = event.data
        strategies:List[CtaTemplate] = self.symbol_strategy_map[tick.vt_symbol]
        if not strategies:
            return

        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_auction_tick, tick)

    def process_status_event(self,event:Event):
        """ 交易状态消息处理 hxxjava add """
        status:StatusData = event.data
        strategies:List[CtaTemplate] = []

        # step1: find strategies related to this status data 
        vt_instrument0 = get_vt_instrument(status.vt_symbol)
        if vt_instrument0 == status.vt_symbol:
            # 交易品种的交易状态
            for vt_symbol in self.symbol_strategy_map.keys():
                vt_instrument = get_vt_instrument(vt_symbol)
                if vt_instrument == vt_instrument0:
                    # 交易品种的交易状态属于策略交易的合约
                    strategies.extend(self.symbol_strategy_map[vt_symbol]) 

        else:
            # 单独合约的交易状态
            strategies.extend(self.symbol_strategy_map.get(status.vt_symbol,[]))

        if not strategies:
            return

        # step 2: push status data to all relate strategies
        for strategy in strategies:
            if strategy.inited:
                # 执行策略的集合竞价消息处理
                self.call_strategy_func(strategy, strategy.on_status, status)

    def process_tick_event(self,event:Event):
        """ 用tick的价格检查条件单 """
        super().process_tick_event(event)

        tick:TickData = event.data
        all_condition_orders = [order for order in self.condition_orders.values() \
            if order.vt_symbol == tick.vt_symbol and order.status == CondOrderStatus.WAITING]
        for order in all_condition_orders:
            # 检查条件单是否满足条件
            self.check_condition_order(order,tick)

    def check_condition_order(self,order:ConditionOrder,tick:TickData):
        """ 检查条件单是否满足条件 """       
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        price = tick.last_price

        is_be = order.condition == Condition.BE and price >= order.price
        is_le = order.condition == Condition.LE and price <= order.price
        is_bt = order.condition == Condition.BT and price > order.price
        is_lt = order.condition == Condition.LT and price < order.price

        if is_be or is_le or is_bt or is_lt:
            # 满足触发条件
            if order.execute_price == ExecutePrice.MARKET:
                # 取市场价
                price = tick.last_price
            elif order.execute_price == ExecutePrice.EXTREME:
                # 取极限价
                price = tick.limit_up if order.direction == Direction.LONG else tick.limit_down
            else:
                # 取设定价
                price = order.price

            # 执行委托
            order_ids = strategy.send_order(
                    direction = order.direction,
                    offset=order.offset,
                    price=price,
                    volume=order.volume 
                )

            if order_ids:
                order.trigger_time = tick.datetime
                order.status = CondOrderStatus.TRIGGERED
                order.vt_orderids = order_ids

                self.call_strategy_func(strategy,strategy.on_condition_order,order)
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

    def find_condition_order(self,vt_orderid:str):
        """ 根据委托单号查询所属条件单 """
        corder:ConditionOrder = None
        for order in self.condition_orders.values():
            if vt_orderid in order.vt_orderids:
                corder = order
                break

        return corder

    def process_trade_event(self, event: Event):
        """ 委托单推送处理 """
        super().process_trade_event(event)
        trade:TradeData = event.data
        vt_orderid = trade.vt_orderid

        corder = self.find_condition_order(vt_orderid)
        if corder:
            # 该成交单属于某个条件单
            strategy = self.strategies.get(corder.strategy_name,None)
            if strategy and strategy.trading:
                # 找到了该条件单属实策略实例且正在交易中

                # 累计条件单的成交量
                corder.traded += trade.volume
                # 推送该条件单给策略
                self.call_strategy_func(strategy,strategy.on_condition_order,corder)
                # 刷新条件单列表控件
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,corder))

    def send_condition_order(self,order:ConditionOrder):
        """  """
        strategy = self.strategies.get(order.strategy_name,None)
        if not strategy or not strategy.trading:
            return False

        if order.cond_orderid not in self.condition_orders:
            self.condition_orders[order.cond_orderid] = order
            self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
            return True

        return False

    def cancel_condition_order(self,cond_orderid:str):
        """  """
        order:ConditionOrder = self.condition_orders.get(cond_orderid,None)
        if not order:
            return False

        order.status = CondOrderStatus.CANCELLED
        self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))
        return True

    def cancel_all_condition_orders(self,strategy_name:str):
        """  """
        for order in self.condition_orders.values():
            if order.strategy_name == strategy_name and order.status == CondOrderStatus.WAITING:
                order.status = CondOrderStatus.CANCELLED
                self.event_engine.put(Event(EVENT_CONDITION_ORDER,order))

        return True

3.2 第二步修改CTA策略模板CtaTemplate

在vnpy_ctastrategy\template.py中增加一个on_status()虚函数,它是接收交易状态信息数据推送的接口,on_status()代码如下:


class CtaTemplate(ABC):
    """"""

    author: str = ""
    parameters: list = []
    variables: list = []

    def __init__(
        self,
        cta_engine: Any,
        strategy_name: str,
        vt_symbol: str,
        setting: dict,
    ) -> None:
        """"""
        self.cta_engine: Any = cta_engine
        self.strategy_name: str = strategy_name
        self.vt_symbol: str = vt_symbol

        self.inited: bool = False
        self.trading: bool = False
        self.pos: int = 0

        # Copy a new variables list here to avoid duplicate insert when multiple
        # strategy instances are created with the same strategy class.
        self.variables = copy(self.variables)
        self.variables.insert(0, "inited")
        self.variables.insert(1, "trading")
        self.variables.insert(2, "pos")

        self.update_setting(setting)

    def update_setting(self, setting: dict) -> None:
        """
        Update strategy parameter wtih value in setting dict.
        """
        for name in self.parameters:
            if name in setting:
                setattr(self, name, setting[name])

    @classmethod
    def get_class_parameters(cls) -> dict:
        """
        Get default parameters dict of strategy class.
        """
        class_parameters: dict = {}
        for name in cls.parameters:
            class_parameters[name] = getattr(cls, name)
        return class_parameters

    def get_parameters(self) -> dict:
        """
        Get strategy parameters dict.
        """
        strategy_parameters: dict = {}
        for name in self.parameters:
            strategy_parameters[name] = getattr(self, name)
        return strategy_parameters

    def get_variables(self) -> dict:
        """
        Get strategy variables dict.
        """
        strategy_variables: dict = {}
        for name in self.variables:
            strategy_variables[name] = getattr(self, name)
        return strategy_variables

    def get_data(self) -> dict:
        """
        Get strategy data.
        """
        strategy_data: dict = {
            "strategy_name": self.strategy_name,
            "vt_symbol": self.vt_symbol,
            "class_name": self.__class__.__name__,
            "author": self.author,
            "parameters": self.get_parameters(),
            "variables": self.get_variables(),
        }
        return strategy_data

    @virtual
    def on_init(self) -> None:
        """
        Callback when strategy is inited.
        """
        pass

    @virtual
    def on_inited(self):    # hxxjava add
        """
        Callback when strategy is inited.
        """
        pass

    @virtual
    def on_start(self):
        """
        Callback when strategy is started.
        """
        pass

    @virtual
    def on_stop(self) -> None:
        """
        Callback when strategy is stopped.
        """
        pass

    @virtual
    def on_auction_tick(self, tick: TickData):
        """
        Callback of new tick data update.   # hxxjava add for auction tick
        """
        pass

    @virtual
    def on_status(self, status: StatusData=None):
        """
        Callback of new status data update.   # hxxjava add for trading status
        """
        pass

    @virtual
    def on_tick(self, tick: TickData) -> None:
        """
        Callback of new tick data update.
        """
        pass

    @virtual
    def on_bar(self, bar: BarData) -> None:
        """
        Callback of new bar data update.
        """
        pass

    @virtual
    def on_trade(self, trade: TradeData) -> None:
        """
        Callback of new trade data update.
        """
        pass

    @virtual
    def on_order(self, order: OrderData) -> None:
        """
        Callback of new order data update.
        """
        pass

    @virtual
    def on_stop_order(self, stop_order: StopOrder) -> None:
        """
        Callback of stop order update.
        """
        pass

    def buy(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send buy order to open a long position.
        """
        return self.send_order(
            Direction.LONG,
            Offset.OPEN,
            price,
            volume,
            stop,
            lock,
            net
        )

    def sell(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send sell order to close a long position.
        """
        return self.send_order(
            Direction.SHORT,
            Offset.CLOSE,
            price,
            volume,
            stop,
            lock,
            net
        )

    def short(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send short order to open as short position.
        """
        return self.send_order(
            Direction.SHORT,
            Offset.OPEN,
            price,
            volume,
            stop,
            lock,
            net
        )

    def cover(
        self,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send cover order to close a short position.
        """
        return self.send_order(
            Direction.LONG,
            Offset.CLOSE,
            price,
            volume,
            stop,
            lock,
            net
        )

    def send_order(
        self,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False,
        net: bool = False
    ) -> list:
        """
        Send a new order.
        """
        if self.trading:
            vt_orderids: list = self.cta_engine.send_order(
                self, direction, offset, price, volume, stop, lock, net
            )
            return vt_orderids
        else:
            return []

    def cancel_order(self, vt_orderid: str) -> None:
        """
        Cancel an existing order.
        """
        if self.trading:
            self.cta_engine.cancel_order(self, vt_orderid)

    def cancel_all(self) -> None:
        """
        Cancel all orders sent by strategy.
        """
        if self.trading:
            self.cta_engine.cancel_all(self)

    def write_log(self, msg: str) -> None:
        """
        Write a log message.
        """
        self.cta_engine.write_log(msg, self)

    def get_engine_type(self) -> EngineType:
        """
        Return whether the cta_engine is backtesting or live trading.
        """
        return self.cta_engine.get_engine_type()

    def get_pricetick(self) -> float:
        """
        Return pricetick data of trading contract.
        """
        return self.cta_engine.get_pricetick(self)

    def load_bar(
        self,
        days: int,
        interval: Interval = Interval.MINUTE,
        callback: Callable = None,
        use_database: bool = False
    ) -> None:
        """
        Load historical bar data for initializing strategy.
        """
        if not callback:
            callback: Callable = self.on_bar

        bars: List[BarData] = self.cta_engine.load_bar(
            self.vt_symbol,
            days,
            interval,
            callback,
            use_database
        )

        for bar in bars:
            callback(bar)

    def load_tick(self, days: int) -> None:
        """
        Load historical tick data for initializing strategy.
        """
        ticks: List[TickData] = self.cta_engine.load_tick(self.vt_symbol, days, self.on_tick)

        for tick in ticks:
            self.on_tick(tick)

    def put_event(self) -> None:
        """
        Put an strategy data event for ui update.
        """
        if self.inited:
            self.cta_engine.put_strategy_event(self)

    def send_email(self, msg) -> None:
        """
        Send email to default receiver.
        """
        if self.inited:
            self.cta_engine.send_email(msg, self)

    def sync_data(self) -> None:
        """
        Sync strategy variables value into disk storage.
        """
        if self.trading:
            self.cta_engine.sync_strategy_data(self)

    def get_trading_hours(self):
        """
        Return trading_hours of trading hours.      # hxxjava add
        """
        return self.cta_engine.get_trading_hours(self)

    def get_actual_days(self,last_time:datetime,days:int):  # hxxjava add
        """
        得到从last_time开始往前days天的实际天数。
        """
        if days <= 0:
            return 0

        th = TradingHours(self.get_trading_hours())

        # 找到有效的最后时间
        till_time = last_time
        while not th.get_trade_hours(till_time):
            till_time = timedelta(minutes=1)

        availble_days = 0
        #找到有些多开始时间
        from_time = till_time
        while availble_days <= days:
            from_time -= timedelta(days=1)
            if th.get_trade_hours(from_time):
                availble_days += 1

        actual_days = (last_time-from_time).days
        # print(f"till_time={till_time},from_time{from_time},days={days},acutal_days={actual_days}")
        return actual_days

    def get_contract(self):
        """
        Return trading_hours of trading contract.   # hxxjava add
        """
        return self.cta_engine.get_contract(self)

    @virtual
    def on_condition_order(self, cond_order: ConditionOrder):
        """
        Callback of condition order update.
        """
        pass

    def send_condition_order(self,order:ConditionOrder):    # hxxjava add
        """ """
        if not self.trading:
            return False
        return self.cta_engine.send_condition_order(order)

    def cancel_condition_order(self,cond_orderid:str):      # hxxjava add
        """ """
        return self.cta_engine.cancel_condition_order(cond_orderid)

    def cancel_all_condition_orders(self):                  # hxxjava add
        """ """
        return self.cta_engine.cancel_all_condition_orders(self)

    def send_margin_ratio_request(self):   # hxxjava add
        """ """
        main_engine = self.cta_engine.main_engine
        contract:ContractData = self.get_contract()
        symbol,exchange = extract_vt_symbol(self.vt_symbol)
        req = MarginRequest(symbol=symbol,exchange=exchange)
        main_engine.send_margin_ratio_request(req,contract.gateway_name)

    def send_commission_request(self):   # hxxjava add
        """ """
        main_engine = self.cta_engine.main_engine
        contract:ContractData = self.get_contract()
        symbol,exchange = extract_vt_symbol(self.vt_symbol)
        req = CommissionRequest(symbol=symbol,exchange=exchange)
        main_engine.send_commission_request(req,contract.gateway_name)

3.3 第三步修改K线生成器BarGenerator

修改vnpy\trader\utility.py中的BarGenerator,为其添加下面的update_status()函数:

class BarGenerator:
    ... ...
    def update_status(self,status:StatusData=None):
        """  """
        # if status:
        #     hh,mm = status.enter_time.split(':')
        #     st_time = datetime.now().replace(hour=int(hh),minute=int(mm),second=0,microsecond=0,tzinfo=CHINA_TZ)

        if self.bar:
            # 只要接收到交易状态信息,一定是整分钟,立即推送当前分钟bar
            self.on_bar(self.bar)
            self.bar = None

3.4 第四步修改Cta策略

在您的CTA策略中增加一个on_status()函数,这里只给出一个CTA策略与本文主题相关部分的代码,其他部分用省略号表示,一般的代码如下:

class XxxStrategy(CtaTemplate):
    """ XXX交易策略 """
    self.window = 30

    ... ...

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        """"""
        super(GsjyStrategy, self).__init__(cta_engine, strategy_name, vt_symbol, setting)
        ... ...
        self.bg : BarGenerator:BarGenerator(self.on_bar,self.window,self.on_xmin_bar)
        ... ...

    def on_inited(self):
         """ 
        Callback after strategy is inited. 
        """
        self.bg.update_status()   # 用来强制生成最后的1分钟K线
        self.write_log("策略初始化结束。")

    def on_auction_tick(self, tick: TickData):
        """
        Callback of auction tick data update.
        """
        self.bg.update_auction_tick(tick)

    def on_status(self,status:StatusData=None):  # 
        """ 
        收到合约交易状态信息时,更新所有的K线生成器 。
        注意:合约交易状态信息推送只会在策略初始化后才执行!
        """
        self.bg.update_status(status=status)

    def on_tick(self, tick: TickData):
        """
        Callback of new tick data update.
        """
        self.bg.update_tick(tick)

    def on_bar(self, bar: BarData):
        """
        Callback of new bar data update.
        """
        self.bg.update_bar(bar)

    def on_xmin_bar(self, bar: BarData):
        """  """
        # 这里应该是利用大周期bar进行出入场交易信号计算和委托交易指令发出的代码
        pass

    ... ...

3.5 第五步启用扩展的MyCtaEngine

修改vnpy_ctastrategy__init__.py中的CtaStrategyApp,将其engine_class更改为扩展的MyCtaEngine:

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    # engine_class = CtaEngine
    engine_class = MyCtaEngine    # hxxjava add
    widget_name = "CtaManager"
    icon_name = str(app_path.joinpath("ui", "cta.ico"))


vnpy 的启动流程总结

作为初学者,面对 vnpy 无所不包、博大精深的丰富内容,试图用图形对 vnpy 的运行流程做一个归纳。
不到之处,还请各位多多指正

description



VeighNa发布v3.0.0 - 3.0大版本的首次发布

发布于veighna社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2022-03-23
 
本周发布了VeighNa的3.0.0版本,主要更新的内容是采用Python 3.10作为核心支持(也保持了对3.7、3.8、3.9的兼容),同时对周边插件模块进行了相应的编译升级。

由于内置Python核心的版本升级(3.7 -> 3.10)以及VeighNa Station的开发重构,本次更新无法使用之前的自动升级功能实现,需要用户卸载老版本后下载新的VeighNa Studio-3.0.0,下载链接:

https://download.vnpy.com/veighna_studio-3.0.0.exe

 

Python 3.10核心支持

 

关于把核心支持升级到Python 3.10的原因,在之前的《2022年的项目计划》中已经有详细解析,这里就不再重复了。

目前大部分VeighNa的插件模块都已经完成3.10的编译升级,少数由于依赖库问题尚未支持的模块包括:

  • 交易接口

    • vnpy_tora
    • vnpy_comstar
  • 数据库

    • vnpy_arctic
    • vnpy_dolphindb
    • vnpy_leveldb
  • 数据服务

    • vnpy_tinysoft

以上模块预计将会在相关底层依赖库发布3.10版本后尽快完成支持。

兼容性方面,为了尽可能方便老版本用户的升级,VeighNa 3.0.0版本的代码回避了Python 3.7后引入的新语法特性,从而实现3.7、3.8、3.9的向后兼容。

需要注意的是,部分交易接口在Windows上非Python 3.10的环境安装时,必须安装有Visual Studio 2017以上版本来完成相关的底层编译任务。

 

重构VeighNa Station

 

作为VeighNa框架的图形化管理工具,VeighNa Station在过去基本只是处于【能用】的状态,很难达到【好用】的评价,可能不少老用户都有过以下类似的经历:

  • 同时加载CTP和其他接口(CTPTEST、ROHON等),导致登录报错;
  • 自动更新时不时会遭遇各种异常:更新下载失败,兼容性检查冲突等;
  • 代码加密运行后无法正确生成pyd:easycython的编译限制较多。

所以趁着这次大版本的升级,对整个VeighNa Station进行了一次重构,一方面致力于解决过去经验中发现的各种问题,另一方面也尽可能利用当下Python的新技术特性,来打造一款称得上【好用】的产品。新版本的登录后主界面如下图:

description

主界面变化不大,功能按钮从底部移动了左侧,采用标签页面的方式来替代原本的弹出窗口,视觉上更加清爽一些。点击【交易】按钮,切换到VeighNa Trader配置页面:

description

左侧采用树型表格来选择接口和应用模块的加载,并将VeighNa Trader进程运行过程中的底层输出信息(cmd打印)显示在右侧的监控区域,方便必要时的异常信息排查。

在【投研】页面中,以内嵌方式来启动Jupyter Lab环境,提供更加一体化的交互式开发体验:

description

另外代码加密和自动更新功能也都做了对应的改进优化,感兴趣的用户可以先尝试看看,交互界面上基本还是采用了和之前类似的流程,具体使用文档将会后续推出。

 

CHANGELOG

 

修复

  1. 使用非原生窗口菜单栏,修复Linux/Mac下【配置】按钮不显示的问题

调整

  1. 移除api、gateway、app子模块的目录

  2. 移除requirements.txt对于插件的默认依赖

  3. 简化重构rpc子模块,定位于可靠环境下跨进程通讯(本机、局域网)

  4. 移除rpc子模块对于鉴权的支持

  5. 调整rpc子模块中的心跳机制的实现方式

  6. 移除基于QScintilla开发的代码编辑器,改用VSCode打开代码

  7. 优化MainWindow主窗口中,对于QAction按钮图标的加载逻辑

  8. MainEngine添加交易接口时,支持自定义接口名称

 



我分享一个我自己的按天或周来计算的K线合成器,我也是继承csdn大神的,解决了报错后分享出来给大家采纳或者改良自己的合成器。代码只是添加了2个da_bar和week_bar函数,

"""
General utility functions.
"""

import json
import logging
import sys
import datetime
from pathlib import Path
from typing import Callable, Dict, Tuple, Union, Optional
from decimal import Decimal
from math import floor, ceil

import numpy as np
import talib

from .object import BarData, TickData
from .constant import Exchange, Interval

if sys.version_info >= (3, 9):
from zoneinfo import ZoneInfo, available_timezones # noqa
else:
from backports.zoneinfo import ZoneInfo, available_timezones # noqa

log_formatter: logging.Formatter = logging.Formatter('[%(asctime)s] %(message)s')

def extract_vt_symbol(vt_symbol: str) -> Tuple[str, Exchange]:
"""
:return: (symbol, exchange)
"""
symbol, exchange_str = vt_symbol.split(".")
return symbol, Exchange(exchange_str)

def generate_vt_symbol(symbol: str, exchange: Exchange) -> str:
"""
return vt_symbol
"""
return f"{symbol}.{exchange.value}"

def _get_trader_dir(temp_name: str) -> Tuple[Path, Path]:
"""
Get path where trader is running in.
"""
cwd: Path = Path.cwd()
temp_path: Path = cwd.joinpath(temp_name)

# If .vntrader folder exists in current working directory,
# then use it as trader running path.
if temp_path.exists():
    return cwd, temp_path

# Otherwise use home path of system.
home_path: Path = Path.home()
temp_path: Path = home_path.joinpath(temp_name)

# Create .vntrader folder under home path if not exist.
if not temp_path.exists():
    temp_path.mkdir()

return home_path, temp_path


TRADER_DIR, TEMP_DIR = _get_trader_dir(".vntrader")
sys.path.append(str(TRADER_DIR))

def get_file_path(filename: str) -> Path:
"""
Get path for temp file with filename.
"""
return TEMP_DIR.joinpath(filename)

def get_folder_path(folder_name: str) -> Path:
"""
Get path for temp folder with folder name.
"""
folder_path: Path = TEMP_DIR.joinpath(folder_name)
if not folder_path.exists():
folder_path.mkdir()
return folder_path

def get_icon_path(filepath: str, ico_name: str) -> str:
"""
Get path for icon file with ico name.
"""
ui_path: Path = Path(filepath).parent
icon_path: Path = ui_path.joinpath("ico", ico_name)
return str(icon_path)

def load_json(filename: str) -> dict:
"""
Load data from json file in temp path.
"""
filepath: Path = get_file_path(filename)

if filepath.exists():
    with open(filepath, mode="r", encoding="UTF-8") as f:
        data: dict = json.load(f)
    return data
else:
    save_json(filename, {})
    return {}


def save_json(filename: str, data: dict) -> None:
"""
Save data into json file in temp path.
"""
filepath: Path = get_file_path(filename)
with open(filepath, mode="w+", encoding="UTF-8") as f:
json.dump(
data,
f,
indent=4,
ensure_ascii=False
)

def round_to(value: float, target: float) -> float:
"""
Round price to price tick value.
"""
value: Decimal = Decimal(str(value))
target: Decimal = Decimal(str(target))
rounded: float = float(int(round(value / target)) * target)
return rounded

def floor_to(value: float, target: float) -> float:
"""
Similar to math.floor function, but to target float number.
"""
value: Decimal = Decimal(str(value))
target: Decimal = Decimal(str(target))
result: float = float(int(floor(value / target)) * target)
return result

def ceil_to(value: float, target: float) -> float:
"""
Similar to math.ceil function, but to target float number.
"""
value: Decimal = Decimal(str(value))
target: Decimal = Decimal(str(target))
result: float = float(int(ceil(value / target)) * target)
return result

def get_digits(value: float) -> int:
"""
Get number of digits after decimal point.
"""
value_str: str = str(value)

if "e-" in value_str:
    _, buf = value_str.split("e-")
    return int(buf)
elif "." in value_str:
    _, buf = value_str.split(".")
    return len(buf)
else:
    return 0


class BarGenerator:
"""
For:

1. generating 1 minute bar data from tick data
2. generating x minute bar/x hour bar data from 1 minute data
Notice:
1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
2. for x hour bar, x can be any number
"""

def __init__(
    self,
    on_bar: Callable,
    window: int = 0,
    on_window_bar: Callable = None,
    interval: Interval = Interval.MINUTE

) -> None:
    """Constructor"""
    self.bar: BarData = None
    self.on_bar: Callable = on_bar

    self.interval: Interval = interval
    self.interval_count: int = 0

    self.hour_bar: BarData = None
    self.day_bar: BarData = None
    self.week_bar: BarData = None

    self.window: int = window
    self.window_bar: BarData = None
    self.on_window_bar: Callable = on_window_bar

    self.last_tick: TickData = None

def update_tick(self, tick: TickData) -> None:
    """
    Update new tick data into generator.
    """
    new_minute: bool = False

    # Filter tick data with 0 last price
    if not tick.last_price:
        return

    # Filter tick data with older timestamp
    if self.last_tick and tick.datetime < self.last_tick.datetime:
        return

    if not self.bar:
        new_minute = True
    elif (
        (self.bar.datetime.minute != tick.datetime.minute)
        or (self.bar.datetime.hour != tick.datetime.hour)
    ):
        self.bar.datetime = self.bar.datetime.replace(
            second=0, microsecond=0
        )
        self.on_bar(self.bar)

        new_minute = True

    if new_minute:
        self.bar = BarData(
            symbol=tick.symbol,
            exchange=tick.exchange,
            interval=Interval.MINUTE,
            datetime=tick.datetime,
            gateway_name=tick.gateway_name,
            open_price=tick.last_price,
            high_price=tick.last_price,
            low_price=tick.last_price,
            close_price=tick.last_price,
            open_interest=tick.open_interest
        )
    else:
        self.bar.high_price = max(self.bar.high_price, tick.last_price)
        if tick.high_price > self.last_tick.high_price:
            self.bar.high_price = max(self.bar.high_price, tick.high_price)

        self.bar.low_price = min(self.bar.low_price, tick.last_price)
        if tick.low_price < self.last_tick.low_price:
            self.bar.low_price = min(self.bar.low_price, tick.low_price)

        self.bar.close_price = tick.last_price
        self.bar.open_interest = tick.open_interest
        self.bar.datetime = tick.datetime

    if self.last_tick:
        volume_change: float = tick.volume - self.last_tick.volume
        self.bar.volume += max(volume_change, 0)

        turnover_change: float = tick.turnover - self.last_tick.turnover
        self.bar.turnover += max(turnover_change, 0)

    self.last_tick = tick

def update_bar(self, bar: BarData) -> None:
    """
    Update 1 minute bar into generator
    """
    # if self.interval == Interval.MINUTE:
    #     self.update_bar_minute_window(bar)
    # else:
    #     self.update_bar_hour_window(bar)
    if self.interval == Interval.MINUTE:
        self.update_bar_minute_window(bar)
    elif self.interval == Interval.HOUR:
        self.update_bar_hour_window(bar)
    elif self.interval == Interval.DAILY: 
        self.update_bar_day_window(bar) #处理日线
    else:
        self.update_bar_week_window(bar) #处理周线


def update_bar_minute_window(self, bar: BarData) -> None:
    """"""
    # If not inited, create window bar object
    if not self.window_bar:
        dt: datetime = bar.datetime.replace(second=0, microsecond=0)
        self.window_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price
        )
    # Otherwise, update high/low price into window bar
    else:
        self.window_bar.high_price = max(
            self.window_bar.high_price,
            bar.high_price
        )
        self.window_bar.low_price = min(
            self.window_bar.low_price,
            bar.low_price
        )

    # Update close price/volume/turnover into window bar
    self.window_bar.close_price = bar.close_price
    self.window_bar.volume += bar.volume
    self.window_bar.turnover += bar.turnover
    self.window_bar.open_interest = bar.open_interest

    # Check if window bar completed
    if not (bar.datetime.minute + 1) % self.window:
        self.on_window_bar(self.window_bar)
        self.window_bar = None

def update_bar_hour_window(self, bar: BarData) -> None:
    """"""
    # If not inited, create window bar object
    if not self.hour_bar:
        dt: datetime = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.hour_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            close_price=bar.close_price,
            volume=bar.volume,
            turnover=bar.turnover,
            open_interest=bar.open_interest
        )
        return

    finished_bar: BarData = None

    # If minute is 59, update minute bar into window bar and push
    if bar.datetime.minute == 59:
        self.hour_bar.high_price = max(
            self.hour_bar.high_price,
            bar.high_price
        )
        self.hour_bar.low_price = min(
            self.hour_bar.low_price,
            bar.low_price
        )

        self.hour_bar.close_price = bar.close_price
        self.hour_bar.volume += bar.volume
        self.hour_bar.turnover += bar.turnover
        self.hour_bar.open_interest = bar.open_interest

        finished_bar = self.hour_bar
        self.hour_bar = None

    # If minute bar of new hour, then push existing window bar
    elif bar.datetime.hour != self.hour_bar.datetime.hour:
        finished_bar = self.hour_bar

        dt: datetime = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.hour_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            close_price=bar.close_price,
            volume=bar.volume,
            turnover=bar.turnover,
            open_interest=bar.open_interest
        )
    # Otherwise only update minute bar
    else:
        self.hour_bar.high_price = max(
            self.hour_bar.high_price,
            bar.high_price
        )
        self.hour_bar.low_price = min(
            self.hour_bar.low_price,
            bar.low_price
        )

        self.hour_bar.close_price = bar.close_price
        self.hour_bar.volume += bar.volume
        self.hour_bar.turnover += bar.turnover
        self.hour_bar.open_interest = bar.open_interest

    # Push finished window bar
    if finished_bar:
        self.on_hour_bar(finished_bar)

添加部分

def update_bar_day_window(self, bar: BarData) -> None:
    """"""
    # 没有日线bar就生成一个

    if not self.day_bar:
        dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.day_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            volume=bar.volume
        )
        return

    finished_bar = None
    temp_datetime = (bar.datetime + datetime.timedelta(hours=4))
    # 14:59 更新bar,生成新的日线bar
    if bar.datetime.minute == 59 and bar.datetime.hour == 14:
        self.day_bar.high_price = max(
            self.day_bar.high_price,
            bar.high_price
        )
        self.day_bar.low_price = min(
            self.day_bar.low_price,
            bar.low_price
        )

        self.day_bar.close_price = bar.close_price
        self.day_bar.volume += int(bar.volume)
        self.day_bar.open_interest = bar.open_interest

        finished_bar = self.day_bar #保存日线bar
        self.day_bar = None #因为日线bar已经保存给finished_bar了所以将日线bar设为空,下次新数据来了就会生成新的日线bar

    # 夜盘算新的一天的开始,
    # 现存的bar加上5小时如果是周六的话就那代表是周五的夜盘数据,而它对应的白天数据是下周一的,隔了2天加5个小时还是不够的,
    # 所以特判一下如果现存的self.day_bar是周五的话不要用5小时判断,剩下不用管他,因为下周一的夜盘进来的话会被+5小时的条件判断掉,进而将周五夜盘和周一白天的数据推送出去

    elif temp_datetime.day != (self.day_bar.datetime+  datetime.timedelta(hours=5)).day and  (self.day_bar.datetime+  datetime.timedelta(hours=5)).weekday() != 5:

        finished_bar = self.week_bar

        dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.day_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            close_price=bar.close_price,
            volume=bar.volume
        )
    # 更新 现存的day_bar
    else:
        self.day_bar.high_price = max(
            self.day_bar.high_price,
            bar.high_price
        )
        self.day_bar.low_price = min(
            self.day_bar.low_price,
            bar.low_price
        )

        self.day_bar.close_price = bar.close_price
        self.day_bar.volume += int(bar.volume)
        self.day_bar.open_interest = bar.open_interest

    # 推送日线给on_hour_bar处理
    if finished_bar:
        self.on_hour_bar(finished_bar)

    # Cache last bar object
    self.last_bar = bar

添加部分

def update_bar_week_window(self, bar: BarData) -> None:
    """"""
    # If not inited, create window bar object

    if not self.week_bar:
        dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.week_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            volume=bar.volume
        )
        return

    finished_bar = None

    # If time is Firday 14:59, update day bar into window bar and push
    if bar.datetime.minute == 59 and bar.datetime.hour == 14 and bar.datetime.weekday() == 4:
        self.week_bar.high_price = max(
            self.week_bar.high_price,
            bar.high_price
        )
        self.week_bar.low_price = min(
            self.week_bar.low_price,
            bar.low_price
        )

        self.week_bar.close_price = bar.close_price
        self.week_bar.volume += int(bar.volume)
        self.week_bar.open_interest = bar.open_interest

        finished_bar = self.week_bar
        self.week_bar = None

    # isocalendar() 返回多少年的第几周的第几天 格式如(2018, 27, 5)
    # 周数不相同肯定是新的一周,可以推送出一根完整周k线了

    elif  (bar.datetime + datetime.timedelta(days=2,hours=5)).isocalendar()[1] != (self.week_bar.datetime + datetime.timedelta(days=2,hours=5)).isocalendar()[1]:
        # print(bar.datetime.isocalendar())
        finished_bar = self.week_bar

        dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.week_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            close_price=bar.close_price,
            volume=bar.volume
        )
    # Otherwise only update minute bar
    else:
        self.week_bar.high_price = max(
            self.week_bar.high_price,
            bar.high_price
        )
        self.week_bar.low_price = min(
            self.week_bar.low_price,
            bar.low_price
        )
        self.week_bar.close_price = bar.close_price
        self.week_bar.volume += int(bar.volume)
        self.week_bar.open_interest = bar.open_interest

    # Push finished window bar
    if finished_bar:
        self.on_hour_bar(finished_bar) #on_window_bar只关心bar的数量,不关心bar的类型,所以可以直接调用

    # Cache last bar object
    self.last_bar = bar
 #添加部分

def on_hour_bar(self, bar: BarData) -> None:
    """"""
    if self.window == 1:
        self.on_window_bar(bar)
    else:
        if not self.window_bar:
            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price,
                bar.high_price
            )
            self.window_bar.low_price = min(
                self.window_bar.low_price,
                bar.low_price
            )

        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += bar.volume
        self.window_bar.turnover += bar.turnover
        self.window_bar.open_interest = bar.open_interest

        self.interval_count += 1
        if not self.interval_count % self.window:
            self.interval_count = 0
            self.on_window_bar(self.window_bar)
            self.window_bar = None

def generate(self) -> Optional[BarData]:
    """
    Generate the bar data and call callback immediately.
    """
    bar: BarData = self.bar

    if self.bar:
        bar.datetime = bar.datetime.replace(second=0, microsecond=0)
        self.on_bar(bar)

    self.bar = None
    return bar


simnow备胎openctp的使用说明

openctp已经开放运营一年多了,帮助了很多CTP用户调试程序、验证策略、学习交易等,有simnow这样的官方平台,谁弄个第三方的干嘛?原因大家也都知道,正如现在所碰到的情况,simnow又停服一个月,已经是今年第二次超长时间停服了,谁知道还会不会有第三、第四次。。

也是机缘巧合,手上积累了相关的技术,这个积累可不是拿来主义啊,咱可是一个字母一个字母敲出来的,妥妥的原创技术,只是说是设计思想跟CTP接近,也仅此而已了。通过接口封装成CTPAPI形式提供了与CTP接口兼容的接入方式,CTP程序只要更改一下CTP的交易dll(thosttraderapi_se.dll)和行情dll(thostmduserapi_se.dll)即可对接到openctp的交易前置和行情前置,当然,这两个dll或so也可以只替换其中一个,比如你想连到openctp的交易前置那就只需要替换交易dll即可,openctp的两套仿真环境也只需要你替换一下交易dll就行了,行情可以直接连接CTP实盘行情前置,因为openctp的行情也是转自CTP实盘前置,又何必接这个二道贩子的数据呢。

openctp的VIP环境直接就没有提供行情前置,你必须去直连实盘行情,但是这可能操作上有点麻烦,因为vn.py默认的TTS通道使用的是openctp的行情dll,你只改个CTP实盘前置地址是不够的,还要把行情dll替换成ctp官方版本,具体位置看你安装路径了,大概是这样的位置:C:\veighna_studio\Lib\site-packages\vnpy_tts\api
description
需要注意的是vn.py使用的是6.5.1的win64版本dll,请不需要弄错版本号。

CTP实盘行情的地址有很多,其实CTP行情前置是不校验用户名、密码的,所以你可以连接任意一家期货公司的行情前置,我随便挑了几个实盘的地址:
tcp://180.169.112.54:42213
tcp://140.207.168.9:42213
tcp://180.168.212.75:41313
tcp://27.115.78.155:41313
tcp://180.168.102.233:41168
tcp://112.64.143.220:41168

另外也还是陆续有vn.py的朋友问4097的错误问题,这里再提一下这个问题的解决方法,因为TTS的dll与CTP的dll同名,所以不能同时勾选这两个通道,只能勾选其中一个:
description

TTS通道的更多信息请到openctp官方页面了解:https://github.com/krenx1983/openctp

或者关注openctp的公众号,TTS的模拟账号也会在你关注的时候自动为你创建,一个微信号可以获得免费的3个7x24和3个仿真模拟账号,需要更全品种、全好体验的可以购买相应的VIP环境模拟账号。
description

最后非常感谢vn.py晓优大佬的支持,希望能够跟vn.py一起为投资者提供更好的服务。



查询仓位,持仓均价,未成交委托单一个函数搞定

1.首先完善converter.py

class PositionHolding:
    """"""

    def __init__(self, contract: ContractData = None):
        """"""
        if contract:
            self.vt_symbol = contract.vt_symbol
            self.exchange = contract.exchange

        self.active_orders = {}
        self.order_id = ""
        self.long_pos = 0
        self.long_pnl = 0
        self.long_price = 0
        self.long_yd = 0
        self.long_td = 0
        self.short_pos = 0
        self.short_pnl = 0
        self.short_price = 0        
        self.short_yd = 0
        self.short_td = 0
        self.long_pos_frozen = 0
        self.long_yd_frozen = 0
        self.long_td_frozen = 0

        self.short_pos_frozen = 0
        self.short_yd_frozen = 0
        self.short_td_frozen = 0

    def update_position(self, position: PositionData):
        """"""
        if position.direction == Direction.LONG:
            self.long_pos = position.volume
            self.long_pnl = position.pnl
            self.long_yd = position.yd_volume
            self.long_td = self.long_pos - self.long_yd
            self.long_price = position.price
            self.long_pos_frozen = position.frozen
        else:
            self.short_pos = position.volume
            self.short_pnl = position.pnl            
            self.short_yd = position.yd_volume
            self.short_td = self.short_pos - self.short_yd
            self.short_price = position.price
            self.short_pos_frozen = position.frozen
    def update_order(self, order: OrderData):
        """"""
        #active_orders只记录未成交和部分成交委托单
        if order.status in [Status.NOTTRADED, Status.PARTTRADED]:
            self.active_orders[order.vt_orderid] = order
        else:
            if order.vt_orderid in self.active_orders:
                self.active_orders.pop(order.vt_orderid)

        self.calculate_frozen()

    def update_order_request(self, req: OrderRequest, vt_orderid: str):
        """"""
        #分离gateway_name和orderid
        gateway_name,*split_orderid = vt_orderid.split("_")
        if len(split_orderid) == 1:
            self.order_id = split_orderid[0]
        elif len(split_orderid) == 2:
            self.order_id = "_".join([split_orderid[0],split_orderid[1]])
        elif len(split_orderid) == 3:
            self.order_id = "_".join([split_orderid[0],split_orderid[1],split_orderid[2]])
        elif len(split_orderid) == 4:
            self.order_id = "_".join([split_orderid[0],split_orderid[1],split_orderid[2],split_orderid[3]])
        if self.order_id:
            order = req.create_order_data(self.order_id, gateway_name)
            self.update_order(order)

    def update_trade(self, trade: TradeData):
        """"""
        if trade.direction == Direction.LONG:
            if trade.offset == Offset.OPEN:
                self.long_td += trade.volume
            elif trade.offset == Offset.CLOSETODAY:
                self.short_td -= trade.volume
            elif trade.offset == Offset.CLOSEYESTERDAY:
                self.short_yd -= trade.volume
            elif trade.offset == Offset.CLOSE:
                if trade.exchange in [Exchange.SHFE, Exchange.INE]:
                    self.short_yd -= trade.volume
                else:
                    self.short_td -= trade.volume

                    if self.short_td < 0:
                        self.short_yd += self.short_td
                        self.short_td = 0
        else:
            if trade.offset == Offset.OPEN:
                self.short_td += trade.volume
            elif trade.offset == Offset.CLOSETODAY:
                self.long_td -= trade.volume
            elif trade.offset == Offset.CLOSEYESTERDAY:
                self.long_yd -= trade.volume
            elif trade.offset == Offset.CLOSE:
                if trade.exchange in [Exchange.SHFE, Exchange.INE]:
                    self.long_yd -= trade.volume
                else:
                    self.long_td -= trade.volume

                    if self.long_td < 0:
                        self.long_yd += self.long_td
                        self.long_td = 0

        self.long_pos = self.long_td + self.long_yd
        self.short_pos = self.short_td + self.short_yd

    def calculate_frozen(self):
        """"""
        self.long_pos_frozen = 0
        self.long_yd_frozen = 0
        self.long_td_frozen = 0

        self.short_pos_frozen = 0
        self.short_yd_frozen = 0
        self.short_td_frozen = 0

        for order in self.active_orders.values():
            # Ignore position open orders
            if order.offset == Offset.OPEN:
                continue

            frozen = order.volume - order.traded

            if order.direction == Direction.LONG:
                if order.offset == Offset.CLOSETODAY:
                    self.short_td_frozen += frozen
                elif order.offset == Offset.CLOSEYESTERDAY:
                    self.short_yd_frozen += frozen
                elif order.offset == Offset.CLOSE:
                    self.short_td_frozen += frozen

                    if self.short_td_frozen > self.short_td:
                        self.short_yd_frozen += (
                            self.short_td_frozen - self.short_td)
                        self.short_td_frozen = self.short_td
            elif order.direction == Direction.SHORT:
                if order.offset == Offset.CLOSETODAY:
                    self.long_td_frozen += frozen
                elif order.offset == Offset.CLOSEYESTERDAY:
                    self.long_yd_frozen += frozen
                elif order.offset == Offset.CLOSE:
                    self.long_td_frozen += frozen

                    if self.long_td_frozen > self.long_td:
                        self.long_yd_frozen += (
                            self.long_td_frozen - self.long_td)
                        self.long_td_frozen = self.long_td

            self.long_pos_frozen = self.long_td_frozen + self.long_yd_frozen
            self.short_pos_frozen = self.short_td_frozen + self.short_yd_frozen

    def convert_order_request_shfe(self, req: OrderRequest):
        """"""
        if req.offset == Offset.OPEN:
            return [req]

        if req.direction == Direction.LONG:
            pos_available = self.short_pos - self.short_pos_frozen
            td_available = self.short_td - self.short_td_frozen
        else:
            pos_available = self.long_pos - self.long_pos_frozen
            td_available = self.long_td - self.long_td_frozen

        if req.volume > pos_available:
            return []
        elif req.volume <= td_available:
            req_td = copy(req)
            req_td.offset = Offset.CLOSETODAY
            return [req_td]
        else:
            req_list = []

            if td_available > 0:
                req_td = copy(req)
                req_td.offset = Offset.CLOSETODAY
                req_td.volume = td_available
                req_list.append(req_td)

            req_yd = copy(req)
            req_yd.offset = Offset.CLOSEYESTERDAY
            req_yd.volume = req.volume - td_available
            req_list.append(req_yd)

            return req_list

    def convert_order_request_lock(self, req: OrderRequest):
        """"""
        if req.direction == Direction.LONG:
            td_volume = self.short_td
            yd_available = self.short_yd - self.short_yd_frozen
        else:
            td_volume = self.long_td
            yd_available = self.long_yd - self.long_yd_frozen

        # If there is td_volume, we can only lock position
        if td_volume:
            req_open = copy(req)
            req_open.offset = Offset.OPEN
            return [req_open]
        # If no td_volume, we close opposite yd position first
        # then open new position
        else:
            open_volume = max(0, req.volume - yd_available)
            req_list = []

            if yd_available:
                req_yd = copy(req)
                if self.exchange in [Exchange.SHFE, Exchange.INE]:
                    req_yd.offset = Offset.CLOSEYESTERDAY
                else:
                    req_yd.offset = Offset.CLOSE
                req_list.append(req_yd)

            if open_volume:
                req_open = copy(req)
                req_open.offset = Offset.OPEN
                req_open.volume = open_volume
                req_list.append(req_open)

            return req_list


VeighNa发布v3.4.0 - 杰宜斯资管系统支持

原文作者:用Python的交易员 | 发布时间:2022-10-24

 

本周发布了VeighNa的3.4.0版本,本次更新的主要内容是增加了杰宜斯资管系统的交易接口模块vnpy_jees。

对于已经安装了VeighNa Studio的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载VeighNa Studio-3.4.0,体验一键安装的量化交易Python发行版,下载链接:

https://download.vnpy.com/veighna_studio-3.4.0.exe

 

杰宜斯资管系统支持

 

JEES(杰宜斯)是由武汉杰宜斯科技信息有限公司推出,针对期货FOF和MOM投资交易管理的资管系统。本次3.4.0版本更新中,基于JEES的6.6.1版本API封装开发了vnpy_jees模块。

尽管JEES系统提供了CTP兼容风格的交易API,但由于部分业务功能细节实现上的区别,直接采用替换vnpy_ctp模块中dll文件的方式,使用时可能会出现某些问题。因此vnpy_jees模块中,对以下功能细节做了专门调整。

 

日内委托和成交数据获取

JEES对于当日历史私有流数据(委托和成交),在日内重新登录后没有提供自动重传功能,只会推送登录后新产生的数据(类似TERT_QUICK模式)。

所以vnpy_jees模块在完成连接登录后,会主动发起当日历史委托和成交数据的查询请求(reqQryOrder和reqQryTrade),并在对应的查询回调函数中对返回的数据进行处理。

 

行情服务器接入

作为资管系统的JEES只提供了交易API,需要接入其他渠道的行情服务器(如CTP、飞马等),才能实现行情数据的订阅获取。

vnpy_jees接口中,默认使用vnpy_ctp下的行情API组件(CtpMdApi类)来接入CTP行情服务器。目前vnpy_ctp底层的API版本为较新的6.6.7,如果在使用过程中发现和期货公司的CTP柜台版本不一致,可以根据需求降级使用老版本,例如降级到6.5.1版本:

pip install vnpy_ctp==6.5.1.12

 

CHANGELOG

 

新增

  1. 新增杰宜斯资管系统交易接口vnpy_jees

调整

  1. 开启vnpy.rpc的pyzmq连接keepalive机制,避免在复杂网络环境下闲置连接的断开
  2. 移除vnpy_rpcservice中服务端的EVENT_TIMER定时事件推送
  3. 调整vnpy_postgresql采用批量方式写入数据,提高效率
  4. 添加VeighNa Trader中的子线程异常捕捉(需要Python>=3.8)
  5. 调整vnpy_ib接口查询历史K线数据时,对外汇和贵金属均采用中间价(而非成交价)
  6. 增加vnpy_ctastrategy对于回测过程中资金爆仓(小于等于0)情况的检查
  7. 优化vnpy_webtrader模块的加密鉴权,支持web进程关闭重启

修复

  1. 修复vnpy.rpc模块对于23.0以上版本pyzmq的NOBLOCK兼容性问题
  2. 修复vnpy_taos由于TDengine版本升级,出现d的一系列兼容问题
  3. 修复vnpy_datamanager刷新数据汇总信息显示时,老数据点移除失败的问题
     


hdf5持久化ContractData

shelve持久化有时会出错文件还大,打开也慢,今天刚上hdf5,分享下
首先pip install h5py
我把函数封装在utility.py

import os
import platform
import zlib
import pickle
import platform
import numpy as np
import h5py

if platform.uname().system == "Windows":
    LINK_SIGN = "\\"
elif platform.uname().system == "Linux":
    LINK_SIGN = "/"
#------------------------------------------------------------------------------------
def save_h5(filename:str,data:Any,overwrite:bool=False):
    """
    1.保存hdf5数据
    2.overwrite为True覆盖源文件,为False增量更新文件
    """
    contract_file_path = get_folder_path(filename)
    filepath =f"{contract_file_path}{LINK_SIGN}{filename}.h5"
    if overwrite:
        raw_data = data
    else:
        #增量更新数据
        raw_data = load_h5(filename)
        if isinstance(raw_data,dict):
            raw_data.update(data)
        elif isinstance(raw_data,list):
            for value in data:
                if value not in raw_data:
                    raw_data.append(value)        
    #循环写入h5数据直到写入成功或重试3次后退出循环
    count = 0
    while True:
        count += 1
        status = save_h5_status(filepath,raw_data)
        if status or count > 3:
            break
#------------------------------------------------------------------------------------
def save_h5_status(filepath:str,raw_data:Any):
    """
    获取H5保存数据状态
    """
    try:
        with h5py.File(filepath,"w") as file:
            data = zlib.compress(pickle.dumps(raw_data), 5)
            file["data"] =np.void(data)
        return True
    except:
        return False
#------------------------------------------------------------------------------------
def load_h5(filename:str):
    """
    读取hdf5数据
    """
    contract_file_path = get_folder_path(filename)
    filepath =f"{contract_file_path}{LINK_SIGN}{filename}.h5"
    if not os.path.exists(filepath):
        return {}
    count = 0
    while True:
        count += 1
        status,data = load_h5_status(filepath)
        if status or count > 3:
            return data
#------------------------------------------------------------------------------------        
def load_h5_status(filepath:str):
    """
    获取H5读取状态及数据
    """
    try:
        with  h5py.File(filepath,"r") as file:
            data = file["data"][()]
            data = pickle.loads(zlib.decompress(data))
            return True,data
    except:
        return False,{}


如何给MACD指标的显示控件MacdItem传入不同的参数?

在ChartWidget基础上创建的指标是不能改变主图或副图的参数的,如何做才能给让指标传递不同参数?
以MacdItem为例,步骤如下:

1. MacdItem是一个有参数的显示控件

MacdItem的实现见:为K线图表添砖加瓦——解决MACD绘图部件显示范围BUG,这里就不再贴代码。

2. 修改ChartWidget的add_item()方法,使得它可以接受指标参数

    def add_item(
        self,
        item_class: Type[ChartItem],
        item_name: str,
        plot_name: str,     
        **kwargs     # hxxjava add
    ) -> None:
        """
        Add chart item.
        """
        # item: ChartItem = item_class(self._manager) 
        item: ChartItem = item_class(self._manager) if not kwargs else item_class(self._manager,**kwargs)   # hxxjava change

        self._items[item_name] = item

        plot: pg.PlotItem = self._plots.get(plot_name)
        plot.addItem(item)

        self._item_plot_map[item] = plot

3. MyChartWidget是一个包含ChartWidget的窗口

class MyChartWidget(QtWidgets.QFrame):
    """
    订单流K线图表控件,
    """
    def __init__(self,title:str="K线图表"):
        """"""
        super().__init__()
        self.init_ui()
        self.setWindowTitle(title)

    def init_ui(self):
        """"""
        self.resize(1400, 800)

        # Create chart widget
        self.chart = ChartWidget()
        self.chart.add_plot("candle", hide_x_axis=True)
        self.chart.add_plot("fast_macd", maximum_height=100,hide_x_axis=True)
        self.chart.add_plot("slow_macd", maximum_height=100,hide_x_axis=True)
        self.chart.add_plot("volume", maximum_height=100,hide_x_axis=False)

        self.chart.add_item(CandleItem, "candle", "candle")  
        self.chart.add_item(MacdItem, "fast_macd", "fast_macd",short_window=6,long_window=19,M=9)   # 相当于MACD(6,19,9)
        self.chart.add_item(MacdItem, "slow_macd", "slow_macd",short_window=19,long_window=39,M=9)  # 相当于MACD(19,39,9)
        self.chart.add_item(VolumeItem, "volume", "volume")

        self.chart.add_cursor()

统计

主题
7778
帖子
30552
已注册用户
37555
最新用户
在线用户
194
在线来宾用户
3817
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】