VeighNa量化社区
你的开源社区量化交易平台
Member
加入于:
帖子: 59
声望: 8

一,事件引擎的创初始化。这部分没啥好说的,主要是创建_queue, 创建_run、_run_timer线程对象,创建_handlers默认字典(一个事件类型对能够处理该事件的函数列表)、创建_general_handlers列表。再来说说这个事件引擎的主要功能:_run线程被开启之后会创建一个死循环,不断从队列里取出事件交由_process方法处理,_process方法根据事件类型处理与之绑定的函数列表,遍历函数列表分别调用各函数. _run_timer线程包含一个每隔一个时间间隔创建一个计时器事件的死循环(用于定时查询持仓以及账户信息,可以在ctp_gateway.py中看到,一旦调用了connect方法, 就会调用self.int_query方法, 而这个方法会立马将process_timer_event方法与EVENT_TIMER事件绑定,而process_timer_event方法是包含两个查询方法的[self.query_account, self.query_position]的)

二、主引擎的初始化。接收事件引擎作为入参,调用self.event_engine.start(),开始调用事件引擎的_run,_run_timer两个线程,此时_run_timer线程会不断往事件引擎的_queue队列里放Event_timer事件, 但是由于还没有执行connect操作(进而还没有执行connect方法里的init_query方法,进而还没有执行EVENT_TIMER事件注册), self._queue.get(block=True, timeout=1)会一直"白取".
而后执行最关键的self.init_engines()方法: self.add_engine(LogEngine),self.add_engine(OmsEngine),self.add_engine(EmailEngine),这三个方法都是拿参数的class文件, 将这三个class文件分别实例化, 而后返回. 这里分析最关键的self.add_engine(OmsEngine)方法, 一些字典(self.ticks,self.orders,self.trades等)的初始化这里略过, 先看self.add_function()的调用, 这个方法主要是将oms引擎中的查询方法绑定到main_engine的实例中,如self.main_engine.get_tick = self.get_tick,self.main_engine.get_order = self.get_order等; 再看self.register_event()方法,这里会注册六个事件(EVENT_TICK,EVENT_ORDER等)同时绑定相关方法

三, 添加ctpgateway接口. 获取获取ctpgateway的class文件, ctpgateway自身实例化,其实主要是实例化行情api对象CtpMdApi和交易api对象CTPTdApi, 添加完ctpgateway接口之后主引擎就创建成功了,但是还是一个空壳, 事件引擎里的_queue还是只有EVENT_TIMER 事件, 啥都没有

四,连接ctp接口, 这个connect方法其实是为主引擎注入灵魂的操作.CtpGateway类继承了Base_gateway类, 同时绑定了CtpMdApi和CTPTdApi两个类,里面有个self.init()方法, 具体逻辑看不到, 但是可以推测其功能, 如何推测呢, 看执行完connect之后oms引擎里头变量的变化,里面原本的部分空字典已经有数据了, 挑一个最复杂的self.contracts字典. 可以发现oms的self.contracts字典里已经充满了ctp接口内包含的所有合约代码的基本信息,那合约信息是怎么样进入到oms引擎的呢? 打开OmsEngine引擎类发现往self.contracts字典里填充数据的方法是process_contract_event,它接收的是Event事件参数,而Event都是通过事件引擎从_queue队列里头取出来的,说明这个self.init()方法会从交易所那边把所有合约的基本信息包装成事件放入到_queue队列中,那在哪里包装的呢? 目前我们所拥有的对象有:CtpGateway, 父类BaseGateway, CtpMdApi,CtpTdApi, 一个一个去找,发现在BaseGateway抽象类里有个函数on_event()就是用来做这个事情的(可以判断,除了合约的基本信息以外,其他从交易所收到的信息, 都要经过这个方法的包装,比如tickdata,orderdata等), 再往前回溯,有一个on_contract()方法正好调用了这个on_event()方法,而后在ctp_gateway.py搜索on_contract,发现on_contract方法在CtpTdapi的onRspQryInstrument()方法中被调用, 说明onRspQryInstrument这个方法就是用于处理原始从交易所里面我们登陆自己账号后返回的信息(原始程度不详).同理:self.accounts,self.positions字典的数据填充过程也是类似. 综上我们可以了解到数据流过程: CtpMdApi和CTPTdApi对象的on开头的部分函数负责将交易所的原始数据进行初步处理,包装秤vnpy的各类数据对象,如TickData,OrderData等,而后回调给BaseGateway的各类回调函数如gateway.on_tick,gateway.on_oreder等, 而后由这些回调函进一步回调给on_event函数,on_event函数将其包装成各类事件,继而将各类事件放入事件引擎的_queue队列中,交由事件引擎中已注册的事件的各类函数去处理. 除此之外, ctp_gateway.py中还调用了init_query方法,用于定时查询账户信息和持仓,此时事件引擎的self._queue.get(block=True, timeout=1)就不会"白取"了

五, 数据记录引擎RecorderEngine的初始化,这里先以直接初始化RecorderEngine类为例, market_recorder = RecorderEngine(main_engine, event_engine)
在RecorderEngine里实例化一个queue队列
为需要做行情录制的合约代码初始化空字典
创建数据库写入线程run
self.register_event()注册事件:
self.event_engine.register(EVENT_TICK, self.process_tick_event) ,为eTick事件添加来自RecorderEngine的process_tick_event方法
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event), 为eContract事件添加来自RecorderEngine的process_contract_event方法
self.event_engine.register(EVENT_SPREAD_DATA, self.process_spread_event)
根据第四步,连接完ctp接口的时候,oms的self.contracts字典中已经充满了合约基本信息,也即说明事件引擎已经完全消耗了eContracts事件,所以在RecorderEngine中为eContract事件再去添加来自RecorderEngine的process_contract_event方法, 其实这个来自RecorderEngine的process_contract_event方法永远不会执行, 然而在主线程中main_engine.connect(CTP_SETTING, "CTP")之后紧接着实例化 RecorderEngine(main_engine, event_engine), 来自oms的process_contract_event方法和来自RecorderEngine的process_contract_event方法都会得到执行, 这就很奇怪了,因为connect之后按理说队列中已经没有eContract事件了啊, 所以由此判定connect方法有阻塞子线程的作用, 通过在connect方法下一行代码增加阻塞主线程的sleep函数之后,再去实例化RecorderEngine(main_engine, event_engine), 来自RecorderEngine的process_contract_event方法确实永远得不到执行. 所以我个人觉得在"丘"的帖子"全市场录制行情数据"(https://www.vnpy.com/forum/topic/3046-quan-shi-chang-lu-zhi-xing-qing-shu-ju)帖子中, 会有一定的bug几率,
作者是这么写的
main_engine.connect(CTP_SETTING, "CTP")
main_engine.write_log("连接CTP接口")
whole_market_recorder = WholeMarketRecorder(main_engine, event_engine)
个人觉得应该在执行完main_engine.connect(CTP_SETTING, "CTP")之后,在主线程中添加阻塞,以等待connect方法完全执行, 如果不阻塞一下主线程的话, 有一定的概率有部分eContract事件会先"跑掉",做不到在执行完来自oms的process_contract_event之后紧接着执行来自RecorderEngine的process_contract_event方法, 这是有风险的,最保险的做法是在主线程中增加sleep, 阻塞主线程一些时间.

六, 添加订阅合约代码
在” 全市场录制行情数据”帖子中一直搞不懂是如何将ctp接口的所有合约代码进行录制的,因为在源代码中一直没找到是如何调用add_tick_recording()方法”显式”的订阅所有合约的过程, 看完第五条之后应该明白了, 其是在connect方法执行的时候, 利用用户连接ctp服务器之后返回的所有合约基本信息得到合约代码, 同时覆写父类recorderEngine的process_contract_event方法, 在eContract事件中同时绑定来自oms的process_contract_event和来自RecorderEngine的process_contract_event方法, 在一个死循环中, 既将oms引擎的self.contracts字典填充完毕,又同时将所有合约代码进行订阅了.
如果只想订阅某个合约, 将recorderEngine实例化后,直接调用add_tick_recording方法就好了, 需要注意的是, 如果connect之后没有添加主线程阻塞, recorderEngine实例化后需要在主线程中进行阻塞,否则无法添加成功.

七, 开始数据库存储
当调用subscribe方法订阅之后, 其实调用的是ctpMdapi的subscribeMarketData方法进行订阅的,具体订阅逻辑不明, 反正订阅之后, 事件引擎中就会填充上你订阅的合约代码的etick事件了.之前在recorderEngine中是增加(oms引擎中也注册了该事件)注册了EVENT_TICK事件process_tick_event方法的,该方法继而调用update_tick方法判断记录类型,以记录bar行情为例,首先获取包含某合约代码的bar_generator(RecorderEngine.record_bar),而后调用Bar_generator的update_tick方法开始生成第一根bar,当生成bar之后回调RecorderEngine.record_bar(bar),往RecorderEngine的queue中放入bar数据,一旦放入数据, RecorderEngine的run线程早就开始待命了,根据不同事件类型调用不同的数据库保存方法,完成数据记录过程

Member
avatar
加入于:
帖子: 8
声望: 2

。。。明天也没看到帖子

Member
加入于:
帖子: 59
声望: 8

callingpulse wrote:

。。。明天也没看到帖子

不出意外的话,今天晚上更新, 白天要工作啊..

Member
加入于:
帖子: 59
声望: 8

无法编辑铁子了,只能回复了

Member
avatar
加入于:
帖子: 2
声望: 0

我这边录制数据最近发现上午少了好多合约 ,晚上是好的,到早上之后只剩下 几个金银合约了,具体在哪里加sleep,楼主能说一下吗,上面的逻辑没有看懂

Member
avatar
加入于:
帖子: 8
声望: 2

不出意外的话,今天晚上更新, 白天要工作啊..

一不小心催更成功了。哈哈,谢谢楼主的分享。

Member
avatar
加入于:
帖子: 61
声望: 2

callingpulse wrote:

不出意外的话,今天晚上更新, 白天要工作啊..

一不小心催更成功了。哈哈,谢谢楼主的分享。
感谢楼主分享

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】