VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 23
声望: 0

因为veighna(原名vn.py)升级到3.0版本,但是社区精选的代码还停留在旧版本,所以我就小小的维护下,若有问题欢迎交流

首先在site-packages文件里的vnpy包里打开trader文件夹的database,在输入的参数里面增加collection_name: str = None

    @abstractmethod
    def load_bar_data(
        ...
        collection_name: str = None
    ) -> List[BarData]:
        """
        Load bar data from database.
        """
        pass

    @abstractmethod
    def load_tick_data(
        ...
        collection_name: str = None
    ) -> List[TickData]:
        """
        Load tick data from database.
        """
        pass

然后同样在这database文件里修改get_database函数,增加collection_name输入参数,在里面增加个判断逻辑,若有collection_name则使用mongodb数据库

def get_database(collection_name: str = None) -> BaseDatabase:
    ...
    # Read database related global setting
    database_name: str = SETTINGS["database.name"]
    module_name: str = f"vnpy_{database_name}"

    if collection_name:
        module_name: str = f"vnpy_mongodb"

    # Try to import database module
    try:
        module = import_module(module_name)
    except ModuleNotFoundError:
        print(f"找不到数据库驱动{module_name},使用默认的SQLite数据库")
        module = import_module("vnpy_sqlite")
    ...

打开site-packages文件里的vnpy_ctastrategy包里打开backtesting.py,修改load_bar_data、load_tick_data、set_parameters和load_data函数,此处修改和旧方法一样,但是要往get_database函数里面传入collection_name

@lru_cache(maxsize=999)
def load_bar_data(
    ...
    collection_name: str = None
):
    """"""
    database = get_database(collection_name)
    if collection_name:

        return database.load_bar_data(
            symbol, exchange, interval, start, end, collection_name
        )
    else:
        return database.load_bar_data(
            symbol, exchange, interval, start, end)


@lru_cache(maxsize=999)
def load_tick_data(
    ...
    collection_name: str = None
):
    """"""
    database = get_database(collection_name)
    if collection_name:
        return database.load_tick_data(
            symbol, exchange, start, end, collection_name
        )
    else:
        return database.load_tick_data(
            symbol, exchange, start, end)


def set_parameters(
        ...
        collection_name: str = None
    ):
        self.collection_name = collection_name
        ...

def load_data():
     ...
            if self.mode == BacktestingMode.BAR:
                data = load_bar_data(
                    self.symbol,
                    self.exchange,
                    self.interval,
                    start,
                    end,
                    self.collection_name,
                )
            else:
                data = load_tick_data(
                    self.symbol,
                    self.exchange,
                    start,
                    end,
                    self.collection_name
                )
     ...

最后打开site-packages文件里的vnpy_mongodb包里打开mongodb_database.py修改load_bar_data和load_tick_data函数
在增加传入参数collection_name,然后增加判断若是collection_name不为None则从mongodb处读入这个表的数据

   def load_bar_data(
        ...
        collecion_name: str = None
    ) -> List[BarData]:
        """读取K线数据"""
        filter = {
            "symbol": symbol,
            "exchange": exchange.value,
            "interval": interval.value,
            "datetime": {
                "$gte": start,
                "$lte": end
            }
        }

        if collecion_name:
            self.bar_collection = self.db[collecion_name]
            self.bar_collection.create_index(
            [
                ("exchange", ASCENDING),
                ("symbol", ASCENDING),
                ("interval", ASCENDING),
                ("datetime", ASCENDING),
            ],
            unique=True
        )
        c: Cursor = self.bar_collection.find(filter)

        ...

    def load_tick_data(
        ...
        collection_name:str = None
    ) -> List[TickData]:
        """读取TICK数据"""
        filter = {
            "symbol": symbol,
            "exchange": exchange.value,
            "datetime": {
                "$gte": start,
                "$lte": end
            }
        }
        if collection_name:

            self.tick_collection: Collection = self.db[collection_name]
            self.tick_collection.create_index(
                [
                    ("exchange", ASCENDING),
                    ("symbol", ASCENDING),
                    ("datetime", ASCENDING),
                ],
                unique=True
            )

        c: Cursor = self.tick_collection.find(filter)

        ...

然后参数优化的话不怎么用改,无界面参数优化时穷举算法可行,效果如下。

description

但是无界面遗传算法倒是出现了bug,报的错是 ValueError: empty range for randrange() (1, 1, 0)
在有UI界面的时候两种优化算法都可以运行,不过在UI界面上读取数据时是从mongodb名为bar_data和tick_data的表读取数据的,要修改的话要往UI界面增加个PyQt的部件来传输collection_name参数。

对于无界面遗传算法若是有同样问题但解决了的大佬,欢迎评论!

Member
avatar
加入于:
帖子: 1468
声望: 105

感谢更新!

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】