vn.py量化社区
By Traders, For Traders.

置顶主题

通达信股票和期货数据导入数据库

先上使用说明:

  1. future_download状态,下载期货数据时设置为True,下载股票数据时设置为False
  2. file_path:通达信数据保存路径大家自行替换
  3. 通达信期货数据对齐datetime到文华财经后数据与文化财经完全一致,包括指数合约
  4. 单个文件较大时多进程只有两个进程在运行,原因不明
  5. 通达信股票只能下载最近100天数据,期货数据下载没有时间限制
  6. 期货数据存储路径:D:\tdx\vipdoc\ds,上交所股票数据路径:D:\tdx\vipdoc\sh,深交所股票数据路径:D:\tdx\vipdoc\sz
  7. 建议下载通达信期货通可以同时下载股票和期货数据,enjoy it!
  8. 附上vnpy\trader\engine.py里面的合约数据保存读取代码

    import platform
    import shelve
    if platform.uname().system == "Windows":
      LINK_SIGN = "\\"
    elif platform.uname().system == "Linux":
      LINK_SIGN = "/"
    #--------------------------------------------------------------------------------------------------
    class OmsEngine(BaseEngine):
      contract_file_name = 'contract_data'
      contract_file_path = get_folder_path(contract_file_name)
      def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
          """"""
          super(OmsEngine, self).__init__(main_engine, event_engine, "oms")
          self.ticks = {}
          self.orders = {}
          self.trades = {}
          self.positions = {}
          self.accounts = {}
          self.contracts = {}
    
          self.active_orders = {}
          self.add_function()
          self.load_contracts()
          self.register_event()
      #--------------------------------------------------------------------------------------------------
      def load_contracts(self):
          """读取合约数据"""
          try:
              with shelve.open(f"{self.contract_file_path}{LINK_SIGN}contract_data.vt",writeback=True) as file:
                  if 'data' in file:
                      contract_data = file['data']
                      for key, value in list(contract_data.items()):
                          self.contracts[key] = value
          except:
              return {}
          return self.contracts
      #--------------------------------------------------------------------------------------------------
      def save_contracts(self):
          """
          保存合约数据
          """
          try:
              with shelve.open(f"{self.contract_file_path}{LINK_SIGN}contract_data.vt",writeback=True)  as file:
                  file['data'] = self.contracts 
          except:
              return
      #--------------------------------------------------------------------------------------------------
      def add_function(self):
          """
          为MainEngine添加OmsEngine函数
          """
          self.main_engine.save_contracts = self.save_contracts                   #保存合约参数到硬盘
          self.main_engine.load_contracts = self.load_contracts                   #读取硬盘合约数据

    save_conntracts我是写在CLI交易的子进程和cta_engine里面
    #保存合约数据到硬盘
    main_engine.save_contracts()
    #--------------------------------------------------------------------------------------
    修改vnpy\app\cta_strategy里面的engine.py
    def stop_all_strategies(self):

      """
      停止所有策略
      """
      for strategy_name in self.strategies.keys():
          self.stop_strategy(strategy_name)
      #保存合约数据到本地
      self.main_engine.save_contracts()


为K线图表添砖加瓦——MACD

看完了陈老师的线上公开课,化了2天时间终于把MACD幅图曲线给添加上了。
MACD曲线和RSI,SMA之类的不同之处在于它的y方向显示范围是可变的,需要根据K线显示范围的变化及时做出调整,有执行效率问题。
本人采用了字典保存了已经计算的y方向显示范围计算结果,避免了重复计算,执行效率还是相当流畅的。当然会需要一定的存储开销,但
是不大,而且也是值得开销的。代码如下:

from datetime import datetime
from typing import List, Tuple, Dict

import numpy as np
import pyqtgraph as pg
import talib
import copy

from vnpy.trader.ui import create_qapp, QtCore, QtGui, QtWidgets
from vnpy.trader.database import database_manager
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData

from vnpy.chart import ChartWidget, VolumeItem, CandleItem
from vnpy.chart.item import ChartItem
from vnpy.chart.manager import BarManager
from vnpy.chart.base import NORMAL_FONT


class LineItem(CandleItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        last_bar = self._manager.get_bar(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.white_pen)

        # Draw Line
        end_point = QtCore.QPointF(ix, bar.close_price)

        if last_bar:
            start_point = QtCore.QPointF(ix - 1, last_bar.close_price)
        else:
            start_point = end_point

        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture


class SmaItem(CandleItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.blue_pen: QtGui.QPen = pg.mkPen(color=(100, 100, 255), width=2)

        self.sma_window = 10
        self.sma_data: Dict[int, float] = {}

    def get_sma_value(self, ix: int) -> float:
        """"""
        if ix < 0:
            return 0

        # When initialize, calculate all rsi value
        if not self.sma_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]
            sma_array = talib.SMA(np.array(close_data), self.sma_window)

            for n, value in enumerate(sma_array):
                self.sma_data[n] = value

        # Return if already calcualted
        if ix in self.sma_data:
            return self.sma_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix - self.sma_window, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        sma_array = talib.SMA(np.array(close_data), self.sma_window)
        sma_value = sma_array[-1]
        self.sma_data[ix] = sma_value

        return sma_value

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        sma_value = self.get_sma_value(ix)
        last_sma_value = self.get_sma_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Set painter color
        painter.setPen(self.blue_pen)

        # Draw Line
        start_point = QtCore.QPointF(ix-1, last_sma_value)
        end_point = QtCore.QPointF(ix, sma_value)
        painter.drawLine(start_point, end_point)

        # Finish
        painter.end()
        return picture

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.sma_data:
            sma_value = self.sma_data[ix]
            text = f"SMA {sma_value:.1f}"
        else:
            text = "SMA -"

        return text


class RsiItem(ChartItem):
    """"""

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=2)

        self.rsi_window = 14
        self.rsi_data: Dict[int, float] = {}

    def get_rsi_value(self, ix: int) -> float:
        """"""
        if ix < 0:
            return 50

        # When initialize, calculate all rsi value
        if not self.rsi_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]
            rsi_array = talib.RSI(np.array(close_data), self.rsi_window)

            for n, value in enumerate(rsi_array):
                self.rsi_data[n] = value

        # Return if already calcualted
        if ix in self.rsi_data:
            return self.rsi_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix - self.rsi_window, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        rsi_array = talib.RSI(np.array(close_data), self.rsi_window)
        rsi_value = rsi_array[-1]
        self.rsi_data[ix] = rsi_value

        return rsi_value

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        rsi_value = self.get_rsi_value(ix)
        last_rsi_value = self.get_rsi_value(ix - 1)

        # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # Draw RSI line
        painter.setPen(self.yellow_pen)

        if np.isnan(last_rsi_value) or np.isnan(rsi_value):
            # print(ix - 1, last_rsi_value,ix, rsi_value,)
            pass
        else:
            end_point = QtCore.QPointF(ix, rsi_value)
            start_point = QtCore.QPointF(ix - 1, last_rsi_value)
            painter.drawLine(start_point, end_point)

        # Draw oversold/overbought line
        painter.setPen(self.white_pen)

        painter.drawLine(
            QtCore.QPointF(ix, 70),
            QtCore.QPointF(ix - 1, 70),
        )

        painter.drawLine(
            QtCore.QPointF(ix, 30),
            QtCore.QPointF(ix - 1, 30),
        )

        # Finish
        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        # min_price, max_price = self._manager.get_price_range()
        rect = QtCore.QRectF(
            0,
            0,
            len(self._bar_picutures),
            100
        )
        return rect

    def get_y_range( self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        """  """
        return 0, 100

    def get_info_text(self, ix: int) -> str:
        """"""
        if ix in self.rsi_data:
            rsi_value = self.rsi_data[ix]
            text = f"RSI {rsi_value:.1f}"
            # print(text)
        else:
            text = "RSI -"

        return text


def to_int(value: float) -> int:
    """"""
    return int(round(value, 0))

""" 将y方向的显示范围扩大到1.1 """
def adjust_range(in_range:Tuple[float, float])->Tuple[float, float]:
    ret_range:Tuple[float, float]
    diff = abs(in_range[0] - in_range[1])
    ret_range = (in_range[0]-diff*0.05,in_range[1]+diff*0.05)
    return ret_range

class MacdItem(ChartItem):
    """"""
    _values_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}

    last_range:Tuple[int, int] = (-1,-1)    # 最新显示K线索引范围

    def __init__(self, manager: BarManager):
        """"""
        super().__init__(manager)

        self.white_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 255), width=1)
        self.yellow_pen: QtGui.QPen = pg.mkPen(color=(255, 255, 0), width=1)
        self.red_pen: QtGui.QPen = pg.mkPen(color=(255, 0, 0), width=1)
        self.green_pen: QtGui.QPen = pg.mkPen(color=(0, 255, 0), width=1)

        self.short_window = 12
        self.long_window = 26
        self.M = 9

        self.macd_data: Dict[int, Tuple[float,float,float]] = {}

    def get_macd_value(self, ix: int) -> Tuple[float,float,float]:
        """"""
        if ix < 0:
            return (0.0,0.0,0.0)

        # When initialize, calculate all macd value
        if not self.macd_data:
            bars = self._manager.get_all_bars()
            close_data = [bar.close_price for bar in bars]

            diffs,deas,macds = talib.MACD(np.array(close_data), 
                                    fastperiod=self.short_window, 
                                    slowperiod=self.long_window, 
                                    signalperiod=self.M)

            for n in range(0,len(diffs)):
                self.macd_data[n] = (diffs[n],deas[n],macds[n])

        # Return if already calcualted
        if ix in self.macd_data:
            return self.macd_data[ix]

        # Else calculate new value
        close_data = []
        for n in range(ix-self.long_window-self.M+1, ix + 1):
            bar = self._manager.get_bar(n)
            close_data.append(bar.close_price)

        diffs,deas,macds = talib.MACD(np.array(close_data), 
                                            fastperiod=self.short_window, 
                                            slowperiod=self.long_window, 
                                            signalperiod=self.M) 
        diff,dea,macd = diffs[-1],deas[-1],macds[-1]
        self.macd_data[ix] = (diff,dea,macd)

        return (diff,dea,macd)

    def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
        """"""
        macd_value = self.get_macd_value(ix)
        last_macd_value = self.get_macd_value(ix - 1)

        # # Create objects
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)

        # # Draw macd lines
        if np.isnan(macd_value[0]) or np.isnan(last_macd_value[0]):
            # print("略过macd lines0")
            pass
        else:
            end_point0 = QtCore.QPointF(ix, macd_value[0])
            start_point0 = QtCore.QPointF(ix - 1, last_macd_value[0])
            painter.setPen(self.white_pen)
            painter.drawLine(start_point0, end_point0)

        if np.isnan(macd_value[1]) or np.isnan(last_macd_value[1]):
            # print("略过macd lines1")
            pass
        else:
            end_point1 = QtCore.QPointF(ix, macd_value[1])
            start_point1 = QtCore.QPointF(ix - 1, last_macd_value[1])
            painter.setPen(self.yellow_pen)
            painter.drawLine(start_point1, end_point1)

        if not np.isnan(macd_value[2]):
            if (macd_value[2]>0):
                painter.setPen(self.red_pen)
                painter.setBrush(pg.mkBrush(255,0,0))
            else:
                painter.setPen(self.green_pen)
                painter.setBrush(pg.mkBrush(0,255,0))
            painter.drawRect(QtCore.QRectF(ix-0.3,0,0.6,macd_value[2]))
        else:
            # print("略过macd lines2")
            pass

        painter.end()
        return picture

    def boundingRect(self) -> QtCore.QRectF:
        """"""
        min_y, max_y = self.get_y_range()
        rect = QtCore.QRectF(
            0,
            min_y,
            len(self._bar_picutures),
            max_y
        )
        return rect

    def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
        #   获得3个指标在y轴方向的范围   
        #   hxxjava 修改,2020-6-29
        #   当显示范围改变时,min_ix,max_ix的值不为None,当显示范围不变时,min_ix,max_ix的值不为None,

        offset = max(self.short_window,self.long_window) + self.M - 1

        if not self.macd_data or len(self.macd_data) < offset:
            return 0.0, 1.0

        # print("len of range dict:",len(self._values_ranges),",macd_data:",len(self.macd_data),(min_ix,max_ix))

        if min_ix != None:          # 调整最小K线索引
            min_ix = max(min_ix,offset)

        if max_ix != None:          # 调整最大K线索引
            max_ix = min(max_ix, len(self.macd_data)-1)

        last_range = (min_ix,max_ix)    # 请求的最新范围   

        if last_range == (None,None):   # 当显示范围不变时
            if self.last_range in self._values_ranges:  
                # 如果y方向范围已经保存
                # 读取y方向范围
                result = self._values_ranges[self.last_range]
                # print("1:",self.last_range,result)
                return adjust_range(result)
            else:
                # 如果y方向范围没有保存
                # 从macd_data重新计算y方向范围
                min_ix,max_ix = 0,len(self.macd_data)-1

                macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
                ndarray = np.array(macd_list)           
                max_price = np.nanmax(ndarray)
                min_price = np.nanmin(ndarray)

                # 保存y方向范围,同时返回结果
                result = (min_price, max_price)
                self.last_range = (min_ix,max_ix)
                self._values_ranges[self.last_range] = result
                # print("2:",self.last_range,result)
                return adjust_range(result)

        """ 以下为显示范围变化时 """

        if last_range in self._values_ranges:
            # 该范围已经保存过y方向范围
            # 取得y方向范围,返回结果
            result = self._values_ranges[last_range]
            # print("3:",last_range,result)
            return adjust_range(result)

        # 该范围没有保存过y方向范围
        # 从macd_data重新计算y方向范围
        macd_list = list(self.macd_data.values())[min_ix:max_ix + 1]
        ndarray = np.array(macd_list) 
        max_price = np.nanmax(ndarray)
        min_price = np.nanmin(ndarray)

        # 取得y方向范围,返回结果
        result = (min_price, max_price)
        self.last_range = last_range
        self._values_ranges[self.last_range] = result
        # print("4:",self.last_range,result)
        return adjust_range(result)


    def get_info_text(self, ix: int) -> str:
        # """"""
        if ix in self.macd_data:
            diff,dea,macd = self.macd_data[ix]
            words = [
                f"diff {diff:.3f}"," ",
                f"dea {dea:.3f}"," ",
                f"macd {macd:.3f}"
                ]
            text = "\n".join(words)
        else:
            text = "diff - \ndea - \nmacd -"

        return text



class NewChartWidget(ChartWidget):
    """"""
    MIN_BAR_COUNT = 100

    def __init__(self, parent: QtWidgets.QWidget = None):
        """"""
        super().__init__(parent)

        self.last_price_line: pg.InfiniteLine = None

    def add_last_price_line(self):
        """"""
        plot = list(self._plots.values())[0]
        color = (255, 255, 255)

        self.last_price_line = pg.InfiniteLine(
            angle=0,
            movable=False,
            label="{value:.1f}",
            pen=pg.mkPen(color, width=1),
            labelOpts={
                "color": color,
                "position": 1,
                "anchors": [(1, 1), (1, 1)]
            }
        )
        self.last_price_line.label.setFont(NORMAL_FONT)
        plot.addItem(self.last_price_line)

    def update_history(self, history: List[BarData]) -> None:
        """
        Update a list of bar data.
        """
        self._manager.update_history(history)

        for item in self._items.values():
            item.update_history(history)

        self._update_plot_limits()

        self.move_to_right()

        self.update_last_price_line(history[-1])

    def update_bar(self, bar: BarData) -> None:
        """
        Update single bar data.
        """
        self._manager.update_bar(bar)

        for item in self._items.values():
            item.update_bar(bar)

        self._update_plot_limits()

        if self._right_ix >= (self._manager.get_count() - self._bar_count / 2):
            self.move_to_right()

        self.update_last_price_line(bar)

    def update_last_price_line(self, bar: BarData) -> None:
        """"""
        if self.last_price_line:
            self.last_price_line.setValue(bar.close_price)


if __name__ == "__main__":
    app = create_qapp()

    # bars = database_manager.load_bar_data(
    #     "IF888",
    #     Exchange.CFFEX,
    #     interval=Interval.MINUTE,
    #     start=datetime(2019, 7, 1),
    #     end=datetime(2019, 7, 17)
    # )

    symbol = "rb2010"
    exchange = Exchange.SHFE
    interval=Interval.MINUTE
    start=datetime(2020, 6, 1)
    end=datetime(2020, 6, 30)    

    dynamic = False  # 是否动态演示
    n = 200          # 缓冲K线根数


    bars = database_manager.load_bar_data(
        symbol=symbol,
        exchange=exchange,
        interval=interval,
        start=start,
        end=end
    )

    widget = NewChartWidget()
    widget.setWindowTitle(f"K线图表——{symbol}.{exchange.value},{interval},{start}-{end}")
    widget.add_plot("candle", hide_x_axis=True)
    widget.add_plot("volume", maximum_height=150)
    widget.add_plot("rsi", maximum_height=150)
    widget.add_plot("macd", maximum_height=150)
    widget.add_item(CandleItem, "candle", "candle")
    widget.add_item(VolumeItem, "volume", "volume")

    widget.add_item(LineItem, "line", "candle")
    widget.add_item(SmaItem, "sma", "candle")
    widget.add_item(RsiItem, "rsi", "rsi")
    widget.add_item(MacdItem, "macd", "macd")
    widget.add_last_price_line()
    widget.add_cursor()

    if dynamic:
        history = bars[:n]      # 先取得最早的n根bar作为历史
        new_data = bars[n:]     # 其它留着演示
    else:
        history = bars[-n:]     # 先取得最新的n根bar作为历史
        new_data = []           # 演示的为空

    widget.update_history(history)

    def update_bar():
        if new_data:
            bar = new_data.pop(0)
            widget.update_bar(bar)

    timer = QtCore.QTimer()
    timer.timeout.connect(update_bar)
    if dynamic:
        timer.start(100)

    widget.show()
    app.exec_()


查询仓位,持仓均价,未成交委托单一个函数搞定

1.首先完善converter.py

class PositionHolding:
    """"""

    def __init__(self, contract: ContractData = None):
        """"""
        if contract:
            self.vt_symbol = contract.vt_symbol
            self.exchange = contract.exchange

        self.active_orders = {}
        self.order_id = ""
        self.long_pos = 0
        self.long_pnl = 0
        self.long_price = 0
        self.long_yd = 0
        self.long_td = 0
        self.short_pos = 0
        self.short_pnl = 0
        self.short_price = 0        
        self.short_yd = 0
        self.short_td = 0
        self.long_pos_frozen = 0
        self.long_yd_frozen = 0
        self.long_td_frozen = 0

        self.short_pos_frozen = 0
        self.short_yd_frozen = 0
        self.short_td_frozen = 0

    def update_position(self, position: PositionData):
        """"""
        if position.direction == Direction.LONG:
            self.long_pos = position.volume
            self.long_pnl = position.pnl
            self.long_yd = position.yd_volume
            self.long_td = self.long_pos - self.long_yd
            self.long_price = position.price
            self.long_pos_frozen = position.frozen
        else:
            self.short_pos = position.volume
            self.short_pnl = position.pnl            
            self.short_yd = position.yd_volume
            self.short_td = self.short_pos - self.short_yd
            self.short_price = position.price
            self.short_pos_frozen = position.frozen
    def update_order(self, order: OrderData):
        """"""
        #active_orders只记录未成交和部分成交委托单
        if order.status in [Status.NOTTRADED, Status.PARTTRADED]:
            self.active_orders[order.vt_orderid] = order
        else:
            if order.vt_orderid in self.active_orders:
                self.active_orders.pop(order.vt_orderid)

        self.calculate_frozen()

    def update_order_request(self, req: OrderRequest, vt_orderid: str):
        """"""
        #分离gateway_name和orderid
        gateway_name,*split_orderid = vt_orderid.split("_")
        if len(split_orderid) == 1:
            self.order_id = split_orderid[0]
        elif len(split_orderid) == 2:
            self.order_id = "_".join([split_orderid[0],split_orderid[1]])
        elif len(split_orderid) == 3:
            self.order_id = "_".join([split_orderid[0],split_orderid[1],split_orderid[2]])
        elif len(split_orderid) == 4:
            self.order_id = "_".join([split_orderid[0],split_orderid[1],split_orderid[2],split_orderid[3]])
        if self.order_id:
            order = req.create_order_data(self.order_id, gateway_name)
            self.update_order(order)

    def update_trade(self, trade: TradeData):
        """"""
        if trade.direction == Direction.LONG:
            if trade.offset == Offset.OPEN:
                self.long_td += trade.volume
            elif trade.offset == Offset.CLOSETODAY:
                self.short_td -= trade.volume
            elif trade.offset == Offset.CLOSEYESTERDAY:
                self.short_yd -= trade.volume
            elif trade.offset == Offset.CLOSE:
                if trade.exchange in [Exchange.SHFE, Exchange.INE]:
                    self.short_yd -= trade.volume
                else:
                    self.short_td -= trade.volume

                    if self.short_td < 0:
                        self.short_yd += self.short_td
                        self.short_td = 0
        else:
            if trade.offset == Offset.OPEN:
                self.short_td += trade.volume
            elif trade.offset == Offset.CLOSETODAY:
                self.long_td -= trade.volume
            elif trade.offset == Offset.CLOSEYESTERDAY:
                self.long_yd -= trade.volume
            elif trade.offset == Offset.CLOSE:
                if trade.exchange in [Exchange.SHFE, Exchange.INE]:
                    self.long_yd -= trade.volume
                else:
                    self.long_td -= trade.volume

                    if self.long_td < 0:
                        self.long_yd += self.long_td
                        self.long_td = 0

        #self.long_pos = self.long_td + self.long_yd
        #self.short_pos = self.short_td + self.short_yd

    def calculate_frozen(self):
        """"""
        self.long_pos_frozen = 0
        self.long_yd_frozen = 0
        self.long_td_frozen = 0

        self.short_pos_frozen = 0
        self.short_yd_frozen = 0
        self.short_td_frozen = 0

        for order in self.active_orders.values():
            # Ignore position open orders
            if order.offset == Offset.OPEN:
                continue

            frozen = order.volume - order.traded

            if order.direction == Direction.LONG:
                if order.offset == Offset.CLOSETODAY:
                    self.short_td_frozen += frozen
                elif order.offset == Offset.CLOSEYESTERDAY:
                    self.short_yd_frozen += frozen
                elif order.offset == Offset.CLOSE:
                    self.short_td_frozen += frozen

                    if self.short_td_frozen > self.short_td:
                        self.short_yd_frozen += (
                            self.short_td_frozen - self.short_td)
                        self.short_td_frozen = self.short_td
            elif order.direction == Direction.SHORT:
                if order.offset == Offset.CLOSETODAY:
                    self.long_td_frozen += frozen
                elif order.offset == Offset.CLOSEYESTERDAY:
                    self.long_yd_frozen += frozen
                elif order.offset == Offset.CLOSE:
                    self.long_td_frozen += frozen

                    if self.long_td_frozen > self.long_td:
                        self.long_yd_frozen += (
                            self.long_td_frozen - self.long_td)
                        self.long_td_frozen = self.long_td

            self.long_pos_frozen = self.long_td_frozen + self.long_yd_frozen
            self.short_pos_frozen = self.short_td_frozen + self.short_yd_frozen

    def convert_order_request_shfe(self, req: OrderRequest):
        """"""
        if req.offset == Offset.OPEN:
            return [req]

        if req.direction == Direction.LONG:
            pos_available = self.short_pos - self.short_pos_frozen
            td_available = self.short_td - self.short_td_frozen
        else:
            pos_available = self.long_pos - self.long_pos_frozen
            td_available = self.long_td - self.long_td_frozen

        if req.volume > pos_available:
            return []
        elif req.volume <= td_available:
            req_td = copy(req)
            req_td.offset = Offset.CLOSETODAY
            return [req_td]
        else:
            req_list = []

            if td_available > 0:
                req_td = copy(req)
                req_td.offset = Offset.CLOSETODAY
                req_td.volume = td_available
                req_list.append(req_td)

            req_yd = copy(req)
            req_yd.offset = Offset.CLOSEYESTERDAY
            req_yd.volume = req.volume - td_available
            req_list.append(req_yd)

            return req_list

    def convert_order_request_lock(self, req: OrderRequest):
        """"""
        if req.direction == Direction.LONG:
            td_volume = self.short_td
            yd_available = self.short_yd - self.short_yd_frozen
        else:
            td_volume = self.long_td
            yd_available = self.long_yd - self.long_yd_frozen

        # If there is td_volume, we can only lock position
        if td_volume:
            req_open = copy(req)
            req_open.offset = Offset.OPEN
            return [req_open]
        # If no td_volume, we close opposite yd position first
        # then open new position
        else:
            open_volume = max(0, req.volume - yd_available)
            req_list = []

            if yd_available:
                req_yd = copy(req)
                if self.exchange in [Exchange.SHFE, Exchange.INE]:
                    req_yd.offset = Offset.CLOSEYESTERDAY
                else:
                    req_yd.offset = Offset.CLOSE
                req_list.append(req_yd)

            if open_volume:
                req_open = copy(req)
                req_open.offset = Offset.OPEN
                req_open.volume = open_volume
                req_list.append(req_open)

            return req_list


【VNPY进阶】on_tick函数内撤单追单详解,实盘在用的代码,没有坑哦

0.修改OrderData如下:

@dataclass
class OrderData(BaseData):
    """
    Order data contains information for tracking lastest status 
    of a specific order.
    """

    symbol: str
    exchange: Exchange
    orderid: str

    type: OrderType = OrderType.LIMIT
    direction: Direction = Direction.NET
    offset: Offset = Offset.NONE
    price: float = 0
    volume: float = 0
    traded: float = 0
    status: Status = Status.SUBMITTING
    datetime: datetime = None

    cancel_time: str = ""
    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}_{self.exchange.value}/{self.gateway_name}"
        self.vt_orderid = f"{self.gateway_name}_{self.orderid}"
        #未成交量
        self.untrade = self.volume - self.traded

1.策略init初始化参数

        #状态控制初始化
        self.chase_long_trigger = False
        self.chase_sell_trigger = False
        self.chase_short_trigger = False
        self.chase_cover_trigger = False  
        self.cancel_status = False
        self.long_trade_volume = 0
        self.short_trade_volume = 0
        self.sell_trade_volume = 0
        self.cover_trade_volume = 0 
        self.chase_interval   =    10    #拆单间隔:秒

2.on_tick里面的代码如下

get_position_detail参考这个帖子 https://www.vnpy.com/forum/topic/2167-cha-xun-cang-wei-chi-cang-jun-jie-wei-cheng-jiao-wei-tuo-dan-yi-ge-han-shu-gao-ding

    def on_tick(self, tick: TickData):
        working_order_dict = self.get_position_detail(tick.vt_symbol).active_orders
        #working_order_dict = self.order_dict
        if working_order_dict:
            #委托完成状态
            order_finished = False
            vt_orderid = list(working_order_dict.items())[0][0]         #委托单vt_orderid
            working_order = list(working_order_dict.items())[0][1]      #委托单字典 
            #开平仓追单,部分交易没有平仓指令(Offset.NONE)
            """获取到未成交委托单后检查未成交委托量>0,tick.datetime - 未成交委托单的datetime>追单间隔(chase_interval),同时chase_long_trigger状态未触发和有vt_orderid的判定(之前有收过空vt_orderid,所有要加个过滤),撤销该未成交委托单,赋值chase_long_trigger为True.chase_long_trigger为True且没有未成交委托单时执行追单,如有未成交委托单则调用cancel_surplus_order取消所有未成交委托单,追单的委托单发送出去后初始化chase_long_trigger.其他方向的撤单追单也是一样的流程"""
            if working_order.offset in (Offset.NONE,Offset.OPEN):
                if working_order.direction == Direction.LONG:
                    self.long_trade_volume = working_order.untrade
                    if (tick.datetime - working_order.datetime).seconds > self.chase_interval and self.long_trade_volume > 0 and (not self.chase_long_trigger) and vt_orderid:
                        #撤销之前发出的未成交订单
                        self.cancel_order(vt_orderid)
                        self.chase_long_trigger = True
                elif working_order.direction == Direction.SHORT:
                    self.short_trade_volume = working_order.untrade
                    if (tick.datetime - working_order.datetime).seconds > self.chase_interval and self.short_trade_volume > 0 and (not self.chase_short_trigger) and vt_orderid:  
                        self.cancel_order(vt_orderid)
                        self.chase_short_trigger = True
            #平仓追单
            elif working_order.offset in (Offset.CLOSE,Offset.CLOSETODAY):
                if working_order.direction == Direction.SHORT: 
                    self.sell_trade_volume = working_order.untrade
                    if (tick.datetime - working_order.datetime).seconds > self.chase_interval and self.sell_trade_volume > 0 and (not self.chase_sell_trigger) and vt_orderid: 
                        self.cancel_order(vt_orderid)
                        self.chase_sell_trigger = True                                                    
                if working_order.direction == Direction.LONG:
                    self.cover_trade_volume = working_order.untrade
                    if (tick.datetime - working_order.datetime).seconds > self.chase_interval and self.cover_trade_volume > 0 and (not self.chase_cover_trigger) and vt_orderid:                                                       
                        self.cancel_order(vt_orderid)
                        self.chase_cover_trigger = True   
        else:
            order_finished = True
            self.cancel_status = False
        if self.chase_long_trigger:
            if order_finished:
                self.buy(tick.ask_price_1,self.long_trade_volume)
                self.chase_long_trigger = False  
            else:
                self.cancel_surplus_order(list(working_order_dict))
        elif self.chase_short_trigger:
            if  order_finished:
                self.short(tick.bid_price_1,self.short_trade_volume)
                self.chase_short_trigger = False 
            else:
                self.cancel_surplus_order(list(working_order_dict))
        elif self.chase_sell_trigger:
            if order_finished:
                self.sell(tick.bid_price_1,self.sell_trade_volume)
                self.chase_sell_trigger = False                      
            else:
                self.cancel_surplus_order(list(working_order_dict))
        elif self.chase_cover_trigger:
            if order_finished:
                self.cover(tick.ask_price_1,self.cover_trade_volume)
                self.chase_cover_trigger = False
            else:
                self.cancel_surplus_order(list(working_order_dict))
    #------------------------------------------------------------------------------------
    def cancel_surplus_order(self,orderids:list):
        """
        撤销剩余活动委托单
        """
        if not self.cancel_status:
            for vt_orderid in  orderids:
                self.cancel_order(vt_orderid)
            self.cancel_status = True


基于Terminal UI的vn.py图形界面

本文中的代码来自社区成员Aaron Qiu的分享,运行效果如下:

description

启动脚本代码如下,请保存到文件run.py中后既可运行:

"""
Author: Aaron Qiu
"""
from vnpy.trader.ui.widget import (
    TickMonitor,
    LogMonitor,
    TradeMonitor,
    OrderMonitor,
    ActiveOrderMonitor,
    PositionMonitor,
    AccountMonitor,
)
from asciimatics.widgets import (
    Frame,
    MultiColumnListBox,
    Layout,
    Divider,
    Text,
    Label,
    Button,
    TextBox,
    Widget,
    VerticalDivider,
)
import pickle
from enum import Enum
from asciimatics.scene import Scene
from asciimatics.screen import Screen
from asciimatics.exceptions import ResizeScreenError, NextScene, StopApplication
import sys

import multiprocessing
from time import sleep
from datetime import datetime, time
from logging import INFO

from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine

from vnpy.gateway.ctp import CtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG

from vnpy.trader.utility import load_json

CTP_SETTING = load_json("connect_ctp.json")

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True

FRAME_NO = "mainwindows"
TICK_LAYOUT = {
    "name": "行情信息",
    "monitor": TickMonitor,
    "titles": TickMonitor.headers.keys(),
    "data_key": "ticks",
    "height": 6,
    "columns": [
        "<20",
        "<15",
        "<30",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<30",
        "<15",
    ],
}
ACTIVE_ORDERS_LAYOUT = {
    "name": "活动委托信息",
    "monitor": ActiveOrderMonitor,
    "titles": ActiveOrderMonitor.headers.keys(),
    "data_key": "active_orders",
    "height": 6,
    "columns": [
        "<20",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<30",
        "<15",
        "<15",
        "<15",
        "<30",
        "<15",
    ],
}
ORDERS_LAYOUT = {
    "name": "委托信息",
    "monitor": OrderMonitor,
    "titles": OrderMonitor.headers.keys(),
    "data_key": "orders",
    "height": 6,
    "columns": [
        "<20",
        "<15",
        "<15",
        "<30",
        "<30",
        "<30",
        "<15",
        "<15",
        "<15",
        "<30",
        "<15",
        "<30",
        "<15",
    ],
}
TRADE_LAYOUT = {
    "name": "交易信息",
    "monitor": TradeMonitor,
    "titles": TradeMonitor.headers.keys(),
    "data_key": "trades",
    "height": 6,
    "columns": [
        "<20",
        "<15",
        "<15",
        "<15",
        "<30",
        "<30",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<30",
        "<15",
    ],
}
LOG_LAYOUT = {
    "name": "日志信息",
    "monitor": LogMonitor,
    "titles": LogMonitor.headers.keys(),
    "data_key": "logs",
    "height": 20,
    "columns": ["<30", "<40", "<30"],
}
POSITION_LAYOUT = {
    "name": "持仓信息",
    "monitor": PositionMonitor,
    "titles": PositionMonitor.headers.keys(),
    "data_key": "positions",
    "height": 20,
    "columns": [
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
    ],
}
ACCOUNT_LAYOUT = {
    "name": "账户信息",
    "monitor": AccountMonitor,
    "titles": AccountMonitor.headers.keys(),
    "data_key": "accounts",
    "height": 20,
    "columns": [
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
        "<15",
    ],
}


class MainView(Frame):
    def __init__(self, screen, data, logs_data):
        super(MainView, self).__init__(
            screen,
            screen.height,
            screen.width,
            on_load=self._load_data,
            hover_focus=True,
            can_scroll=True,
            title="VNPY Terminal UI",
            name=FRAME_NO,
        )
        self.set_theme("bright")
        self.ticks = []
        self.active_orders = []
        self.orders = []
        self.trades = []
        self.logs = []
        self.positions = []
        self.accounts = []

        self.update_options(data)
        self.draw_header_layouts()
        self.draw_content_layouts()
        self.draw_footer_layouts()
        self.fix()

    def draw_content_layouts(self):
        """"""
        self.create_layout(TICK_LAYOUT)
        self.create_layout(ACTIVE_ORDERS_LAYOUT,)
        self.create_layout(ORDERS_LAYOUT)
        self.create_layout(TRADE_LAYOUT)

    def draw_header_layouts(self):
        """"""
        pass

    def draw_footer_layouts(self):
        """"""
        footer_configs = [LOG_LAYOUT, ACCOUNT_LAYOUT, POSITION_LAYOUT]
        layout = Layout([1, 1, 1])
        self.add_layout(layout)

        for index, config in enumerate(footer_configs):
            layout.add_widget(VerticalDivider(), index)
            layout.add_widget(Divider(), index)
            layout.add_widget(Label(config["name"], align="^"), index)
            layout.add_widget(Divider(), index)
            box = MultiColumnListBox(
                config["height"],
                columns=config["columns"],
                options=self.__dict__[config["data_key"]],
                titles=[d["display"] for d in config["monitor"].headers.values()],
                name=config["name"],
                add_scroll_bar=True,
            )
            layout.add_widget(box, index)

    def update_options(self, data):
        if data:
            self.ticks = self.get_options(
                pickle.loads(data["ticks"]), TICK_LAYOUT["titles"]
            )
            self.active_orders = self.get_options(
                pickle.loads(data["active_orders"]), ACTIVE_ORDERS_LAYOUT["titles"]
            )
            self.orders = self.get_options(
                pickle.loads(data["orders"]), ORDERS_LAYOUT["titles"]
            )
            self.trades = self.get_options(
                pickle.loads(data["trades"]), TRADE_LAYOUT["titles"]
            )
            self.positions = self.get_options(
                pickle.loads(data["positions"]), POSITION_LAYOUT["titles"]
            )
            self.accounts = self.get_options(
                pickle.loads(data["accounts"]), ACCOUNT_LAYOUT["titles"]
            )
            logs = []
            for index, log in enumerate(logs_data):
                log_time, log_data = pickle.loads(log)
                logs.append(([log_time, log_data.msg, log_data.gateway_name], index))
            self.logs = logs
        self.save()

    def get_options(self, data, titles):
        options = []
        if len(data.values()) != 0:
            for index, tick in enumerate(data.values()):
                row = []
                for x in titles:
                    node = tick.__dict__[x]
                    if isinstance(node, Enum):
                        row.append(node.value)
                    elif isinstance(node, datetime):
                        row.append(node.strftime("%Y-%m-%d %H:%M:%S.%f"))
                    else:
                        row.append(str(node))

                options.append((row, index))
        return options

    def create_layout(self, config, fill_frame=False):
        """
        生成layout
        """
        layout = Layout([100], fill_frame)
        self.add_layout(layout)
        layout.add_widget(Divider())
        layout.add_widget(Label(config["name"], align="^"))
        layout.add_widget(Divider())
        box = MultiColumnListBox(
            config["height"],
            columns=config["columns"],
            options=self.__dict__[config["data_key"]],
            titles=[d["display"] for d in config["monitor"].headers.values()],
            name=config["name"],
            add_scroll_bar=True,
        )
        layout.add_widget(box)
        return layout

    def _load_data(self, new_value=None):
        self.update_options(new_value)

    @staticmethod
    def _quit():
        raise StopApplication("User pressed quit")


def run_child(tui_data, logs_data):
    """
    Running in the child process.
    """

    def save_logs(event):
        logs_data.append(
            pickle.dumps((datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), event.data,))
        )

    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    cta_engine = main_engine.add_app(CtaStrategyApp)
    main_engine.write_log("主引擎创建成功")

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    event_engine.register(EVENT_CTA_LOG, save_logs)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(CTP_SETTING, "CTP")
    main_engine.write_log("连接CTP接口")

    sleep(5)

    cta_engine.init_engine()
    main_engine.write_log("CTA策略初始化完成")

    cta_engine.init_all_strategies()
    sleep(5)  # Leave enough time to complete strategy initialization

    main_engine.write_log("CTA策略全部初始化")
    cta_engine.start_all_strategies()

    main_engine.write_log("CTA策略全部启动")
    oms_engine = main_engine.engines["oms"]
    while True:
        tui_data["ticks"] = pickle.dumps(oms_engine.ticks)
        tui_data["active_orders"] = pickle.dumps(oms_engine.active_orders)
        tui_data["orders"] = pickle.dumps(oms_engine.orders)
        tui_data["trades"] = pickle.dumps(oms_engine.trades)
        tui_data["positions"] = pickle.dumps(oms_engine.positions)
        tui_data["accounts"] = pickle.dumps(oms_engine.accounts)
        sleep(0.5)


def run_parent(tui_data, logs_data):
    """
    Running in the parent process.
    """
    # Chinese futures market trading period (day/night)
    DAY_START = time(8, 45)
    DAY_END = time(15, 30)

    NIGHT_START = time(20, 45)
    NIGHT_END = time(2, 45)
    child_process = None
    while True:
        # run cta child process
        current_time = datetime.now().time()
        trading = False

        # Check whether in trading period
        if (
            (current_time >= DAY_START and current_time <= DAY_END)
            or (current_time >= NIGHT_START)
            or (current_time <= NIGHT_END)
        ):
            trading = True

        # Start child process in trading period
        if trading and child_process is None:
            print("启动子进程")
            child_process = multiprocessing.Process(
                target=run_child, args=(tui_data, logs_data)
            )
            child_process.start()
            print("子进程启动成功")

        # 非记录时间则退出子进程
        if not trading and child_process is not None:
            print("关闭子进程")
            child_process.terminate()
            child_process.join()
            child_process = None
            print("子进程关闭成功")
        sleep(1)


def terminal_ui(screen, scene, tui_data, logs_data):
    main_scene = MainView(screen, data=tui_data, logs_data=logs_data)
    scenes = [
        Scene([main_scene], -1, name="Main", clear=False),
    ]
    screen.play(
        scenes, stop_on_resize=True, start_scene=scene, allow_int=True, repeat=True
    )


if __name__ == "__main__":
    with multiprocessing.Manager() as manager:
        print("启动CTA策略守护父进程")
        tui_data = manager.dict()
        logs_data = manager.list()
        main_scene = None
        last_scene = None
        print("启动守护主进程")
        parent_process = multiprocessing.Process(
            target=run_parent, args=(tui_data, logs_data)
        )
        parent_process.start()
        print("子进程启动成功")
        print("启动界面")
        while True:
            if tui_data:
                try:
                    main_scene = Screen.wrapper(
                        terminal_ui,
                        catch_interrupt=True,
                        arguments=[last_scene, tui_data, logs_data],
                    )
                    sys.exit(0)
                except ResizeScreenError as e:
                    last_scene = e.scene
            if main_scene:
                main_scene.data = tui_data
            sleep(1)


Mac/Linux 下鼠标不能点 鼠标事件不响应 不能激活 的问题! 已解决

终端下执行命令:
pip install pyqt5==5.14.2


Mac/Linux下 鼠标事件不响应, 是pyqt5的锅!
官方默认集成的版本是5.15.0 , 此版本在Mac下事件响应存在BUG
切换pyqt5到上一个版本(5.14.2)即可

*分割线*
print("" 100)
用了 两天Windows 今天终于可以切回Mac了!
O(∩_∩)O哈哈~



Jupyter Notebook实现从IB接口历史数据获取,写入数据库,策略回测和实盘交易

刚好有个同学问怎么实现IB历史数据获取,和策略回测和实盘交易。想着熟悉vnpy2.0操作,就用Jupyter Notebook都是跑了一边。VNPY2.0的整体架构设计很有扩展性,而且调用也比起v1.0先进清晰很多,引擎加载调用非常方便。

讲讲注意点:

  1. IB接口历史数据大多是要收费的订阅的,如果收费会有报错信息提示,这里找个免费的作为使用。另外vnpy是按照最大6个月历史数据设计的。
  2. 数据库定义有个小坑,我是用mongodb的,在第一次填写 trader/setting.py中密码写错了,后面在trader/setting.py改发现怎么也改不好;原来当第一次维护后,配置会写入.vntrader/vt_setting,之后系统只会去.vntrader/vt_setting读取。去改vt_setting,而不是trader/setting.py。
  3. 使用CtaStrategyApp支持加入新策略,系统会自动输出json保持策略信息;所以第二次运行代码时候,会提示已经有了,不是问题。
  4. 我在代码里面把回测和实盘放在一次,如果直接跑下来可能会报错,建议跑实盘时候先注释的回测。
  5. 使用script_engine订阅历史数据是是默认从rqdata获取,vnpy v2.07 IB接口已经提供历史数据获取,这里创建HistoryRequest用main_engine来获取,

为了方便贴出来,改成.py代码格式,直接跑也没有问题。

from vnpy.app.script_trader import init_cli_trading
from vnpy.app.script_trader.cli import process_log_event
from vnpy.gateway.ib import IbGateway
from time import sleep
from datetime import datetime
import pandas as pd
# 连接到服务器
setting = {
    "TWS地址": "127.0.0.1",
    "TWS端口": 7497,
    "客户号":8 #每个链接用一个独立的链接号,一个IBAPI支持32个来同时链接
}
engine = init_cli_trading([IbGateway]) #返回Script_engine 示例,并且给main_engine注册了gateway
engine.connect_gateway(setting, "IB") #链接

# 查询资金 - 自动
sleep(10)
print("***查询资金和持仓***")
print(engine.get_all_accounts(use_df = True))
# 查询持仓
print(engine.get_all_positions(use_df = True))

# 订阅行情
from vnpy.trader.constant import Exchange
from vnpy.trader.object import SubscribeRequest
# 从我测试直接用Script_engine有问题,IB的品种太多,get_all_contracts命令不行,需要指定具体后才可以,这里使用main_engine订阅
req1 = SubscribeRequest("12087792",Exchange.IDEALPRO) #创建行情订阅
engine.main_engine.subscribe(req1,"IB")


# 使用script_engine订阅历史数据是从rqdata获取,vnpy v2.07已经提供历史数据获取,这里创建HistoryRequest来获取,
# 查询如果没有endtime,默认当前。返回历史数据输出到数据库和csv文件
# 关于api更多信息可以参见 https://interactivebrokers.github.io/tws-api/historical_bars.html
print("***从IB读取历史数据, 返回历史数据输出到数据库和csv文件***")
from vnpy.trader.object import HistoryRequest
from vnpy.trader.object import Interval
start = datetime.strptime('20190901', "%Y%m%d")

historyreq = HistoryRequest(
    symbol="12087792",
    exchange=Exchange.IDEALPRO,
    start=start,
    interval=Interval.MINUTE
)
# # 读取历史数据,并把历史数据BarData放入数据库
bardatalist = engine.main_engine.query_history(historyreq,"IB")
from vnpy.trader.database import database_manager
database_manager.save_bar_data(bardatalist)

# 把历史数据BarData输出到csv
pd.DataFrame(bardatalist).to_csv("C:\Project\\"+ str(historyreq.symbol) + ".csv" , index=True, header=True)
print("History data export to CSV")

# # 参考backtesting.ipynb, 使用自带的双均线策略回测,10日上穿60日做多,否则反之
print("***从数据库读取历史数据, 进行回测***")
from vnpy.app.cta_strategy.backtesting import BacktestingEngine
from vnpy.app.cta_strategy.strategies.double_ma_strategy import (
    DoubleMaStrategy,
)
btengine = BacktestingEngine() #新建回测引擎
btengine.set_parameters(
    vt_symbol="12087792.IDEALPRO",
    interval="1m",
    start=datetime(2019, 9, 1),
    end=datetime(2019, 10, 5),
    rate = 0,
    slippage=0.00005,
    size=1000,
    pricetick=0.00005,
    capital=1_000_000,
)
btengine.add_strategy(DoubleMaStrategy, {"fast_window":10, "slow_window": 60})

btengine.load_data()
btengine.run_backtesting()
df = btengine.calculate_result()
btengine.calculate_statistics()
btengine.show_chart()

# 给script_engine载入双均线策略,实盘运行
print("***从数据库读取准备数据, 实盘运行***")
# 使用cta交易引擎
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
engine.event_engine.register(EVENT_CTA_LOG, process_log_event)
cta_engine = engine.main_engine.add_app(CtaStrategyApp) #加入app
cta_engine.init_engine()
cta_engine.add_strategy("DoubleMaStrategy","DoubleMaStrategy_IB_12087792_v1", "12087792.IDEALPRO",{"fast_window":10, "slow_window": 50})
sleep(10)
cta_engine.init_strategy("DoubleMaStrategy_IB_12087792_v1")
sleep(10)
cta_engine.start_strategy("DoubleMaStrategy_IB_12087792_v1")


使用CSV文件进行回测

很多时候,需要用CSV文件直接回测

先看用法:

from vnpy.app.cta_strategy.csv_backtesting import CsvBacktestingEngine, OptimizationSetting
from vnpy.app.cta_strategy.base import BacktestingMode
from vnpy.app.cta_strategy.strategies.atr_rsi_strategy import ( 
    AtrRsiStrategy,
)
from datetime import datetime

engine = CsvBacktestingEngine()
engine.set_parameters(
    vt_symbol="IF88.CFFEX",
    interval="1m",
    start=datetime(2016, 1, 1),
    end=datetime(2019, 4, 30),
    rate=0.3/10000,
    slippage=0.2,
    size=300,
    pricetick=0.2,
    capital=1_000_000,
)
engine.add_strategy(AtrRsiStrategy, {})

engine.load_data("data.csv", names = [
    "datetime",
    "open_price",
    "high_price",
    "low_price",
    "close_price",
    "volume",
    "open_interest",
])

engine.run_backtesting()
df = engine.calculate_result()
engine.calculate_statistics()
engine.show_chart()

将下列代码命名为csv_backtesting.py保存到 cta_strategy 目录下并且与backtesting.py 同一目录

import pandas as pd
from vnpy.app.cta_strategy.backtesting import *


def generate_bar_from_row(row, symbol, exchange):
    """
    Generate bar from row.
    """
    return BarData(
        symbol=symbol,
        exchange=Exchange(exchange),
        interval=Interval.MINUTE,
        open_price=row["open"],
        high_price=row["high"],
        low_price=row["low"],
        close_price=row["close"],
        open_interest=row["open_interest"] or 0,
        volume=row["volume"],
        datetime=row.name.to_pydatetime(),
        gateway_name="DB",
    )


def generate_tick_from_row(row, symbol, exchange):
    """
    Generate tick from row.
    """
    return TickData(
        symbol=symbol,
        exchange=Exchange(exchange),
        datetime=row["datetime"],
        name=row["name"],
        volume=row["volume"],
        open_interest=row["open_interest"],
        last_price=row["last_price"],
        last_volume=row["last_volume"],
        limit_up=row["limit_up"],
        limit_down=row["limit_down"],
        open_price=row["open_price"],
        high_price=row["high_price"],
        low_price=row["low_price"],
        pre_close=row["pre_close"],
        bid_price_1=row["bid_price_1"],
        bid_price_2=row["bid_price_2"],
        bid_price_3=row["bid_price_3"],
        bid_price_4=row["bid_price_4"],
        bid_price_5=row["bid_price_5"],
        ask_price_1=row["ask_price_1"],
        ask_price_2=row["ask_price_2"],
        ask_price_3=row["ask_price_3"],
        ask_price_4=row["ask_price_4"],
        ask_price_5=row["ask_price_5"],
        bid_volume_1=row["bid_volume_1"],
        bid_volume_2=row["bid_volume_2"],
        bid_volume_3=row["bid_volume_3"],
        bid_volume_4=row["bid_volume_4"],
        bid_volume_5=row["bid_volume_5"],
        ask_volume_1=row["ask_volume_1"],
        ask_volume_2=row["ask_volume_2"],
        ask_volume_3=row["ask_volume_3"],
        ask_volume_4=row["ask_volume_4"],
        ask_volume_5=row["ask_volume_5"],
        gateway_name="DB",
    )


class CsvBacktestingEngine(BacktestingEngine):
    def __init__(self):
        super().__init__()

    def load_data(
        self,
        filename: str,
        names: list = [
            "datetime",
            "open_price",
            "high_price",
            "low_price",
            "close_price",
            "volume",
            "open_interest",
        ],
        compression: any = None,
        parse_dates: bool = True,
        skiprows: int = 1,
    ):
        """
        Load Bar Names: [
            "datetime",
            "open_price",
            "high_price",
            "low_price",
            "close_price",
            "volume",
            "open_interest",
        ]

        Load Tick Names: [
            "datetime",
            "name",
            "volume",
            "open_interest",
            "last_price",
            "last_volume",
            "limit_up",
            "limit_down",
            "open_price",
            "high_price",
            "low_price",
            "pre_close",
            "bid_price_1",
            "bid_price_2",
            "bid_price_3",
            "bid_price_4",
            "bid_price_5",
            "ask_price_1",
            "ask_price_2",
            "ask_price_3",
            "ask_price_4",
            "ask_price_5",
            "bid_volume_1",
            "bid_volume_2",
            "bid_volume_3",
            "bid_volume_4",
            "bid_volume_5",
            "ask_volume_1",
            "ask_volume_2",
            "ask_volume_3",
            "ask_volume_4",
            "ask_volume_5",
        ]
        """
        self.output("开始加载历史数据")

        if not self.end:
            self.end = datetime.now()

        if self.start >= self.end:
            self.output("起始日期必须小于结束日期")
            return

        self.history_data.clear()  # Clear previously loaded history data

        # Load 30 days of data each time and allow for progress update
        progress_delta = timedelta(days=30)
        total_delta = self.end - self.start
        interval_delta = INTERVAL_DELTA_MAP[self.interval]

        start = self.start
        end = self.start + progress_delta
        progress = 0

        while start < self.end:
            end = min(end, self.end)  # Make sure end time stays within set range

            df = pd.read_csv(filename, compression, parse_dates, skiprows, names,)
            # Generate
            symbol, exchange = self.vt_symbol.split(".")
            data = []
            if df is not None and not df.empty:
                for ix, row in df.iterrows():
                    if row["datetime"] > self.start and row["datetime"] < self.end:
                        if self.mode == BacktestingMode.BAR:
                            data.append(generate_bar_from_row(row, symbol, exchange))
                        else:
                            data.append(generate_tick_from_row(row, symbol, exchange))
            else:
                self.output("Csv file has no Data!")
                return

            self.history_data.extend(data)

            progress += progress_delta / total_delta
            progress = min(progress, 1)
            progress_bar = "#" * int(progress * 10)
            self.output(f"加载进度:{progress_bar} [{progress:.0%}]")

            start = end + interval_delta
            end += progress_delta + interval_delta

        self.output(f"历史数据加载完成,数据量:{len(self.history_data)}")


centos7.6 搭建vnpy量化交易环境

作为Python开发的开源项目,vn.py本身具有非常好的跨平台通用性,毕竟Python几乎可以在所有主流操作系统上运行。但对于Linux系统,官方团队只提供了对Ubuntu 18.04版本的支持(主要就是安装脚本)。

本人一直用的是CentOS的服务器,折腾了几天,终于在上面把vnpy跑起来了。没有记录折腾的细节,只是记录了下正常的操作步骤,欢迎大家一起交流。
以下内容全部基于CentOS 7.6版本,首先准备好一个全新安装的系统,然后跟着一步步操作即可。

安装python环境

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
按照提示信息进行安装,当出现以下信息时,选择no,不然后面vncserver无法正常启动。还不清楚具体原因。
by running conda init? [yes|no]
[no] >>> no

安装Mate桌面

yum groups install "X Window System" -y
yum install epel-release -y
yum groups install "MATE Desktop" -y
systemctl set-default graphical.target

安装VNC Server

yum install tigervnc-server -y

# 替换User为root,增加显示分辨率参数设置
sed -r -i "s/^(ExecStart.*)<USER>(.*%i)/\1root\2 -geometry 1920x1200 -depth 16/" /lib/systemd/system/vncserver@.service
sed -r -i "s/^(PIDFile.*)home\/<USER>(.*pid)/\1root\2/" /lib/systemd/system/vncserver@.service

mv /lib/systemd/system/vncserver@.service /lib/systemd/system/vncserver@:1.service
systemctl daemon-reload
vncpasswd
systemctl start vncserver@:1.service
systemctl enable vncserver@:1.service

# 屏蔽默认桌面,启动mate桌面
sed -r -i "s%^/etc/X11/xinit/xinitrc$%# &%" /root/.vnc/xstartup
echo "/usr/bin/mate-session &" >> /root/.vnc/xstartup

# 其它操作
# 禁用selinux
sed -r -i "s/^(SELINUX=).*/\1disabled/" /etc/selinux/config
# 关闭防火墙
systemctl stop firewalld.service
systemctl disable firewalld.service

reboot

# 如果是云服务器,需要确保开放了TCP 5901端口

安装VS Code

rpm --import https://packages.microsoft.com/keys/microsoft.asc
sh -c 'echo -e "[code]\nname=Visual Studio Code\nbaseurl=https://packages.microsoft.com/yumrepos/vscode\nenabled=1\ngpgcheck=1\ngpgkey=https://packages.microsoft.com/keys/microsoft.asc" > /etc/yum.repos.d/vscode.repo'
yum check-update
yum install code -y

升级GCC版本

注意GCC必须要使用9.1.0以上的版本,否则在编译vnpy的时候,会报-std=c++17相关的错误。
GCC的编译时间很长,估计得几个小时。

yum install gcc gcc-c++ bzip2 m4  gmp-devel.x86_64  -y

wget https://mirrors.ustc.edu.cn/gnu/gcc/gcc-9.1.0/gcc-9.1.0.tar.gz
tar xvf gcc-9.1.0.tar.gz
cd gcc-9.1.0/
./contrib/download_prerequisites

cd gmp;mkdir temp;cd temp
../configure --prefix=/usr/local/gmp-6.1.0
make && make install

cd ../../mpfr;mkdir temp;cd temp
../configure --prefix=/usr/local/mpfr-3.1.4 --with-gmp=/usr/local/gmp-6.1.0
make && make install

cd ../../mpc;mkdir temp;cd temp
../configure --prefix=/usr/local/mpc-1.0.3 --with-gmp=/usr/local/gmp-6.1.0 --with-mpfr=/usr/local/mpfr-3.1.4
make && make install

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/mpc-1.0.3/lib:/usr/local/gmp-6.1.0/lib:/usr/local/mpfr-3.1.4/lib

cd ../..;mkdir temp;cd temp
../configure --disable-multilib --enable-languages=c,c++ --with-gmp=/usr/local/gmp-6.1.0 --with-mpfr=/usr/local/mpfr-3.1.4 --with-mpc=/usr/local/mpc-1.0.3
make -j4 && make install

我的服务器是4核的,就在make 后面加了-j4,大家可根据自己的情况调整,缩短编译时间。

安装vnpy

最后终于可以安装vn.py了,在此过程中会自动编译Linux上支持的交易接口,如CTP/OES等。

# 切换到python环境
. ~/miniconda3/bin/activate
yum install postgresql-devel* libxkbcommon-x11 -y
下载vnpy的最新源码包并解压。
cd vnpy
bash install.sh


三行代码 解决国内期货10.00到10.15的数据合成缺失问题

https://www.vnpy.com/forum/topic/3409-wei-shi-yao-ni-de-hui-ce-ce-lue-he-bie-de-ping-tai-bu-tai-yi-yang?page=1#pid12167
这是我之前的帖子,为什么你的回测策略和别的平台“不太一样”

vnpy的商品期货30分钟周期的每天的10点到11点的数据是没有的,别的时间比如9.30-10.00都是
有数据的但是10.00到10.30是没有数据的,直接是10.00到11.00,少了这一部分的分钟的数据,
也就是说商品期货休息的这段时间没有数据但是事实是10.00-10.30之间有15min的数据所以一直是少
了这一个K线,下面就是解决办法,只是国内的商品期货,别的品种大家自行选择

在合成分钟线的代代码下面加上这三句代码,原理大家都懂不多说,之后就会合成10.00-10.15的K线了

            elif self.last_bar and str(bar.datetime)[-8:]=='10:14:00':
                finished = True
                self.interval_count = 0

description

description


统计

主题
3618
帖子
12964
已注册用户
13863
最新用户
在线用户
183
在线来宾用户
125
© 2015-2019 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号-3