VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 19
声望: 1

"""
General utility functions.
"""

import json
import logging
import sys
import datetime
from pathlib import Path
from typing import Callable, Dict, Tuple, Union, Optional
from decimal import Decimal
from math import floor, ceil

import numpy as np
import talib

from .object import BarData, TickData
from .constant import Exchange, Interval

if sys.version_info >= (3, 9):
from zoneinfo import ZoneInfo, available_timezones # noqa
else:
from backports.zoneinfo import ZoneInfo, available_timezones # noqa

log_formatter: logging.Formatter = logging.Formatter('[%(asctime)s] %(message)s')

def extract_vt_symbol(vt_symbol: str) -> Tuple[str, Exchange]:
"""
:return: (symbol, exchange)
"""
symbol, exchange_str = vt_symbol.split(".")
return symbol, Exchange(exchange_str)

def generate_vt_symbol(symbol: str, exchange: Exchange) -> str:
"""
return vt_symbol
"""
return f"{symbol}.{exchange.value}"

def _get_trader_dir(temp_name: str) -> Tuple[Path, Path]:
"""
Get path where trader is running in.
"""
cwd: Path = Path.cwd()
temp_path: Path = cwd.joinpath(temp_name)

# If .vntrader folder exists in current working directory,
# then use it as trader running path.
if temp_path.exists():
    return cwd, temp_path

# Otherwise use home path of system.
home_path: Path = Path.home()
temp_path: Path = home_path.joinpath(temp_name)

# Create .vntrader folder under home path if not exist.
if not temp_path.exists():
    temp_path.mkdir()

return home_path, temp_path


TRADER_DIR, TEMP_DIR = _get_trader_dir(".vntrader")
sys.path.append(str(TRADER_DIR))

def get_file_path(filename: str) -> Path:
"""
Get path for temp file with filename.
"""
return TEMP_DIR.joinpath(filename)

def get_folder_path(folder_name: str) -> Path:
"""
Get path for temp folder with folder name.
"""
folder_path: Path = TEMP_DIR.joinpath(folder_name)
if not folder_path.exists():
folder_path.mkdir()
return folder_path

def get_icon_path(filepath: str, ico_name: str) -> str:
"""
Get path for icon file with ico name.
"""
ui_path: Path = Path(filepath).parent
icon_path: Path = ui_path.joinpath("ico", ico_name)
return str(icon_path)

def load_json(filename: str) -> dict:
"""
Load data from json file in temp path.
"""
filepath: Path = get_file_path(filename)

if filepath.exists():
    with open(filepath, mode="r", encoding="UTF-8") as f:
        data: dict = json.load(f)
    return data
else:
    save_json(filename, {})
    return {}


def save_json(filename: str, data: dict) -> None:
"""
Save data into json file in temp path.
"""
filepath: Path = get_file_path(filename)
with open(filepath, mode="w+", encoding="UTF-8") as f:
json.dump(
data,
f,
indent=4,
ensure_ascii=False
)

def round_to(value: float, target: float) -> float:
"""
Round price to price tick value.
"""
value: Decimal = Decimal(str(value))
target: Decimal = Decimal(str(target))
rounded: float = float(int(round(value / target)) * target)
return rounded

def floor_to(value: float, target: float) -> float:
"""
Similar to math.floor function, but to target float number.
"""
value: Decimal = Decimal(str(value))
target: Decimal = Decimal(str(target))
result: float = float(int(floor(value / target)) * target)
return result

def ceil_to(value: float, target: float) -> float:
"""
Similar to math.ceil function, but to target float number.
"""
value: Decimal = Decimal(str(value))
target: Decimal = Decimal(str(target))
result: float = float(int(ceil(value / target)) * target)
return result

def get_digits(value: float) -> int:
"""
Get number of digits after decimal point.
"""
value_str: str = str(value)

if "e-" in value_str:
    _, buf = value_str.split("e-")
    return int(buf)
elif "." in value_str:
    _, buf = value_str.split(".")
    return len(buf)
else:
    return 0


class BarGenerator:
"""
For:

1. generating 1 minute bar data from tick data
2. generating x minute bar/x hour bar data from 1 minute data
Notice:
1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
2. for x hour bar, x can be any number
"""

def __init__(
    self,
    on_bar: Callable,
    window: int = 0,
    on_window_bar: Callable = None,
    interval: Interval = Interval.MINUTE

) -> None:
    """Constructor"""
    self.bar: BarData = None
    self.on_bar: Callable = on_bar

    self.interval: Interval = interval
    self.interval_count: int = 0

    self.hour_bar: BarData = None
    self.day_bar: BarData = None
    self.week_bar: BarData = None

    self.window: int = window
    self.window_bar: BarData = None
    self.on_window_bar: Callable = on_window_bar

    self.last_tick: TickData = None

def update_tick(self, tick: TickData) -> None:
    """
    Update new tick data into generator.
    """
    new_minute: bool = False

    # Filter tick data with 0 last price
    if not tick.last_price:
        return

    # Filter tick data with older timestamp
    if self.last_tick and tick.datetime < self.last_tick.datetime:
        return

    if not self.bar:
        new_minute = True
    elif (
        (self.bar.datetime.minute != tick.datetime.minute)
        or (self.bar.datetime.hour != tick.datetime.hour)
    ):
        self.bar.datetime = self.bar.datetime.replace(
            second=0, microsecond=0
        )
        self.on_bar(self.bar)

        new_minute = True

    if new_minute:
        self.bar = BarData(
            symbol=tick.symbol,
            exchange=tick.exchange,
            interval=Interval.MINUTE,
            datetime=tick.datetime,
            gateway_name=tick.gateway_name,
            open_price=tick.last_price,
            high_price=tick.last_price,
            low_price=tick.last_price,
            close_price=tick.last_price,
            open_interest=tick.open_interest
        )
    else:
        self.bar.high_price = max(self.bar.high_price, tick.last_price)
        if tick.high_price > self.last_tick.high_price:
            self.bar.high_price = max(self.bar.high_price, tick.high_price)

        self.bar.low_price = min(self.bar.low_price, tick.last_price)
        if tick.low_price < self.last_tick.low_price:
            self.bar.low_price = min(self.bar.low_price, tick.low_price)

        self.bar.close_price = tick.last_price
        self.bar.open_interest = tick.open_interest
        self.bar.datetime = tick.datetime

    if self.last_tick:
        volume_change: float = tick.volume - self.last_tick.volume
        self.bar.volume += max(volume_change, 0)

        turnover_change: float = tick.turnover - self.last_tick.turnover
        self.bar.turnover += max(turnover_change, 0)

    self.last_tick = tick

def update_bar(self, bar: BarData) -> None:
    """
    Update 1 minute bar into generator
    """
    # if self.interval == Interval.MINUTE:
    #     self.update_bar_minute_window(bar)
    # else:
    #     self.update_bar_hour_window(bar)
    if self.interval == Interval.MINUTE:
        self.update_bar_minute_window(bar)
    elif self.interval == Interval.HOUR:
        self.update_bar_hour_window(bar)
    elif self.interval == Interval.DAILY: 
        self.update_bar_day_window(bar) #处理日线
    else:
        self.update_bar_week_window(bar) #处理周线


def update_bar_minute_window(self, bar: BarData) -> None:
    """"""
    # If not inited, create window bar object
    if not self.window_bar:
        dt: datetime = bar.datetime.replace(second=0, microsecond=0)
        self.window_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price
        )
    # Otherwise, update high/low price into window bar
    else:
        self.window_bar.high_price = max(
            self.window_bar.high_price,
            bar.high_price
        )
        self.window_bar.low_price = min(
            self.window_bar.low_price,
            bar.low_price
        )

    # Update close price/volume/turnover into window bar
    self.window_bar.close_price = bar.close_price
    self.window_bar.volume += bar.volume
    self.window_bar.turnover += bar.turnover
    self.window_bar.open_interest = bar.open_interest

    # Check if window bar completed
    if not (bar.datetime.minute + 1) % self.window:
        self.on_window_bar(self.window_bar)
        self.window_bar = None

def update_bar_hour_window(self, bar: BarData) -> None:
    """"""
    # If not inited, create window bar object
    if not self.hour_bar:
        dt: datetime = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.hour_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            close_price=bar.close_price,
            volume=bar.volume,
            turnover=bar.turnover,
            open_interest=bar.open_interest
        )
        return

    finished_bar: BarData = None

    # If minute is 59, update minute bar into window bar and push
    if bar.datetime.minute == 59:
        self.hour_bar.high_price = max(
            self.hour_bar.high_price,
            bar.high_price
        )
        self.hour_bar.low_price = min(
            self.hour_bar.low_price,
            bar.low_price
        )

        self.hour_bar.close_price = bar.close_price
        self.hour_bar.volume += bar.volume
        self.hour_bar.turnover += bar.turnover
        self.hour_bar.open_interest = bar.open_interest

        finished_bar = self.hour_bar
        self.hour_bar = None

    # If minute bar of new hour, then push existing window bar
    elif bar.datetime.hour != self.hour_bar.datetime.hour:
        finished_bar = self.hour_bar

        dt: datetime = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.hour_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            close_price=bar.close_price,
            volume=bar.volume,
            turnover=bar.turnover,
            open_interest=bar.open_interest
        )
    # Otherwise only update minute bar
    else:
        self.hour_bar.high_price = max(
            self.hour_bar.high_price,
            bar.high_price
        )
        self.hour_bar.low_price = min(
            self.hour_bar.low_price,
            bar.low_price
        )

        self.hour_bar.close_price = bar.close_price
        self.hour_bar.volume += bar.volume
        self.hour_bar.turnover += bar.turnover
        self.hour_bar.open_interest = bar.open_interest

    # Push finished window bar
    if finished_bar:
        self.on_hour_bar(finished_bar)

添加部分

def update_bar_day_window(self, bar: BarData) -> None:
    """"""
    # 没有日线bar就生成一个

    if not self.day_bar:
        dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.day_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            volume=bar.volume
        )
        return

    finished_bar = None
    temp_datetime = (bar.datetime + datetime.timedelta(hours=4))
    # 14:59 更新bar,生成新的日线bar
    if bar.datetime.minute == 59 and bar.datetime.hour == 14:
        self.day_bar.high_price = max(
            self.day_bar.high_price,
            bar.high_price
        )
        self.day_bar.low_price = min(
            self.day_bar.low_price,
            bar.low_price
        )

        self.day_bar.close_price = bar.close_price
        self.day_bar.volume += int(bar.volume)
        self.day_bar.open_interest = bar.open_interest

        finished_bar = self.day_bar #保存日线bar
        self.day_bar = None #因为日线bar已经保存给finished_bar了所以将日线bar设为空,下次新数据来了就会生成新的日线bar

    # 夜盘算新的一天的开始,
    # 现存的bar加上5小时如果是周六的话就那代表是周五的夜盘数据,而它对应的白天数据是下周一的,隔了2天加5个小时还是不够的,
    # 所以特判一下如果现存的self.day_bar是周五的话不要用5小时判断,剩下不用管他,因为下周一的夜盘进来的话会被+5小时的条件判断掉,进而将周五夜盘和周一白天的数据推送出去

    elif temp_datetime.day != (self.day_bar.datetime+  datetime.timedelta(hours=5)).day and  (self.day_bar.datetime+  datetime.timedelta(hours=5)).weekday() != 5:

        finished_bar = self.week_bar

        dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.day_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            close_price=bar.close_price,
            volume=bar.volume
        )
    # 更新 现存的day_bar
    else:
        self.day_bar.high_price = max(
            self.day_bar.high_price,
            bar.high_price
        )
        self.day_bar.low_price = min(
            self.day_bar.low_price,
            bar.low_price
        )

        self.day_bar.close_price = bar.close_price
        self.day_bar.volume += int(bar.volume)
        self.day_bar.open_interest = bar.open_interest

    # 推送日线给on_hour_bar处理
    if finished_bar:
        self.on_hour_bar(finished_bar)

    # Cache last bar object
    self.last_bar = bar

添加部分

def update_bar_week_window(self, bar: BarData) -> None:
    """"""
    # If not inited, create window bar object

    if not self.week_bar:
        dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.week_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            volume=bar.volume
        )
        return

    finished_bar = None

    # If time is Firday 14:59, update day bar into window bar and push
    if bar.datetime.minute == 59 and bar.datetime.hour == 14 and bar.datetime.weekday() == 4:
        self.week_bar.high_price = max(
            self.week_bar.high_price,
            bar.high_price
        )
        self.week_bar.low_price = min(
            self.week_bar.low_price,
            bar.low_price
        )

        self.week_bar.close_price = bar.close_price
        self.week_bar.volume += int(bar.volume)
        self.week_bar.open_interest = bar.open_interest

        finished_bar = self.week_bar
        self.week_bar = None

    # isocalendar() 返回多少年的第几周的第几天 格式如(2018, 27, 5)
    # 周数不相同肯定是新的一周,可以推送出一根完整周k线了

    elif  (bar.datetime + datetime.timedelta(days=2,hours=5)).isocalendar()[1] != (self.week_bar.datetime + datetime.timedelta(days=2,hours=5)).isocalendar()[1]:
        # print(bar.datetime.isocalendar())
        finished_bar = self.week_bar

        dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
        self.week_bar = BarData(
            symbol=bar.symbol,
            exchange=bar.exchange,
            datetime=dt,
            gateway_name=bar.gateway_name,
            open_price=bar.open_price,
            high_price=bar.high_price,
            low_price=bar.low_price,
            close_price=bar.close_price,
            volume=bar.volume
        )
    # Otherwise only update minute bar
    else:
        self.week_bar.high_price = max(
            self.week_bar.high_price,
            bar.high_price
        )
        self.week_bar.low_price = min(
            self.week_bar.low_price,
            bar.low_price
        )
        self.week_bar.close_price = bar.close_price
        self.week_bar.volume += int(bar.volume)
        self.week_bar.open_interest = bar.open_interest

    # Push finished window bar
    if finished_bar:
        self.on_hour_bar(finished_bar) #on_window_bar只关心bar的数量,不关心bar的类型,所以可以直接调用

    # Cache last bar object
    self.last_bar = bar
 #添加部分

def on_hour_bar(self, bar: BarData) -> None:
    """"""
    if self.window == 1:
        self.on_window_bar(bar)
    else:
        if not self.window_bar:
            self.window_bar = BarData(
                symbol=bar.symbol,
                exchange=bar.exchange,
                datetime=bar.datetime,
                gateway_name=bar.gateway_name,
                open_price=bar.open_price,
                high_price=bar.high_price,
                low_price=bar.low_price
            )
        else:
            self.window_bar.high_price = max(
                self.window_bar.high_price,
                bar.high_price
            )
            self.window_bar.low_price = min(
                self.window_bar.low_price,
                bar.low_price
            )

        self.window_bar.close_price = bar.close_price
        self.window_bar.volume += bar.volume
        self.window_bar.turnover += bar.turnover
        self.window_bar.open_interest = bar.open_interest

        self.interval_count += 1
        if not self.interval_count % self.window:
            self.interval_count = 0
            self.on_window_bar(self.window_bar)
            self.window_bar = None

def generate(self) -> Optional[BarData]:
    """
    Generate the bar data and call callback immediately.
    """
    bar: BarData = self.bar

    if self.bar:
        bar.datetime = bar.datetime.replace(second=0, microsecond=0)
        self.on_bar(bar)

    self.bar = None
    return bar
Member
avatar
加入于:
帖子: 11
声望: 0

方便留个联系方式吗,我之前也是看csdn的代码,发现大商所的品种每周四晚上就推送周线

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】