VeighNa量化社区
你的开源社区量化交易平台

置顶主题

tick合成秒级别数据

第一次发帖,因为要用tick合成的秒级别数据,折腾了几天,终于完成,功能如下:
通过tick合成1秒数据,通过1秒合成能整除60秒或者被60秒整除的bar组合,欢迎大家测试看是否还有bug
替换原有BarGenerator,不影响原有功能

class BarGenerator:
    """
    For:
    1. generating 1 minute bar data from tick data
    2. generating x minute bar/x hour bar data from 1 minute data

    Notice:
    1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
    2. for x hour bar, x can be any number
    """

    def __init__(
        self,
        on_bar: Callable,
        window: int = 0,
        on_window_bar: Callable = None,
        interval: Interval = Interval.MINUTE,
        bar_size: list = ['15', '60', '300'],
        on_build_bars: Callable = None,
    ):
        """Constructor"""
        self.bar_s: BarData = None
        self.bar: BarData = None
        self.on_bar: Callable = on_bar

        self.interval: Interval = interval
        self.interval_count: int = 0

        self.hour_bar: BarData = None

        self.window: int = window
        self.window_bar: BarData = None
        self.on_window_bar: Callable = on_window_bar

        # 设置所需要的bar周期,以秒为单位
        self.bar_size = bar_size
        self.bar_list: dict = {x: None for x in self.bar_size}
        self.on_build_bars: Callable = on_build_bars

        self.last_tick: TickData = None
        self.last_bar: BarData = None

    def update_tick(self, tick: TickData) -> None:
        if self.on_build_bars:
            self.update_second(tick)
        self.update_minute(tick)
        self.last_tick = tick

    def update_second(self, tick: TickData) -> None:
        """根据 tick 数据,生成 1秒的 bar线"""

        # print('-----------', tick.datetime)
        # if tick.datetime.microsecond == 0:
        #     tick.datetime += datetime.timedelta(microseconds=500000)
        # if 15 < tick.datetime.hour < 21:
        #     print('是')
        #     return
        # print(tick.datetime)

        # 过滤掉 last_price 为0的情况
        if not tick.last_price:
            return

        # 过滤掉旧日期的 tick 数据
        if self.last_tick and tick.datetime < self.last_tick.datetime:
            return

        if self.bar_s:
            self.bar_s.high_price = max(self.bar_s.high_price, tick.last_price)
            if tick.high_price > self.last_tick.high_price:
                self.bar_s.high_price = max(self.bar_s.high_price, tick.high_price)

            self.bar_s.low_price = min(self.bar_s.low_price, tick.last_price)
            if tick.low_price < self.last_tick.low_price:
                self.bar_s.low_price = min(self.bar_s.low_price, tick.low_price)

            self.bar_s.close_price = tick.last_price
            self.bar_s.open_interest = tick.open_interest
            self.bar_s.datetime = tick.datetime

        else:
            self.bar_s = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.SECOND,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )

        if self.last_tick:
            volume_change = tick.volume - self.last_tick.volume
            self.bar_s.volume += max(volume_change, 0)
        else:
            self.bar_s.volume += max(tick.last_volume, 0)

        if self.last_tick and not (tick.datetime.replace(microsecond=0).second == 0 and self.last_tick.datetime.replace(microsecond=0).second == 0):
            if self.last_tick and (
                    (self.last_tick.datetime.minute != tick.datetime.minute)
                    or (self.last_tick.datetime.hour != tick.datetime.hour)
                    or (self.last_tick.datetime.second != tick.datetime.second)
            ):
                self.bar_s.datetime = self.bar_s.datetime.replace(
                    microsecond=0 
                )
                [self.update_bar_second_window(self.bar_s, size=x) for x in self.bar_size]
                self.last_bar = self.bar_s
                self.bar_s = None

    def update_minute(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        # Filter tick data with 0 last price
        if not tick.last_price:
            return

        # Filter tick data with older timestamp
        if self.last_tick and tick.datetime < self.last_tick.datetime:
            return

        if self.bar:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            if tick.high_price > self.last_tick.high_price:
                self.bar.high_price = max(self.bar.high_price, tick.high_price)

            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            if tick.low_price < self.last_tick.low_price:
                self.bar.low_price = min(self.bar.low_price, tick.low_price)

            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        else:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )

        if self.last_tick:
            volume_change = tick.volume - self.last_tick.volume
            self.bar.volume += max(volume_change, 0)

            turnover_change = tick.turnover - self.last_tick.turnover
            self.bar.turnover += max(turnover_change, 0)

        # if self.last_tick and self.last_tick.datetime + datetime.timedelta(minutes=10) > tick.datetime:

        if self.last_tick and not (tick.datetime.replace(microsecond=0).second == 0 and self.last_tick.datetime.replace(microsecond=0).second == 0):

            if self.last_tick and (
                (self.last_tick.datetime.minute != tick.datetime.minute)
                or (self.last_tick.datetime.hour != tick.datetime.hour)
            ):
                self.bar.datetime = self.bar.datetime.replace(
                    second=0, microsecond=0
                )
                self.on_bar(self.bar)
                self.bar = None

    def update_bar(self, bar: BarData) -> None:
        """
        Update 1 minute bar into generator
        """
        # if self.interval == Interval.SECOND:
        #     [self.update_bar_second_window(self.bar_s, size=x) for x in self.bar_size]
        if self.interval == Interval.MINUTE:
            self.update_bar_minute_window(bar)
        else:
            self.update_bar_hour_window(bar)

    def update_bar_minute_window(self, bar: BarData) -> None:
        """"""
        # If not inited, create window bar object
        if not self.window_bar:
            dt = bar.datetime.replace(second=0, microsecond=0)
            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        # Otherwise, update high/low price into window bar
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price,
                bar.high_price
            )
            self.window_bar.low_price = min(
                self.window_bar.low_price,
                bar.low_price
            )

        # Update close price/volume/turnover into window bar
        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += bar.volume
        self.window_bar.turnover += bar.turnover
        self.window_bar.open_interest = bar.open_interest

        # Check if window bar completed
        if not (bar.datetime.minute + 1) % self.window:
            self.on_window_bar(self.window_bar)
            self.window_bar = None

    def update_bar_hour_window(self, bar: BarData) -> None:
        """"""
        # If not inited, create window bar object
        if not self.hour_bar:
            dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
            self.hour_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
                close_price=bar.close_price,
                volume=bar.volume,
                turnover=bar.turnover,
                open_interest=bar.open_interest
            )
            return

        finished_bar = None

        # If minute is 59, update minute bar into window bar and push
        if bar.datetime.minute == 59:
            self.hour_bar.high_price = max(
                self.hour_bar.high_price,
                bar.high_price
            )
            self.hour_bar.low_price = min(
                self.hour_bar.low_price,
                bar.low_price
            )

            self.hour_bar.close_price = bar.close_price
            self.hour_bar.volume += bar.volume
            self.hour_bar.turnover += bar.turnover
            self.hour_bar.open_interest = bar.open_interest

            finished_bar = self.hour_bar
            self.hour_bar = None

        # If minute bar of new hour, then push existing window bar
        elif bar.datetime.hour != self.hour_bar.datetime.hour:
            finished_bar = self.hour_bar

            dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
            self.hour_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
                close_price=bar.close_price,
                volume=bar.volume,
                turnover=bar.turnover,
                open_interest=bar.open_interest
            )
        # Otherwise only update minute bar
        else:
            self.hour_bar.high_price = max(
                self.hour_bar.high_price,
                bar.high_price
            )
            self.hour_bar.low_price = min(
                self.hour_bar.low_price,
                bar.low_price
            )

            self.hour_bar.close_price = bar.close_price
            self.hour_bar.volume += bar.volume
            self.hour_bar.turnover += bar.turnover
            self.hour_bar.open_interest = bar.open_interest

        # Push finished window bar
        if finished_bar:
            self.on_hour_bar(finished_bar)

    def on_hour_bar(self, bar: BarData) -> None:
        """"""
        if self.window == 1:
            self.on_window_bar(bar)
        else:
            if not self.window_bar:
                self.window_bar = BarData(
                    symbol=bar.symbol,
                    exchange=bar.exchange,
                    datetime=bar.datetime,
                    gateway_name=bar.gateway_name,
                    open_price=bar.open_price,
                    high_price=bar.high_price,
                    low_price=bar.low_price
                )
            else:
                self.window_bar.high_price = max(
                    self.window_bar.high_price,
                    bar.high_price
                )
                self.window_bar.low_price = min(
                    self.window_bar.low_price,
                    bar.low_price
                )

            self.window_bar.close_price = bar.close_price
            self.window_bar.volume += bar.volume
            self.window_bar.turnover += bar.turnover
            self.window_bar.open_interest = bar.open_interest

            self.interval_count += 1
            if not self.interval_count % self.window:
                self.interval_count = 0
                self.on_window_bar(self.window_bar)
                self.window_bar = None

    def update_bar_second_window(self, bar: BarData, size) -> None:
        """"""
        # 如果已存在bar,但是当前时间已超过上根bar合成时间,则将上根bar先推出去
        if self.last_bar and self.bar_list[size]:
            while self.last_bar.datetime < bar.datetime:
                if (int(size) < 60 and not (self.last_bar.datetime.second) % int(size)) \
                    or (self.last_bar.datetime.second == 0 and int(size) >= 60 and not self.last_bar.datetime.minute % (int(size)/60)):
                    self.bar_list[size].datetime = self.last_bar.datetime.replace(microsecond=0)
                    self.on_build_bars({size:self.bar_list[size]})
                    self.bar_list[size] = None
                    break
                else:
                    self.last_bar.datetime+=datetime.timedelta(seconds=1)

        dt = bar.datetime.replace(microsecond=0)
        # If not inited, create window bar object
        if not self.bar_list[size]:
            # dt = bar.datetime.replace(microsecond=0)
            self.bar_list[size] = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=dt,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        # Otherwise, update high/low price into window bar
        else:
            self.bar_list[size].high_price = max(
                self.bar_list[size].high_price,
                bar.high_price
            )
            self.bar_list[size].low_price = min(
                self.bar_list[size].low_price,
                bar.low_price
            )

        # Update close price/volume/turnover into window bar
        self.bar_list[size].close_price = bar.close_price
        self.bar_list[size].volume += bar.volume
        self.bar_list[size].turnover += bar.turnover
        self.bar_list[size].open_interest = bar.open_interest
        self.bar_list[size].datetime = dt

        if (int(size) < 60 and not (bar.datetime.second) % int(size)) \
            or (bar.datetime.second == 0 and int(size) >= 60 and not bar.datetime.minute % (int(size)/60)):
            self.bar_list[size].datetime = bar.datetime.replace(microsecond=0)
            self.on_build_bars({size:self.bar_list[size]})
            self.bar_list[size] = None

    def generate(self) -> Optional[BarData]:
        """
        Generate the bar data and call callback immediately.
        """
        bar = self.bar

        if self.bar:
            bar.datetime = bar.datetime.replace(second=0, microsecond=0)
            self.on_bar(bar)

        self.bar = None
        return bar

使用方法:

self.bg = BarGenerator(self.on_bar, 1, interval=Interval.MINUTE, on_window_bar=self.on_window_bar, bar_size=['30', '60', '120'], on_build_bars=self.on_build_bars)
def on_build_bars(self, bar_data):
        print(['------------%s:%s'%(x,bar_data[x].datetime) for x in bar_data.keys()])


【VNPY进阶】on_tick函数内撤单追单详解,实盘在用的代码,没有坑哦

0.修改OrderData如下:

@dataclass
class OrderData(BaseData):
    """
    Order data contains information for tracking lastest status 
    of a specific order.
    """

    symbol: str
    exchange: Exchange
    orderid: str

    type: OrderType = OrderType.LIMIT
    direction: Direction = Direction.NET
    offset: Offset = Offset.NONE
    price: float = 0
    volume: float = 0
    traded: float = 0
    status: Status = Status.SUBMITTING
    datetime: datetime = None

    cancel_time: str = ""
    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}_{self.exchange.value}/{self.gateway_name}"
        self.vt_orderid = f"{self.gateway_name}_{self.orderid}"
        #未成交量
        self.untrade = self.volume - self.traded

1.策略init初始化参数

        #状态控制初始化
        self.chase_long_trigger = False
        self.chase_sell_trigger = False
        self.chase_short_trigger = False
        self.chase_cover_trigger = False  
        self.cancel_status = False
        self.last_vt_orderid = ""
        self.long_trade_volume = 0
        self.short_trade_volume = 0
        self.sell_trade_volume = 0
        self.cover_trade_volume = 0 
        self.chase_interval   =    10    #拆单间隔:秒

get_position_detail参考这个帖子 https://www.vnpy.com/forum/topic/2167-cha-xun-cang-wei-chi-cang-jun-jie-wei-cheng-jiao-wei-tuo-dan-yi-ge-han-shu-gao-ding

2.on_tick里面的代码如下

from vnpy.trader.object import TickData, BarData, TradeData, OrderData,Status
    def __init__(self, strategy_engine: StrategyEngine, strategy_name: str,vt_symbols: List[str], setting: dict):
        """
        """
        super().__init__(strategy_engine, strategy_name, vt_symbols, setting)
        #撤单条件选择,默认使用超时撤单,为False使用突破价格范围撤单
        self.cancel_timer_trigger = True
    def on_tick(self, tick: TickData):
        active_orders = self.get_position_detail(chase_vt_symbol).active_orders
        vt_orderid = ""
        if active_orders:
            #委托完成状态
            order_finished = False
            self.last_vt_orderid = list(active_orders.items())[0][0]         #委托单vt_orderid
            active_order:OrderData = list(active_orders.items())[0][1]      #委托单类 
            if self.cancel_timer_trigger:
                #撤单触发条件,超时撤单
                trigger_status = (raw_tick.datetime - active_order.datetime).total_seconds() > self.chase_interval
            else:
                price_tick = self.get_contract_detail(chase_vt_symbol).price_tick
                #突破价格范围撤单
                trigger_status = not active_order.price - price_tick * self.cancel_trigger_payup <= raw_tick.last_price <= active_order.price + price_tick * self.cancel_trigger_payup
            #开平仓追单,部分交易没有平仓指令(Offset.NONE)
            if active_order.offset in (Offset.NONE,Offset.OPEN):
                if active_order.direction == Direction.LONG:
                    self.long_trade_volume = active_order.untrade
                    if trigger_status and self.long_trade_volume > 0 and (not self.chase_long_trigger) and self.last_vt_orderid:
                        #撤销之前发出的未成交订单
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_long_trigger = True
                elif active_order.direction == Direction.SHORT:
                    self.short_trade_volume = active_order.untrade    
                    if trigger_status and self.short_trade_volume > 0 and (not self.chase_short_trigger) and self.last_vt_orderid:  
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_short_trigger = True
            #平仓追单
            elif active_order.offset in (Offset.CLOSE,Offset.CLOSETODAY,Offset.CLOSEYESTERDAY):
                if active_order.direction == Direction.SHORT: 
                    self.sell_trade_volume = active_order.untrade
                    if trigger_status and self.sell_trade_volume > 0 and (not self.chase_sell_trigger) and self.last_vt_orderid: 
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_sell_trigger = True                                                    
                if active_order.direction == Direction.LONG:
                    self.cover_trade_volume = active_order.untrade
                    if trigger_status and self.cover_trade_volume > 0 and (not self.chase_cover_trigger) and self.last_vt_orderid:                                                       
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_cover_trigger = True   
        else:
            order_finished = True
            self.cancel_status = False
        #追单的委托单状态是正常的撤销状态则发出追单指令
        if self.get_order(self.last_vt_orderid) and self.get_order(self.last_vt_orderid).status == Status.CANCELLED:
            if self.chase_long_trigger:
                if order_finished:
                    self.buy(chase_vt_symbol,raw_tick.ask_price_1,self.long_trade_volume)
                    self.long_trade_volume = 0
                    self.chase_long_trigger = False  
                else:
                    self.cancel_surplus_order(list(active_orders))
            elif self.chase_short_trigger:
                if  order_finished:
                    self.short(chase_vt_symbol,raw_tick.bid_price_1,self.short_trade_volume)
                    self.short_trade_volume = 0
                    self.chase_short_trigger = False 
                else:
                    self.cancel_surplus_order(list(active_orders))
            elif self.chase_sell_trigger:
                if order_finished:
                    self.sell(chase_vt_symbol,raw_tick.bid_price_1,self.sell_trade_volume)
                    self.sell_trade_volume = 0
                    self.chase_sell_trigger = False                      
                else:
                    self.cancel_surplus_order(list(active_orders))
            elif self.chase_cover_trigger:
                if order_finished:
                    self.cover(chase_vt_symbol,raw_tick.ask_price_1,self.cover_trade_volume)
                    self.cover_trade_volume = 0
                    self.chase_cover_trigger = False
                else:
                    self.cancel_surplus_order(list(active_orders))
    #------------------------------------------------------------------------------------
    def cancel_surplus_order(self,orderids:list):
        """
        撤销剩余活动委托单
        """
        if not self.cancel_status:
            for vt_orderid in  orderids:
                self.cancel_order(vt_orderid)
            self.cancel_status = True
# template.py里面增加
    #------------------------------------------------------------------
    def get_order(self,vt_orderid:str) -> Union[OrderData,None]:
        """
        通过vt_orderid获取委托单
        """
        return self.cta_engine.get_order(vt_orderid) 
# cta_strategy\engine.py里面增加
    #------------------------------------------------------------------------------------
    def get_order(self,vt_orderid:str) -> Union[OrderData,None]:
        """
        通过vt_orderid获取委托单
        """
        self.main_engine.get_order(vt_orderid)


把你编写的指标用图表显示出来

用python编写指标和以往熟悉的文华,博弈大师等有很大的区别,你写的代码最终在图形上是什么样子,看一看才能心安。

于是有了下面这段代码:

from datetime import datetime
from typing import  Dict
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import database_manager
from vnpy.app.cta_strategy import ArrayManager
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
import pyqtgraph as pg
from vnpy.trader.ui import create_qapp, QtCore, QtGui
from vnpy.trader.object import BarData
from vnpy.chart.manager import BarManager


class ZB(CandleItem):
    """自定义指标显示"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)
        self.sma_data: Dict[int, float] = {}

    def get_sma_value(self, ix: int) -> float:
        """"""
        if ix < 0:
            return 0

        if not self.sma_data:
            bars = self._manager.get_all_bars()
            sma_array = [bar.down_line for bar in bars]

            for n, value in enumerate(sma_array):
                self.sma_data[n] = value

        if ix in self.sma_data:
            return self.sma_data[ix]

        sma_value = sma_array[-1]

        return sma_value

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        sma_value = self.get_sma_value(ix)
        last_sma_value = self.get_sma_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.blue_pen)

        # Draw Line
        start_point = QtCore.QPointF(ix-1, last_sma_value)
        end_point = QtCore.QPointF(ix, sma_value)
        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.sma_data:
            sma_value = self.sma_data[ix]
            text = f"ZB {sma_value:.2f}"
        else:
            text = "ZB -"

        return text

class ZB2(CandleItem):
    """自定义指标显示"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)
        self.sma_data: Dict[int, float] = {}

    def get_sma_value(self, ix: int) -> float:
        """"""
        if ix < 0:
            return 0

        if not self.sma_data:
            bars = self._manager.get_all_bars()
            sma_array = [bar.up_line for bar in bars]

            for n, value in enumerate(sma_array):
                self.sma_data[n] = value

        if ix in self.sma_data:
            return self.sma_data[ix]

        sma_value = sma_array[-1]

        return sma_value

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        sma_value = self.get_sma_value(ix)
        last_sma_value = self.get_sma_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.blue_pen)

        # Draw Line
        start_point = QtCore.QPointF(ix-1, last_sma_value)
        end_point = QtCore.QPointF(ix, sma_value)
        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.sma_data:
            sma_value = self.sma_data[ix]
            text = f"ZB {sma_value:.2f}"
        else:
            text = "ZB -"

        return text


if __name__ == "__main__":
    app = create_qapp()

    symbol = "CL-20210322-USD-FUT"
    exchange = Exchange.NYMEX
    interval = Interval.MINUTE_30
    start = datetime(2021, 1, 1)
    end = datetime(2022, 1, 1)

    bars = database_manager.load_bar_data(
        symbol=symbol,
        exchange=exchange,
        interval=interval,
        start=start,
        end=end
    )

    am = ArrayManager(50)
    new_data = bars[:]
    line_up = []
    line_down = []

    while new_data :
        bar = new_data.pop(0)
        am.update_bar(bar)
        up, down = am.boll(20,2)
        line_up.append(up)
        line_down.append(down)   #这里调用合适的公式就好了

    print("K线数量是", len(bars), "指标数据是", len(line_up))

    i = 0
    while line_down :
        bars[i].down_line = line_down.pop(0)
        bars[i].up_line = line_up.pop(0)
        i = i + 1
    print("共处理了", i, "数据")

    widget = ChartWidget()
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=250)
    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")

    widget.add_item(ZB, "ZB", "candle")
    widget.add_item(ZB2, "ZB2", "candle")
    widget.add_cursor()

    history = bars
    widget.update_history(history)

    def update_bar():
        bar = new_data.pop(0)
        widget.update_bar(bar)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)

    widget.show()
    app.exec_()

因为我使用的盈透可以随时下载各种周期的历史数据,所以我只要下载然后存在数据库中。直接用代码读取就可以看见最新的k线。
如果你需要下载盈透的数据,可以参考我的上一个帖子。
https://www.vnpy.com/forum/topic/6123-ying-tou-jie-kou-xia-zai-5fen-zhong-xian-bing-xian-shi-tu-biao?page=1#pid21676

其他的接口我不熟。



提问前请先看:vn.py使用FAQ

策略相关


K线合成器

Q:CTA策略默认传入 1min K 线数据,在哪个函数中传入默认参数 1min?
A:BarGenerator里面的update_tick()用于把tick数据合成1分钟K线数据,update_bar()是用于把1分钟数据合成X分钟数据。可以参考布林带策略示例,那里提供1分钟K线合成为15分钟K线,并且基于15分钟K线来产生买卖信号。

Q:BarGenerator的最大值是不是只能60?
A:合成分钟线的时候,周期最大只能为60(基于60分钟整除来进行N分钟切分);合成小时线的时候,周期可以为任意值(基于多少个小时过去了,进行合成)

 

K线时间序列管理器

Q:ArrayManager的初始化函数默认size=100,size指的是什么?
A:size是指这个K线时间序列容器缓存的数据量的大小,理论上只要超过了策略中要计算的所有技术指标最长的那个周期,就够用了。比如你要算MA20 RSI14 CCI50,那么最少需要size=50,否则CCI计算的数据量就不够,一般情况下还会在size上加上一定的量,来避免talib中某些指标算法可能需要更长的数据,保证计算的正确性。

Q:策略使用5分钟K线,ArrayManager初始化 self.am = ArrayManager(100)。在初始化size这里输入100的话,请问vn.py会从数据库里面提取100根1分钟的bar还是500根1分钟的bar来初始化指标?
A:缓存到100跟5分钟K线后才会完成初始化状态。

Q:talib安装失败,怎么解决?
A:使用手动安装:
1.进入Unofficial Windows Binaries for Python Extension Packages中找到talib对应的版本(如py3.7,64位)
2.下载对应版本的文件TA_Lib?0.4.17?cp37?cp37m?win_amd64.whl
3.下载好whl文件之后,直接在命令行下安装文件即可,如下。成功后会显示“Sucessful installed TA_Lib?0.4.17”

pip install TA_Lib?0.4.17?cp37?cp37m?win_amd64.whl

Q:如何导出计算的技术指标数值
A:最简单的方法就是直接print了,或者也可以写入到文件里。

 

回测数据

Q:CTA回测组件提示K线已经下载完成,但是在sqlite数据库中查看不到记录。
A:数据被插入到当前用户目录的database.db中,如C:\Users\HOME\.vntrader\database.db

Q:实盘时vn.py数据是tick级数据推送,回测时数据是分钟级的,它会识别时间戳吗?
A:bar回测的模式,在策略内部将tick合成为bar或者将1分钟bar合成为x分钟的bar

Q:回测加载数据,显示载入数据量为0。
A:没有连接数据库,或者数据库无数据

Q:初始化策略,默认initDays=10,改成初始一定数量的分钟数是不是更加合理?
A:若载入充足的历史数据,就可以立刻交易了。

Q:数据商也提供逐笔数据如何用来回测?
A:用逐笔自行还原出完整的订单簿tick数据,所以用tick模式来回测。

Q:如何获取A股分钟线数据?
A:目前没有免费的下载渠道(交易所禁止),推荐通过米筐的RQData获取。

Q:如何更改数据库存放盘?
A:举个例子:在D盘创建目录D:\test;在D:\test下创建文件夹.vntrader;使用VN Station的VN Trader Pro切换到D:\test目录启动;此时启动的VN Trader运行时目录已经是D:\test(可以在标题栏看到)

 

参数优化
Q:用capital作为目标,输出结果全是0
A:因为在逐日统计回测中,capital代表的是起始资金,无法作为优化的目标。可以改用endBalance,sharpeRatio,maxDrawdown等目标优化参数试一下。

Q:engine.trades可以输出交易记录,但是记录里只有time,没有具体的date,请问哪里可以输出具体的交易日期和时间?
A:trades记录里的trade对象,有个额外的datetime字段是用来标识该成交的日期时间的。

 

实盘运行

Q:自定义策略需要放在哪里?
A:在当前运行的脚本目录先创建创建strategies文件夹,然后把策略文件放进去就可以了。 配置文件可以创建.vntrader, 然后把配置文件放进去。

Q:怎样在on_bar里output数据,以此检查策略是不是按照我的想法在运转
A:可以直接在策略初始化的时候打开一个txt文件句柄,然后回测过程中随时往里面写记录

Q:点击CTA策略出现了json错误
A:json读取错误,如把.vntrader下的cta_strategy_data.json删除可解决

Q:CTA模块中是否有自带的变量存储合约的最小变动价位数据?
A:策略发出的买卖指令的价格,会自动根据最小价格变动pricetick进行round取整,无需最小价格变动的数据了

Q:如何在无人值守脚本获得所有合约的信息?
A:调用main_engine.get_all_contracts函数

Q:用CTPtest抓所有合约, 郑交所的合同没有菜粕,苹果。
A:调用get_all_contracts前,sleep等待5秒,接收合约数据推送需要时间;某些测试环境里,是会缺少部分合约的。

Q:为什么使用VnTrader的cta策略组件进行日常实盘交易时,每天交易时段结束之后,一定要把VnTrader关闭,然后在下次开盘前15分钟在重启并初始化策略的参数?
A:关闭交易系统主要是为了清空系统内部(接口层、引擎层)的缓存状态,策略层的缓存状态一般倒是不太容易出问题的(除非你逻辑写错了)

Q:请问backtesting里的cross_limit_order和cross_stop_order什么意思
A:撮合限价单委托,撮合本地停止单(条件单)委托。讲最新的行情K线或者TICK和策略之前下达的所有委托进行检查,如果能够撮合成交,则返回并记录数据

Q:如何实现k线内成交?
A:用的是本地停止单
就比如当前价格是100点,策略发出信号,在下一根K线的120线发出买入:
若价格没突破到120点,继续挂着。
若价格从100涨到130,那么在120点的时候停止单变成市价单追上去保证尽可能成交。
若下一根K线的开盘价大于120,那么以开盘价来立刻成交。

Q:回测1h k线数据, 为什么每一笔订单成交记录都比委托记录晚一个小时 , 请问这个是在哪里设置的?
A:委托时间,是你发出委托请求的时间。成交时间,是你的成交发生的事件。CTA策略回测遵循T时刻发出的委托,永远只在T+1后才能进行撮合的规则(否则会有未来函数)。
实盘中,你的成交时间可能和委托时间非常接近,但是回测中受限于数据没法确定,只能用T+1那根K线的时间戳近似。

Q:如果停止单触发下单之后一段时间没有执行的话,会撤单吗,什么时候撤单,撤单之后还会追单吗?
A:不会,除非策略执行撤单操作。

Q:如果停止单触发下单之后,部分成交,接下来会撤单还是追单?
A:策略会收到这笔委托的回报,用户可以自行处理,不会自动撤单或者追单

Q:如果同时持有多空,pos是单向的,该怎么处理? 例如:如果持有1手多单,sell卖平的委托没有成交,紧接着的short卖开的单子成交了,pos 是多少?
A : 对于CTA策略模块来说,策略的持仓,是基于策略本身发出去的委托成交维护的逻辑持仓(而非账户底层的实际持仓),所以pos会为0

Q:假如策略下了1手多单,手动下了1手多单,pos 是多少 ?
A : 人手工下的单子无影响

Q:假如策略下了2手多单,手动平仓1手,pos 是多少 ?
A:人手工下的单子无影响

Q:两个策略都在跑同一个代码,应该是各自有各自的pos 吧 ?
A:对的,这两个逻辑持仓互相无关

Q:框架对pos值的更新,是在onTrade 和 onOrder推送动作前,还是推送动作后?
A:onTrade推送前更新,保证策略收到onTrade回调的时候,pos已经是最新数据。

Q:vt_setting.json的路径到底在哪?
A:在c:\users\administrator.vntrader目录下,是基于Python的pathlib实现的

Q:positionData中的 yd_volume 是指什么?
A:昨仓,这个主要针对中国市场的股票(只能卖出昨日的股票)和期货(昨仓平仓手续费不同)

Q:当前CTA模块是不是不能在策略里获取当前资金情况?
A:不能获取资金和账户级别的持仓。策略的仓位管理(风险分配)应该由交易员来做,而不是让程序自动做

Q:非交易时间报单,是直接返回拒单,还是返回提交中等开盘了再打出去?
A:在策略层,如on_bar()函数里面第一个逻辑应该是self.cancel_all(),目的是清空为成交的委托(包括本地停止单,限价单),保证当前时间点委托状态是唯一的。
若在非交易时间发单,服务器收到这个委托会推送一个拒单的委托回报。

Q:vn.py如何查询实盘账户的历史资金情况
A:大部分交易系统并未提供历史资金查询功能,一般都是只能查当前时间点的资金,所以需要你自己保存。

 

其他问题

Q: 请问下,vn.py中有期权回测的例子么?vn.py 关于期权,是不是就只有OptionMatser模块?
A:期权的波动率交易策略一般无需回测,更多依赖建模;可以用其他组件交易期权,比如CTA策略模块赌趋势,或者SpreadTrading模块赌价差,但这些本质都不是期权交易策略。

Q:如何根据资金量进行下单,而不是固定手数下单?
A:vn.py框架下不建议交易程序在实盘中去获取账户可用资金,并调整交易手数,这是很危险的事情。

 
 
 

接口登录相关


CTP接口

Q:如何实现行情并发推送?
A:C++ API的回调函数只有一个推送线程;用户可以一次性订阅全市场的合约,某个合约有行情变动的时候才会推送.

Q:CTP能可否获取到指定经纪商的手续费?
A:这个手续费应该是连带期货公司的部分,但是不建议在交易程序中去访问这种数据,相关数据应该每天在启动策略前就获取好配置到策略里。

Q:郑商所的品种都收不到14:59这一根bar
A:郑商所的数据推送,没有3点后的最后一个tick用于标识收盘完成,所以要调用BarGenerator.generate函数来做最终的强制K线生成

Q:国内期货模拟,除了sinnow可以模拟,还有别的账号可以模拟吗,可以去期货公司申请模拟账号模拟吗?
A:SimNow是目前最稳定的仿真环境了,可以找期货公司申请,比如中信、上海中期等。

Q:sinnow连接有时会断开,然后传送的tick数据的时间就会延后,就是收盘时,本来应该平仓平不了,超过三点了还在传tick。
A:SimNow环境因为免费,用的人很多,所以服务器有时会卡。

Q:为什么有时候会不停的断开重连呢?
A:服务器关了或者人满了,或者账号密码错误。

Q:连接进 SimNow后,按”市价“下单被拒绝,按”限价“”FAK“”FOK"下单可以,请问,是否不支持“市价”下单。
A:SimNow不支持市价单,实盘支持。

Q:连接SimNow后,下单提示“提交中“无法成交也无法撤单
A:这种情况,一般是委托请求没有到CTP柜台(网络断了),或者CTP柜台挂了

Q:SimNow上不去,上去了注册又总是提示验证码失败,还有其他模拟的推荐吗?
A:SimNow是目前最推荐的仿真环境,建议换交易时间注册,以及SimNow主要支持移动和联通的手机号

Q:CTP配置无法连接:输入论坛登录名,账号,配置对应的交易服务器和行情服务器,点击连接,无任何反应
A:CTP的测试账号请通过SimNow获取,不是vn.py论坛的

Q:已有行情数据显示。 但是不能发单,如rb1905
A:检查是否漏填交易所或者上委托数量的字段。

Q:下单异常,第一种情况:一点击委托就是直接“已撤销”(委托栏里委托状态),双击撤单的时候,又会显示“交易撤单失败,代码25”。第二种情况:一点击委托就是直接“提交中”(委托栏里委托状态),等到双击撤单的时候,又会显示“交易撤单失败,代码25”
A:第一个情况应该是报单不符合服务端的要求,被拒单撤销了;第二个情况感觉是你的网络断了,报单请求发出但没有到服务器。可以顺着这两个方向检查。

Q:CTPTEST测试交易期货公司采集不到硬盘序列号,CPU序列号,BIOS序列号
A:数据采集是通过API内部的代码自动完成的,其他任何上层程序都无法影响。建议换机器。

Q:登录穿透式仿真账号问题,一直报4097的错
A:不要同时import CTPTEST和CTP这两个接口

Q:使用ctp和ctptest登录都显示不合法登录。错误代码3
A:账户密码错误

Q:CTP能否两个账号同时登录并且同时操作。
A:同一个接口只能登录一次。如果要同时登录两个CTP,需要自己扩展修改CtpGateway,然后加载两个CtpGateway;对于不同的接口,比如股票的XTP和期货的CTP,可以直接同时登录使用,并在策略中同时交易这两个接口的合约

 

IB接口

Q:如何链接盈透api ?
A:启动TWS;在配置中打开Socket连接功能;在vn.py中加载ibGateway,然后启动就行。

Q:IB接口连接,错误提示显示:couldn't connect to TWS. confirm that "enable activex and socket clients" is enabled aports for new installations of version 954.1 or newer: TWS:7497:IB gateway: 4002。
A:请检查TWS是否打开了socket访问功能。

Q:启动行情记录,则程序假死。
A:IB的行情订阅函数没有异步缓存逻辑,用DataRecorder脚本的话,会在连接TWS之前就进行了订阅请求,导致死掉。IbGatewa已经加上了历史数据查询获取功能,直接从IB查询K线数据进行初始化就行,不建议自己录制了。

 

富途接口
Q:futu_gateway里面的登陆信息设置,为何只要密码?
A:需要先下载和安装FUTU OPEN API:https://www.futunn.com/openAPI

 

华宝派
Q:华宝派如何申请试用/实盘呢?
A:请在华宝证券开户,然后联系客户经理申请使用华宝PI

 
 
 

模块应用相关


交易复制
Q:当跟单帐户检测到被跟单帐户的仓位变化后,具体操作是什么?
A:TradeCopy的发布者账号,维护一份本地持仓数据表,当有成交推送时立即更新计算最新仓位;发布者每当收到成交推送时,或者每隔一定的时间间隔(默认1秒),会广播一次当前自己的仓位信息;订阅者收到广播推送的发布者仓位后,乘以自身的复制系数,作为目标仓位;订阅者根据目标仓位,和自身实际持仓的偏差,决定具体的下单操作(目标是将实际持仓同步到和目标持仓一致),如果有之前的委托,会先执行撤单。

 

RQData
Q:import rqdatac 失败,没有找到rqdatac包
A:运行以下命令安装

pip install --extra-index-url https://rquser:ricequant99@py.ricequant.com/simple/ rqdatac==1.0.0a66

Q:如何配置RQDAAT账户?
A:申请试用账号:https://www.ricequant.com/purchase
在VN Trader主界面上,点配置,rqdata.username rqdata.password输入
重启VN Trader就能用了

 

工作线程
Q:vn.py运行的时候,会启动哪些线程?
A:
1.主线程:带PyQt界面时运行Qt的循环,无界面时可以直接阻塞或者用while循环
2.事件引擎线程:处理事件引擎队列中的事件,并调用注册的处理函数处理,所以如果是CtaStrategy层,所有回调函数你可以认为都是单线程在驱动的(每次只有一个在调用)
3.API层线程:不同的API不一样了。

Q:eventEngine2.__queue很多数据没有处理、队列一直变大?
A:如果queue的大小只增加,不减少,只可能是没有启动EventEngine,导致事件没有处理持续挤压导致的。否则运行过程中即使没有注册处理函数,该事件的数据也会被抛弃掉,不会继续保存着。

 

行情记录
Q:行情记录后怎么查看和下载历史行情数据?
A:行情记录模块是将tick或者bar直接存入你配置的数据库的,你要单独查看,可以用数据库可视化工具连接本地数据库查看,如果在vn.py里回测使用,不需要下载,是默认从数据库里找相关数据的

Q:datarecoder 和 cta running 的 CTP能不能分开设置?
A:可以,另外新建一个目录,里面创建.vntrader文件夹,在这个目录用run.py或者VN Station启动VN Trader,CTP接口的登录信息就都是独立的了。

 
其他

Q:网站上下载的vnpy ,和Anaconda site-package里面的vnpy有什么区别?
A:进行运行的是ananconda里面的vnpy。如同在anaconda里面调用numpy一样。

Q:修改vnpy代码后需要更新到anaconda site-package对应的文件里?
A:python里import的vnpy就是site-packages里的,你可以修改下环境变量,把你clone的那个目录加入搜索路径,这样你修改了clone的那个vn.py,用的时候就自动改了

Q:一键安装完2.0.5 后,再另外安装anaconda3, spyder无法使用
A:假设你安装到c:\anaconda3。打开cmd,运行c:\anaconda3\scripts\activate,然后再运行python,就会进入anaconda环境了

Q:请问算法交易怎么用,可以策略生成下单指令,由算法下单吗?
A:目前AlgoTrading模块主要通过GUI和篮子委托文件的方式来实现算法下单
尽管可以通过扩展的方式,实现策略调用算法执行交易,但更建议在CTA策略中自行实现算法交易的逻辑,获得更好的细节控制能力

Q:vn.py中配置界面中email各项设置如何填写,有何用处?
A:设置如下
"email.server": "SMTP邮件服务器地址",
"email.port": SMTP邮件服务器端口号,
"email.username": "邮箱用户名",
"email.password": "邮箱密码",
"email.sender": "发送者邮箱",
"email.receiver": "接收者邮箱",

 
 
 

社区操作相关


Q:找回vn.py社区的密码
A:在这个页面可以找回密码:https://www.vnpy.com/auth/reset-password

Q:注册了社区账号,但是登录报错。[WinError 10061] 由于目标计算机积极拒绝,无法连接
A:这个是代理服务器问题吧,换个网络试试。

Q:维恩的派和vn.py社区有什么关系?
A:维恩的派是vn.py社区2015-2018年的老论坛,但由于discuz的各种问题使用体验太差,现在已经停止使用,将在19年底正式下线。

Q:社区怎么上传照片的?
A:直接把图片拖动到编辑框中就能自动上传了

Q:有微信群,QQ群吗?
A:vn.py框架学习群:666359421 ; vn.py框架交流群:262656087



在Mac上安装vnpy,保证一次成功!

一、提前下载需要的安装包:
1、Miniconda3
https://docs.conda.io/en/latest/miniconda.html#
选择MacOSX installers里的最新版本,这里是Python 3.9下载。

2、pycharm
pycharm-community-2020.3.3.dmg
从官网上下载社区版https://www.jetbrains.com/pycharm/

3、vnpy安装包(解压后,复制文件夹到自己喜欢的位置)
从vnpy在gitee的官方地址下载最新的安装包,采用zip格式下载。
https://gitee.com/vnpy/vnpy

二、安装
1、安装Miniconda,这里是Miniconda3-latest-MacOSX-x86_64.pkg
2、添加国内源:
添加国内源:在当前用户下,编辑.condarc,内容如下:
channels:

3、创建虚拟环境
conda create -n py37_vnpy python=3.7
conda activate py37_vnpy
(退出:conda deactivate)

4、安装python.app
conda install -c conda-forge python.app
可能会因为网络问题不成功,多试几次。

5、安装pycharm-community-2020.3.3.dmg
从官网上下载社区版https://www.jetbrains.com/pycharm/

7、打开vnpy所在的文件夹,进行配置
点击‘PyCharm’菜单->Preferences菜单->Project:vnpy一级菜单->Python Interpreter二级菜单->点击右上齿轮->Add菜单->Conda Environment->Existing enviroment->Interpreter:/opy/miniconda3/envs/py37_vnpy/bin/pythonw(选择前面新建的虚拟环境的pythonw)->点OK->点OK->点OK

8、(确认在PyCharm里已经打开了vnpy项目),在PyCharm的底部,找到Terminal的标签,点击,进入py37_vnpy环境的终端,并且当前路径位于vnpy项目的文件夹。
执行以下的安装语句(requirements.txt是vnpy项目文件夹下面的一个文件),这个安装时间比较长,需要较好的网络。

pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

9、创建run.py文件,复制以下代码,来源 README.md

因为mac上不支持ctp接口,所以要注释掉ctp接口,否则运行会报错。

from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp

# from vnpy.gateway.ctp import CtpGateway

from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_backtester import CtaBacktesterApp

def main():
"""Start VN Trader"""
qapp = create_qapp()

event_engine = EventEngine()
main_engine = MainEngine(event_engine)

# main_engine.add_gateway(CtpGateway)
main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp)

main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()

qapp.exec()


if name == "main":
main()

10、运行 python run.py,注意环境名称是 py37_vnpy



VeighNa发布v3.9.2 - 迅投研多市场实时行情接口!

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2024-07-10
 
本周二发布了VeighNa的3.9.2版本,本次更新的主要内容是增加了迅投研多市场实时行情接口,提供股票、ETF、可转债、指数、期货、期权等市场的实时行情数据(L1 Tick)订阅推送。

对于已经安装了VeighNa Studio的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载VeighNa Studio-3.9.2,体验一键安装的量化交易Python发行版,下载链接:

https://download.vnpy.com/veighna_studio-3.9.2.exe

 

迅投研实时行情接口

 

在去年末更新的3.9.0版本中,VeighNa就引入了迅投研数据服务XtDatafeed,支持历史K线数据和Tick数据的下载获取。

而本次3.9.2版本中则是新增了迅投研实时数据接口XtGateway,支持订阅多市场行情数据的实时推送,满足需要同时监控现货、期货、期权来实时计算交易信号的复杂量化策略需求。XtGateway提供的实时行情数据包括:

  • 交易所

    • 股票类:上交所(SSE)、深交所(SZSE)、北交所(BSE)
    • 期货类:中金所(CFFEX)、上期所(SHFE)、大商所(DCE)、郑商所(CZCE)、能交所(INE)、广期所(GFEX)
  • 品种行情

    • 股票市场:股票、可转债、ETF、指数
    • 期货市场:期货
    • 期权市场:指数期权、期货期权、ETF期权

实时行情数据订阅功能对于之前已经购买了迅投研专业版(4978元/年)的用户都可以直接使用,无需再另外付费。还没有购买想要先测试一下的同学,可以通过以下链接申请14天试用账号:

https://xuntou.net/#/signup?utm_source=vnpy

XtGateway的使用方法和其他交易接口类似,在VeighNa Station的【交易】页面配置加载【迅投研】接口后启动VeighNa Trader,点击菜单栏【系统 -> 连接XT】即可看到如下图所示的对话框:

description

在token栏中填入迅投研数据服务的token令牌,同时根据需求选择要订阅数据的市场(至少要有一个选【是】),点击【连接】按钮即可连接登录。XtGateway依赖于后台运行的迅投研数据中心服务进程xtdc,由于该进程的启动耗时较长(可能在10-50秒,视乎电脑和网络性能),所以连接登录会需要等待一段时间。

连接登录完成后,即可按需订阅各个市场的实时行情数据:

description

如果不知道要订阅的合约代码,可以通过菜单栏【帮助->合约查询】来搜索查找:

description

需要注意的是,VeighNa平台上的各交易接口本身已经自带了对应市场的行情数据支持,且这些行情大多直接来源于经纪商的柜台行情服务器,速度上可能比迅投研经过转发的行情更具优势,因此建议在XT接口配置中只选择需要额外订阅的行情市场,例如:

  • 通过CTP接口交易期货的情况下,XT接口仅订阅股票市场和期权市场数据(不订阅期货)
  • 通过SOPT接口交易ETF期权的情况下,XT接口仅订阅股票市场和期货市场数据(不订阅期权)

 

CHANGELOG

 

新增

  1. vnpy_xt增加实时行情接口XtGateway
  2. vnpy_xt增加基于文件锁实现的xtdc单例运行
  3. vnpy_ib增加行情退订功能
  4. vnpy_ib的合约乘数支持浮点数
  5. vnpy_ib增加期权链合约数据更新结束回报
  6. vnpy_ctabacktester、vnpy_ctastrategy、vnpy_portfoliostrategy增加i18n国际化支持

调整

  1. vnpy_algotrading增加委托/成交推送时,对于算法状态的过滤
  2. vnpy_tushare模块的to_ts_asset函数增加ETF基金支持
  3. vnpy_xt更新适配xtquant的240613.1.1版本
  4. vnpy_xt开启使用期货真实夜盘时间,增加期货历史数据集合竞价K线合成支持
  5. vnpy_tts更新API版本到6.7.2
  6. vnpy_rohon更新API版本:行情1.4.1.3,交易30.4.1.24
  7. vnpy_tap完善API日志输出功能
  8. vnpy_rest发送REST请求时,增加对于json参数的支持
  9. vnpy_excelrtd优化PyXLL启动时加载模块的方式
  10. vnpy_spreadtrading使用线程池实现策略初始化的异步执行
  11. vnpy_ib移除期权合约的自动查询功能
  12. vnpy_ib缓存查询返回的IB合约数据,简化行情切片查询函数
  13. vnpy_ib查询历史数据时,使用UTC时间戳传参,并将等待时间延长为600秒
  14. vnpy_ctastrategy的绩效统计值增加基于指数移动平均计算的EWM Sharpe比率
  15. vnpy_ctastrategy回测引擎的show_chart函数直接返回图表对象

修复

  1. 修复vnpy_rhon行情登录失败时的判断逻辑问题
  2. 修复vnpy_datarecorder记录价差数据时缺失的localtime字段
  3. 修复vnpy_spreadtraidng从datafeed加载数据时,时间戳传参缺失时区信息的问题
  4. 修复vnpy_paperaccount委托数量为0撮合之后导致的ZeroDivisionError问题
  5. 修复vnpy_portoliostrategy停止策略时,没有自动撤销策略委托的功能

 



彻底解决K线生成器的问题——一个日内对齐等交易时长的K线生成器

先厘清大思路,后面逐步完成。

1.搞量化交易,一个好用的K线生成器是最基本的要求!

vnpy系统自带了一个BarGenerator,它可以帮助我们生成1分钟,n分钟,n小时,日周期的K线,也叫bar。可是除了1分钟比较完美之外,有很多问题。它在读取历史数据、回测的时候多K线的处理和实盘却有不一样的效果。具体的问题我已经在解决vnpy 2.9.0版本的BarGenerator产生30分钟Bar的错误!这个帖子中做过尝试,但也不是很成功。因为系统的BarGenerator靠时间窗口与1分钟bar的时间分钟关系来决定是否该新建和结束一个bar,这个有问题。于是我改用对1分钟bar进行计数来决定是否该新建和结束一个bar,这也是有不可靠的问题,遇到行情比较清淡的时候,可能有的分钟就没有1分钟bar产生,这是完全有可能的!
K线几乎是绝大部分交易策略分析的基础,除非你从事的是极高频交易,否则你就得用它。可是如果你连生成一个稳健可靠的K线都不能够保证,那么运行在K线基础上的指标及由此产生的交易信号就无从谈起,K线错了,它们就是错误的,以此为依据做出点交易指令有可能是南辕北辙,所以必须解决它!

2.日内对齐等交易时长K线需要什么参数

2.1 日内对齐等交易时长K线是最常用的

K线不是交易所发布的,它有很多种产生机制。其对齐方式、表现形式多种多样。关于K线的分类本人在以往的帖子中做出过比较详细的说明,有兴趣的读者可以去我以往的帖子中查看,这里就不再赘述。
市面上的绝大部分软件如通达信、大智慧、文华财经等软件,除非用户特别设定,他们最常提供给用户的K线多是日内对齐等交易时长K线。常用是一定是有道理的,因为它们已经为广大用户和投资者所接受。

2.2 日内对齐等交易时长K线需要什么参数

1)什么是日内对齐等交易时长K线?
它具有这些特点:以每日开盘为起点,每根K线都包含相同交易时间的数据,跳过中间的休市时间,直至当前交易日的收盘,收盘不足n分钟也就是K线。实盘中,每日的第一个n分钟K线含集合竞价的那个tick数据。
2)为什么这种K线能够被普遍接受?
为它尽可能地保证一个交易日内的所有K线所表达的意义内容上是一致的,它们包含相等的交易时长。这非常重要,因为你把一个5分钟时长的K线与一个30分钟时长的K线放在一起谈论是没有意义的。但是如果为了保证K线在交易时长上的一致性,让n分钟K线跨日的话也是不太合理,因为这跨日,跨周末时间太长,这中间会发生什么意外事情,可能会产生出非常巨大的幅度大K线,掩盖了隔日跳空的行情变化,这对解读行情是不利的。当然n日的K线日跨日的,但是它是n个交易日的K线融合而成的,不过其融合的每个日K线也是对齐各自的日开盘的。
另外日内对齐等交易时长K线还有一个好处,那就是你以任何之前的时间为起点,在读取历史数据重新生成该日的n分钟K线的时候,得到的改日的K线是一致的。举个例子,如果我们的CTA策略在init()中常常是这么一句:

self.load_bar(20)  # 先加载20日历史1分钟数据

这么简单的一句,包含着很多你意识不到的变化——你今天运行策略和明天运行你的策略,其中的历史数据的范围发生了变化,也就是说加载数据的起点变了。如果我们合成的K线的对齐方式不采用日内对齐的话,而采用对齐加载时间起点的话,你今天、明天加载出来之前的某日的K线就可能完全是不同的。而采用日内对齐等交易时长的K线则不存在这个问题。

3)需要知道合约的交易时间段
既然要对齐每日开盘,还有跳过各个休市时间,还要知道收市时间,那么我们就知道生成这种K线必须知道其所表达合约或对象的交易时间段,交易时间段中包含了这些信息,不知道这些信息,BarGenerator就不知道如何生成这种bar。这是必须的!

3. 如何获取合约的交易时间段

3.1 vnpy系统和CTP接口找不到交易时间段信息

目前vnpy系统中的是没有合约的交易时间段的。到哪里获取合约的交易时间段的呢?
1) 它与合约相关,应该到保存合约的数据类ContractData中去找,没有找到。
2) 是否可以提供接口,从交易所获得,这个也是比较基础的数据。于是到CTP接口中(我使用的是CTP接口,您也许不一样) ,在最新版本的CTP接口文档中也没有找到任何与交易时间段相关的信息,绝望!

3.2 有两种方法可以得到交易时间段信息

  1. 米筐数据接口中有,其中有个得到市场中所有合约的函数all_instruments(),它的返回值中就包含交易时间段信息trading_hours,还有另外一些如get_trading_hours()也可以直接获得这些交易时间段信息,好!
  2. 实在没有的话,咱们手工创建一个文件,按照一定格式,把我们需要创建K线的合约准备好交易时间段信息,这也是可行的。

3.3 直接从米筐数据接口获取交易时间段信息的问题

  1. 你必须购买过米筐数,否则无从获得
  2. 直接使用从米筐数据接口获取交易时间段信息,会有效率问题。本人试过,运行get_trading_hours()常会用卡顿,其目前是在65000多个合约中为你寻找一个合约的交易时间段信息,在K线合成这种使用非常频繁的地方,效率是必须的!况且米筐对每个用户的流量也是有限制的,如果超过了也是会限流的!
  3. 对于有些米筐没有的品种难道我们就不能使用K线了吗?

解决方法:

  1. 基于以上这些原因,采用解耦具体数据提供商的方法会更好!把这些信息保存到一个文件或者数据库中,只要您能够办法获得这些信息,按照规定的格式存储,哪怕是手工输入也是可以的。
  2. 新的K线生成器只需要对规定格式的交易时间段信息进行处理,按照一定的规则就可以进行K线生成了!

3.4 从米筐获取交易时间段信息

3.4.1 扩展DataFeed

打开vnpy.trader.datafeed.py文件为Datafeed的基类BaseDatafeed扩展下面的接口

class BaseDatafeed(ABC):
    """
    Abstract datafeed class for connecting to different datafeed.
    """

    def init(self) -> bool:
        """
        Initialize datafeed service connection.
        """
        pass

    def update_all_trading_hours(self) -> bool:     # hxxjava add 
        """ 更新所有合约的交易时间段到trading_hours.json文件中 """
        pass

    def load_all_trading_hours(self) -> dict:       # hxxjava add 
        """ 从trading_hours.json文件中读取所有合约的交易时间段 """
        pass

    def query_bar_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
        """
        Query history bar data.
        """
        pass

    def query_tick_history(self, req: HistoryRequest) -> Optional[List[TickData]]:
        """
        Query history tick data.
        """
        pass

其中的trading_hours.json文件我会在后面的文章中做详细的介绍。有了它我们才能展开其他的设计。

3.4.2 扩展RqdataDataFeed

在vnpy_rqdata\rqdata_datafeed.py中增加下面的代码

  • 引用部分增加:
from datetime import timedelta,date # hxxjava add
  • 在class RqdataDatafeed(BaseDatafeed)中增加下面的代码 :

    def update_all_trading_hours(self) -> bool:     # hxxjava add 
        """ 更新所有合约的交易时间段到trading_hours.json文件中 """

        if not self.inited:
            self.init()

        if not self.inited:
            return False

        ths_dict = load_json(self.trading_hours_file)

        # instruments = all_instruments(type=['Future','Stock','Index','Spot'])

        trade_hours = {}

        for stype in ['Future','Stock','Index','Fund','Spot']:
            instruments = all_instruments(type=[stype])
            # print(f"{stype} instruments count={len(instruments)}")

            for idx,inst in instruments.iterrows():
                # 获取每个最新发布的合约的建议时间段
                if ('trading_hours' not in inst) or not(isinstance(inst.trading_hours,str)):
                    # 跳过没有交易时间段或者交易时间段无效的合约
                    continue

                inst_name = inst.trading_code if stype == 'Future' else inst.order_book_id 
                inst_name = inst_name.upper() 
                if inst_name.find('.') < 0:
                    inst_name += '.' + inst.exchange

                if inst_name not in ths_dict:
                    str_trading_hours = inst.trading_hours

                    # 把'01-'或'31-'者替换成'00-'或'30-'
                    suffix_pair = [('1-','0-'),('6-','5-')]
                    for s1,s2 in suffix_pair:
                        str_trading_hours = str_trading_hours.replace(s1,s2)

                    # 如果原来没有,提取出来
                    trade_hours[inst_name] = {"name": inst.symbol,"trading_hours": str_trading_hours}

        # print(f"trade_hours old count {len(ths_dict)},append count={len(trade_hours)}")
        if trade_hours:
            ths_dict.update(trade_hours)
            save_json(self.trading_hours_file,ths_dict)

        return True

    def load_all_trading_hours(self) -> dict:       # hxxjava add 
        """ 从trading_hours.json文件中读取所有合约的交易时间段 """
        json_file = get_file_path(self.trading_hours_file)
        if not json_file.exists():
            return {}
        else:
            return load_json(self.trading_hours_file)

3.4.3 为main_engine安装一个可以获取交易时间段信息的接口

在vnpy\trader\engine.py中:

  • 该文件的引用部分:
from .datafeed import get_datafeed                  # hxxjava add
  • 为MainEngine类增加下面函数

    def get_trading_hours(self,vt_symbol:str) -> str:   # hxxjava add
        """ get vt_symbol's trading hours """
        ths = self.all_trading_hours.get(vt_symbol.upper(),"")       
        return ths["trading_hours"] if ths else ""

为什么在MainEngine类增加可以获取交易时间段信息的接口?

因为无论你运行vnpy中的哪个app,你都会启动main_engine,无需绕弯子就可以得到这些信息,而我们的用户策略中都包含各自策略的引擎,这样就方便获取交易时间段信息。

如CTA策略中包含cta_engine,而cta_engine它的成员就包含main_engine。那么在策略中执行类似下面的语句就可以获取您交易品种的交易时间段信息:

       trading_hours = self.cta_engine.main_engine.get_trading_hours(selt.vt_symbol)

如PortFolioStrategy策略中包含strategy_engine,而strategy_engine它的成员就包含main_engine。那么在策略中执行类似下面的语句就可以获取多个交易品种的交易时间段信息:

       trading_hours_list = [self.cta_engine.main_engine.get_trading_hours(vt_symbol) for vt_symbol in self.vt_symbols]

是不是很方便呢?

3.4.4 在系统的投研中执行更新所有品种(含期货、股票、指数和基金)的交易时间段

vnpy 3.0的启动界面中已经集成了一个叫“投研”的功能,其实它是jupyter lab,启动之后输入下面的代码:

# 测试update_all_trading_hours()函数和load_all_trading_hours()
from vnpy.trader.datafeed import get_datafeed

df = get_datafeed()
df.init()

df.update_all_trading_hours()   # 更新所有合约的交易时间段到本地文件中

ths = df.load_all_trading_hours() # 从本地文件中读取所有合约的交易时间段

当然您可以在vnpy的trader中主界面的菜单中增加一项,方便您在需要的时候执行下面语句。不过这个更新交易时间段的功能并不需要频繁执行,手动也就够了,记得就好。

3.4.5 你还可以手工打开trading_hours.json,直接输入

经过上面步骤3.4.4,您就在本地得到了一个trading_hours.json文件,该文件在您的用户目录下的.vntrader\中,其内容如下:

{
    "A0303.DCE": {
        "name": "豆一0303",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0305.DCE": {
        "name": "豆一0305",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0307.DCE": {
        "name": "豆一0307",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0309.DCE": {
        "name": "豆一0309",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0311.DCE": {
        "name": "豆一0311",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
    "A0401.DCE": {
        "name": "豆一0401",
        "trading_hours": "21:00-23:00,09:00-10:15,10:30-11:30,13:30-15:00"
    },
   ... ...
}

观察其格式,在你没有米筐数据接口或者这里没有的合约,您也可以手动输入合约交易时间段信息。

按照程序中算法,这个文件文件中一共包含约16500多个合约的交易时间段信息。可以覆盖国内金融市场几乎全部都产品,但是不包括金融二次衍生品期权。
为什么没有期权交易时间段信息,因为不需要。期权合约有其对应的标的物,从其名称和编号就可以解析出来。期权合约的交易时间段其和标的物的交易时间段是完全相同的,因此不需要保存到该文件中。



基于迅投研的量化数据自运维下载更新

发布于VeighNa社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2024-01-20
 

K线历史时序数据,不管对于期货CTA、股票多因子还是期权波动率等量化策略的开发来说,都是不可或缺的原材料。在大多数Quant的日常工作流程中,固定一块内容就是对每日增量数据的下载更新。

这块数据更新工作如果通过手动的方式执行各项步骤无疑会相当繁琐(而且容易误操作),所以本文中将会分享一套基于迅投研的量化数据自运维下载更新方案。

关于迅投研数据服务的详细介绍可以参考之前的这篇VeighNa发布v3.9.0 - 迅投研数据服务!》。

 

前期准备

 

试用申请

迅投研数据服务为VeighNa社区用户提供了14天免费试用,点击以下专属链接即可申请:

https://xuntou.net/#/signup?utm_source=vnpy

目前只需要提供手机号验证即可申请,试用账号中包括了股票、指数、期货、期权、基金等数据权限。

试用申请完成后,回到首页(https://xuntou.net/)登录账号,然后点击右上角的用户中心:

description

在弹出页面中底部即可找到【接口TOKEN】,点击【复制】按钮即可快速复制token内容,该token将用于后续VeighNa Trader中的数据服务配置:

description

同时在页面左下方【行情服务】的到期日,可以看到试用权限的到期时间。

 

模块安装

VeighNa Studio的3.9.0版中已经包含了迅投研相关的功能模块。使用其他Python环境的用户可以运行下述命令安装:

pip install xtquant vnpy_xt --index=https://pypi.vnpy.com

 

账号配置

启动VeighNa Trader,点击顶部菜单栏的【配置】按钮,在弹出的对话框中修改下述字段:

  • datafeed.name:xt
  • datafeed.username:token
  • datafeed.password:之前在用户中心复制的token内容

修改完成后点击对话框底部的【确定】按钮,重启VeighNa Trader即可生效。老用户也同样可以通过修改.vntrader目录下的vt_setting.json文件来进行配置。

 

数据更新

 

下载合约信息

迅投研数据服务的客户端xtquant在工作机制上和其他数据服务有一个较大的区别,即需要先从服务器下载所需的数据到本地缓存目录的数据文件,然后才能从数据文件中查询获取所需的数据。

合约更新任务的代码本身较为简单:

def update_history_data() -> None:
    """更新历史合约信息"""
    # 在子进程中加载xtquant
    from xtquant.xtdata import download_history_data

    # 初始化数据服务
    datafeed = get_datafeed()
    datafeed.init()

    # 下载历史合约信息
    download_history_data("", "historycontract")

    print("xtquant历史合约信息下载完成")

但对于合约信息数据,xtquant模块会在当前进程中首次初始化时读取,后续即使执行了下载更新操作,在当前进程的运行过程中也不会重载,必须等到下一次进程重启后才会更新。

因此对于历史合约信息更新函数update_history_data,需要使用multiprocessing模块在子进程中运行:

# 使用子进程更新历史合约信息
    process: Process = Process(target=update_history_data)
    process.start()
    process.join()      # 等待子进程执行完成

这样后续在主进程中获取合约信息时(等同于重启了新的进程),就是已经更新后的数据。

 

获取合约信息

为了实现自运维交易所数据更新,需要能够获取每日交易所新上市的合约代码以及之前已有存续的合约代码信息,这里需要用到xtquant.xtdata模块中所提供的get_stock_list_in_sector函数:

# 查询交易所历史合约代码
    active_symbols: list[str] = get_stock_list_in_sector(sector_name)
    expire_symbols: list[str] = get_stock_list_in_sector("过期" + sector_name)

    xt_symbols: list[str] = active_symbols + expire_symbols

函数中所传入的sector_name为迅投研定义的交易市场名称信息,一些比较常用的名称包括:

  • 中金所、上期所、大商所、郑商所、能源中心
  • 上证A股、深圳A股
  • 上证期权、深圳期权

同时对于每个市场,都包含了当前处于挂牌状态的合约,以及已经到期退市的合约(市场名称前需要加上【过期】前缀),需要分开查询后拼接成为该市场目前的整体合约代码列表。

 

数据增量过滤

对于本地已有K线数据的合约,每日仅需要下载更新增量的部分,而没有必要从头开始下载(浪费运行时间和数据流量)。这里借助于VeighNa数据库组件所提供的BarOverview汇总来查询当前本地已有的数据范围:

# 获取本地已有数据汇总
    data: list[BarOverview] = database.get_bar_overview()

    overviews: dict[str, BarOverview] = {}
    for o in data:
        vt_symbol: str = f"{o.symbol}.{o.exchange.value}"
        overviews[vt_symbol] = o

在遍历整个市场合约代码列表的过程中,可以先查询该合约的到期时间,然后只有在满足下列条件时才执行更新:

  • 本地从未下载过该合约的数据(合约刚上市或者首次运行下载)
  • 合约到期时间大于当前时间(尚未到期所以还有数据更新)

条件过滤代码如下:

# 查询合约信息
        data: dict = get_instrument_detail(xt_symbol, True)

        # 获取合约到期时间
        expiry: datetime = None
        if data["ExpireDate"]:
            expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")

        # 拆分迅投研代码
        symbol, xt_exchange = xt_symbol.split(".")

        # 生成本地代码
        exchange: Exchange = EXCHANGE_XT2VT[xt_exchange]
        vt_symbol: str = f"{symbol}.{exchange.value}"

        # 查询数据汇总
        overview: BarOverview = overviews.get(vt_symbol, None)

        # 如果已经到期,则跳过
        if overview and expiry and expiry < now:
            continue

 

下载数据入库

对于通过了前述过滤筛选的合约,则执行K线下载并将数据写入到数据库:

# 实现增量查询
        start: datetime = START_TIME
        if overview:
            start = overview.end

        # 执行数据查询和更新入库
        req: HistoryRequest = HistoryRequest(
            symbol=symbol,
            exchange=exchange,
            start=start,
            end=now,
            interval=interval
        )

        bars: list[BarData] = datafeed.query_bar_history(req)

        if bars:
            database.save_bar_data(bars)

            start_dt: datetime = bars[0].datetime
            end_dt: datetime = bars[-1].datetime
            msg: str = f"{vt_symbol}数据更新成功,{start_dt} - {end_dt}"
            print(msg)

以上针对每个市场的K线数据更新功能,整体封装在了update_bar_data函数中,用户可以根据需求传入所需的市场名称参数执行更新任务:

    # 这里只更新两个期货交易所的数据    update_bar_data("中金所")    update_bar_data("上期所")

 

完整代码

 

老规矩还是附上完整的程序代码:

from multiprocessing import Process
from datetime import datetime

from vnpy.trader.database import BarOverview
from vnpy.trader.datafeed import get_datafeed
from vnpy.trader.database import get_database
from vnpy.trader.object import BarData, HistoryRequest
from vnpy.trader.constant import Exchange, Interval


# 交易所映射关系
EXCHANGE_XT2VT = {
    "SH": Exchange.SSE,
    "SZ": Exchange.SZSE,
    "BJ": Exchange.BSE,
    "SF": Exchange.SHFE,
    "IF": Exchange.CFFEX,
    "INE": Exchange.INE,
    "DF": Exchange.DCE,
    "ZF": Exchange.CZCE,
    "GF": Exchange.GFEX
}

# 开始查询时间
START_TIME = datetime(2018, 1, 1)


def update_history_data() -> None:
    """更新历史合约信息"""
    # 在子进程中加载xtquant
    from xtquant.xtdata import download_history_data

    # 初始化数据服务
    datafeed = get_datafeed()
    datafeed.init()

    # 下载历史合约信息
    download_history_data("", "historycontract")

    print("xtquant历史合约信息下载完成")


def update_bar_data(
    sector_name: str,
    interval: Interval = Interval.MINUTE
) -> None:
    """更新K线数据"""
    # 在子进程中加载xtquant
    from xtquant.xtdata import (
        get_stock_list_in_sector,
        get_instrument_detail
    )

    # 初始化数据服务
    datafeed = get_datafeed()
    datafeed.init()

    # 连接数据库
    database = get_database()

    # 获取当前时间戳
    now: datetime = datetime.now()

    # 获取本地已有数据汇总
    data: list[BarOverview] = database.get_bar_overview()

    overviews: dict[str, BarOverview] = {}
    for o in data:
        vt_symbol: str = f"{o.symbol}.{o.exchange.value}"
        overviews[vt_symbol] = o

    # 查询交易所历史合约代码
    xt_symbols: list[str] = get_stock_list_in_sector(sector_name)

    # 遍历列表查询合约信息
    for xt_symbol in xt_symbols:
        # 查询合约信息
        data: dict = get_instrument_detail(xt_symbol, True)

        # 获取合约到期时间
        expiry: datetime = None
        if data["ExpireDate"]:
            expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d")

        # 拆分迅投研代码
        symbol, xt_exchange = xt_symbol.split(".")

        # 生成本地代码
        exchange: Exchange = EXCHANGE_XT2VT[xt_exchange]
        vt_symbol: str = f"{symbol}.{exchange.value}"

        # 查询数据汇总
        overview: BarOverview = overviews.get(vt_symbol, None)

        # 如果已经到期,则跳过
        if overview and expiry and expiry < now:
            continue

        # 实现增量查询
        start: datetime = START_TIME
        if overview:
            start = overview.end

        # 执行数据查询和更新入库
        req: HistoryRequest = HistoryRequest(
            symbol=symbol,
            exchange=exchange,
            start=start,
            end=now,
            interval=interval
        )

        bars: list[BarData] = datafeed.query_bar_history(req)

        if bars:
            database.save_bar_data(bars)

            start_dt: datetime = bars[0].datetime
            end_dt: datetime = bars[-1].datetime
            msg: str = f"{vt_symbol}数据更新成功,{start_dt} - {end_dt}"
            print(msg)


if __name__ == "__main__":
    # 使用子进程更新历史合约信息
    process: Process = Process(target=update_history_data)
    process.start()
    process.join()      # 等待子进程执行完成

    # 更新历史数据
    update_bar_data("上期所")
    update_bar_data("过期上期所")

 



RSRS择时指标的150倍计算加速

该主题文章来源于公众号:Logan投资

上次去深圳参加咱们Veighna开源举办的分享会,听了陈总的分享,会中的一些关于因子统计再构建的感悟文章《最近思考的量化指标再挖掘》我分享在了公众号中,欢迎大家关注和交流。

本帖子主要是分享关于加速RSRS计算的方法,改进后的代码比原版快了150倍左右,而且计算结果完全一致。

RSRS指标是基于光大证券研报《基于阻力支撑相对强度(RSRS)的市场择时》,给出了RSRS斜率指标择时,以及在斜率基础上的标准化指标择时策略。

阻力支撑相对强度(Resistance Support Relative Strength, RSRS)是另一种阻力位与支撑位的运用方式,它不再把阻力位与支撑位当做一个定值,而是看做一个变量,反应了交易者对目前市场状态顶底的一种预期判断。

其他因子的逻辑不多介绍了,大家可以去看这里
知乎的文章:https://zhuanlan.zhihu.com/p/33501881
原始RSRS计算代码如下:

import rqdatac as rq
from tqdm import tqdm
rq.init('账号', '密码')
data = rq.get_price('510330.XSHG', '2023-01-01','2024-01-01', frequency='30m')
​
def RSRS(low,high, regress_window: int,
         zscore_window: int, array: bool = False)
:
    """
    :param
    """
​
    high_array = high
    low_array = low
​
    highs = copy.deepcopy(high_array)
    lows = copy.deepcopy(low_array)
    rsrs_beta = []
    rsrs_rightdev = []
    zscore_rightdev = []
​
    N = regress_window
    M = zscore_window
​
    for i in range(len(highs)):
        try:
            data_high = highs[i - N + 1:i + 1]
            data_low = lows[i - N + 1:i + 1]
            X = sm.add_constant(data_low)
            model = sm.OLS(data_high, X)
​
            results = model.fit()
            beta = results.params[1]
            r2 = results.rsquared
​
            rsrs_beta.append(beta)
            rsrs_rightdev.append(r2)
​
            if len(rsrs_beta) < M:
                zscore_rightdev.append(0)
            else:
                section = rsrs_beta[-M:]
                mu = np.mean(section)
                sigma = np.std(section)
                zscore = (section[-1] - mu) / sigma
                # 计算右偏RSRS标准分
                zscore_rightdev.append(zscore * beta * r2)
​
        except:
            rsrs_beta.append(0)
            rsrs_rightdev.append(0)
            zscore_rightdev.append(0)
​
    if array:
        return zscore_rightdev
    else:
        return zscore_rightdev[-1]
test = []
for i in tqdm(range(10)):
    start = time.time()
    ddaa = RSRS(data.low, data.high, 18, 600,array=True)
    end = time.time()
    t = end - start
    test.append(t)
print(f'原始rsrs代码运行平均用时:{np.mean(test)}')

根据网上的代码,用原始的代码计算RSRS并记录运行了10次的平均运算时间。

description

可以看到用原始代码运行的话平均需要7.89秒(参数为N=18,M=600)

这是仅对于一个股票计算的,若是根据RSRS在3000只票里选股就要计算很多次,进而花费时间大概是6.575个小时,这对于日频策略来说也够呛了,对于高频一点的策略或者平时的因子分析研究中更不用说了,还是很耗费时间的。

所以接下来上神器,numpy!
思想是用numpy矩阵运算实现滚动窗口的批量线性回归和指标计算。代码如下:

import numpy as np
​
from numpy.lib.stride_tricks import as_strided as strided
​
def rolling_window(a:np.array, window: int):
    '生成滚动窗口,以三维数组的形式展示'
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return strided(a, shape=shape, strides=strides)
​
def numpy_rolling_regress(x1, y1, window: int=18, array: bool=False):
    '在滚动窗口内进行,每个矩阵对应进行回归'
    x_series = np.array(x1)
    y_series = np.array(y1)
    # 创建一个一维数组
    dd = x_series
    x = rolling_window(dd, window)
    yT = rolling_window(y_series, window)
    y = np.array([i.reshape(window, 1) for i in yT])
    ones_vector = np.ones((1, x.shape[1]))
    XT = np.stack([np.vstack([ones_vector, row]) for row in x])  #加入常数项
    X = np.array([matrix.T for matrix in XT])  #以行数组表示
    reg_result = np.linalg.pinv(XT @ X) @ XT @ y   #线性回归公示
​
    if array:
        return reg_result
    else:
        frame = pd.DataFrame()
        result_const = np.zeros(x_series.shape[0])
        const = reg_result.reshape(-1, 2)[:,0]
        result_const[-const.shape[0]:] = const
        frame['const'] = result_const
        frame.index = x1.index
        for i in range(1, reg_result.shape[1]):
​
            result = np.zeros(x_series.shape[0])
            beta = reg_result.reshape(-1, 2)[:,i]
            result[-beta.shape[0]:] = beta
            frame[f'factor{i}'] = result
        return frame
​
def numpy_rsrs(low:pd.Series, high:pd.Series, N:int=18, M:int=600):
    beta_series = numpy_rolling_regress(low, high, window=N, array=True)
    beta = beta_series.reshape(-1, 2)[:,1]
​
    beta_rollwindow = rolling_window(beta, M)
    beta_mean = np.mean(beta_rollwindow, axis=1)
    beta_std = np.std(beta_rollwindow, axis=1)
    zscore = (beta[M-1:] - beta_mean) / beta_std
    return zscore
​
test = []
for i in tqdm(range(50)):
    start = time.time()
    numpy_rsrs(data.low, data.high)
    end = time.time()
    test.append(end - start)
print(f'numpy加速后的rsrs代码运行平均用时:{np.mean(test)}')

这里的RSRS没有进行右偏处理,右偏RSRS在下文中。
运行了50次的平均用时
description
可以看到平均运行了约0.05秒,足足快了157.8倍!!!!!
这个速度可能还不足以满足几千只股票的RSRS指标的计算,但是对于研究已经是极大的提速了。若限制1秒以内的延迟上限,再用上多进程multiprocessing,只能满足20~100只标的的RSRS指标的计算,但这也满足宽基指数或者股指期货的分钟级和小时级的择时策略了。
进阶一点就是考虑用DASK框架写成并发过程,用这个函数并发计算几千只股票的因子。

具体速度得看各位的电脑配置了,我就是个小小的破surface pro,16G内存。
description

以上是RSRS计算的代码和过程。
而原版RSRS中还对其进行修正为RSRS右偏标准分,而我也复现了出来,搞了好一会儿的高等数学和线性代数。具体运行时间如下,为了减少rolling次数,所以我全部都写在了同一个函数里面。为了展示运行结果的正确性,我挑出numpy加速后的RSRS右偏标准分最后100条数据和原始代码计算的RSRS右偏标准分进行了对比

description
蓝色粗线为原始的,覆盖在上面的红色细线是numpy进行计算的。可以看到完全一致,计算结果无误。
最后附上回测图,RSRS右偏标准分择时指标在近年回撤得比较厉害。

description

RSRS右偏标准分的代码有需要的朋友可以在关注公众号后再后台私信我“RSRS”即可。
description



【Elite量化策略实验室】Efficiency国债期货CTA策略

发布于VeighNa社区公众号【vnpy-community】
 
原文作者: 李思佳 | 发布时间:2024-04-02
 
此前【Elite量化策略实验室】的期货类策略分享主要围绕商品期货展开,本篇文章要分享的Efficiency策略则基于捕捉国债期货趋势的思路构造。

原策略研报发表于2019年7月,距今已有快5年时间,但思路仍可借鉴。本文通过VeighNa Elite平台对其进行了代码上的实现与改进,让Efficiency指标以另一种方式构成了有效信号。

 

策略基本信息

 

description

 

策略核心原理

 

Efficiency,即效率,定义为回看t期总体涨跌幅与每期涨跌幅绝对值之和的比值。假设p0表示当期价格,pi表示过去i期的价格,t表示回看期,则Efficiency的计算公式如下:

description

上式从曲线形态上理解就是位移除以路程:从t时刻到当前时刻的价格变化量(价格的位移)除以该段时间窗口价格曲线轨迹的路程。Efficiency取值位于-1到1之间,绝对值越接近于1就说明趋势越明显。

由于国债期货的价格波动相对其他期货品种微弱,更多的策略逻辑只会减少交易机会,所以策略没有增加复杂的调仓和平仓方法。那么,基于Efficiency指标,可以构建一个简单的趋势策略:

  • Efficiency绝对值大于阈值,则做多平空
  • Efficiency绝对值小于阈值,则做空平多

策略在开平仓上只有两个参数,即用于计算Efficiency的回看窗口参数efficiency_window和指标阈值efficiency_limit。

原策略中,当Efficiency高于设定阈值时,策略会发出做多信号;而当Efficiency低于负阈值时,策略则发出做空信号。而在本策略的calculate_efficiency函数中实际计算的是原文章Efficiency的绝对值,因此,策略信号均发生了改变:

  • 只要Efficiency的绝对值高于阈值,无论趋势方向如何,均视为强劲趋势并发出做多信号;
  • 相反,当Efficiency的绝对值低于阈值时,则视为市场处于震荡状态,发出做空信号。

 

策略代码实现

 

Efficiency计算函数定义

def calculate_efficiency(
    data: np.ndarray,
    n: int
) -> float:
    """
    计算效率的绝对值

    n参数代表时间窗口

    返回值:t时刻的效率绝对值,取值在0到1间,效率比的值越接近1噪声越低,趋势越明显
    """
    # 计算价格曲线轨迹的位移
    x: float = abs(data[-1] - data[-n])

    # 计算价格净变化,即价格的路程
    diff_array: np.ndarray = np.diff(data)
    abs_diff: np.ndarray = np.abs(diff_array)
    s: float = np.sum(abs_diff[-n:])

    return x / s

策略参数定义

class EfficiencyStrategy(EliteCtaTemplate):
    """效率趋势策略"""

    author = "VeighNa Elite"

    # 基础参数(必填)
    bar_window: int = Parameter(5)              # K线窗口
    bar_interval: int = Parameter("1h")         # K线级别
    bar_buffer: int = Parameter(100)            # K线缓存

    # 策略参数(可选)
    efficiency_window: int = Parameter(85)      # 效率窗口
    efficiency_limit: int = Parameter(0.02)     # 效率比阈值

    risk_window: int = Parameter(10)            # 风险窗口
    risk_capital: int = Parameter(1_000_000)    # 风险敞口

    price_add: int = Parameter(0.05)            # 委托下单超价

    # 策略变量
    trading_size: int = Variable(1)             # 当前委托数量

信号指标计算

在on_init回调函数中初始化一个用于缓存efficiency指标的numpy数组:

def on_init(self) -> None:
        """初始化"""
        self.write_log("策略初始化")

        self.efficiency_arr = np.zeros(self.bar_buffer)

        self.load_bar(100)

在on_history回调函数中使用前文已经定义好的calculate_efficiency函数计算efficiency值,并将其缓存在efficiency_arr中:

def on_history(self, hm: HistoryManager) -> None:
        """K线推送"""
        # 计算效率比
        efficiency: float = calculate_efficiency(hm.close,
                                                 self.efficiency_window)

        # 缓存效率比值
        self.efficiency_arr[:-1] = self.efficiency_arr[1:]
        self.efficiency_arr[-1] = efficiency

        # 判断交易信号
        long_signal: bool = self.efficiency_arr[-1] >= self.efficiency_limit
        short_signal: bool = self.efficiency_arr[-1] <= self.efficiency_limit

动态仓位计算

# 计算交易数量
        self.trading_size = self.calculate_volume(self.risk_capital, self.risk_window, 1000, 1)

目标交易执行

# 获取当前目标
        last_target: int = self.get_target()

        # 初始化新一轮目标(默认不变)
        new_target: int = last_target

        # 执行开仓信号
        if not last_target:
            if long_signal:
                new_target = self.trading_size
            elif short_signal:
                new_target = -self.trading_size

        # 信号反转平仓
        if last_target > 0 and short_signal:
            new_target = 0
            if not last_target:
                new_target = -self.trading_size
        elif last_target < 0 and long_signal:
            new_target = 0
            if not last_target:
                new_target = self.trading_size

        # 设置新一轮目标
        self.set_target(new_target)

        # 执行目标交易
        self.execute_trading(self.price_add)

        # 推送UI更新
        self.put_event()

 

回测结果

 

回测数据上,本文选择使用米筐RQData提供的五年期期货TF99连续指数合约数据,在后续篇幅中还将使用T99十年期国债期货和TS99两年期国债期货数据进行回测,回测配置如下:

  • 本地代码:TF99.CFFEX
  • K线周期:1分钟
  • 开始日期:2021-1-1
  • 结束日期:2024-3-1
  • 手续费率:0
  • 交易滑点:0.005 + 3e-06
  • 合约乘数:1000000
  • 价格跳动:0.005
  • 回测资金:1000000

由于国债期货的交易手续费均为3元,是固定比手续费,所以此处将手续费除以合约乘数后在滑点中体现。

description

原文基于2014年10月至2019年6月的数据所得参数显然对于现在不完全适用,不过其优化参数集合仍可作为借鉴。设定”efficiency_window”在10到140的区间以10的步长、“efficiency_limit”在0.01到0.1的区间以0.01为步长进行调整,并使用2021年1月到2024年3月的数据作为总数据集,选取2021年1月到2023年1月作为样本内进行参数优化,得到以下结果:

  • 策略参数

    • bar_window: 5
    • bar_interval: "1h"
    • bar_buffer: 100
    • efficiency_window: 40
    • efficiency_limit: 0.08

description

description

 

其他国债期货效果

 

使用与前文五年期国债期货相同的回测配置和优化方法,得到以下结果:

TS99.CFFEX(两年期国债期货)

  • 策略参数

    • bar_window: 5
    • bar_interval: "1h"
    • bar_buffer: 100
    • fast_window: 40
    • slow_window: 0.01

description

description

T99.CFFEX(十年期国债期货)

  • 策略参数

    • bar_window: 5
    • bar_interval: "1h"
    • bar_buffer: 100
    • fast_window: 60
    • slow_window: 0.08

description

description

 

免责声明

文章中的信息或观点仅供参考,作者不对其准确性或完整性做出任何保证。读者应以其独立判断做出投资决策,作者不对因使用本报告的内容而引致的损失承担任何责任。

 


新消息

统计

主题
9469
帖子
35824
已注册用户
48621
最新用户
在线用户
136
在线来宾用户
9952
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】