VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 5
声望: 0

rqdata被剥离后应该怎么改呢,这个rqdataclient现在是什么
database_manger现在是get_database,应该怎么改

Member
avatar
加入于:
帖子: 4704
声望: 287

可参考https://github.com/vnpy/vnpy/blob/dev/docs/database.md
https://github.com/vnpy/vnpy/blob/dev/docs/datafeed.md

Member
avatar
加入于:
帖子: 11
声望: 2

在前面各位基础上更新VN2.9版本

from datetime import datetime
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.datafeed import get_datafeed
from vnpy.trader.object import HistoryRequest
from vnpy.trader.database import get_database

获取数据服务实例

database = get_database()
datafeed = get_datafeed()

updateInfo=[]
barOverViews=database.get_bar_overview()
for barOverView in barOverViews:

#print(f"{barOverView.symbol} start:{barOverView.start} end:{barOverView.end} count:{barOverView.count}")
updateInfo.append((barOverView.symbol,barOverView.exchange,barOverView.end,barOverView.interval))

endDt=datetime.now()

for info in updateInfo:
print(f'更新:{info[0]},Interval:{info[3]}')
req=HistoryRequest(
symbol=info[0],
exchange=info[1],
start=info[2],
end=endDt,
interval=info[3],
)

获取k线历史数据

data = datafeed.query_bar_history(req)
#保存入数据库
database.save_bar_data(data)
print(f"{req.symbol}历史数据下载完成")
Member
avatar
加入于:
帖子: 6
声望: 0

用的是tushare,将数据保存到Mongod。想问下这个报错怎么解决

from vnpy.trader.rqdata import rqdata_client

from vnpy.trader.database import get_database
from vnpy.trader.datafeed import get_datafeed
from typing import List
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import HistoryRequest, BarData
from datetime import datetime
import tushare as ts
from vnpy_tushare.tushare_datafeed import TushareDatafeed
from pymongo import MongoClient
from vnpy_mongodb.mongodb_database import MongodbDatabase

from vnpy.trader.datafeed import BaseDatafeed

database_manager = get_database()
datafeed = get_datafeed()

start_date = datetime(2005,1,1)
end_date = datetime(2030,12,31)

symbols={
"SHFE": ["AG","AL","AU","BU","CU","FU","HC","LU","NI","NR","PB","RB","RU","SC","SN","SP","SS","WR","ZN"],
"DCE": ["A","B","BB","C","CS","EG","EB","FB","I","J","JD","JM","L","LH","M","P","PG","PP","RR","V","Y"],
"CZCE": ["AP","CF","CJ","CY","FG","JR","LR","MA","OI","PF","PM","RI","RM","RS","SA","SF","SM","SR","TA","UR","WH","ZC"],
"CFFEX": ["IC","IF","IH","T","TF","TS"]
}
symbol_type = "301"
print("symbols字典创建成功")

tsdatafeed = TushareDatafeed()
tsdatafeed.init()
mgdb = MongodbDatabase()

def load_data(req):
data = tsdatafeed.query_bar_history(req)
mgdb.save_bar_data(data)
print(f"{req.symbol}历史数据下载完成")

for exchange, symbols_list in symbols.items():
for s in symbols_list:
req = HistoryRequest(
symbol=s+str(symbol_type),
exchange=Exchange(exchange),
start=start_date,
interval=Interval.MINUTE,
end=end_date,
)
load_data(req=req)
print("运行结束!")


InvalidOperation Traceback (most recent call last)
Cell In[19], line 19
11 for s in symbols_list:
12 req = HistoryRequest(
13 symbol=s+str(symbol_type),
14 exchange=Exchange(exchange),
(...)
17 end=end_date,
18 )
---> 19 load_data(req=req)
20 print("运行结束!")

Cell In[19], line 7, in load_data(req)
5 def load_data(req):
6 data = tsdatafeed.query_bar_history(req)
----> 7 mgdb.save_bar_data(data)
8 print(f"{req.symbol}历史数据下载完成")

File E:\VNPY\lib\site-packages\vnpy_mongodb\mongodb_database.py:123, in MongodbDatabase.save_bar_data(self, bars, stream)
107 d: dict = {
108 "symbol": bar.symbol,
109 "exchange": bar.exchange.value,
(...)
118 "close_price": bar.close_price,
119 }
121 requests.append(ReplaceOne(filter, d, upsert=True))
--> 123 self.bar_collection.bulk_write(requests, ordered=False)
125 # 更新汇总
126 filter: dict = {
127 "symbol": bar.symbol,
128 "exchange": bar.exchange.value,
129 "interval": bar.interval.value
130 }

File E:\VNPY\lib\site-packages\pymongo_csot.py:108, in apply.<locals>.csot_wrapper(self, args, **kwargs)
106 with _TimeoutContext(timeout):
107 return func(self,
args, kwargs)
--> 108 return func(self, *args,
kwargs)

File E:\VNPY\lib\site-packages\pymongo\collection.py:571, in Collection.bulk_write(self, requests, ordered, bypass_document_validation, session, comment, let)
568 raise TypeError(f"{request!r} is not a valid request")
570 write_concern = self._write_concern_for(session)
--> 571 bulk_api_result = blk.execute(write_concern, session)
572 if bulk_api_result is not None:
573 return BulkWriteResult(bulk_api_result, True)

File E:\VNPY\lib\site-packages\pymongo\bulk.py:562, in _Bulk.execute(self, write_concern, session)
560 """Execute operations."""
561 if not self.ops:
--> 562 raise InvalidOperation("No operations to execute")
563 if self.executed:
564 raise InvalidOperation("Bulk operations can only be executed once.")

InvalidOperation: No operations to execute

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】