作者:时间 ;来源:维恩的派论坛 ;版本:v1.72
 

改写原因:行情记录多个合约的数据时,在下午收盘后依旧有很多tick数据未来的及插入数据库,尤其是在1.7.1版本时(mainEngine.dbUpdate()的模式下)

 

改写方式:

  1. 主要是在陈老师的drengine上进行更改
  2. 每个队列负责记录一个合约,每个线程负责一个合约的插入
  3. 主力合约数据没有单独推送到队列,而是跟随tick 或bar数据直接插入数据库
  4. 采用1.7.2版本的(mainEngine.dbInsert()模式)

 
效果: 七个合约进行数据记录,基本实现生产和消费同步,测试时间不久,错误之处欢迎指正

 
 
代码如下:

# encoding: UTF-8

'''
本文件中实现了行情数据记录引擎,用于汇总TICK数据,并生成K线插入数据库。

使用DR_setting.json来配置需要收集的合约,以及主力合约代码。
'''

import json
import csv
import os
import copy
from collections import OrderedDict
from datetime import datetime, timedelta
from Queue import Queue, Empty
from threading import Thread

from vnpy.event import Event
from vnpy.trader.vtEvent import *
from vnpy.trader.vtFunction import todayDate, getJsonPath
from vnpy.trader.vtObject import VtSubscribeReq, VtLogData, VtBarData, VtTickData
from vnpy.trader.app.ctaStrategy.ctaTemplate import BarManager

from .drBase import *
from .language import text


########################################################################
# class DrEngine(object):
#     """数据记录引擎"""

#     settingFileName = 'DR_setting.json'
#     settingFilePath = getJsonPath(settingFileName, __file__)
#     # print u"数据记录配置文件地址:",settingFilePath  

#     #----------------------------------------------------------------------
#     def __init__(self, mainEngine, eventEngine):
#         """Constructor"""
#         self.mainEngine = mainEngine
#         self.eventEngine = eventEngine

#         # 当前日期
#         self.today = todayDate()

#         # 主力合约代码映射字典,key为具体的合约代码(如IF1604),value为主力合约代码(如IF0000)
#         self.activeSymbolDict = {}

#         # Tick对象字典
#         self.tickSymbolSet = set()

#         # K线合成器字典
#         self.bmDict = {}

#         # 配置字典
#         self.settingDict = OrderedDict()

#         # 负责执行数据库插入的单独线程相关
#         self.active = False                     # 工作状态
#         self.queue = Queue()                    # 队列
#         self.thread = Thread(target=self.run)   # 线程

#         # 载入设置,订阅行情
#         self.loadSetting()

#         # 启动数据插入线程
#         self.start()

#         # 注册事件监听
#         self.registerEvent()  

#     #----------------------------------------------------------------------
#     def loadSetting(self):
#         """加载配置"""
#         with open(self.settingFilePath) as f:
#             drSetting = json.load(f)

#             # 如果working设为False则不启动行情记录功能
#             working = drSetting['working']
#             if not working:
#                 return

#             # Tick记录配置
#             if 'tick' in drSetting:
#                 l = drSetting['tick']

#                 for setting in l:
#                     symbol = setting[0]
#                     gateway = setting[1]
#                     vtSymbol = symbol

#                     req = VtSubscribeReq()
#                     req.symbol = setting[0]

#                     # 针对LTS和IB接口,订阅行情需要交易所代码
#                     if len(setting)>=3:
#                         req.exchange = setting[2]
#                         vtSymbol = '.'.join([symbol, req.exchange])

#                     # 针对IB接口,订阅行情需要货币和产品类型
#                     if len(setting)>=5:
#                         req.currency = setting[3]
#                         req.productClass = setting[4]

#                     self.mainEngine.subscribe(req, gateway)

#                     #tick = VtTickData()           # 该tick实例可以用于缓存部分数据(目前未使用)
#                     #self.tickDict[vtSymbol] = tick
#                     self.tickSymbolSet.add(vtSymbol)

#                     # 保存到配置字典中
#                     if vtSymbol not in self.settingDict:
#                         d = {
#                             'symbol': symbol,
#                             'gateway': gateway,
#                             'tick': True
#                         }
#                         self.settingDict[vtSymbol] = d
#                     else:
#                         d = self.settingDict[vtSymbol]
#                         d['tick'] = True

#             # 分钟线记录配置
#             if 'bar' in drSetting:
#                 l = drSetting['bar']

#                 for setting in l:
#                     symbol = setting[0]
#                     gateway = setting[1]
#                     vtSymbol = symbol

#                     req = VtSubscribeReq()
#                     req.symbol = symbol                    

#                     if len(setting)>=3:
#                         req.exchange = setting[2]
#                         vtSymbol = '.'.join([symbol, req.exchange])

#                     if len(setting)>=5:
#                         req.currency = setting[3]
#                         req.productClass = setting[4]                    

#                     self.mainEngine.subscribe(req, gateway)  

#                     # 保存到配置字典中
#                     if vtSymbol not in self.settingDict:
#                         d = {
#                             'symbol': symbol,
#                             'gateway': gateway,
#                             'bar': True
#                         }
#                         self.settingDict[vtSymbol] = d
#                     else:
#                         d = self.settingDict[vtSymbol]
#                         d['bar'] = True     

#                     # 创建BarManager对象
#                     self.bmDict[vtSymbol] = BarManager(self.onBar)

#             # 主力合约记录配置
#             if 'active' in drSetting:
#                 d = drSetting['active']
#                 self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()}

#     #----------------------------------------------------------------------
#     def getSetting(self):
#         """获取配置"""
#         return self.settingDict, self.activeSymbolDict

#     #----------------------------------------------------------------------
#     def procecssTickEvent(self, event):
#         """处理行情事件"""
#         tick = event.dict_['data']
#         vtSymbol = tick.vtSymbol

#         # 生成datetime对象
#         if not tick.datetime:
#             tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')            

#         self.onTick(tick)

#         bm = self.bmDict.get(vtSymbol, None)
#         if bm:
#             bm.updateTick(tick)

#     #----------------------------------------------------------------------
#     def onTick(self, tick):
#         """Tick更新"""
#         vtSymbol = tick.vtSymbol

#         if vtSymbol in self.tickSymbolSet:
#             self.insertData(TICK_DB_NAME, vtSymbol, tick)

#             if vtSymbol in self.activeSymbolDict:
#                 activeSymbol = self.activeSymbolDict[vtSymbol]
#                 self.insertData(TICK_DB_NAME, activeSymbol, tick)


#             self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol,
#                                                              time=tick.time, 
#                                                              last=tick.lastPrice, 
#                                                              bid=tick.bidPrice1, 
#                                                              ask=tick.askPrice1))

#     #----------------------------------------------------------------------
#     def onBar(self, bar):
#         """分钟线更新"""
#         vtSymbol = bar.vtSymbol

#         self.insertData(MINUTE_DB_NAME, vtSymbol, bar)

#         if vtSymbol in self.activeSymbolDict:
#             activeSymbol = self.activeSymbolDict[vtSymbol]
#             self.insertData(MINUTE_DB_NAME, activeSymbol, bar)                    

#         self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol, 
#                                                         time=bar.time, 
#                                                         open=bar.open, 
#                                                         high=bar.high, 
#                                                         low=bar.low, 
#                                                         close=bar.close))        

#     #----------------------------------------------------------------------
#     def registerEvent(self):
#         """注册事件监听"""
#         self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)

#     #----------------------------------------------------------------------
#     def insertData(self, dbName, collectionName, data):
#         """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)"""
#         self.queue.put((dbName, collectionName, data.__dict__))

#     #----------------------------------------------------------------------
#     def run(self):
#         """运行插入线程"""
#         while self.active:
#             try:
#                 dbName, collectionName, d = self.queue.get(block=True, timeout=1)
#                 flt = {'datetime': d['datetime']}
#                 self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True)
#             except Empty:
#                 pass

#     #----------------------------------------------------------------------
#     def start(self):
#         """启动"""
#         self.active = True
#         self.thread.start()

#     #----------------------------------------------------------------------
#     def stop(self):
#         """退出"""
#         if self.active:
#             self.active = False
#             self.thread.join()

#     #----------------------------------------------------------------------
#     def writeDrLog(self, content):
#         """快速发出日志事件"""
#         log = VtLogData()
#         log.logContent = content
#         event = Event(type_=EVENT_DATARECORDER_LOG)
#         event.dict_['data'] = log
#         self.eventEngine.put(event)   
#     




########################################################################
# class DrEngine(object):
#     """数据记录引擎"""

#     settingFileName = 'DR_setting.json'
#     settingFilePath = getJsonPath(settingFileName, __file__)
#     print u"----------多线程行情数据记录-------------------"  

#     #----------------------------------------------------------------------
#     def __init__(self, mainEngine, eventEngine):
#         """Constructor"""
#         self.mainEngine = mainEngine
#         self.eventEngine = eventEngine

#         # 当前日期
#         self.today = todayDate()

#         # 主力合约代码映射字典,key为具体的合约代码(如IF1604),value为主力合约代码(如IF0000)
#         self.activeSymbolDict = {}

#         # Tick对象字典
#         self.tickSymbolSet = set()

#         # K线合成器字典
#         self.bmDict = {}

#         # 配置字典
#         self.settingDict = OrderedDict()

#         # 负责执行数据库插入的单独线程相关
#         self.active = False                     # 工作状态
#         # self.queue = Queue()                    # 队列
#         # self.thread = Thread(target=self.run)   # 线程
#         self.queue_dict=OrderedDict()
#         self.threadlist=[]

#         # 载入设置,订阅行情
#         self.loadSetting()

#         #加载多线程
#         self.get_threadlist()
#         # 启动数据插入线程
#         self.start()

#         # 注册事件监听
#         self.registerEvent()  

#     #----------------------------------------------------------------------
#     def loadSetting(self):
#         """加载配置"""
#         with open(self.settingFilePath) as f:
#             drSetting = json.load(f)

#             # 如果working设为False则不启动行情记录功能
#             working = drSetting['working']
#             if not working:
#                 return

#             # Tick记录配置
#             if 'tick' in drSetting:
#                 l = drSetting['tick']

#                 for setting in l:
#                     symbol = setting[0]
#                     gateway = setting[1]
#                     vtSymbol = symbol
#                     if symbol not in self.queue_dict:
#                         self.queue_dict[symbol]=Queue()

#                     req = VtSubscribeReq()
#                     req.symbol = setting[0]

#                     # 针对LTS和IB接口,订阅行情需要交易所代码
#                     if len(setting)>=3:
#                         req.exchange = setting[2]
#                         vtSymbol = '.'.join([symbol, req.exchange])

#                     # 针对IB接口,订阅行情需要货币和产品类型
#                     if len(setting)>=5:
#                         req.currency = setting[3]
#                         req.productClass = setting[4]

#                     self.mainEngine.subscribe(req, gateway)

#                     #tick = VtTickData()           # 该tick实例可以用于缓存部分数据(目前未使用)
#                     #self.tickDict[vtSymbol] = tick
#                     self.tickSymbolSet.add(vtSymbol)

#                     # 保存到配置字典中
#                     if vtSymbol not in self.settingDict:
#                         d = {
#                             'symbol': symbol,
#                             'gateway': gateway,
#                             'tick': True
#                         }
#                         self.settingDict[vtSymbol] = d
#                     else:
#                         d = self.settingDict[vtSymbol]
#                         d['tick'] = True

#             # 分钟线记录配置
#             if 'bar' in drSetting:
#                 l = drSetting['bar']

#                 for setting in l:
#                     symbol = setting[0]
#                     gateway = setting[1]
#                     vtSymbol = symbol
#                     if symbol not in self.queue_dict:
#                         self.queue_dict[symbol]=Queue()

#                     req = VtSubscribeReq()
#                     req.symbol = symbol                    

#                     if len(setting)>=3:
#                         req.exchange = setting[2]
#                         vtSymbol = '.'.join([symbol, req.exchange])

#                     if len(setting)>=5:
#                         req.currency = setting[3]
#                         req.productClass = setting[4]                    

#                     self.mainEngine.subscribe(req, gateway)  

#                     # 保存到配置字典中
#                     if vtSymbol not in self.settingDict:
#                         d = {
#                             'symbol': symbol,
#                             'gateway': gateway,
#                             'bar': True
#                         }
#                         self.settingDict[vtSymbol] = d
#                     else:
#                         d = self.settingDict[vtSymbol]
#                         d['bar'] = True     

#                     # 创建BarManager对象
#                     self.bmDict[vtSymbol] = BarManager(self.onBar)

#             # 主力合约记录配置
#             if 'active' in drSetting:
#                 d = drSetting['active']
#                 self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()}

#     #----------------------------------------------------------------------
#     def getSetting(self):
#         """获取配置"""
#         return self.settingDict, self.activeSymbolDict

#     #----------------------------------------------------------------------
#     def procecssTickEvent(self, event):
#         """处理行情事件"""
#         tick = event.dict_['data']
#         vtSymbol = tick.vtSymbol

#         # 生成datetime对象
#         if not tick.datetime:
#             tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')            

#         self.onTick(tick)

#         bm = self.bmDict.get(vtSymbol, None)
#         if bm:
#             bm.updateTick(tick)

#     #----------------------------------------------------------------------
#     def onTick(self, tick):
#         """Tick更新"""
#         vtSymbol = tick.vtSymbol

#         if vtSymbol in self.tickSymbolSet:
#             self.insertData(TICK_DB_NAME, vtSymbol, tick)

#             if vtSymbol in self.activeSymbolDict:
#                 activeSymbol = self.activeSymbolDict[vtSymbol]
#                 self.insertData(TICK_DB_NAME, activeSymbol, tick)


#             self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol,
#                                                              time=tick.time, 
#                                                              last=tick.lastPrice, 
#                                                              bid=tick.bidPrice1, 
#                                                              ask=tick.askPrice1))

#     #----------------------------------------------------------------------
#     def onBar(self, bar):
#         """分钟线更新"""
#         vtSymbol = bar.vtSymbol

#         self.insertData(MINUTE_DB_NAME, vtSymbol, bar)

#         if vtSymbol in self.activeSymbolDict:
#             activeSymbol = self.activeSymbolDict[vtSymbol]
#             self.insertData(MINUTE_DB_NAME, activeSymbol, bar)                    

#         self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol, 
#                                                         time=bar.time, 
#                                                         open=bar.open, 
#                                                         high=bar.high, 
#                                                         low=bar.low, 
#                                                         close=bar.close))        

#     #----------------------------------------------------------------------
#     def registerEvent(self):
#         """注册事件监听"""
#         self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)

#     #----------------------------------------------------------------------
#     def insertData(self, dbName, collectionName, data):
#         """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)"""
#         symbol=data.vtSymbol
#         symbol_queue=self.queue_dict.get(symbol,None)
#         if symbol_queue:
#             symbol_queue.put((dbName, collectionName, data.__dict__))

#     #----------------------------------------------------------------------
#     def run(self,symbol):
#         """运行插入线程"""
#         while self.active:
#             try:
#                 single_queue=self.queue_dict.get(symbol,None)
#                 if single_queue:
#                     print u"----{}----,队列大小为:{}".format(symbol,single_queue.qsize())
#                     dbName, collectionName, d = single_queue.get(block=True, timeout=1)
#                     flt = {'datetime': d['datetime']}
#                     self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True)
#             except Empty:
#                 pass

#     #----------------------------------------------------------------------
#     def get_threadlist(self):
#         symbollist=self.queue_dict.keys()
#         for symbol in symbollist:
#             t=Thread(target=self.run,args=(symbol,))
#             self.threadlist.append(t)

#     def start(self):
#         """启动"""
#         self.active = True
#         # self.thread.start()
#         for t in self.threadlist:
#             t.start()

#     #----------------------------------------------------------------------
#     def stop(self):
#         """退出"""
#         if self.active:
#             self.active = False
#             self.thread.join()

#     #----------------------------------------------------------------------
#     def writeDrLog(self, content):
#         """快速发出日志事件"""
#         log = VtLogData()
#         log.logContent = content
#         event = Event(type_=EVENT_DATARECORDER_LOG)
#         event.dict_['data'] = log
#         self.eventEngine.put(event)   




########################################################################
class DrEngine(object):
    """数据记录引擎"""

    settingFileName = 'DR_setting.json'
    settingFilePath = getJsonPath(settingFileName, __file__)  

    #----------------------------------------------------------------------
    def __init__(self, mainEngine, eventEngine):
        """Constructor"""
        self.mainEngine = mainEngine
        self.eventEngine = eventEngine

        # 当前日期
        self.today = todayDate()

        # 主力合约代码映射字典,key为具体的合约代码(如IF1604),value为主力合约代码(如IF0000)
        self.activeSymbolDict = {}

        # Tick对象字典
        self.tickSymbolSet = set()

        # K线合成器字典
        self.bmDict = {}

        # 配置字典
        self.settingDict = OrderedDict()

        # 负责执行数据库插入的单独线程相关
        self.active = False                     # 工作状态
        # self.queue = Queue()                    # 队列
        # self.thread = Thread(target=self.run)   # 线程
        self.queue_dict=OrderedDict()
        self.threadlist=[]

        # 载入设置,订阅行情
        self.loadSetting()

        #加载多线程
        self.get_threadlist()
        # 启动数据插入线程
        self.start()

        # 注册事件监听
        self.registerEvent()  

    #----------------------------------------------------------------------
    def loadSetting(self):
        """加载配置"""
        with open(self.settingFilePath) as f:
            drSetting = json.load(f)

            # 如果working设为False则不启动行情记录功能
            working = drSetting['working']
            if not working:
                return

            # Tick记录配置
            if 'tick' in drSetting:
                l = drSetting['tick']

                for setting in l:
                    symbol = setting[0]
                    gateway = setting[1]
                    vtSymbol = symbol
                    if symbol not in self.queue_dict:
                        self.queue_dict[symbol]=Queue()

                    req = VtSubscribeReq()
                    req.symbol = setting[0]

                    # 针对LTS和IB接口,订阅行情需要交易所代码
                    if len(setting)>=3:
                        req.exchange = setting[2]
                        vtSymbol = '.'.join([symbol, req.exchange])

                    # 针对IB接口,订阅行情需要货币和产品类型
                    if len(setting)>=5:
                        req.currency = setting[3]
                        req.productClass = setting[4]

                    self.mainEngine.subscribe(req, gateway)

                    #tick = VtTickData()           # 该tick实例可以用于缓存部分数据(目前未使用)
                    #self.tickDict[vtSymbol] = tick
                    self.tickSymbolSet.add(vtSymbol)

                    # 保存到配置字典中
                    if vtSymbol not in self.settingDict:
                        d = {
                            'symbol': symbol,
                            'gateway': gateway,
                            'tick': True
                        }
                        self.settingDict[vtSymbol] = d
                    else:
                        d = self.settingDict[vtSymbol]
                        d['tick'] = True

            # 分钟线记录配置
            if 'bar' in drSetting:
                l = drSetting['bar']

                for setting in l:
                    symbol = setting[0]
                    gateway = setting[1]
                    vtSymbol = symbol
                    if symbol not in self.queue_dict:
                        self.queue_dict[symbol]=Queue()

                    req = VtSubscribeReq()
                    req.symbol = symbol                    

                    if len(setting)>=3:
                        req.exchange = setting[2]
                        vtSymbol = '.'.join([symbol, req.exchange])

                    if len(setting)>=5:
                        req.currency = setting[3]
                        req.productClass = setting[4]                    

                    self.mainEngine.subscribe(req, gateway)  

                    # 保存到配置字典中
                    if vtSymbol not in self.settingDict:
                        d = {
                            'symbol': symbol,
                            'gateway': gateway,
                            'bar': True
                        }
                        self.settingDict[vtSymbol] = d
                    else:
                        d = self.settingDict[vtSymbol]
                        d['bar'] = True     

                    # 创建BarManager对象
                    self.bmDict[vtSymbol] = BarManager(self.onBar)

            # 主力合约记录配置
            if 'active' in drSetting:
                d = drSetting['active']
                self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()}

    #----------------------------------------------------------------------
    def getSetting(self):
        """获取配置"""
        return self.settingDict, self.activeSymbolDict

    #----------------------------------------------------------------------
    def procecssTickEvent(self, event):
        """处理行情事件"""
        tick = event.dict_['data']
        vtSymbol = tick.vtSymbol

        # 生成datetime对象
        if not tick.datetime:
            tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')            

        self.onTick(tick)

        bm = self.bmDict.get(vtSymbol, None)
        if bm:
            bm.updateTick(tick)

    #----------------------------------------------------------------------
    def onTick(self, tick):
        """Tick更新"""
        vtSymbol = tick.vtSymbol

        if vtSymbol in self.tickSymbolSet:
            self.insertData(TICK_DB_NAME, vtSymbol, tick)

            # if vtSymbol in self.activeSymbolDict:
            #     activeSymbol = self.activeSymbolDict[vtSymbol]
            #     self.insertData(TICK_DB_NAME, activeSymbol, tick)


            self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol,
                                                             time=tick.time, 
                                                             last=tick.lastPrice, 
                                                             bid=tick.bidPrice1, 
                                                             ask=tick.askPrice1))

    #----------------------------------------------------------------------
    def onBar(self, bar):
        """分钟线更新"""
        vtSymbol = bar.vtSymbol

        self.insertData(MINUTE_DB_NAME, vtSymbol, bar)

        # if vtSymbol in self.activeSymbolDict:
        #     activeSymbol = self.activeSymbolDict[vtSymbol]
        #     self.insertData(MINUTE_DB_NAME, activeSymbol, bar)                    

        self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol, 
                                                        time=bar.time, 
                                                        open=bar.open, 
                                                        high=bar.high, 
                                                        low=bar.low, 
                                                        close=bar.close))        

    #----------------------------------------------------------------------
    def registerEvent(self):
        """注册事件监听"""
        self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)

    #----------------------------------------------------------------------
    def insertData(self, dbName, collectionName, data):
        """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)"""
        symbol=data.vtSymbol
        symbol_queue=self.queue_dict.get(symbol,None)
        if symbol_queue:
            symbol_queue.put((dbName, collectionName, data.__dict__))

    #----------------------------------------------------------------------
    def run(self,symbol):
        """运行插入线程"""
        while self.active:
            try:
                single_queue=self.queue_dict.get(symbol,None)
                if single_queue:
                    # sys.stdout.write(u"----{}----,队列大小为:{} \r".format(symbol,single_queue.qsize()))
                    # print u"----{}----,队列大小为:{}".format(symbol,single_queue.qsize())
                    dbName, collectionName, d =single_queue.get(block=True, timeout=1)
                    flt = {'datetime': d['datetime']}
                    # self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True)
                    self.mainEngine.dbInsert(dbName, collectionName, d)
                    #同样的数据插入主力连续合约数据集
                    if symbol in self.activeSymbolDict:
                        activeSymbol = self.activeSymbolDict[symbol]
                        # self.mainEngine.dbUpdate(dbName, activeSymbol, d, flt, True)
                        self.mainEngine.dbInsert(dbName, activeSymbol, d)

            except Empty:
                pass

    #----------------------------------------------------------------------
    def get_threadlist(self):
        symbollist=self.queue_dict.keys()
        for symbol in symbollist:
            t=Thread(target=self.run,args=(symbol,))
            self.threadlist.append(t)

    def get_total_tasknumber(self):
        total_size=0
        symbollist=self.queue_dict.keys()
        for symbol in symbollist:
            total_size=total_size+self.queue_dict[symbol].qsize()
        return total_size

    def start(self):
        """启动"""
        self.active = True
        # self.thread.start()
        for t in self.threadlist:
            t.start()

    #----------------------------------------------------------------------
    def stop(self):
        """退出"""
        if self.active:
            self.active = False
            self.thread.join()

    #----------------------------------------------------------------------
    def writeDrLog(self, content):
        """快速发出日志事件"""
        log = VtLogData()
        log.logContent = content
        event = Event(type_=EVENT_DATARECORDER_LOG)
        event.dict_['data'] = log
        self.eventEngine.put(event)