基于https://www.vnpy.com/forum/topic/3759-vn-pyshe-qu-jing-xuan-22-kan-wan-zhe-pian-che-di-xue-hui-csvli-shi-shu-ju-dao-ru
在点击初始化时会调用akshare下载分时数据入库,小白第一次发帖,简单改动一下,各位看看有没有bug
!!!akshare只有最近3到5天分时数据!!!
1下载数据和导入
# 导入 database_manager 模块
from vnpy.trader.constant import (Exchange, Interval)
from vnpy.trader.object import (BarData,TickData)
import pandas as pd
from pytz import timezone
from vnpy.trader.database.initialize import init_sql
from vnpy.trader.database.database import Driver
from datetime import datetime
CHINA_TZ = timezone("Asia/Shanghai")
settings={
"database": "database.db",
"host": "localhost",
"port": 3306,
"user": "root",
"password": "",
"authentication_source": "admin"
}
sqlite_manager = init_sql(driver=Driver.SQLITE, settings=settings)
# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame, input_symbol:'RB99' ,
exchange:Exchange.SHFE , interval:Interval.MINUTE , collection_name:str = None):
bars = []
start = None
count = 0
imported_data['symbol'] = input_symbol
imported_data['exchange'] = exchange
imported_data['interval'] = interval
for sy,row in imported_data.iterrows():
# print(row['datetime'])
dt = datetime.fromisoformat(row['datetime'])
bar = BarData(
symbol=row['symbol'],
exchange=row['exchange'],
datetime = dt,
interval=row['interval'],
volume=row['volume'],
open_price=row['open'],
high_price=row['high'],
low_price=row['low'],
close_price=row['close'],
open_interest=row.get("open_interest", 0),
gateway_name="DB",
)
bars.append(bar)
# do some statistics
count += 1
if not start:
start = bar.datetime
end = bar.datetime
# insert into database
sqlite_manager.save_bar_data(bars)
# database_manager.save_bar_data(bars) 数据库mongodb
print(f"Insert Bar: {count} from {start} - {end}")
2修改D:\vnstudio\Lib\site-packages\vnpy\app\cta_strategy\engine.py中的
from vnpy.trader.constant import (Exchange, Interval)
import akshare as ak
from vnpy.app.cta_strategy import 手动下载导入 as imp_vn
# 手动下载导入 as imp_vn
def _init_strategy(self, strategy_name: str):
"""
Init strategies in queue.
"""
strategy = self.strategies[strategy_name]
if strategy.inited:
self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
return
self.write_log(f"{strategy_name}开始执行初始化")
# Call on_init function of strategy
self.call_strategy_func(strategy, strategy.on_init)
# Restore strategy data(variables)
data = self.strategy_data.get(strategy_name, None)
if data:
for name in strategy.variables:
value = data.get(name, None)
if value:
setattr(strategy, name, value)
# Subscribe market data
contract = self.main_engine.get_contract(strategy.vt_symbol)
# 下载策略
rbdf = ak.futures_zh_minute_sina(contract.symbol, '1')
rbdf.rename(columns={'date': 'datetime'}, inplace=True)
imp_vn.move_df_to_mongodb(rbdf, contract.symbol, contract.exchange, Interval.MINUTE)
if contract:
req = SubscribeRequest(
symbol=contract.symbol, exchange=contract.exchange)
self.main_engine.subscribe(req, contract.gateway_name)
else:
self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)
# Put event to update init completed status.
strategy.inited = True
self.put_strategy_event(strategy)
self.write_log(f"{strategy_name}初始化完成")