VeighNa量化社区
你的开源社区量化交易平台

置顶主题

提问前请先看:vn.py使用FAQ

策略相关


K线合成器

Q:CTA策略默认传入 1min K 线数据,在哪个函数中传入默认参数 1min?
A:BarGenerator里面的update_tick()用于把tick数据合成1分钟K线数据,update_bar()是用于把1分钟数据合成X分钟数据。可以参考布林带策略示例,那里提供1分钟K线合成为15分钟K线,并且基于15分钟K线来产生买卖信号。

Q:BarGenerator的最大值是不是只能60?
A:合成分钟线的时候,周期最大只能为60(基于60分钟整除来进行N分钟切分);合成小时线的时候,周期可以为任意值(基于多少个小时过去了,进行合成)

 

K线时间序列管理器

Q:ArrayManager的初始化函数默认size=100,size指的是什么?
A:size是指这个K线时间序列容器缓存的数据量的大小,理论上只要超过了策略中要计算的所有技术指标最长的那个周期,就够用了。比如你要算MA20 RSI14 CCI50,那么最少需要size=50,否则CCI计算的数据量就不够,一般情况下还会在size上加上一定的量,来避免talib中某些指标算法可能需要更长的数据,保证计算的正确性。

Q:策略使用5分钟K线,ArrayManager初始化 self.am = ArrayManager(100)。在初始化size这里输入100的话,请问vn.py会从数据库里面提取100根1分钟的bar还是500根1分钟的bar来初始化指标?
A:缓存到100跟5分钟K线后才会完成初始化状态。

Q:talib安装失败,怎么解决?
A:使用手动安装:
1.进入Unofficial Windows Binaries for Python Extension Packages中找到talib对应的版本(如py3.7,64位)
2.下载对应版本的文件TA_Lib?0.4.17?cp37?cp37m?win_amd64.whl
3.下载好whl文件之后,直接在命令行下安装文件即可,如下。成功后会显示“Sucessful installed TA_Lib?0.4.17”

pip install TA_Lib?0.4.17?cp37?cp37m?win_amd64.whl

Q:如何导出计算的技术指标数值
A:最简单的方法就是直接print了,或者也可以写入到文件里。

 

回测数据

Q:CTA回测组件提示K线已经下载完成,但是在sqlite数据库中查看不到记录。
A:数据被插入到当前用户目录的database.db中,如C:\Users\HOME\.vntrader\database.db

Q:实盘时vn.py数据是tick级数据推送,回测时数据是分钟级的,它会识别时间戳吗?
A:bar回测的模式,在策略内部将tick合成为bar或者将1分钟bar合成为x分钟的bar

Q:回测加载数据,显示载入数据量为0。
A:没有连接数据库,或者数据库无数据

Q:初始化策略,默认initDays=10,改成初始一定数量的分钟数是不是更加合理?
A:若载入充足的历史数据,就可以立刻交易了。

Q:数据商也提供逐笔数据如何用来回测?
A:用逐笔自行还原出完整的订单簿tick数据,所以用tick模式来回测。

Q:如何获取A股分钟线数据?
A:目前没有免费的下载渠道(交易所禁止),推荐通过米筐的RQData获取。

Q:如何更改数据库存放盘?
A:举个例子:在D盘创建目录D:\test;在D:\test下创建文件夹.vntrader;使用VN Station的VN Trader Pro切换到D:\test目录启动;此时启动的VN Trader运行时目录已经是D:\test(可以在标题栏看到)

 

参数优化
Q:用capital作为目标,输出结果全是0
A:因为在逐日统计回测中,capital代表的是起始资金,无法作为优化的目标。可以改用endBalance,sharpeRatio,maxDrawdown等目标优化参数试一下。

Q:engine.trades可以输出交易记录,但是记录里只有time,没有具体的date,请问哪里可以输出具体的交易日期和时间?
A:trades记录里的trade对象,有个额外的datetime字段是用来标识该成交的日期时间的。

 

实盘运行

Q:自定义策略需要放在哪里?
A:在当前运行的脚本目录先创建创建strategies文件夹,然后把策略文件放进去就可以了。 配置文件可以创建.vntrader, 然后把配置文件放进去。

Q:怎样在on_bar里output数据,以此检查策略是不是按照我的想法在运转
A:可以直接在策略初始化的时候打开一个txt文件句柄,然后回测过程中随时往里面写记录

Q:点击CTA策略出现了json错误
A:json读取错误,如把.vntrader下的cta_strategy_data.json删除可解决

Q:CTA模块中是否有自带的变量存储合约的最小变动价位数据?
A:策略发出的买卖指令的价格,会自动根据最小价格变动pricetick进行round取整,无需最小价格变动的数据了

Q:如何在无人值守脚本获得所有合约的信息?
A:调用main_engine.get_all_contracts函数

Q:用CTPtest抓所有合约, 郑交所的合同没有菜粕,苹果。
A:调用get_all_contracts前,sleep等待5秒,接收合约数据推送需要时间;某些测试环境里,是会缺少部分合约的。

Q:为什么使用VnTrader的cta策略组件进行日常实盘交易时,每天交易时段结束之后,一定要把VnTrader关闭,然后在下次开盘前15分钟在重启并初始化策略的参数?
A:关闭交易系统主要是为了清空系统内部(接口层、引擎层)的缓存状态,策略层的缓存状态一般倒是不太容易出问题的(除非你逻辑写错了)

Q:请问backtesting里的cross_limit_order和cross_stop_order什么意思
A:撮合限价单委托,撮合本地停止单(条件单)委托。讲最新的行情K线或者TICK和策略之前下达的所有委托进行检查,如果能够撮合成交,则返回并记录数据

Q:如何实现k线内成交?
A:用的是本地停止单
就比如当前价格是100点,策略发出信号,在下一根K线的120线发出买入:
若价格没突破到120点,继续挂着。
若价格从100涨到130,那么在120点的时候停止单变成市价单追上去保证尽可能成交。
若下一根K线的开盘价大于120,那么以开盘价来立刻成交。

Q:回测1h k线数据, 为什么每一笔订单成交记录都比委托记录晚一个小时 , 请问这个是在哪里设置的?
A:委托时间,是你发出委托请求的时间。成交时间,是你的成交发生的事件。CTA策略回测遵循T时刻发出的委托,永远只在T+1后才能进行撮合的规则(否则会有未来函数)。
实盘中,你的成交时间可能和委托时间非常接近,但是回测中受限于数据没法确定,只能用T+1那根K线的时间戳近似。

Q:如果停止单触发下单之后一段时间没有执行的话,会撤单吗,什么时候撤单,撤单之后还会追单吗?
A:不会,除非策略执行撤单操作。

Q:如果停止单触发下单之后,部分成交,接下来会撤单还是追单?
A:策略会收到这笔委托的回报,用户可以自行处理,不会自动撤单或者追单

Q:如果同时持有多空,pos是单向的,该怎么处理? 例如:如果持有1手多单,sell卖平的委托没有成交,紧接着的short卖开的单子成交了,pos 是多少?
A : 对于CTA策略模块来说,策略的持仓,是基于策略本身发出去的委托成交维护的逻辑持仓(而非账户底层的实际持仓),所以pos会为0

Q:假如策略下了1手多单,手动下了1手多单,pos 是多少 ?
A : 人手工下的单子无影响

Q:假如策略下了2手多单,手动平仓1手,pos 是多少 ?
A:人手工下的单子无影响

Q:两个策略都在跑同一个代码,应该是各自有各自的pos 吧 ?
A:对的,这两个逻辑持仓互相无关

Q:框架对pos值的更新,是在onTrade 和 onOrder推送动作前,还是推送动作后?
A:onTrade推送前更新,保证策略收到onTrade回调的时候,pos已经是最新数据。

Q:vt_setting.json的路径到底在哪?
A:在c:\users\administrator.vntrader目录下,是基于Python的pathlib实现的

Q:positionData中的 yd_volume 是指什么?
A:昨仓,这个主要针对中国市场的股票(只能卖出昨日的股票)和期货(昨仓平仓手续费不同)

Q:当前CTA模块是不是不能在策略里获取当前资金情况?
A:不能获取资金和账户级别的持仓。策略的仓位管理(风险分配)应该由交易员来做,而不是让程序自动做

Q:非交易时间报单,是直接返回拒单,还是返回提交中等开盘了再打出去?
A:在策略层,如on_bar()函数里面第一个逻辑应该是self.cancel_all(),目的是清空为成交的委托(包括本地停止单,限价单),保证当前时间点委托状态是唯一的。
若在非交易时间发单,服务器收到这个委托会推送一个拒单的委托回报。

Q:vn.py如何查询实盘账户的历史资金情况
A:大部分交易系统并未提供历史资金查询功能,一般都是只能查当前时间点的资金,所以需要你自己保存。

 

其他问题

Q: 请问下,vn.py中有期权回测的例子么?vn.py 关于期权,是不是就只有OptionMatser模块?
A:期权的波动率交易策略一般无需回测,更多依赖建模;可以用其他组件交易期权,比如CTA策略模块赌趋势,或者SpreadTrading模块赌价差,但这些本质都不是期权交易策略。

Q:如何根据资金量进行下单,而不是固定手数下单?
A:vn.py框架下不建议交易程序在实盘中去获取账户可用资金,并调整交易手数,这是很危险的事情。

 
 
 

接口登录相关


CTP接口

Q:如何实现行情并发推送?
A:C++ API的回调函数只有一个推送线程;用户可以一次性订阅全市场的合约,某个合约有行情变动的时候才会推送.

Q:CTP能可否获取到指定经纪商的手续费?
A:这个手续费应该是连带期货公司的部分,但是不建议在交易程序中去访问这种数据,相关数据应该每天在启动策略前就获取好配置到策略里。

Q:郑商所的品种都收不到14:59这一根bar
A:郑商所的数据推送,没有3点后的最后一个tick用于标识收盘完成,所以要调用BarGenerator.generate函数来做最终的强制K线生成

Q:国内期货模拟,除了sinnow可以模拟,还有别的账号可以模拟吗,可以去期货公司申请模拟账号模拟吗?
A:SimNow是目前最稳定的仿真环境了,可以找期货公司申请,比如中信、上海中期等。

Q:sinnow连接有时会断开,然后传送的tick数据的时间就会延后,就是收盘时,本来应该平仓平不了,超过三点了还在传tick。
A:SimNow环境因为免费,用的人很多,所以服务器有时会卡。

Q:为什么有时候会不停的断开重连呢?
A:服务器关了或者人满了,或者账号密码错误。

Q:连接进 SimNow后,按”市价“下单被拒绝,按”限价“”FAK“”FOK"下单可以,请问,是否不支持“市价”下单。
A:SimNow不支持市价单,实盘支持。

Q:连接SimNow后,下单提示“提交中“无法成交也无法撤单
A:这种情况,一般是委托请求没有到CTP柜台(网络断了),或者CTP柜台挂了

Q:SimNow上不去,上去了注册又总是提示验证码失败,还有其他模拟的推荐吗?
A:SimNow是目前最推荐的仿真环境,建议换交易时间注册,以及SimNow主要支持移动和联通的手机号

Q:CTP配置无法连接:输入论坛登录名,账号,配置对应的交易服务器和行情服务器,点击连接,无任何反应
A:CTP的测试账号请通过SimNow获取,不是vn.py论坛的

Q:已有行情数据显示。 但是不能发单,如rb1905
A:检查是否漏填交易所或者上委托数量的字段。

Q:下单异常,第一种情况:一点击委托就是直接“已撤销”(委托栏里委托状态),双击撤单的时候,又会显示“交易撤单失败,代码25”。第二种情况:一点击委托就是直接“提交中”(委托栏里委托状态),等到双击撤单的时候,又会显示“交易撤单失败,代码25”
A:第一个情况应该是报单不符合服务端的要求,被拒单撤销了;第二个情况感觉是你的网络断了,报单请求发出但没有到服务器。可以顺着这两个方向检查。

Q:CTPTEST测试交易期货公司采集不到硬盘序列号,CPU序列号,BIOS序列号
A:数据采集是通过API内部的代码自动完成的,其他任何上层程序都无法影响。建议换机器。

Q:登录穿透式仿真账号问题,一直报4097的错
A:不要同时import CTPTEST和CTP这两个接口

Q:使用ctp和ctptest登录都显示不合法登录。错误代码3
A:账户密码错误

Q:CTP能否两个账号同时登录并且同时操作。
A:同一个接口只能登录一次。如果要同时登录两个CTP,需要自己扩展修改CtpGateway,然后加载两个CtpGateway;对于不同的接口,比如股票的XTP和期货的CTP,可以直接同时登录使用,并在策略中同时交易这两个接口的合约

 

IB接口

Q:如何链接盈透api ?
A:启动TWS;在配置中打开Socket连接功能;在vn.py中加载ibGateway,然后启动就行。

Q:IB接口连接,错误提示显示:couldn't connect to TWS. confirm that "enable activex and socket clients" is enabled aports for new installations of version 954.1 or newer: TWS:7497:IB gateway: 4002。
A:请检查TWS是否打开了socket访问功能。

Q:启动行情记录,则程序假死。
A:IB的行情订阅函数没有异步缓存逻辑,用DataRecorder脚本的话,会在连接TWS之前就进行了订阅请求,导致死掉。IbGatewa已经加上了历史数据查询获取功能,直接从IB查询K线数据进行初始化就行,不建议自己录制了。

 

富途接口
Q:futu_gateway里面的登陆信息设置,为何只要密码?
A:需要先下载和安装FUTU OPEN API:https://www.futunn.com/openAPI

 

华宝派
Q:华宝派如何申请试用/实盘呢?
A:请在华宝证券开户,然后联系客户经理申请使用华宝PI

 
 
 

模块应用相关


交易复制
Q:当跟单帐户检测到被跟单帐户的仓位变化后,具体操作是什么?
A:TradeCopy的发布者账号,维护一份本地持仓数据表,当有成交推送时立即更新计算最新仓位;发布者每当收到成交推送时,或者每隔一定的时间间隔(默认1秒),会广播一次当前自己的仓位信息;订阅者收到广播推送的发布者仓位后,乘以自身的复制系数,作为目标仓位;订阅者根据目标仓位,和自身实际持仓的偏差,决定具体的下单操作(目标是将实际持仓同步到和目标持仓一致),如果有之前的委托,会先执行撤单。

 

RQData
Q:import rqdatac 失败,没有找到rqdatac包
A:运行以下命令安装

pip install --extra-index-url https://rquser:ricequant99@py.ricequant.com/simple/ rqdatac==1.0.0a66

Q:如何配置RQDAAT账户?
A:申请试用账号:https://www.ricequant.com/purchase
在VN Trader主界面上,点配置,rqdata.username rqdata.password输入
重启VN Trader就能用了

 

工作线程
Q:vn.py运行的时候,会启动哪些线程?
A:
1.主线程:带PyQt界面时运行Qt的循环,无界面时可以直接阻塞或者用while循环
2.事件引擎线程:处理事件引擎队列中的事件,并调用注册的处理函数处理,所以如果是CtaStrategy层,所有回调函数你可以认为都是单线程在驱动的(每次只有一个在调用)
3.API层线程:不同的API不一样了。

Q:eventEngine2.__queue很多数据没有处理、队列一直变大?
A:如果queue的大小只增加,不减少,只可能是没有启动EventEngine,导致事件没有处理持续挤压导致的。否则运行过程中即使没有注册处理函数,该事件的数据也会被抛弃掉,不会继续保存着。

 

行情记录
Q:行情记录后怎么查看和下载历史行情数据?
A:行情记录模块是将tick或者bar直接存入你配置的数据库的,你要单独查看,可以用数据库可视化工具连接本地数据库查看,如果在vn.py里回测使用,不需要下载,是默认从数据库里找相关数据的

Q:datarecoder 和 cta running 的 CTP能不能分开设置?
A:可以,另外新建一个目录,里面创建.vntrader文件夹,在这个目录用run.py或者VN Station启动VN Trader,CTP接口的登录信息就都是独立的了。

 
其他

Q:网站上下载的vnpy ,和Anaconda site-package里面的vnpy有什么区别?
A:进行运行的是ananconda里面的vnpy。如同在anaconda里面调用numpy一样。

Q:修改vnpy代码后需要更新到anaconda site-package对应的文件里?
A:python里import的vnpy就是site-packages里的,你可以修改下环境变量,把你clone的那个目录加入搜索路径,这样你修改了clone的那个vn.py,用的时候就自动改了

Q:一键安装完2.0.5 后,再另外安装anaconda3, spyder无法使用
A:假设你安装到c:\anaconda3。打开cmd,运行c:\anaconda3\scripts\activate,然后再运行python,就会进入anaconda环境了

Q:请问算法交易怎么用,可以策略生成下单指令,由算法下单吗?
A:目前AlgoTrading模块主要通过GUI和篮子委托文件的方式来实现算法下单
尽管可以通过扩展的方式,实现策略调用算法执行交易,但更建议在CTA策略中自行实现算法交易的逻辑,获得更好的细节控制能力

Q:vn.py中配置界面中email各项设置如何填写,有何用处?
A:设置如下
"email.server": "SMTP邮件服务器地址",
"email.port": SMTP邮件服务器端口号,
"email.username": "邮箱用户名",
"email.password": "邮箱密码",
"email.sender": "发送者邮箱",
"email.receiver": "接收者邮箱",

 
 
 

社区操作相关


Q:找回vn.py社区的密码
A:在这个页面可以找回密码:https://www.vnpy.com/auth/reset-password

Q:注册了社区账号,但是登录报错。[WinError 10061] 由于目标计算机积极拒绝,无法连接
A:这个是代理服务器问题吧,换个网络试试。

Q:维恩的派和vn.py社区有什么关系?
A:维恩的派是vn.py社区2015-2018年的老论坛,但由于discuz的各种问题使用体验太差,现在已经停止使用,将在19年底正式下线。

Q:社区怎么上传照片的?
A:直接把图片拖动到编辑框中就能自动上传了

Q:有微信群,QQ群吗?
A:vn.py框架学习群:666359421 ; vn.py框架交流群:262656087



【VNPY进阶】on_tick函数内撤单追单详解,实盘在用的代码,没有坑哦

0.修改OrderData如下:

@dataclass
class OrderData(BaseData):
    """
    Order data contains information for tracking lastest status 
    of a specific order.
    """

    symbol: str
    exchange: Exchange
    orderid: str

    type: OrderType = OrderType.LIMIT
    direction: Direction = Direction.NET
    offset: Offset = Offset.NONE
    price: float = 0
    volume: float = 0
    traded: float = 0
    status: Status = Status.SUBMITTING
    datetime: datetime = None

    cancel_time: str = ""
    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}_{self.exchange.value}/{self.gateway_name}"
        self.vt_orderid = f"{self.gateway_name}_{self.orderid}"
        #未成交量
        self.untrade = self.volume - self.traded

1.策略init初始化参数

        #状态控制初始化
        self.chase_long_trigger = False
        self.chase_sell_trigger = False
        self.chase_short_trigger = False
        self.chase_cover_trigger = False  
        self.cancel_status = False
        self.last_vt_orderid = ""
        self.long_trade_volume = 0
        self.short_trade_volume = 0
        self.sell_trade_volume = 0
        self.cover_trade_volume = 0 
        self.chase_interval   =    10    #拆单间隔:秒

get_position_detail参考这个帖子 https://www.vnpy.com/forum/topic/2167-cha-xun-cang-wei-chi-cang-jun-jie-wei-cheng-jiao-wei-tuo-dan-yi-ge-han-shu-gao-ding

2.on_tick里面的代码如下

from vnpy.trader.object import TickData, BarData, TradeData, OrderData,Status
    def __init__(self, strategy_engine: StrategyEngine, strategy_name: str,vt_symbols: List[str], setting: dict):
        """
        """
        super().__init__(strategy_engine, strategy_name, vt_symbols, setting)
        #撤单条件选择,默认使用超时撤单,为False使用突破价格范围撤单
        self.cancel_timer_trigger = True
    def on_tick(self, tick: TickData):
        active_orders = self.get_position_detail(chase_vt_symbol).active_orders
        vt_orderid = ""
        if active_orders:
            #委托完成状态
            order_finished = False
            self.last_vt_orderid = list(active_orders.items())[0][0]         #委托单vt_orderid
            active_order:OrderData = list(active_orders.items())[0][1]      #委托单类 
            if self.cancel_timer_trigger:
                #撤单触发条件,超时撤单
                trigger_status = (raw_tick.datetime - active_order.datetime).total_seconds() > self.chase_interval
            else:
                price_tick = self.get_contract_detail(chase_vt_symbol).price_tick
                #突破价格范围撤单
                trigger_status = not active_order.price - price_tick * self.cancel_trigger_payup <= raw_tick.last_price <= active_order.price + price_tick * self.cancel_trigger_payup
            #开平仓追单,部分交易没有平仓指令(Offset.NONE)
            if active_order.offset in (Offset.NONE,Offset.OPEN):
                if active_order.direction == Direction.LONG:
                    self.long_trade_volume = active_order.untrade
                    if trigger_status and self.long_trade_volume > 0 and (not self.chase_long_trigger) and self.last_vt_orderid:
                        #撤销之前发出的未成交订单
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_long_trigger = True
                elif active_order.direction == Direction.SHORT:
                    self.short_trade_volume = active_order.untrade    
                    if trigger_status and self.short_trade_volume > 0 and (not self.chase_short_trigger) and self.last_vt_orderid:  
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_short_trigger = True
            #平仓追单
            elif active_order.offset in (Offset.CLOSE,Offset.CLOSETODAY,Offset.CLOSEYESTERDAY):
                if active_order.direction == Direction.SHORT: 
                    self.sell_trade_volume = active_order.untrade
                    if trigger_status and self.sell_trade_volume > 0 and (not self.chase_sell_trigger) and self.last_vt_orderid: 
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_sell_trigger = True                                                    
                if active_order.direction == Direction.LONG:
                    self.cover_trade_volume = active_order.untrade
                    if trigger_status and self.cover_trade_volume > 0 and (not self.chase_cover_trigger) and self.last_vt_orderid:                                                       
                        self.cancel_order(self.last_vt_orderid)
                        self.chase_cover_trigger = True   
        else:
            order_finished = True
            self.cancel_status = False
        #追单的委托单状态是正常的撤销状态则发出追单指令
        if self.get_order(self.last_vt_orderid) and self.get_order(self.last_vt_orderid).status == Status.CANCELLED:
            if self.chase_long_trigger:
                if order_finished:
                    self.buy(chase_vt_symbol,raw_tick.ask_price_1,self.long_trade_volume)
                    self.long_trade_volume = 0
                    self.chase_long_trigger = False  
                else:
                    self.cancel_surplus_order(list(active_orders))
            elif self.chase_short_trigger:
                if  order_finished:
                    self.short(chase_vt_symbol,raw_tick.bid_price_1,self.short_trade_volume)
                    self.short_trade_volume = 0
                    self.chase_short_trigger = False 
                else:
                    self.cancel_surplus_order(list(active_orders))
            elif self.chase_sell_trigger:
                if order_finished:
                    self.sell(chase_vt_symbol,raw_tick.bid_price_1,self.sell_trade_volume)
                    self.sell_trade_volume = 0
                    self.chase_sell_trigger = False                      
                else:
                    self.cancel_surplus_order(list(active_orders))
            elif self.chase_cover_trigger:
                if order_finished:
                    self.cover(chase_vt_symbol,raw_tick.ask_price_1,self.cover_trade_volume)
                    self.cover_trade_volume = 0
                    self.chase_cover_trigger = False
                else:
                    self.cancel_surplus_order(list(active_orders))
    #------------------------------------------------------------------------------------
    def cancel_surplus_order(self,orderids:list):
        """
        撤销剩余活动委托单
        """
        if not self.cancel_status:
            for vt_orderid in  orderids:
                self.cancel_order(vt_orderid)
            self.cancel_status = True
# template.py里面增加
    #------------------------------------------------------------------
    def get_order(self,vt_orderid:str) -> Union[OrderData,None]:
        """
        通过vt_orderid获取委托单
        """
        return self.cta_engine.get_order(vt_orderid) 
# cta_strategy\engine.py里面增加
    #------------------------------------------------------------------------------------
    def get_order(self,vt_orderid:str) -> Union[OrderData,None]:
        """
        通过vt_orderid获取委托单
        """
        self.main_engine.get_order(vt_orderid)


米筐答疑版块上线,欢迎选择 RQData 数据服务

熟悉我们的老用户可能知道,米筐自 2017 年开始与 VN.PY 形成战略合作伙伴关系。米筐的“数据”和“策略”解决方案 VN.PY 的“交易”服务互补,两年多来,我们已经联合为多位用户提供了高性价比的量化投研与交易服务。感兴趣的新朋友可以回顾我们的老帖子《米筐科技和 VN.PY 的战略合作重磅升级》

为了给各位用户带来更好的产品体验,RQData 相关的答疑功能来了。在这个版块中,我们将提供更好的售前答疑和售后服务,帮助您解决从数据购买到产品应用的技术性问题。欢迎大家在本版块中多多交流~

若您在投研过程中有金融数据需求,米筐的 RQData 产品可为您提供对应的数据服务。RQData 是一个基于 Python 的金融数据工具包,提供丰富整齐的历史数据以及简单高效的 API 接口,免除了您进行数据搜索、清洗的烦恼。

针对期货、商品期权,我们提供了对应的方案,欢迎您点击此处进行体验和购买

更多下单疑问,您可以在本帖中进行交流。感谢大家的关注~



VeighNa发布v3.6.0 - 支持Mac系统CTP交易接口!

发布于veighna社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2023-02-22
 

《30天精进Python交易GUI》课程更新35-40集,近6集内容中详细讲解了量化系统中的【通用化数据监控表格控件】的开发使用技巧,以及如何通过Qt Style Sheet实现【自定义交易UI界面风格美化】,感兴趣的同学请戳这里

上周发布了VeighNa的3.6.0版本,本次更新的主要内容是增加了Mac系统上的CTP交易接口支持(本文完成于专门为此采购的Mac Mini M2~)。

对于已经安装了VeighNa Studio的用户,可以使用快速更新功能完成自动升级。对于没有安装的用户,请下载VeighNa Studio-3.6.0,体验一键安装的量化交易Python发行版,下载链接:

https://download.vnpy.com/veighna_studio-3.6.0.exe

 

Mac系统的CTP接口支持

 

得益于Python语言本身的跨平台优势(Windows、Linux、Mac三大系统),VeighNa量化交易平台的核心框架部分很早就可以在Mac系统上运行。

但由于C++类交易API对于Mac系统支持的普遍不足,导致之前只有vnpy_ib等少数【纯Python实现】的交易接口可以在Mac系统上运行,对于大部分用户来说没什么实际价值。

从6.6.7版本的CTP API开始,上期技术官方推出了对Mac系统支持,包括Intel(x86_64)和苹果M系(arm64)芯片。终于,VeighNa平台可以在Mac系统上为期货量化用户提供从投研回测到实盘交易的一体化解决方案。

 

Mac系统的VeighNa安装流程

 

目前Mac系统上还没有类似VeighNa Studio的开箱即用发行版(开发团队在抓紧研究中),需要手动完成安装流程:

  1. 前往Python官网下载3.10版本的安装包(或者使用brew安装),安装完成后在终端(Terminal)中运行命令:
python3

检查确认打开的Python解释器为3.10版本。

  1. 使用brew安装TA-Lib的C++开发包:
brew install ta-lib
  1. 安装NumPy和TA-Lib(Python),这里推荐使用豆瓣PyPI镜像解决官方源访问困难的问题:
python3 -m pip install numpy --index=https://pypi.doubanio.com/simple
python3 -m pip install ta-lib==0.4.24 --index=https://pypi.doubanio.com/simple
  1. 安装米筐RQData客户端,注意这里使用的是米筐PyPI源:
python3 -m pip install rqdatac --index=https://pypi2.ricequant.com/simple
  1. 安装VeighNa核心框架,以及需要使用的功能插件模块:
python3 -m pip install vnpy --index=https://pypi.doubanio.com/simple
python3 -m pip install vnpy_ctastrategy vnpy_ctabacktester vnpy_datamanager vnpy_sqlite vnpy_rqdata --index=https://pypi.doubanio.com/simple

这里的例子中包括(具体可以根据自己的需求调整):

  • CTA策略实盘和回测模块:vnpy_ctastrategy、vnpy_ctabacktester
  • 历史数据管理模块:vnpy_datamanager
  • SQLite数据库驱动:vnpy_sqlite
  • RQData数据服务适配器:vnpy_rqdata

pip安装过程中如果出现报错某些依赖库的缺失,可以尝试先pip install该依赖库,然后再次执行上述安装命令。

  1. 安装CTP交易接口模块:
python3 -m pip install vnpy_ctp --index=https://pypi.doubanio.com/simple

如果Intel芯片的机器上安装失败,请在App Store中安装XCode编译器后再次尝试。

完成后即可使用run.py脚本启动VeighNa Trader:

from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy_ctp import CtpGateway
from vnpy_ctastrategy import CtaStrategyApp
from vnpy_ctabacktester import CtaBacktesterApp
from vnpy_datamanager import DataManagerApp

def main():
    """Start VeighNa Trader"""
    qapp = create_qapp()

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)

    main_engine.add_gateway(CtpGateway)
    main_engine.add_app(CtaStrategyApp)
    main_engine.add_app(CtaBacktesterApp)
    main_engine.add_app(DataManagerApp)

    main_window = MainWindow(main_engine, event_engine)
    main_window.showMaximized()

    qapp.exec()

if __name__ == "__main__":
    main()

附上几张Mac系统运行VeighNa平台的截图:

description

description

description

如果大家在安装过程中遇到任何问题,欢迎在本文评论区留言交流!

本次在Mac上开发CTP交易接口的过程中也踩了不少的坑,包括:

  • 头文件和Windows/Linux定义不一致
  • GBK到UTF-8的中文编码转换出错
  • Clang的专属编译和链接配置参数
  • Mac版本开发包中使用.a静态链接库

如果对上述内容感兴趣的话也请在评论区告诉我们,后面考虑做一期专门的技术分享文章!

 

CHANGELOG

 

新增

  1. 新增vnpy_ctp的Mac系统支持(M1/M2)

调整

  1. BaseDatafeed的相关功能函数增加output入参用于输出日志
  2. 修改相关数据服务模块适配output参数:vnpy_rqdata/vnpy_ifind/vnpy_wind/vnpy_tushare
  3. 修改相关策略应用模块适配output参数:vnpy_ctastrategy/vnpy_ctabacktester/vnpy_portfoliostrategy/vnpy_spreadtrading/vnpy_datamanager
  4. OffsetConverter增加对于SHFE/INE合约的锁仓模式支持
  5. 在OmsEngine中添加全局的OffsetConverter功能,不再需要各AppEngine自行维护
  6. 添加CTA策略模块在执行参数优化时的进程数量限制参数:vnpy_ctastrategy/vnpy_ctabacktester
  7. 增加穷举优化算法运行过程中基于tqdm的进度条输出
  8. 增加遗传优化算法运行过程中的迭代次数进度输出
  9. 增加vnpy_optionmaster模块的期权产品对应标的合约的匹配函数,不再限制产品范围
  10. 升级vnpy_tts的dll链接库,解决openctp升级导致的资金不显示的问题
  11. 修改vnpy_ctastrategy使用vnpy.trader.database中统一定义的时区来加载数据
  12. 增加vnpy_ctastrategy策略模板的合约乘数查询函数get_size
  13. 增加vnpy_spreadtrading回测中统计绩效时对于爆仓情况的检查
  14. 增加vnpy_scripttrader基于vt_symbol和direction查询持仓数据的函数
  15. 修改vt_positionid的字符串内容,增加gateway_name前缀标识

修复

  1. 修复异常捕捉钩子threading_excepthook的参数错误问题
  2. 修复vnpy_ib获取历史数据时的异常失败问题
  3. 修复vnpy_rest/vnpy_websocket中aiohttp的代理参数proxy传空时必须为None的问题
  4. 修复vnpy_optionmaster模块的Greeks监控表行数设置不足的问题
  5. 修复vnpy_rqdata查询股票期权数据报错的问题
  6. 修复vnpy_rqdata中RqdataGateway获取期货指数和连续合约信息时错误的问题
  7. 修复vnpy_portfoliostrategy中,从缓存文件恢复数据,导致defaultdict变成dict的问题
     


历时一年多,终于等到了正义的判决

发布于veighna社区公众号【vnpy-community】
 
原文作者:用Python的交易员 | 发布时间:2023-03-15
 

看到标题,可能许多VeighNa社区的老用户已经猜到了内容。

2021年底上海量贝信息科技有限公司(简称“量贝公司”)通过知乎、简书、QQ群、量贝公司官网各种公开渠道对我司(上海韦纳软件科技有限公司)进行攻击和诽谤,无端指责我司从事“金融违法行为”以及“剽窃知识产权”,量贝公司更是向证监会举报我司从事“金融违法行为”,继而在毫无凭证、证据的情况下凭空捏造、污蔑我司被证监会“查处”。

我司本不予理会这些无端攻击,正常从事经营,怎奈量贝公司及其实际控制人王登高看到我司未做回应,继续肆无忌惮的造谣、诽谤和攻击我司及我司法定代表人陈晓优。我司及我司法定代表人随即委托律师发起了对量贝公司的名誉侵权(网络侵权责任纠纷)法律诉讼,包括:

  • 我司(上海韦纳软件科技有限公司)作为原告发起对被告量贝公司的诉讼,上海市浦东新区人民法院案号(2022)沪0115民初32590号
  • 我司法定代表人陈晓优作为原告发起对被告量贝公司的诉讼,上海市浦东新区人民法院案号(2022)沪0115民初32589号

经过一年左右的时间,我们终于等到了两起诉讼的一审判决结果:

  • 经法院查明,证监部门并未对原告(韦纳公司及陈晓优)作出过行政处罚

  • 且法院认为,被告量贝公司的评论内容(包括但不限于非法金融,剽窃代码等)均为其推测和臆断极易引发网络不特定人员对原告(韦纳公司及陈晓优)产生$主观过错明显,构成对原告(韦纳公司及陈晓优)名誉权的侵害

  • 故法院判令被告量贝公司停止侵权、删除攻击原告陈晓优的评论和链接、公开向韦纳软件和陈晓优赔礼道歉以及支付部分经济赔偿

以下是法院的民事判决书链接:

目前我司法定代表人陈晓优对量贝公司的诉讼,量贝公司不服浦东新区人民法院(2022)沪0115民初32589号民事判决,已经提起二审上诉,我们对于任何恶意侵犯公司及公司法人名誉的行为,将会奉陪到底。

过去的一年里,我们得到了诸多来自VeighNa开源社区的朋友、上海正策律师事务所、公司内部团队的支持和帮助,在此表示由衷的感谢。

虽然2022年的整体开发进度受到了不少影响,但我们依然会继续坚持【开源量化交易平台】的核心方向,不忘初心努力为大家带来更加优秀的产品和服务!
 



本地部署vnpy$发环境[适用于windows+mac]

安装流程:

  1. 下载 anaconda,并安装
  2. 下载pycharm,并安装
  3. 下载vnpy源码vn.py
    下载源码
  4. 打开 pycharm, file->open,选择下载的vnpy 文件夹
    pycharm打开vnpy文件夹
  5. 配置 python 开发环境, file- settings
    pycharm项目设置
    添加新的anaconda环境
    description
    选择python3.7
    description
    这里我使用的环境名称是 py37_vnpy

打开terminal-注意环境切换成 py37_vnpy,执行以下命令,安装需要的插件

pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

description
遇到安装失败的可以单独安装:

pip install PyQt5 -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

description

  1. 创建run.py文件,复制以下代码,来源 README.md

description

  1. 运行 python run.py,注意环境名称是 py37_vnpy

description

备注:
如果需要在cmd 下使用 py37_vnpy 环境。

打开 CMD
运行: conda activate py37_vnpy
会切换到py37_vnpy环境下

description



一个基于ShareMemery实现多标的多进程遗传算法共享K线数据的实现思路以及部分代码

基于python3.8以上版本,我个人是最新的vnpy3.6.0。
首先该功能的实现可以节省多进程回测时大部分的内存消耗,原有1个进程准备1份K线数据,现在可以16个进程准备1分K线数据,占用率大大降低。
感谢Q群上的“广西_漫步“提供的代码,我在此基础上整理了一下实现的思路与难点,并提供部分代码,为后续有必要实现这个功能的小伙伴提供参考。代码改的部分非常多,所以不建议小白去尝试,除非是确实有这个需求非要实现不可。
以下是实现时要解决的问题:
1,官方目前应该还没支持多标的遗传算法(如果有错可以指出),所以要个人实现多进程遗传算法参数优化。(多看下CTA单合约部分是怎么实现的,再改到多标的)
2,K线数据怎么通过ShareMemory共享到各进程里面。在这里,我花费了很多时间去研究,最后还是通过参考“广西_漫步”的代码,才悟出了实现方式。简单的说,就是将原有的历史数据拆分成12个ShareMemory,拆分前先按时间排序,然后再按次序拆分。backtesting读取的时候,直接去遍历整个history数据,由于已经按时间排序好了,所以就符合原有backtesting的数据按时间推送的逻辑。

代码如下,混杂了很多我自己的代码,如果有确实要实现这个功能的小伙伴可以爬下这个屎山,按上面这两个处理难题将其中精华的部分抽取出来,就可以实现自己的多进程多标的遗传算法优化功能了。难度还是有的,但是这个路子是可以走得通的,值得花时间研究研究。
第一份代码是my_backtesting.py,对应的改动原版是官方的vnpy_portfoliostrategy.backtesting文件

# coding=utf-8
from datetime import date, datetime, timedelta
from typing import Dict, List, Set, Tuple
import traceback
from functools import partial
import os
from multiprocessing.shared_memory import SharedMemory

import numpy as np
import seaborn as sns
from pandas import DataFrame

from empyrical import cagr, annual_volatility, downside_risk
from dataclasses import dataclass
from vnpy.trader.constant import Direction, Offset, Interval, Status
from vnpy.trader.object import OrderData, TradeData, BarData, BaseData
from vnpy.trader.utility import round_to, extract_vt_symbol, generate_vt_symbol
from vnpy.trader.constant import Exchange
from vnpy_ctastrategy.backtesting import get_target_value
from vnpy_portfoliostrategy.backtesting import BacktestingEngine, PortfolioDailyResult, ContractDailyResult,\
    INTERVAL_DELTA_MAP
from vnpy_portfoliostrategy.template import StrategyTemplate
from vnpy_spreadtrading.backtesting import BacktestingMode
from my_vnpy.my_trader.my_optimize import my_run_ga_optimization, OptimizationSetting, check_optimization_setting
from my_vnpy.my_app.my_portfolio_strategy.my_template import MyStrategyTemplate
from my_vnpy.my_trader.my_database import my_database_manager
from FuturesResearch.Definite_value import FuturesSchema, IndexFuturesCode, ALL_FUTURES_CODE_LIST
from my_vnpy.my_app.my_portfolio_strategy.my_utility import set_ratios, UTC
from SaveData.SaveDateUtility import get_next_trade_date

# Set seaborn style
sns.set_style("whitegrid")


class MyBacktestingEngine(BacktestingEngine):
    """"""
    gateway_name = "BACKTESTING"
    def __init__(self):
        """"""
        super().__init__()

        self.mode = BacktestingMode.BAR
        self.inverse = False
        self.strategy: MyStrategyTemplate = None
        self.strategy_class = None

        self.trades_by_datetime = {}  # 2021-11-15用来计算每笔盈亏的魔改参数
        # 2021-11-15用来计算每笔盈亏的魔改参数

        self.dynamic_total_occupy_capital_on_minute = 0  # 这个根据每分钟的数据都改变
        self.dynamic_profit_and_loss = 0
        self.dynamic_commission = 0

        # 2022-11-22新加
        self.margin_ratios: Dict[str, float] = 0

        self.dynamic_total_capital = self.capital  # 这里新增一个动态仓位参数
        self.advanced_stop_dynamic_capital_min = 120000  # 这里设置,如果动态资金小于12万,就提前结束回测
        self.dynamic_total_occupy_capital = 0
        self.risk_free: float = 0.02

        self.vt_symbols_dynamic_pnl: Dict[str, VtSymbolDynamicPnl] = {}

        # 下面是为了共享内存多进程回测加入的参数
        self.optimization_count = 0
        self.zw = None
        self.my_stats = None
        self.fixed_param_dict = {}
        self.current_pid = os.getpid()

    def check_data(self, h_data, d_data):
        for vt_symbol in self.vt_symbols:
            for dt in self.dts:
                bar = self.history_data.get((dt, vt_symbol))
                if bar is None:
                    print(dt, vt_symbol)

        vt_s_len = len(self.vt_symbols)
        if len(d_data) * vt_s_len != len(h_data):
            raise ValueError(f"history的总数必须等于 vt_symbols的数量 乘以 dts,dts长度:{len(d_data)}")

    def new_bars(self, dt: datetime) -> None:
        """"""
        self.datetime = dt

        self.bars.clear()

        # 2022-01-18新加,预期加入动态的vt_symbols
        for vt_symbol in self.vt_symbols:
            bar = self.history_data.get((dt, vt_symbol), None)
            if bar:
                # print('这里是new_bar里面合成bars的bar:', bar.__dict__.items())
                self.bars[vt_symbol] = bar
            else:
                dt_str = dt.strftime("%Y-%m-%d %H:%M:%S")
                if vt_symbol == 'AU.SHFE':
                    if (dt.hour < 9 or
                            (dt.hour == 9 and dt.minute <= 30) or
                            dt.hour > 15 or
                            (dt.hour == 10 and 15 <= dt.minute <= 30) or
                            (dt.hour == 13 and dt.minute < 30)):

                        # self.output(f"数据缺失:{dt_str} {vt_symbol}")
                        return
                    elif dt.year == 2020 and dt.month == 7 and dt.day == 24:
                        return
                    elif dt.year == 2019 and dt.month == 5 and dt.day == 10:
                        return
                    elif dt.year == 2019 and dt.month == 11 and dt.day == 19:
                        return
                    elif dt.year == 2018 and dt.month == 11 and dt.day == 12:
                        return
                    elif dt.year == 2018 and dt.month == 5 and dt.day == 7:
                        return
                    elif dt.year == 2017 and dt.month == 11 and dt.day == 15:
                        return
                    elif dt.year == 2017 and dt.month == 5 and dt.day == 8:
                        return
                    elif dt.year == 2016 and dt.month == 11 and dt.day == 11:
                        return
                    elif dt.year == 2016 and dt.month == 5 and dt.day == 5:
                        return
                self.output(f"数据缺失:{dt_str} {vt_symbol}")
                return

        self.cross_limit_order()
        self.update_daily_close(self.bars, dt)  # 这里调换顺序,先算当日收益,再把bars输入on_bars计算。
        self.update_dynamic_capital(dt, self.bars)  # 更新剩余资金

        self.strategy.on_bars(self.bars)

    def cross_limit_order(self) -> None:
        """
        Cross limit order with last bar/tick data.
        """
        for order in list(self.active_limit_orders.values()):
            bar = self.bars[order.vt_symbol]

            long_cross_price = bar.low_price
            short_cross_price = bar.high_price
            long_best_price = bar.open_price
            short_best_price = bar.open_price

            # Push order update with status "not traded" (pending).
            if order.status == Status.SUBMITTING:
                order.status = Status.NOTTRADED
                self.strategy.update_order(order)

            # Check whether limit orders can be filled.
            long_cross = (
                order.direction == Direction.LONG
                and order.price >= long_cross_price
                and long_cross_price > 0
            )

            short_cross = (
                order.direction == Direction.SHORT
                and order.price <= short_cross_price
                and short_cross_price > 0
            )

            if not long_cross and not short_cross:
                continue

            # Push order update with status "all traded" (filled).
            order.traded = order.volume
            order.status = Status.ALLTRADED
            self.strategy.update_order(order)

            self.active_limit_orders.pop(order.vt_orderid)

            # Push trade update
            self.trade_count += 1

            if long_cross:
                trade_price = min(order.price, long_best_price)
            else:
                trade_price = max(order.price, short_best_price)

            trade = TradeData(
                symbol=order.symbol,
                exchange=order.exchange,
                orderid=order.orderid,
                tradeid=str(self.trade_count),
                direction=order.direction,
                offset=order.offset,
                price=trade_price,
                volume=order.volume,
                datetime=self.datetime,
                gateway_name=self.gateway_name,
            )

            self.strategy.update_trade(trade)
            self.trades[trade.vt_tradeid] = trade

            # 这里经过魔改,将trade按datetime为key加入字典
            if trade.datetime in self.trades_by_datetime.keys():
                self.trades_by_datetime[trade.datetime].append(trade)
            else:
                self.trades_by_datetime[trade.datetime] = [trade]

    def update_dynamic_capital(self, dt: datetime, bars) -> None:
        # self.output("开始计算逐日盯市实时资金净值")
        if not self.vt_symbols_dynamic_pnl:
            # 如果vt_symbols_dynamic_pnl是空字典,就初始化,生成每个symbol对应的变量容器
            for vt_symbol in self.vt_symbols:
                vt_symbol_dynamic_pnl = VtSymbolDynamicPnl(
                    vt_symbol=vt_symbol,
                    rate=self.rates[vt_symbol],
                    slippage=self.slippages[vt_symbol],
                    size=self.sizes[vt_symbol],
                    margin_ratio=self.margin_ratios[vt_symbol],
                    gateway_name=self.gateway_name
                )
                self.vt_symbols_dynamic_pnl[vt_symbol] = vt_symbol_dynamic_pnl

        # 归纳每天持仓的市值,计算每天的每个品种累计未平仓的盈亏
        if not self.trades_by_datetime:
            self.strategy.update_capital(self.dynamic_total_capital, self.dynamic_total_occupy_capital)
            return

        if dt in self.trades_by_datetime:
            for trade in self.trades_by_datetime[dt]:
                # 将每一笔交易计算动态盈亏及其保证金持仓
                self.calculate_symbol_dynamic_capital(trade)

        # 2022-11-22改动,将所有持仓手续费汇总再加进去
        all_vt_symbol_dynamic_commission = 0
        all_vt_symbol_submit_dynamic_total_occupy_capital = 0
        _all_symbol_dynamic_pnl = 0
        all_total_pnl = 0
        _dynamic_total_pnl = 0
        for vt_symbol in self.vt_symbols_dynamic_pnl:
            all_vt_symbol_dynamic_commission += self.vt_symbols_dynamic_pnl[
                vt_symbol].dynamic_open_pos_commission
            all_total_pnl += self.vt_symbols_dynamic_pnl[vt_symbol].total_pnl
            dynamic_volume = self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_volume
            if dynamic_volume != 0:
                now_close_price = bars[vt_symbol].close_price
                dynamic_cost_price = self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_cost_price
                size = self.vt_symbols_dynamic_pnl[vt_symbol].size
                _dynamic_pnl = (now_close_price - dynamic_cost_price) * dynamic_volume
                _dynamic_total_pnl += _dynamic_pnl
                single_submit_dynamic_total_occupy_capital = now_close_price * size * self.margin_ratios[vt_symbol] * \
                                                             dynamic_volume
                all_vt_symbol_submit_dynamic_total_occupy_capital += single_submit_dynamic_total_occupy_capital
                sing_commission = now_close_price * dynamic_volume * size * self.vt_symbols_dynamic_pnl[vt_symbol].rate
                if self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_direction == Direction.LONG:
                    _pnl = (now_close_price - dynamic_cost_price) * dynamic_volume * size + \
                           self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_pnl - sing_commission
                else:
                    _pnl = (dynamic_cost_price - now_close_price) * dynamic_volume * size + \
                           self.vt_symbols_dynamic_pnl[vt_symbol].dynamic_pnl - sing_commission
                _all_symbol_dynamic_pnl += _pnl

        self.dynamic_total_occupy_capital = all_vt_symbol_submit_dynamic_total_occupy_capital
        self.dynamic_total_capital = self.capital + all_total_pnl + _dynamic_total_pnl + _all_symbol_dynamic_pnl

        self.strategy.update_capital(self.dynamic_total_capital, all_vt_symbol_submit_dynamic_total_occupy_capital)

    def calculate_symbol_dynamic_capital(self, trade: TradeData):
        # 首先考虑的是原本这个symbol没有仓位的情况下
        trade_vt_symbol = generate_vt_symbol(trade.symbol, trade.exchange)
        _vt_symbol_dynamic_pnl: VtSymbolDynamicPnl = self.vt_symbols_dynamic_pnl[trade_vt_symbol]
        _rate = _vt_symbol_dynamic_pnl.rate
        _size = _vt_symbol_dynamic_pnl.size
        _margin_ratio = _vt_symbol_dynamic_pnl.margin_ratio

        # 这里还要区分多空交易,从而导致的滑点加减方向不同
        if trade.direction == Direction.LONG:
            _actual_trade_price = trade.price + _vt_symbol_dynamic_pnl.slippage
        else:
            _actual_trade_price = trade.price - _vt_symbol_dynamic_pnl.slippage

        _single_turnover = _actual_trade_price * _size * trade.volume
        _single_commission = _single_turnover * _rate

        # 2023-03-06新加考虑空方向
        if _vt_symbol_dynamic_pnl.dynamic_volume == 0:
            # 新开仓
            _vt_symbol_dynamic_pnl.dynamic_direction = trade.direction
            _vt_symbol_dynamic_pnl.dynamic_volume = trade.volume
            _vt_symbol_dynamic_pnl.dynamic_open_pos_commission = _single_commission
            _vt_symbol_dynamic_pnl.dynamic_occupy_capital = _single_turnover * _margin_ratio + _single_commission
            _vt_symbol_dynamic_pnl.dynamic_cost_price = _actual_trade_price

        elif _vt_symbol_dynamic_pnl.dynamic_volume != 0 and trade.offset == Offset.OPEN:
            # 原有多仓加仓
            _vt_symbol_dynamic_pnl.dynamic_open_pos_commission += _single_commission

            _vt_symbol_dynamic_pnl.dynamic_occupy_capital += _single_turnover * _margin_ratio + _single_commission
            _vt_symbol_dynamic_pnl.dynamic_cost_price = (
                _actual_trade_price * trade.volume +
                _vt_symbol_dynamic_pnl.dynamic_cost_price * _vt_symbol_dynamic_pnl.dynamic_volume) / \
                (trade.volume + _vt_symbol_dynamic_pnl.dynamic_volume)
            _vt_symbol_dynamic_pnl.dynamic_volume += trade.volume
        elif trade.offset == Offset.CLOSE and trade.volume == _vt_symbol_dynamic_pnl.dynamic_volume:
            # 这种是全平了的情况
            if trade.direction == Direction.SHORT:
                # 多头平仓,下的指令单为空单
                trade_pnl_without_commission = (_actual_trade_price - _vt_symbol_dynamic_pnl.dynamic_cost_price) * _size * \
                                               trade.volume
            else:
                trade_pnl_without_commission = (_vt_symbol_dynamic_pnl.dynamic_cost_price - _actual_trade_price) * _size * \
                                               trade.volume

            _vt_symbol_dynamic_pnl.dynamic_open_pos_commission += _single_commission
            trade_pnl = trade_pnl_without_commission - _vt_symbol_dynamic_pnl.dynamic_open_pos_commission
            _vt_symbol_dynamic_pnl.dynamic_pnl += trade_pnl
            _vt_symbol_dynamic_pnl.total_pnl += _vt_symbol_dynamic_pnl.dynamic_pnl
            # 计算完了盈亏情况,开始将动态数据清零
            _vt_symbol_dynamic_pnl.dynamic_direction = None
            _vt_symbol_dynamic_pnl.dynamic_open_pos_commission = 0
            _vt_symbol_dynamic_pnl.dynamic_volume = 0
            _vt_symbol_dynamic_pnl.dynamic_occupy_capital = 0
            _vt_symbol_dynamic_pnl.dynamic_cost_price = 0
            _vt_symbol_dynamic_pnl.dynamic_pnl = 0  # 上面已经将盈亏加入总盈亏,所以这里可以清零了

        elif trade.volume != _vt_symbol_dynamic_pnl.dynamic_volume and trade.offset == Offset.CLOSE:
            # 这里是半平仓的情况
            _vt_symbol_dynamic_pnl.dynamic_volume -= trade.volume
            # 根据现在动态交易额,反推该交易额实际会占用多少资金
            _vt_symbol_dynamic_pnl.dynamic_occupy_capital -= _single_turnover * _margin_ratio + \
                                                             _vt_symbol_dynamic_pnl.dynamic_open_pos_commission + \
                                                             _single_commission

            # 计算一次减仓的盈亏
            if trade.direction == Direction.SHORT:
                # 多头平仓,下的指令单为空单
                trade_pnl = (_actual_trade_price - _vt_symbol_dynamic_pnl.dynamic_cost_price) * _size * trade.volume - \
                            _single_commission
            else:
                trade_pnl = (_vt_symbol_dynamic_pnl.dynamic_cost_price - _actual_trade_price) * _size * trade.volume - \
                            _single_commission

            _vt_symbol_dynamic_pnl.dynamic_pnl += trade_pnl

        _vt_symbol_dynamic_pnl.total_trade_times += 1

    def my_set_parameters(
        self,
        vt_symbols: List[str],
        interval: Interval,
        start: datetime,
        rates: Dict[str, float] or int or float,
        slippages: Dict[str, float] or int or float,
        sizes: Dict[str, float] or int or float,
        priceticks: Dict[str, float] or int or float,
        margin_ratios: Dict[str, float] or int or float,
        capital: int = 0,
        end: datetime = None,
        risk_free: float = 0
    ) -> None:
        """"""
        if len(vt_symbols) == 2 and vt_symbols[0] in ALL_FUTURES_CODE_LIST:
            futures_code = vt_symbols[0]
            if futures_code == IndexFuturesCode.IC:
                self.index_symbol = '000905.SSE'
            elif futures_code == IndexFuturesCode.IF:
                self.index_symbol = '000300.SSE'
            elif futures_code == IndexFuturesCode.IH:
                self.index_symbol = '000016.SSE'
            self.vt_symbols.append(self.index_symbol)
            if vt_symbols[1] == 'backtesting':
                this_month_wfq_symbol = futures_code + 'L088.CFFEX'
                this_month_hfq_symbol = futures_code + 'L0889.CFFEX'
                next_month_wfq_symbol = futures_code + 'L188.CFFEX'
                next_month_hfq_symbol = futures_code + 'L1889.CFFEX'
                first_quarter_wfq_symbol = futures_code + 'L288.CFFEX'
                first_quarter_hfq_symbol = futures_code + 'L2889.CFFEX'
                second_quarter_wfq_symbol = futures_code + 'L388.CFFEX'
                second_quarter_hfq_symbol = futures_code + 'L3889.CFFEX'
                self.vt_symbols.append(this_month_wfq_symbol)
                self.vt_symbols.append(this_month_hfq_symbol)
                self.vt_symbols.append(next_month_wfq_symbol)
                self.vt_symbols.append(next_month_hfq_symbol)
                self.vt_symbols.append(first_quarter_wfq_symbol)
                self.vt_symbols.append(first_quarter_hfq_symbol)
                self.vt_symbols.append(second_quarter_wfq_symbol)
                self.vt_symbols.append(second_quarter_hfq_symbol)
        else:
            self.vt_symbols = vt_symbols

        self.interval = interval
        # 其实只要一个是,其他都是,设置统一的比例,手续费,滑点等
        if isinstance(rates, int) or isinstance(rates, float):
            self.rates = set_ratios(rates, self.vt_symbols)
            self.slippages = set_ratios(slippages, self.vt_symbols)
            self.sizes = set_ratios(sizes, self.vt_symbols)
            self.priceticks = set_ratios(priceticks, self.vt_symbols)
            self.margin_ratios = set_ratios(margin_ratios, self.vt_symbols)
        else:
            self.rates = rates
            self.slippages = slippages
            self.sizes = sizes
            self.priceticks = priceticks
            self.margin_ratios = margin_ratios

        self.start = start
        self.end = end
        self.capital = capital
        self.risk_free = risk_free

        self.dynamic_total_capital = capital  # 魔改加入,为了二次使用engine的时候不用初始化

    def load_data(self) -> None:
        """"""
        self.output("开始加载历史数据")

        if not self.end:
            self.end = datetime.now()

        if self.start >= self.end:
            self.output("起始日期必须小于结束日期")
            return

        # Clear previously loaded history data
        self.history_data.clear()
        self.dts.clear()

        # Load 30 days of data each time and allow for progress update
        progress_delta = timedelta(days=365)
        total_delta = self.end - self.start
        interval_delta = INTERVAL_DELTA_MAP[self.interval]

        times = 0

        for vt_symbol in self.vt_symbols:
            start = self.start
            end = self.start + progress_delta
            progress = 0

            data_count = 0
            while start < self.end:
                end = min(end, self.end)  # Make sure end time stays within set range

                data = load_bar_data_without_lru_cache(
                    vt_symbol,
                    self.interval,
                    start,
                    end
                )

                for bar in data:
                    self.dts.add(bar.datetime)
                    self.history_data[(bar.datetime, vt_symbol)] = bar
                    data_count += 1

                progress += progress_delta / total_delta
                progress = min(progress, 1)
                progress_bar = "#" * int(progress * 10)

                start = end + interval_delta
                end += (progress_delta + interval_delta)

            self.output(f"{vt_symbol}历史数据加载完成,数据量:{data_count}")

        self.output("所有历史数据加载完成")

    def run_backtesting(self) -> None:
        """"""
        self.strategy.on_init()

        # Generate sorted datetime list
        dts = list(self.dts)
        dts.sort()

        # Use the first [days] of history data for initializing strategy
        day_count = 0
        ix = 0

        for ix, dt in enumerate(dts):
            if self.datetime and dt.day != self.datetime.day:
                day_count += 1
                if day_count >= self.days:
                    break
            try:
                self.new_bars(dt)
            except Exception:
                self.output("触发异常,回测终止")
                self.output(traceback.format_exc())
                return

        self.strategy.inited = True
        self.output("策略初始化完成")

        self.strategy.on_start()
        self.strategy.trading = True
        self.output("开始回放历史数据")

        # Use the rest of history data for running backtesting

        end_day = 0
        for dt in dts[ix:]:
            try:
                if dt.day == end_day:
                    print("dt:", dt)
                    print("self.dynamic_total_capital:", self.dynamic_total_capital)
                    self.output("历史数据回放提前结束,资金清零")
                    return

                self.new_bars(dt)

                if self.dynamic_total_capital < self.advanced_stop_dynamic_capital_min:
                    if end_day == 0:
                        dt_str = dt.strftime('%Y%m%d')
                        end_day = int(get_next_trade_date(date=dt_str, add_day=1)[6:8])
            except Exception:
                self.output("触发异常,回测终止")
                self.output(traceback.format_exc())
                return

        self.output("历史数据回放结束")

    def clear_data(self) -> None:
        """
        Clear all data of last backtesting.
        """
        self.strategy = None
        self.bars = {}
        self.datetime = None

        self.limit_order_count = 0
        self.limit_orders.clear()
        self.active_limit_orders.clear()

        self.trade_count = 0
        self.trades.clear()

        self.logs.clear()
        self.daily_results.clear()
        self.daily_df = None

        # 下面加入自己的魔改的变量的清零
        self.trades_by_datetime = {}
        self.dts: Set[datetime] = set()
        self.dynamic_total_capital = 0
        self.dynamic_total_occupy_capital = 0
        self.days = 0
        self.vt_symbols_dynamic_pnl = {}
        self.history_data = {}

    def calculate_statistics(self, df: DataFrame = None, output=True) -> None:
        """"""
        self.output("开始计算策略统计指标")

        # Check DataFrame input exterior
        if df is None:
            df = self.daily_df

        # Check for init DataFrame
        if df is None:
            # Set all statistics to 0 if no trade.
            # 年化复合增长率

            cagr_info = 0  # 这里魔改过
            annual_volatility_info = 0
            annual_downside_risk = 0

            start_date = ""
            end_date = ""
            total_days = 0
            profit_days = 0
            loss_days = 0
            end_balance = 0
            max_drawdown = 0
            max_ddpercent = 0
            max_drawdown_duration = 0
            total_net_pnl = 0
            daily_net_pnl = 0
            total_commission = 0
            daily_commission = 0
            total_slippage = 0
            daily_slippage = 0
            total_turnover = 0
            daily_turnover = 0
            total_trade_count = 0
            daily_trade_count = 0
            total_return = 0
            annual_return = 0
            daily_return = 0
            return_std = 0
            sharpe_ratio = 0
            return_drawdown_ratio = 0

            max_ddpercent_start = 0  # 2022-08-09魔改
            max_drawdown_start = 0
        else:
            # Calculate balance related time series data
            df["balance"] = df["net_pnl"].cumsum() + self.capital
            try:
                df["return"] = np.log(df["balance"] / df["balance"].shift(1)).fillna(0)
            except:
                # 出现了log错误
                print('error1')
                print('............')
            df["highlevel"] = (
                df["balance"].rolling(
                    min_periods=1, window=len(df), center=False).max()
            )
            df["drawdown"] = df["balance"] - df["highlevel"]
            df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100

            # Calculate statistics value
            start_date = df.index[0]
            end_date = df.index[-1]

            total_days = len(df)
            profit_days = len(df[df["net_pnl"] > 0])
            loss_days = len(df[df["net_pnl"] < 0])

            end_balance = df["balance"].iloc[-1]
            max_drawdown = df["drawdown"].min()
            max_ddpercent = df["ddpercent"].min()
            max_ddpercent_end = df["ddpercent"].idxmin()
            max_drawdown_end = df["drawdown"].idxmin()

            if isinstance(max_ddpercent_end, date):
                max_ddpercent_start = df["balance"][:max_ddpercent_end].idxmax()
                max_ddpercent_duration = (max_ddpercent_end - max_ddpercent_start).days
            else:
                max_ddpercent_start = False
                max_ddpercent_duration = 0

            if isinstance(max_drawdown_end, date):
                max_drawdown_start = df["balance"][:max_drawdown_end].idxmax()
                max_drawdown_duration = (max_drawdown_end - max_drawdown_start).days
            else:
                max_drawdown_start = False
                max_drawdown_duration = 0

            total_net_pnl = df["net_pnl"].sum()
            daily_net_pnl = total_net_pnl / total_days

            total_commission = df["commission"].sum()
            daily_commission = total_commission / total_days

            total_slippage = df["slippage"].sum()
            daily_slippage = total_slippage / total_days

            total_turnover = df["turnover"].sum()
            daily_turnover = total_turnover / total_days

            total_trade_count = df["trade_count"].sum()
            daily_trade_count = total_trade_count / total_days

            total_return = (end_balance / self.capital - 1) * 100
            annual_return = total_return / total_days * 240
            daily_return = df["return"].mean() * 100
            return_std = df["return"].std() * 100

            # 年化波动率
            annual_volatility_info = annual_volatility(df['return'])
            # 年化复合增长率
            cagr_info = cagr(df['return'])
            # 年化下行风险率
            annual_downside_risk = downside_risk(df['return'])


            if return_std:
                daily_risk_free = self.risk_free / np.sqrt(240)
                sharpe_ratio = (daily_return - daily_risk_free) / return_std * np.sqrt(240)
            else:
                sharpe_ratio = 0

            return_drawdown_ratio = -total_net_pnl / max_drawdown

        # Output
        if output:
            self.output("-" * 30)
            self.output(f"首个交易日:\t{start_date}")
            self.output(f"最后交易日:\t{end_date}")

            self.output(f"总交易日:\t{total_days}")
            self.output(f"盈利交易日:\t{profit_days}")
            self.output(f"亏损交易日:\t{loss_days}")

            self.output(f"起始资金:\t{self.capital:,.2f}")
            self.output(f"结束资金:\t{end_balance:,.2f}")

            self.output(f"总收益率:\t{total_return:,.2f}%")
            self.output(f"年化收益:\t{annual_return:,.2f}%")
            self.output(f"最大回撤: \t{max_drawdown:,.2f}")
            if max_ddpercent_start:
                self.output(f"百分比最大回撤: \t{max_ddpercent:,.2f}%, 最大回撤日期:\t{max_ddpercent_start}至{max_ddpercent_end}, 最长回撤天数: \t{max_ddpercent_duration}")
            if max_drawdown_start:
                self.output(f"最大回撤日期:\t{max_drawdown_start}至{max_drawdown_end}最长回撤天数: \t{max_drawdown_duration}")

            self.output(f"总盈亏:\t{total_net_pnl:,.2f}")
            self.output(f"总手续费:\t{total_commission:,.2f}")
            self.output(f"总滑点:\t{total_slippage:,.2f}")
            self.output(f"总成交金额:\t{total_turnover:,.2f}")
            self.output(f"总成交笔数:\t{total_trade_count}")

            self.output(f"日均盈亏:\t{daily_net_pnl:,.2f}")
            self.output(f"日均手续费:\t{daily_commission:,.2f}")
            self.output(f"日均滑点:\t{daily_slippage:,.2f}")
            self.output(f"日均成交金额:\t{daily_turnover:,.2f}")
            self.output(f"日均成交笔数:\t{daily_trade_count}")

            self.output(f"日均收益率:\t{daily_return:,.2f}%")
            self.output(f"收益标准差:\t{return_std:,.2f}%")
            self.output(f"年化波动率:\t{annual_volatility_info:,.3f}")
            self.output(f"年化复合增长率:\t{cagr_info:,.3f}")
            self.output(f"年化下行风险率:\t{annual_downside_risk:,.3f}")

            self.output(f"Sharpe Ratio:\t{sharpe_ratio:,.2f}")
            self.output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")

        statistics = {
            "start_date": start_date,
            "end_date": end_date,
            "total_days": total_days,
            "profit_days": profit_days,
            "loss_days": loss_days,
            "capital": self.capital,
            "end_balance": end_balance,
            "max_drawdown": max_drawdown,
            "max_ddpercent": max_ddpercent,
            "max_drawdown_duration": max_drawdown_duration,
            "total_net_pnl": total_net_pnl,
            "daily_net_pnl": daily_net_pnl,
            "total_commission": total_commission,
            "daily_commission": daily_commission,
            "total_slippage": total_slippage,
            "daily_slippage": daily_slippage,
            "total_turnover": total_turnover,
            "daily_turnover": daily_turnover,
            "total_trade_count": total_trade_count,
            "daily_trade_count": daily_trade_count,
            "total_return": total_return,
            "annual_return": annual_return,
            "daily_return": daily_return,
            "return_std": return_std,
            "sharpe_ratio": sharpe_ratio,
            "return_drawdown_ratio": return_drawdown_ratio,
        }

        # Filter potential error infinite value
        for key, value in statistics.items():
            if value in (np.inf, -np.inf):
                value = 0
            statistics[key] = np.nan_to_num(value)

        self.output("策略统计指标计算完成")
        return statistics

    def my_create_shared_memory_count(self):
        self.zw = None
        optimization_count_np_array = np.array([0], dtype=np.int)
        # 运行次数
        optimization_count_shm = SharedMemory(name=None, create=True, size=optimization_count_np_array.nbytes)
        optimization_count_shm_array = np.ndarray(optimization_count_np_array.shape,
                                                  dtype=optimization_count_np_array.dtype,
                                                  buffer=optimization_count_shm.buf)
        np.copyto(optimization_count_shm_array, optimization_count_np_array)

        return {"shm": optimization_count_shm, 'shape': optimization_count_np_array.shape,
                "dtype": optimization_count_np_array.dtype}

    def my_create_shared_memory(self):
        # 读取数据
        self.output("开始读取共享内存所需数据")
        self.load_data()
        self.check_data(self.history_data, self.dts)  # 校验数据的准确性

        self.output(f"主进程{self.current_pid}:history_data字典数量:{len(self.history_data.keys())},dts数量:{len(self.dts)}")
        self.output("*" * 30)

        # ----------------------这一步非常重要----------------------
        self.dts: List = list(self.dts)
        self.dts.sort()

        # def create_shm_dict():
        dts = self.dts
        symbol_list, exchange_list, datetime_list, interval_list = [], [], [], []
        volume_list, turnover_list, open_interest_list, open_price_list = [], [], [], []
        high_price_list, low_price_list, close_price_list, gateway_name_list = [], [], [], []

        for dt in dts:
            for vt_symbol in self.vt_symbols:
                bar: BarData = self.history_data.get((dt, vt_symbol), None)
                symbol_list.append(bar.symbol)
                exchange_list.append(bar.exchange.value)
                datetime_list.append(bar.datetime.timestamp())
                interval_list.append(bar.interval.value)
                volume_list.append(bar.volume)
                turnover_list.append(bar.turnover)
                open_interest_list.append(bar.open_interest)
                open_price_list.append(bar.open_price)
                high_price_list.append(bar.high_price)
                low_price_list.append(bar.low_price)
                close_price_list.append(bar.close_price)
                gateway_name_list.append(bar.gateway_name)

        # -------------统统转-------------
        # -------------统统转-------------
        symbol_np_array = np.array(symbol_list, dtype=np.str)
        exchange_np_array = np.array(exchange_list, dtype=np.str)
        datetime_np_array = np.array(datetime_list, dtype=np.float64)
        interval_np_array = np.array(interval_list, dtype=np.str)
        volume_np_array = np.array(volume_list, dtype=np.float64)
        turnover_np_array = np.array(turnover_list, dtype=np.float64)
        open_interest_np_array = np.array(open_interest_list, dtype=np.float64)
        open_price_np_array = np.array(open_price_list, dtype=np.float64)
        high_price_np_array = np.array(high_price_list, dtype=np.float64)
        low_price_np_array = np.array(low_price_list, dtype=np.float64)
        close_price_np_array = np.array(close_price_list, dtype=np.float64)
        gateway_name_np_array = np.array(gateway_name_list, dtype=np.str)

        if True:
            symbol_shm = SharedMemory(name=None, create=True, size=symbol_np_array.nbytes)
            symbol_shm_array = np.ndarray(symbol_np_array.shape, dtype=symbol_np_array.dtype, buffer=symbol_shm.buf)
            np.copyto(symbol_shm_array, symbol_np_array)

            exchange_shm = SharedMemory(name=None, create=True, size=exchange_np_array.nbytes)
            exchange_shm_array = np.ndarray(exchange_np_array.shape, dtype=exchange_np_array.dtype,
                                            buffer=exchange_shm.buf)
            np.copyto(exchange_shm_array, exchange_np_array)

            datetime_shm = SharedMemory(name=None, create=True, size=datetime_np_array.nbytes)
            datetime_shm_array = np.ndarray(datetime_np_array.shape, dtype=datetime_np_array.dtype,
                                            buffer=datetime_shm.buf)
            np.copyto(datetime_shm_array, datetime_np_array)

            interval_shm = SharedMemory(name=None, create=True, size=interval_np_array.nbytes)
            interval_shm_array = np.ndarray(interval_np_array.shape, dtype=interval_np_array.dtype,
                                            buffer=interval_shm.buf)
            np.copyto(interval_shm_array, interval_np_array)

            volume_shm = SharedMemory(name=None, create=True, size=volume_np_array.nbytes)
            volume_shm_array = np.ndarray(volume_np_array.shape, dtype=volume_np_array.dtype,
                                          buffer=volume_shm.buf)
            np.copyto(volume_shm_array, volume_np_array)

            turnover_shm = SharedMemory(name=None, create=True, size=turnover_np_array.nbytes)
            turnover_shm_array = np.ndarray(turnover_np_array.shape, dtype=turnover_np_array.dtype,
                                            buffer=turnover_shm.buf)
            np.copyto(turnover_shm_array, turnover_np_array)

            open_interest_shm = SharedMemory(name=None, create=True, size=open_interest_np_array.nbytes)
            open_interest_shm_array = np.ndarray(open_interest_np_array.shape, dtype=open_interest_np_array.dtype,
                                                 buffer=open_interest_shm.buf)
            np.copyto(open_interest_shm_array, open_interest_np_array)

            open_price_shm = SharedMemory(name=None, create=True, size=open_price_np_array.nbytes)
            open_price_shm_array = np.ndarray(open_price_np_array.shape, dtype=open_price_np_array.dtype,
                                              buffer=open_price_shm.buf)
            np.copyto(open_price_shm_array, open_price_np_array)

            high_price_shm = SharedMemory(name=None, create=True, size=high_price_np_array.nbytes)
            high_price_shm_array = np.ndarray(high_price_np_array.shape, dtype=high_price_np_array.dtype,
                                              buffer=high_price_shm.buf)
            np.copyto(high_price_shm_array, high_price_np_array)

            low_price_shm = SharedMemory(name=None, create=True, size=low_price_np_array.nbytes)
            low_price_shm_array = np.ndarray(low_price_np_array.shape, dtype=low_price_np_array.dtype,
                                             buffer=low_price_shm.buf)
            np.copyto(low_price_shm_array, low_price_np_array)

            close_price_shm = SharedMemory(name=None, create=True, size=close_price_np_array.nbytes)
            close_price_shm_array = np.ndarray(close_price_np_array.shape, dtype=close_price_np_array.dtype,
                                               buffer=close_price_shm.buf)
            np.copyto(close_price_shm_array, close_price_np_array)

            gateway_name_shm = SharedMemory(name=None, create=True, size=gateway_name_np_array.nbytes)
            gateway_name_shm_array = np.ndarray(gateway_name_np_array.shape, dtype=gateway_name_np_array.dtype,
                                                buffer=gateway_name_shm.buf)
            np.copyto(gateway_name_shm_array, gateway_name_np_array)

            shm_dict = {
                "symbol": {"shm": symbol_shm, 'shape': symbol_np_array.shape, "dtype": symbol_np_array.dtype},
                "exchange": {"shm": exchange_shm, 'shape': exchange_np_array.shape, "dtype": exchange_np_array.dtype},
                "datetime": {"shm": datetime_shm, 'shape': datetime_np_array.shape, "dtype": datetime_np_array.dtype},
                "interval": {"shm": interval_shm, 'shape': interval_np_array.shape, "dtype": interval_np_array.dtype},
                "volume": {"shm": volume_shm, 'shape': volume_np_array.shape, "dtype": volume_np_array.dtype},
                "turnover": {"shm": turnover_shm, 'shape': turnover_np_array.shape, "dtype": turnover_np_array.dtype},
                "open_interest": {"shm": open_interest_shm, 'shape': open_interest_np_array.shape,
                                  "dtype": open_interest_np_array.dtype},
                "open_price": {"shm": open_price_shm, 'shape': open_price_np_array.shape,
                               "dtype": open_price_np_array.dtype},
                "high_price": {"shm": high_price_shm, 'shape': high_price_np_array.shape,
                               "dtype": high_price_np_array.dtype},
                "low_price": {"shm": low_price_shm, 'shape': low_price_np_array.shape,
                              "dtype": low_price_np_array.dtype},
                "close_price": {"shm": close_price_shm, 'shape': close_price_np_array.shape,
                                "dtype": close_price_np_array.dtype},
                "gateway_name": {"shm": gateway_name_shm, 'shape': gateway_name_np_array.shape,
                                 "dtype": gateway_name_np_array.dtype},
            }

        self.output("开始创建共享内存:")

        self.output(
            f"主进程{self.current_pid}:创建共享内存成功")
        self.output("*" * 30)
        del self.history_data
        del self.dts

        return shm_dict

    def my_run_backtesting(self, shm_dict: dict, shm_count_dict: dict, optimization_count: int,
                           tag_str: str = 'tag_str'):
        """"""
        if shm_dict is None:
            # 如果不进行共享内存,则采用原来的run_backtesting
            self.load_data()
            self.run_backtesting()
            # 运行总数
            tmp_shm = shm_count_dict.get('shm')
            tmp_shape = shm_count_dict.get('shape')
            tmp_dtype = shm_count_dict.get('dtype')
            optimization_count_shm = SharedMemory(name=tmp_shm.name)
            optimization_count_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype,
                                                      buffer=optimization_count_shm.buf)
            optimization_count_shm_array[0] += 1
            self.output(
                f"【{tag_str}】子进程{self.current_pid}:采用默认进程内存方式回测。总:{optimization_count},当前:{optimization_count_shm_array[0]}")
            self.output(f'采用参数组合:{self.strategy.get_parameters()}')
            self.output("*" * 30)
            return

        # --------------------------采用共享内存的方式--------------------------
        self.strategy.on_init()
        symbol_shm, exchange_shm, datetime_shm, interval_shm = None, None, None, None
        volume_shm, turnover_shm, open_interest_shm, open_price_shm = None, None, None, None
        high_price_shm, low_price_shm, close_price_shm, gateway_name_shm = None, None, None, None

        symbol_shm_array, exchange_shm_array, datetime_shm_array, interval_shm_array = None, None, None, None
        volume_shm_array, turnover_shm_array, open_interest_shm_array, open_price_shm_array = None, None, None, None
        high_price_shm_array, low_price_shm_array, close_price_shm_array, gateway_name_shm_array = None, None, None, None
        for k, d in shm_dict.items():
            tmp_shm = d.get('shm')
            tmp_shape = d.get('shape')
            tmp_dtype = d.get('dtype')
            if True:
                if k == 'symbol':
                    symbol_shm = SharedMemory(name=tmp_shm.name)
                    symbol_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=symbol_shm.buf)
                elif k == 'exchange':
                    exchange_shm = SharedMemory(name=tmp_shm.name)
                    exchange_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=exchange_shm.buf)
                elif k == 'datetime':
                    datetime_shm = SharedMemory(name=tmp_shm.name)
                    datetime_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=datetime_shm.buf)
                elif k == 'interval':
                    interval_shm = SharedMemory(name=tmp_shm.name)
                    interval_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=interval_shm.buf)
                elif k == 'volume':
                    volume_shm = SharedMemory(name=tmp_shm.name)
                    volume_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=volume_shm.buf)
                elif k == 'turnover':
                    turnover_shm = SharedMemory(name=tmp_shm.name)
                    turnover_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=turnover_shm.buf)
                elif k == 'open_interest':
                    open_interest_shm = SharedMemory(name=tmp_shm.name)
                    open_interest_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=open_interest_shm.buf)
                elif k == 'open_price':
                    open_price_shm = SharedMemory(name=tmp_shm.name)
                    open_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=open_price_shm.buf)
                elif k == 'high_price':
                    high_price_shm = SharedMemory(name=tmp_shm.name)
                    high_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=high_price_shm.buf)
                elif k == 'low_price':
                    low_price_shm = SharedMemory(name=tmp_shm.name)
                    low_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=low_price_shm.buf)
                elif k == 'close_price':
                    close_price_shm = SharedMemory(name=tmp_shm.name)
                    close_price_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=close_price_shm.buf)
                elif k == 'gateway_name':
                    gateway_name_shm = SharedMemory(name=tmp_shm.name)
                    gateway_name_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=gateway_name_shm.buf)

        # 运行总数
        tmp_shm = shm_count_dict.get('shm')
        tmp_shape = shm_count_dict.get('shape')
        tmp_dtype = shm_count_dict.get('dtype')
        optimization_count_shm = SharedMemory(name=tmp_shm.name)
        optimization_count_shm_array = np.ndarray(shape=tmp_shape, dtype=tmp_dtype, buffer=optimization_count_shm.buf)
        optimization_count_shm_array[0] += 1

        self.output("*" * 30)
        self.output(
            f"【{tag_str}】子进程{self.current_pid}:取出共享内存进行回测。总:{optimization_count},当前:{optimization_count_shm_array[0]}")

        # -------------遍历数据-------------
        bars = []
        bars_count = 0
        vt_c = len(self.vt_symbols)
        day_count = 0
        exchange_members = Exchange.__members__
        interval_members = Interval.__members__
        for symbol, exchange_value, datetime_float, interval_value, volume, turnover, open_interest, open_price, high_price, low_price, close_price, gateway_name in zip(
                symbol_shm_array, exchange_shm_array, datetime_shm_array, interval_shm_array,
                volume_shm_array, turnover_shm_array, open_interest_shm_array, open_price_shm_array,
                high_price_shm_array, low_price_shm_array, close_price_shm_array, gateway_name_shm_array):
            dt = datetime.fromtimestamp(datetime_float - 28800)
            bar: BarData = BarData(
                symbol=symbol,
                exchange=exchange_members.get(Exchange(exchange_value).name),
                datetime=dt,
                interval=interval_members.get(Interval(interval_value).name),
                volume=volume,
                turnover=turnover,
                open_interest=open_interest,
                open_price=open_price,
                high_price=high_price,
                low_price=low_price,
                close_price=close_price,
                gateway_name=gateway_name,
            )

            bars.append(bar)

            bars_count += 1
            if bars_count < vt_c:
                continue
            bars_count = 0

            # 以下判断是否初始化结束
            if not self.strategy.inited:
                if self.datetime and dt.day != self.datetime.day:
                    day_count += 1
                    if day_count >= self.days:
                        self.strategy.inited = True
                        self.output("策略初始化完成")
                        self.strategy.on_start()
                        self.strategy.trading = True
                        self.output("开始回放历史数据")

            try:
                self.my_new_bars(dt, bars)
            except Exception:
                self.output("触发异常,回测终止")
                self.output(traceback.format_exc())
                return
            bars = []

        self.output("历史数据回放结束")

    def my_new_bars(self, dt: datetime, bars_list: List[BarData]) -> None:
        self.datetime = dt

        bars: Dict[str, BarData] = {}
        bars_ix = 0
        for vt_symbol in self.vt_symbols:
            bar: BarData = bars_list[bars_ix]
            if bar:
                self.bars[vt_symbol] = bar
                bars[vt_symbol] = bar
            else:
                old_bar = self.bars[vt_symbol]
                bar = BarData(
                    symbol=old_bar.symbol,
                    exchange=old_bar.exchange,
                    datetime=dt,
                    open_price=old_bar.close_price,
                    high_price=old_bar.close_price,
                    low_price=old_bar.close_price,
                    close_price=old_bar.close_price,
                    gateway_name=old_bar.gateway_name
                )
                self.bars[vt_symbol] = bar
            bars_ix += 1

        self.cross_limit_order()
        self.update_daily_close(self.bars, dt)
        self.update_dynamic_capital(dt, self.bars)
        self.strategy.on_bars(bars)

    def my_run_ga_optimization(self,
                               optimization_setting: OptimizationSetting,
                               max_workers: int,
                               output=True,
                               population_size: int = 200,
                               ngen_size: int = 60,
                               is_shared_memory: bool = False,
                               tag_str: str = '',
                               ):
        """"""
        if not check_optimization_setting(optimization_setting):
            return

        shm_dict = None
        if is_shared_memory:
            shm_dict = self.my_create_shared_memory()

        shm_count_dict = self.my_create_shared_memory_count()

        evaluate_func: callable = my_wrap_evaluate(self,
                                                   optimization_setting.target_name,
                                                   shm_dict,
                                                   shm_count_dict,
                                                   len(optimization_setting.generate_settings()),
                                                   tag_str=tag_str)
        results = my_run_ga_optimization(
            evaluate_func,
            optimization_setting,
            get_target_value,
            max_workers,
            population_size=population_size,
            ngen_size=ngen_size,
            output=self.output
        )

        if output:
            for result in results:
                msg: str = f"参数:{result[0]}, 目标:{result[1]}"
                self.output(msg)

        # close共享内存
        for key, dict_shm in shm_dict.items():
            dict_shm.get('shm').close()
            dict_shm.get('shm').unlink()

        return results


@dataclass
class VtSymbolDynamicPnl(BaseData):
    """
    Trade data contains information of a fill of an order. One order
    can have several trade fills.
    """

    vt_symbol: str
    size: float  # 比率
    rate: float  # 手续费率
    margin_ratio: float  # 保证金比例
    slippage: float  # 滑点

    # 下面这些都是按照逐笔对冲的逻辑的变量
    dynamic_direction: Direction = Direction.LONG
    dynamic_open_pos_commission: float = 0  # 只涉及加仓的时候的手续费累积,平仓的时候不要加进去
    dynamic_volume: float = 0  # 该品种的动态持仓
    dynamic_occupy_capital: float = 0  # 当前占用资金
    dynamic_pnl: float = 0  # 该品种的动态盈亏累计

    dynamic_cost_price: float = 0  # 该品种每次加仓后,得出的平均成本价

    total_pnl: float = 0  # 该品种的所有盈亏累计
    total_trade_times: int = 0

    datetime: datetime = None

    def __post_init__(self):
        """"""
        self.symbol, self.exchange = extract_vt_symbol(self.vt_symbol)


def load_bar_data_without_lru_cache(
    vt_symbol: str,
    interval: Interval,
    start: datetime,
    end: datetime,
):
    """"""
    symbol, exchange = extract_vt_symbol(vt_symbol)
    return my_database_manager.load_bar_data(
        symbol, exchange, interval, start, end, collection_name=vt_symbol
    )


def my_wrap_evaluate(engine: MyBacktestingEngine, target_name: str,
                     shm_dict: dict, shm_count_dict: dict, optimization_count: int, tag_str: str = '',
                     ) -> callable:
    """
    Wrap evaluate function with given setting from backtesting engine.
    """
    func: callable = partial(
        my_evaluate,
        shm_dict,
        shm_count_dict,
        optimization_count,
        tag_str,
        engine.fixed_param_dict,
        target_name,
        engine.strategy_class,
        engine.vt_symbols,
        engine.interval,
        engine.start,
        engine.rates,
        engine.slippages,
        engine.sizes,
        engine.priceticks,
        engine.margin_ratios,
        engine.capital,
        engine.end
    )
    return func


def my_evaluate(
        shm_dict: dict,
        shm_count_dict: dict,
        optimization_count: int,
        tag_str: str,
        fixed_param_dict: dict,  # 固定参数
        target_name: str,
        strategy_class: StrategyTemplate,
        vt_symbols: List[str],
        interval: Interval,
        start: datetime,
        rates: Dict[str, float],
        slippages: Dict[str, float],
        sizes: Dict[str, float],
        priceticks: Dict[str, float],
        margin_ratios: Dict[str, float] or int or float,
        capital: int,
        end: datetime,
        setting: dict
):
    """
    Function for running in multiprocessing. Pool
    """
    engine = MyBacktestingEngine()

    engine.my_set_parameters(
        vt_symbols=vt_symbols,
        interval=interval,
        start=start,
        rates=rates,
        slippages=slippages,
        sizes=sizes,
        priceticks=priceticks,
        margin_ratios=margin_ratios,
        capital=capital,
        end=end,
    )
    setting.update(fixed_param_dict)
    # noinspection PyTypeChecker
    engine.add_strategy(strategy_class, setting)
    engine.my_run_backtesting(shm_dict, shm_count_dict, optimization_count, tag_str=tag_str)
    engine.calculate_result()
    statistics = engine.calculate_statistics(output=True)

    # statistics = engine.my_stats.statistics
    target_value = statistics[target_name]

    engine.clear_data()

    return str(setting), target_value, statistics


编写python脚本实现数据入库

 

如何编写一个python脚本将本地.csv 文件导入数据库是vn.py论坛中新用户提问较多的问题之一。本文的主要目的是帮助新用户解决数据入库问题,以便用户可以快速进入量化策略的开发。
 

本文主要分为三大部分,第一部分介绍了在vn.py中使用MongoDB数据库所需要进行的配置 (只打算使用vn.py默认SQLite数据库的用户,可以简单了解一下 )。第二部分介绍了数据入库的基本流程 (适用于vn.py支持的所有数据库)。最后一部分则是具体的实现:分别将数据导入MongoDB和SQLite数据库(适用于vn.py支持的所有数据库)。
 

另外,在正文开始之前,还需要提醒大家:在将数据导入数据库之前,务必要确保这些数据已经是被清洗过干净 的数据。如果将没有被清洗过,质量差的数据直接入库进行回测,可能会导致各种问题,影响策略开发的效率。因此,建议大家使用 高质量 的数据源。
 

配置数据库

 

vn.py 中默认使用 SQLite 数据库。 因此,如果需要使用 MongoDB 数据库,则需要修改 vn.py 的全局配置。具体流程如下:
 

  1. C:\Users\你的用户名\.vntrader 目录下找到 vt_setting.json 文件

  2. vt_setting.json 文件中的database.driver,database.database,database.host,database.port 进行修改。
     

下图是我自己设置的数据库配置信息:
 

description

 

上图中的两个 "mongodb" 可能让人会有些困扰。实际上第一个"mongodb"是告诉 vn.py 我们使用的数据库类型是 MongoDB 数据库而不是默认的 SQLite 数据库。 第二个 "mongodb" 是告诉 vn.py 回测所需要的数据储存在 MongoDB 数据库中一个叫做 "mongodb" 的 database 中。这样说可能有些绕口,请看下图:
 

description

 

上图是 MongoDB 数据库的官方图形界面。我们可以清楚的看到在该 MongoDB 数据库中一共有四个 database, 分别是 admin,config,local, mongodb。 另外,mongodb database 中分别储存了不同类型的期货数据。比如,AP888,IF888, RB888等。在该 database 中,不同类型的期货数据,分别 储存在不同的 collection(表)中。vn.py默认是将所有的期货数据储存在同一个collection中并进行读取。 如果,你想让 vn.py 将不同类型的期货数据分别储存在不同的collection中。请阅读这两篇文章vn.py社区精选18 - 老用户福音,MongDB分表重构vn.py社区精选19 - 福音收尾,MongoDB分表读取数据
 

数据入库基本流程

 

在配置 MongoDB 数据库和设置好 MongoDB 分表读取之后(不配置数据库或设置分表读取,不影响后面的内容),我们可以正式开始讨论数据入库的基本流程。vn.py 提供了很多工具使数据入库这个过程变得简单,快捷。下面是数据入库的基本流程:
 

  1. 先确定要入库的数据是 Tick 还是 Bar (K线) 类型数据

  2. 将需要入库的数据转化成 vn.py 定义的 TickDataBarData 数据类型

  3. 使用 vn.py 提供的数据入库工具函数 database_manager.save_tick_datadatabase_manager.save_bar_data 将相应的 TickDataBarData 入库
     

从上面的数据入库流程中可以看出,在 vn.py 中数据入库的流程还是比较简单的。难点集中在第二步(将本地数据转换成vn.py 定义的 TickDataBarData)。一般来说,就是对数据的时间戳处理上。另外,如果数据本身的质量不高,比如数据的整个时间戳格式前后不一致,那需要先对数据的时间戳格式进行统一。时间戳相关知识请查看 Python时间日期格式化之time与datetime模块 这篇文章。
 

数据入库具体实现

 

在了解了数据入库的基本流程之后,我们来实现一次数据入库的过程。首先,来看一看我们要入库的数据:
 

description

 

从上图可以看出,C 列储存的是表示时间的数据且 C2 和 C3 的时间间隔是1分钟。所以,要入库的数据是1分钟的 Bar (K线)数据类型。下面我们进行第二步:将需要入库的数据转化成 vn.py 定义的 BarData
 

首先,我们先来认识一下 vn.py 中的BarData
 

description

 

从上图中可以看出,BarData 一共有11个属性。其中,BarData.vt_symbol 会在 BarData 实例化的时候自动生成。另外,需要指出的是BarData.exchangeBarData.inteval 的数据类型分别是 vn.py 中定义好的枚举常量 ExchangeIntevalBarData.datetime 则是 python 标准库 datetime 中的 datetime 数据类型。
 

Exchange 数据结构的代码:
 

description

 

Interval 数据结构的代码:
 

description

 

在认识了vn.py 中的 BarData 之后,我们开始着手将需要入库的数据转化成 BarData类型数据。再来重温一下,需要入库数据的格式:
 

description

 

通过和上文 BarData 的数据结构对比,我们有以下几个发现:
 

  1. csv文件中的 合约代码 时间 开 高 低 收 成交量 持仓量BarData中的 symbol datetime open_price high_price low_price close_price volume open_interest 一一对应(从名称就就可以看出)。

  2. csv文件中 市场代码 没办法和 BarData 中的 exchange 对应。因为csv文件中 市场代码 都是 SC ,而在上图Exchange 数据结构代码截图中找不到和 SC 对应的枚举常量的绑定值。从合约代码 ag1608(沪银1608) 可以推断出这里的 SC 指的就是上海期货交易所,对应的枚举常量是 Exchang.SHFE

  3. csv文件中缺少了和 BarData 中的 interval 相对应的数据。上文我们已经发现了 csv文件中储存的是1分钟的BarData,对应的枚举常量是 Interval.MINUTE
     

基于上面的发现,很自然的,我们需要进行如下的操作:
 

  1. 将csv文件中 市场代码SC 替换成 Exchang.SHFE

  2. 增加一列数据,且该列数据的所有值都是 Interval.MINUTE

 

一般情况下,使用 python 的 pandas 库可以方便的完成上面的操作。如果数据的质量较差,比如数据的分隔符设置存在问题,会使得pd.read_csv函数没办法正确的读取.csv文件。这时则需要使用python的 csv 库。本文的数据入库过程统一使用 pandas 来完成。 具体操作,如下:

 

from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
# 读取需要入库的csv文件,该文件是用gbk编码
imported_data = pd.read_csv('需要入库的数据的绝对路径',encoding='gbk')

# 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
imported_data['市场代码'] = Exchange.SHFE

# 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
imported_data['interval'] = Interval.MINUTE

 

接下来,我们还需要对每列数据的数据类型进行修改,确保和 BarData 中各个属性的数据类型一致。BarData中属性的数据类型可以分为三大类:float 类, datetime 类 和 自定义枚举类 (IntervalExchange)。因为,上面已经修改过了IntervalExchange,下面只需要修改 floatdatetime 类。
 

修改 float 类代码:
 

# 明确需要是float数据类型的列
float_columns = ['开', '高', '低', '收', '成交量', '持仓量']

for col in float_columns:
  imported_data[col] = imported_data[col].astype('float')

 

修改 datatime 类代码:
 

# 明确时间戳的格式
# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y%m%d %H:%M:%S'

imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)

 

下一步,我们还需要对列名进行修改:
 

# 因为没有用到 成交额 这一列的数据,所以该列列名不变
imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']

 

另外,因为该csv文件储存的是ag的主力连续数据,即多张ag合约的拼接。因此,symbol列中有多个不同到期日的ag合约代码,这里需要将合约代码统一为ag88
 

imported_data['symbol'] ='ag88'

最后,我们使用 vn.py 封装好的 database_manager.save_bar_data 将数据入库:
 

# 导入 database_manager 模块
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData,TickData)
# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
    bars = []
    start = None
    count = 0

    for row in imported_data.itertuples():

        bar = BarData(

              symbol=row.symbol,
              exchange=row.exchange,
              datetime=row.datetime,
              interval=row.interval,
              volume=row.volume,
              open_price=row.open,
              high_price=row.high,
              low_price=row.low,
              close_price=row.close,
              open_interest=row.open_interest,
              gateway_name="DB",

        )


        bars.append(bar)

        # do some statistics
        count += 1
        if not start:
            start = bar.datetime
    end = bar.datetime

    # insert into database
    database_manager.save_bar_data(bars, collection_name)
    print(f"Insert Bar: {count} from {start} - {end}")

 

如果,默认的数据库是其它vn.py支持的数据库,上面的代码需要做略微修改后便可以使用(详情请看结尾Debug部分)。
 

如果,没有设置分表储存不同类型的数据。则需要先将move_df_to_mongodb函数中的collection_name参数删除,同时将上面代码的倒数第二行修改为:
 

database_manager.save_bar_data(bars)

 

如果,想要将数据储存储存在 SQLite 数据库中也很简单(默认数据库不是SQLite)。只需要两步就可以完成。
 

  1. 创建一个sqlite数据库连接对象:

 

from vnpy.trader.database.initialize import init_sql
from vnpy.trader.database.database import Driver

settings={

    "database": "database.db",
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "",
    "authentication_source": "admin"
}
sqlite_manager = init_sql(driver=Driver.SQLITE, settings=settings)

 

2.使用sqlite数据库连接对象将数据入库

 

# 替换函数 move_df_to_mongodb 的倒数第二行
sqlite_manager.save_bar_data(bars)

 

总结

 

本文尝试从数据库配置,数据入库基本流程,数据入库具体实现,三部分来帮助vn.py新用户解决编写python脚本实现数据入库这个难点。借助vn.py的database_manager模块,用户基本上可以无缝切换SQLite,MongoDB等vn.py支持的数据库来读取和存入数据。希望这篇文章能帮助大家快速进入量化策略的研究和开发。
 

Debug

 

使用上述代码进行Sqlite数据入库的时候,会出现peewee.InterfaceError: Error binding parameter 2 - probably unsupported type错误,解决方法:

  1. 找到imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)代码所在行
  2. 在该行代码下键入imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')

详细的Debug过程记录在sqlite数据入库Debug. 将该文件夹内的内容下载到本地同一个位置,运行Jupyter Notebook就可以复现整个过程.
 

完整代码

 

from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData,TickData)

# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
    bars = []
    start = None
    count = 0

    for row in imported_data.itertuples():

        bar = BarData(

              symbol=row.symbol,
              exchange=row.exchange,
              datetime=row.datetime,
              interval=row.interval,
              volume=row.volume,
              open_price=row.open,
              high_price=row.high,
              low_price=row.low,
              close_price=row.close,
              open_interest=row.open_interest,
              gateway_name="DB",

        )


        bars.append(bar)

        # do some statistics
        count += 1
        if not start:
            start = bar.datetime
    end = bar.datetime

    # insert into database
    database_manager.save_bar_data(bars, collection_name)
    print(f'Insert Bar: {count} from {start} - {end}')


if __name__ == "__main__":

    # 读取需要入库的csv文件,该文件是用gbk编码
    imported_data = pd.read_csv('D:/1分钟数据压缩包/FutAC_Min1_Std_2016/ag主力连续.csv',encoding='gbk')
    # 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
    imported_data['市场代码'] = Exchange.SHFE
    # 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
    imported_data['interval'] = Interval.MINUTE
    # 明确需要是float数据类型的列
    float_columns = ['开', '高', '低', '收', '成交量', '持仓量']
    for col in float_columns:
      imported_data[col] = imported_data[col].astype('float')
    # 明确时间戳的格式
    # %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
    datetime_format = '%Y%m%d %H:%M:%S'
    imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
    # 因为没有用到 成交额 这一列的数据,所以该列列名不变
    imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']
    imported_data['symbol'] ='ag88'
    move_df_to_mongodb(imported_data,'ag88')


simnow备胎openctp的使用说明

2023-03-08: 免费仿真环境已迁至121.37.90.193,并且取消了行情前置,订阅行情请直连CTP柜台(免费的,不需要账号)。具体见:openctp仿真环境调整

openctp已经开放运营一年多了,帮助了很多CTP用户调试程序、验证策略、学习交易等,有simnow这样的官方平台,谁弄个第三方的干嘛?原因大家也都知道,正如现在所碰到的情况,simnow又停服一个月,已经是今年第二次超长时间停服了,谁知道还会不会有第三、第四次。。

也是机缘巧合,手上积累了相关的技术,这个积累可不是拿来主义啊,咱可是一个字母一个字母敲出来的,妥妥的原创技术,只是说是设计思想跟CTP接近,也仅此而已了。通过接口封装成CTPAPI形式提供了与CTP接口兼容的接入方式,CTP程序只要更改一下CTP的交易dll(thosttraderapi_se.dll)和行情dll(thostmduserapi_se.dll)即可对接到openctp的交易前置和行情前置,当然,这两个dll或so也可以只替换其中一个,比如你想连到openctp的交易前置那就只需要替换交易dll即可,openctp的两套仿真环境也只需要你替换一下交易dll就行了,行情可以直接连接CTP实盘行情前置,因为openctp的行情也是转自CTP实盘前置,又何必接这个二道贩子的数据呢。

openctp的VIP环境直接就没有提供行情前置,你必须去直连实盘行情,但是这可能操作上有点麻烦,因为vn.py默认的TTS通道使用的是openctp的行情dll,你只改个CTP实盘前置地址是不够的,还要把行情dll替换成ctp官方版本,具体位置看你安装路径了,大概是这样的位置:C:\veighna_studio\Lib\site-packages\vnpy_tts\api
description
需要注意的是vn.py使用的是6.5.1的win64版本dll,请不需要弄错版本号。

CTP实盘行情的地址有很多,其实CTP行情前置是不校验用户名、密码的,所以你可以连接任意一家期货公司的行情前置,我随便挑了几个实盘的地址:
tcp://180.169.112.54:42213
tcp://140.207.168.9:42213
tcp://180.168.212.75:41313
tcp://27.115.78.155:41313
tcp://180.168.102.233:41168
tcp://112.64.143.220:41168

另外也还是陆续有vn.py的朋友问4097的错误问题,这里再提一下这个问题的解决方法,因为TTS的dll与CTP的dll同名,所以不能同时勾选这两个通道,只能勾选其中一个:
description

TTS通道的更多信息请到openctp官方页面了解:https://github.com/krenx1983/openctp

或者关注openctp的公众号,TTS的模拟账号也会在你关注的时候自动为你创建,一个微信号可以获得免费的3个7x24和3个仿真模拟账号,需要更全品种、全好体验的可以购买相应的VIP环境模拟账号。
description

最后非常感谢vn.py晓优大佬的支持,希望能够跟vn.py一起为投资者提供更好的服务。



用户策略怎么可以没有资金参数 ?—— CTA策略账户

1 几乎所有例子的仓位都是self.fixed_size=1

进阶课程里看过陈晓优老师讲的很多策略例子,都是讲如何买和如何卖,如何止盈和止损的,又是仿真、优化、实盘的,看到人是心潮澎湃!
于是看是着手仿照例子编写自己的策略,策略经过计算买卖信号后,总是要下单的,那我下多大仓位呢? 回过头这时候参考例子才发现,几乎所有的仓位都是self.fixed_size=1,就没有讲如何动态决定仓位的例子!
于是VNPY的QQ群里问前辈、先知,无人回答,再在论坛里问老师,终于回答了:”不建议动态仓位,这么重要的事情必须交给手工完成!“,这个答复让我有点懵——做量化交易你让为手动决定仓位???

2 是不建议动态仓位,还是无法动态仓位?

动态仓位需要什么条件:

  • 开仓价格(Price)
  • 可用资金(Money)
  • 合约乘数(Size)
  • 保证金(率)(Margin)
  • 手续费(率)(FeeRate)
  • 开仓的量 (N)
  • 开仓资金(KM)

假设是按手续费率(而不是按手收费),那么开仓资金为:

KM = Price*N*Size*Margin (1+ FeeRate)

那么必须符合添加KM < Money,进而推出

 N < Money/(Price*Size*Margin (1+ FeeRate))。

当然你不可能满仓干,也许还要一个最大开仓资金比例R,例如:

N = int(Money*R/(Price*Size*Margin (1+ FeeRate)))。

当R=40%表示用你账户里40%的资金,可以动态开仓的手数。这样不就可以动态开仓了吗?
当然实际开仓时的仓位计算可能比这复杂多了,比如你可以考虑交易合约的波动水平,需要考虑投资者愿意承担的风险水平等等,但不管怎么变化,策略动态开仓都必须要有如下这几个参数:

  • 开仓价格(Price)
  • 合约乘数(Size)
  • 可用资金(Money)
  • 保证金(率)(Margin)
  • 手续费(率)(FeeRate)

3 用户策略怎么可以与”钱“无关?

经过艰苦和漫长的代码研读和梳理,发现CTA策略交易中只有pos、trading和inited这些策略成员,没有与资金相关的东西。我们来看看这几个动态下单必须具备的参数是否提供了:

  • 开仓价格(Price):策略从合约的行情数据中提取
  • 合约乘数(Size):main_engine中可以使用vt_symbol作为参数提取,函数是main_engine.get_contract()
  • 可用资金(Money):目前有,可以通过 main_engine.get_account()函数提供 但是是当你运行多个策略的时候,这些策略是共用同一个账户的,无法直接使用。
  • 保证金(率)(Margin):目前有,但是这是各个交易所公布的统一的保证金(率),不是你开户的期货各种给你的保证金(率),如rb2010.SHFE的保证金是率8%,可是也许你的开户的期货公司给你的保证金率为却为12%或者15%,怎么用?
  • 手续费(率)(FeeRate):目前没有,而且这个费率回是因人而异的,这样是看你怎么和你的期货经纪人怎么谈判的,所以也没有办法用!

结论:

目前VNPY的CTA策略因为缺少上述几个关键参数,无法实现动态仓位交易。是不能也,而非不可以!

4 为CTA账户引入策略账户!

作为交易赚钱的CTA策略,怎么可以不与这些资金相关的参数打交道?因人而异的保证金和手续费不应该成为不提供这些参数的理由!
当多个策略在同时运行的时候,你的实际账户权益的消长,到底是哪个策略赚的,哪个策略赔的都无法说清楚,运行3天后就已经是一本糊涂账了,这怎么可以!

虽然有上面的困难,但是办法总比困难多!可以参考文华财经8.3或者库安9.0的办法(熟悉文华财经客户端的人应该都知道),它们的方法是用模组账户的方法来为每个用户模组创建一个虚拟的模组账户,很好地解决用户算法对资金、保证金和手续费等参数的设定!

策略账户的功能:

  1. 分配交易的初始资金,还可以做出金入金等虚拟操作,目的就是控制策略的交易规模。
  2. 设置各个合约实际使用的保证金(率)
  3. 设置各个合约可以使用的手续费(率),包括开仓、平仓和平今仓手续费(率)
  4. 可以记录策略产出的委托单
  5. 可以记录策略初始的成交单
  6. 可以为策略提供合约的当前可用资金、保证金(率)和手续费(率)
  7. 算策略的历史交易盈亏和当前交易的浮动盈亏
  8. 提供当前策略账户的权益和可用资金
  9. 提供策略自创建以来的所有历史委托单和成交单查询,解决目前CTA策略之知道最新活动委托单,当日成交单和未平仓的仓位,而不知道历史交易的情况的问题。

如何实现策略账户 ?

策略账户的已经基本上实现了,目前只在测试中,且看我一步一步慢慢为大家分享......

5. 最新进展

有兴趣的可以先看看 策略账户界面展示


统计

主题
8127
帖子
31782
已注册用户
39564
最新用户
在线用户
188
在线来宾用户
4401
© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】