VeighNa量化社区
你的开源社区量化交易平台 | vn.py | vnpy
Member
avatar
加入于:
帖子: 1
声望: 0

有1min线合成日线的代码吗?

Super Moderator
avatar
加入于:
帖子: 107
声望: 12

我来帮你查找1分钟线合成日线的代码。让我先搜索相关的文档和策略示例。

现在让我查找具体的策略代码示例,看看如何实现1分钟线合成日线:

现在让我查找VeighNa框架中的BarGenerator源代码,看看它是如何实现日线合成的:

让我查看template.py文件,这是包含BarGenerator类的文件:

现在让我查看BarGenerator的源代码。我需要查找vnpy_ctastrategy中的BarGenerator实现:

让我查找BarGenerator的具体实现代码。我需要在vnpy_ctastrategy中找到BarGenerator的定义:

让我查看vnpy.trader.utility.py文件,因为BarGenerator很可能在其中:

现在我找到了完整的BarGenerator实现!让我为你提供1分钟线合成日线的完整代码示例。

根据VeighNa框架的源代码,BarGenerator类已经内置了日线合成功能。以下是完整的实现方法:

方法一:使用内置的BarGenerator(推荐)

from datetime import time
from vnpy.trader.constant import Interval
from vnpy_ctastrategy import CtaTemplate, BarGenerator, ArrayManager
from vnpy.trader.object import BarData, TickData

class DailyBarStrategy(CtaTemplate):
    """1分钟线合成日线的策略示例"""

    author = "VeighNa"

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        super().__init__(cta_engine, strategy_name, vt_symbol, setting)

        # 创建1分钟K线生成器
        self.bg = BarGenerator(self.on_bar)

        # 创建日线K线生成器,指定每日收盘时间为14:59(适用于股票)
        # 如果是期货,可能需要调整为15:00或其他时间
        daily_end = time(14, 59)
        self.daily_bg = BarGenerator(
            on_bar=self.on_bar, 
            window=1, 
            on_window_bar=self.on_daily_bar, 
            interval=Interval.DAILY, 
            daily_end=daily_end
        )

        # 创建数组管理器用于技术指标计算
        self.am = ArrayManager()
        self.daily_am = ArrayManager()

    def on_init(self):
        """策略初始化"""
        self.write_log("策略初始化")
        self.load_bar(10)  # 加载10天的历史数据

    def on_start(self):
        """策略启动"""
        self.write_log("策略启动")

    def on_stop(self):
        """策略停止"""
        self.write_log("策略停止")

    def on_tick(self, tick: TickData):
        """Tick数据更新"""
        self.bg.update_tick(tick)

    def on_bar(self, bar: BarData):
        """1分钟K线更新"""
        # 兼容2.5.0+版本,设置成交额为0
        bar.turnover = 0

        # 更新到日线生成器
        self.daily_bg.update_bar(bar)

        # 更新1分钟K线到数组管理器(如果需要基于1分钟线交易)
        self.am.update_bar(bar)
        if self.am.inited:
            # 在这里可以添加基于1分钟线的交易逻辑
            pass

    def on_daily_bar(self, bar: BarData):
        """日线K线更新"""
        self.write_log(f"收到日线: {bar.datetime} 收盘价: {bar.close_price}")

        # 更新日线到数组管理器
        self.daily_am.update_bar(bar)
        if self.daily_am.inited:
            # 在这里添加基于日线的交易逻辑
            # 例如计算日线级别的技术指标
            ma20 = self.daily_am.sma(20)
            rsi = self.daily_am.rsi(14)

            # 示例交易逻辑
            if bar.close_price > ma20 and rsi < 70:
                self.buy(bar.close_price + 1, 1)
            elif bar.close_price < ma20 and rsi > 30:
                self.short(bar.close_price - 1, 1)

方法二:自定义日线生成器

如果你需要更灵活的控制,也可以使用自定义的日线生成器:

from datetime import time
from typing import Callable
from vnpy.trader.object import BarData
from vnpy_ctastrategy import CtaTemplate, BarGenerator, ArrayManager

class DailyBarGenerator:
    """自定义日线生成器"""

    def __init__(self, on_daily_bar: Callable):
        self.on_daily_bar = on_daily_bar
        self.last_bar = None
        self.daily_bar = None

    def update_bar(self, bar: BarData):
        """更新1分钟K线,合成日线"""
        # 如果是新的一天,推送前一天的日线
        if self.last_bar and bar.datetime.date() != self.last_bar.datetime.date():
            if self.daily_bar:
                # 设置日线时间为当天0点
                self.daily_bar.datetime = self.daily_bar.datetime.replace(
                    hour=0, minute=0, second=0, microsecond=0
                )
                self.on_daily_bar(self.daily_bar)
                self.daily_bar = None

        # 初始化或更新日线
        if self.daily_bar is None:
            self.daily_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                interval="d",  # 日线
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price,
                close_price=bar.close_price,
                volume=bar.volume,
                turnover=bar.turnover,
                open_interest=bar.open_interest
            )
        else:
            # 更新日线的最高、最低、收盘价和成交量
            self.daily_bar.high_price = max(self.daily_bar.high_price, bar.high_price)
            self.daily_bar.low_price = min(self.daily_bar.low_price, bar.low_price)
            self.daily_bar.close_price = bar.close_price
            self.daily_bar.volume += bar.volume
            self.daily_bar.turnover += bar.turnover
            self.daily_bar.open_interest = bar.open_interest

        self.last_bar = bar

    def generate(self):
        """强制生成日线(在策略停止时调用)"""
        if self.daily_bar:
            self.daily_bar.datetime = self.daily_bar.datetime.replace(
                hour=0, minute=0, second=0, microsecond=0
            )
            self.on_daily_bar(self.daily_bar)
            self.daily_bar = None

class CustomDailyStrategy(CtaTemplate):
    """使用自定义日线生成器的策略"""

    def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
        super().__init__(cta_engine, strategy_name, vt_symbol, setting)

        self.bg = BarGenerator(self.on_bar)
        self.daily_bg = DailyBarGenerator(self.on_daily_bar)
        self.daily_am = ArrayManager()

    def on_bar(self, bar: BarData):
        """1分钟K线处理"""
        bar.turnover = 0
        self.bg.update_bar(bar)  # 这里其实不需要,因为on_tick已经处理了
        self.daily_bg.update_bar(bar)

    def on_daily_bar(self, bar: BarData):
        """日线处理"""
        self.daily_am.update_bar(bar)
        if self.daily_am.inited:
            # 添加你的日线交易逻辑
            pass

关键要点说明:

  1. 每日收盘时间设置:使用daily_end=time(14, 59)参数指定每日的收盘时间。对于A股通常是14:59或15:00,期货可能不同。

  2. BarGenerator参数

    • on_bar: 1分钟K线回调函数
    • window=1: 合成窗口为1(日线)
    • on_window_bar: 日线回调函数
    • interval=Interval.DAILY: 指定为日线
    • daily_end: 每日收盘时间
  3. 数据流:Tick → 1分钟K线 → 日K线

  4. 时间处理:日线的时间通常设置为当天的0点,便于后续处理。

  5. 兼容性:注意在较新版本的VeighNa中,需要手动设置bar.turnover = 0以兼容数据格式。

使用方法一(内置BarGenerator)是最简单和推荐的方式,因为它已经经过充分测试并且处理了各种边界情况。

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】