"""
General utility functions.
"""
import json
import logging
import sys
import datetime
from pathlib import Path
from typing import Callable, Dict, Tuple, Union, Optional
from decimal import Decimal
from math import floor, ceil
import numpy as np
import talib
from .object import BarData, TickData
from .constant import Exchange, Interval
if sys.version_info >= (3, 9):
from zoneinfo import ZoneInfo, available_timezones # noqa
else:
from backports.zoneinfo import ZoneInfo, available_timezones # noqa
log_formatter: logging.Formatter = logging.Formatter('[%(asctime)s] %(message)s')
def extract_vt_symbol(vt_symbol: str) -> Tuple[str, Exchange]:
"""
:return: (symbol, exchange)
"""
symbol, exchange_str = vt_symbol.split(".")
return symbol, Exchange(exchange_str)
def generate_vt_symbol(symbol: str, exchange: Exchange) -> str:
"""
return vt_symbol
"""
return f"{symbol}.{exchange.value}"
def _get_trader_dir(temp_name: str) -> Tuple[Path, Path]:
"""
Get path where trader is running in.
"""
cwd: Path = Path.cwd()
temp_path: Path = cwd.joinpath(temp_name)
# If .vntrader folder exists in current working directory,
# then use it as trader running path.
if temp_path.exists():
return cwd, temp_path
# Otherwise use home path of system.
home_path: Path = Path.home()
temp_path: Path = home_path.joinpath(temp_name)
# Create .vntrader folder under home path if not exist.
if not temp_path.exists():
temp_path.mkdir()
return home_path, temp_path
TRADER_DIR, TEMP_DIR = _get_trader_dir(".vntrader")
sys.path.append(str(TRADER_DIR))
def get_file_path(filename: str) -> Path:
"""
Get path for temp file with filename.
"""
return TEMP_DIR.joinpath(filename)
def get_folder_path(folder_name: str) -> Path:
"""
Get path for temp folder with folder name.
"""
folder_path: Path = TEMP_DIR.joinpath(folder_name)
if not folder_path.exists():
folder_path.mkdir()
return folder_path
def get_icon_path(filepath: str, ico_name: str) -> str:
"""
Get path for icon file with ico name.
"""
ui_path: Path = Path(filepath).parent
icon_path: Path = ui_path.joinpath("ico", ico_name)
return str(icon_path)
def load_json(filename: str) -> dict:
"""
Load data from json file in temp path.
"""
filepath: Path = get_file_path(filename)
if filepath.exists():
with open(filepath, mode="r", encoding="UTF-8") as f:
data: dict = json.load(f)
return data
else:
save_json(filename, {})
return {}
def save_json(filename: str, data: dict) -> None:
"""
Save data into json file in temp path.
"""
filepath: Path = get_file_path(filename)
with open(filepath, mode="w+", encoding="UTF-8") as f:
json.dump(
data,
f,
indent=4,
ensure_ascii=False
)
def round_to(value: float, target: float) -> float:
"""
Round price to price tick value.
"""
value: Decimal = Decimal(str(value))
target: Decimal = Decimal(str(target))
rounded: float = float(int(round(value / target)) * target)
return rounded
def floor_to(value: float, target: float) -> float:
"""
Similar to math.floor function, but to target float number.
"""
value: Decimal = Decimal(str(value))
target: Decimal = Decimal(str(target))
result: float = float(int(floor(value / target)) * target)
return result
def ceil_to(value: float, target: float) -> float:
"""
Similar to math.ceil function, but to target float number.
"""
value: Decimal = Decimal(str(value))
target: Decimal = Decimal(str(target))
result: float = float(int(ceil(value / target)) * target)
return result
def get_digits(value: float) -> int:
"""
Get number of digits after decimal point.
"""
value_str: str = str(value)
if "e-" in value_str:
_, buf = value_str.split("e-")
return int(buf)
elif "." in value_str:
_, buf = value_str.split(".")
return len(buf)
else:
return 0
class BarGenerator:
"""
For:
1. generating 1 minute bar data from tick data
2. generating x minute bar/x hour bar data from 1 minute data
Notice:
1. for x minute bar, x must be able to divide 60: 2, 3, 5, 6, 10, 15, 20, 30
2. for x hour bar, x can be any number
"""
def __init__(
self,
on_bar: Callable,
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
) -> None:
"""Constructor"""
self.bar: BarData = None
self.on_bar: Callable = on_bar
self.interval: Interval = interval
self.interval_count: int = 0
self.hour_bar: BarData = None
self.day_bar: BarData = None
self.week_bar: BarData = None
self.window: int = window
self.window_bar: BarData = None
self.on_window_bar: Callable = on_window_bar
self.last_tick: TickData = None
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute: bool = False
# Filter tick data with 0 last price
if not tick.last_price:
return
# Filter tick data with older timestamp
if self.last_tick and tick.datetime < self.last_tick.datetime:
return
if not self.bar:
new_minute = True
elif (
(self.bar.datetime.minute != tick.datetime.minute)
or (self.bar.datetime.hour != tick.datetime.hour)
):
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
if tick.high_price > self.last_tick.high_price:
self.bar.high_price = max(self.bar.high_price, tick.high_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
if tick.low_price < self.last_tick.low_price:
self.bar.low_price = min(self.bar.low_price, tick.low_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
volume_change: float = tick.volume - self.last_tick.volume
self.bar.volume += max(volume_change, 0)
turnover_change: float = tick.turnover - self.last_tick.turnover
self.bar.turnover += max(turnover_change, 0)
self.last_tick = tick
def update_bar(self, bar: BarData) -> None:
"""
Update 1 minute bar into generator
"""
# if self.interval == Interval.MINUTE:
# self.update_bar_minute_window(bar)
# else:
# self.update_bar_hour_window(bar)
if self.interval == Interval.MINUTE:
self.update_bar_minute_window(bar)
elif self.interval == Interval.HOUR:
self.update_bar_hour_window(bar)
elif self.interval == Interval.DAILY:
self.update_bar_day_window(bar) #处理日线
else:
self.update_bar_week_window(bar) #处理周线
def update_bar_minute_window(self, bar: BarData) -> None:
""""""
# If not inited, create window bar object
if not self.window_bar:
dt: datetime = bar.datetime.replace(second=0, microsecond=0)
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
# Otherwise, update high/low price into window bar
else:
self.window_bar.high_price = max(
self.window_bar.high_price,
bar.high_price
)
self.window_bar.low_price = min(
self.window_bar.low_price,
bar.low_price
)
# Update close price/volume/turnover into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += bar.volume
self.window_bar.turnover += bar.turnover
self.window_bar.open_interest = bar.open_interest
# Check if window bar completed
if not (bar.datetime.minute + 1) % self.window:
self.on_window_bar(self.window_bar)
self.window_bar = None
def update_bar_hour_window(self, bar: BarData) -> None:
""""""
# If not inited, create window bar object
if not self.hour_bar:
dt: datetime = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.hour_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
close_price=bar.close_price,
volume=bar.volume,
turnover=bar.turnover,
open_interest=bar.open_interest
)
return
finished_bar: BarData = None
# If minute is 59, update minute bar into window bar and push
if bar.datetime.minute == 59:
self.hour_bar.high_price = max(
self.hour_bar.high_price,
bar.high_price
)
self.hour_bar.low_price = min(
self.hour_bar.low_price,
bar.low_price
)
self.hour_bar.close_price = bar.close_price
self.hour_bar.volume += bar.volume
self.hour_bar.turnover += bar.turnover
self.hour_bar.open_interest = bar.open_interest
finished_bar = self.hour_bar
self.hour_bar = None
# If minute bar of new hour, then push existing window bar
elif bar.datetime.hour != self.hour_bar.datetime.hour:
finished_bar = self.hour_bar
dt: datetime = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.hour_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
close_price=bar.close_price,
volume=bar.volume,
turnover=bar.turnover,
open_interest=bar.open_interest
)
# Otherwise only update minute bar
else:
self.hour_bar.high_price = max(
self.hour_bar.high_price,
bar.high_price
)
self.hour_bar.low_price = min(
self.hour_bar.low_price,
bar.low_price
)
self.hour_bar.close_price = bar.close_price
self.hour_bar.volume += bar.volume
self.hour_bar.turnover += bar.turnover
self.hour_bar.open_interest = bar.open_interest
# Push finished window bar
if finished_bar:
self.on_hour_bar(finished_bar)
添加部分
def update_bar_day_window(self, bar: BarData) -> None:
""""""
# 没有日线bar就生成一个
if not self.day_bar:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.day_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
volume=bar.volume
)
return
finished_bar = None
temp_datetime = (bar.datetime + datetime.timedelta(hours=4))
# 14:59 更新bar,生成新的日线bar
if bar.datetime.minute == 59 and bar.datetime.hour == 14:
self.day_bar.high_price = max(
self.day_bar.high_price,
bar.high_price
)
self.day_bar.low_price = min(
self.day_bar.low_price,
bar.low_price
)
self.day_bar.close_price = bar.close_price
self.day_bar.volume += int(bar.volume)
self.day_bar.open_interest = bar.open_interest
finished_bar = self.day_bar #保存日线bar
self.day_bar = None #因为日线bar已经保存给finished_bar了所以将日线bar设为空,下次新数据来了就会生成新的日线bar
# 夜盘算新的一天的开始,
# 现存的bar加上5小时如果是周六的话就那代表是周五的夜盘数据,而它对应的白天数据是下周一的,隔了2天加5个小时还是不够的,
# 所以特判一下如果现存的self.day_bar是周五的话不要用5小时判断,剩下不用管他,因为下周一的夜盘进来的话会被+5小时的条件判断掉,进而将周五夜盘和周一白天的数据推送出去
elif temp_datetime.day != (self.day_bar.datetime+ datetime.timedelta(hours=5)).day and (self.day_bar.datetime+ datetime.timedelta(hours=5)).weekday() != 5:
finished_bar = self.week_bar
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.day_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
close_price=bar.close_price,
volume=bar.volume
)
# 更新 现存的day_bar
else:
self.day_bar.high_price = max(
self.day_bar.high_price,
bar.high_price
)
self.day_bar.low_price = min(
self.day_bar.low_price,
bar.low_price
)
self.day_bar.close_price = bar.close_price
self.day_bar.volume += int(bar.volume)
self.day_bar.open_interest = bar.open_interest
# 推送日线给on_hour_bar处理
if finished_bar:
self.on_hour_bar(finished_bar)
# Cache last bar object
self.last_bar = bar
添加部分
def update_bar_week_window(self, bar: BarData) -> None:
""""""
# If not inited, create window bar object
if not self.week_bar:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.week_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
volume=bar.volume
)
return
finished_bar = None
# If time is Firday 14:59, update day bar into window bar and push
if bar.datetime.minute == 59 and bar.datetime.hour == 14 and bar.datetime.weekday() == 4:
self.week_bar.high_price = max(
self.week_bar.high_price,
bar.high_price
)
self.week_bar.low_price = min(
self.week_bar.low_price,
bar.low_price
)
self.week_bar.close_price = bar.close_price
self.week_bar.volume += int(bar.volume)
self.week_bar.open_interest = bar.open_interest
finished_bar = self.week_bar
self.week_bar = None
# isocalendar() 返回多少年的第几周的第几天 格式如(2018, 27, 5)
# 周数不相同肯定是新的一周,可以推送出一根完整周k线了
elif (bar.datetime + datetime.timedelta(days=2,hours=5)).isocalendar()[1] != (self.week_bar.datetime + datetime.timedelta(days=2,hours=5)).isocalendar()[1]:
# print(bar.datetime.isocalendar())
finished_bar = self.week_bar
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.week_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
close_price=bar.close_price,
volume=bar.volume
)
# Otherwise only update minute bar
else:
self.week_bar.high_price = max(
self.week_bar.high_price,
bar.high_price
)
self.week_bar.low_price = min(
self.week_bar.low_price,
bar.low_price
)
self.week_bar.close_price = bar.close_price
self.week_bar.volume += int(bar.volume)
self.week_bar.open_interest = bar.open_interest
# Push finished window bar
if finished_bar:
self.on_hour_bar(finished_bar) #on_window_bar只关心bar的数量,不关心bar的类型,所以可以直接调用
# Cache last bar object
self.last_bar = bar
#添加部分
def on_hour_bar(self, bar: BarData) -> None:
""""""
if self.window == 1:
self.on_window_bar(bar)
else:
if not self.window_bar:
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=bar.datetime,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
else:
self.window_bar.high_price = max(
self.window_bar.high_price,
bar.high_price
)
self.window_bar.low_price = min(
self.window_bar.low_price,
bar.low_price
)
self.window_bar.close_price = bar.close_price
self.window_bar.volume += bar.volume
self.window_bar.turnover += bar.turnover
self.window_bar.open_interest = bar.open_interest
self.interval_count += 1
if not self.interval_count % self.window:
self.interval_count = 0
self.on_window_bar(self.window_bar)
self.window_bar = None
def generate(self) -> Optional[BarData]:
"""
Generate the bar data and call callback immediately.
"""
bar: BarData = self.bar
if self.bar:
bar.datetime = bar.datetime.replace(second=0, microsecond=0)
self.on_bar(bar)
self.bar = None
return bar