rqdata被剥离后应该怎么改呢,这个rqdataclient现在是什么
database_manger现在是get_database,应该怎么改
rqdata被剥离后应该怎么改呢,这个rqdataclient现在是什么
database_manger现在是get_database,应该怎么改
from datetime import datetime
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.datafeed import get_datafeed
from vnpy.trader.object import HistoryRequest
from vnpy.trader.database import get_database
database = get_database()
datafeed = get_datafeed()
updateInfo=[]
barOverViews=database.get_bar_overview()
for barOverView in barOverViews:
#print(f"{barOverView.symbol} start:{barOverView.start} end:{barOverView.end} count:{barOverView.count}")
updateInfo.append((barOverView.symbol,barOverView.exchange,barOverView.end,barOverView.interval))
endDt=datetime.now()
for info in updateInfo:
print(f'更新:{info[0]},Interval:{info[3]}')
req=HistoryRequest(
symbol=info[0],
exchange=info[1],
start=info[2],
end=endDt,
interval=info[3],
)
data = datafeed.query_bar_history(req)
#保存入数据库
database.save_bar_data(data)
print(f"{req.symbol}历史数据下载完成")
用的是tushare,将数据保存到Mongod。想问下这个报错怎么解决
from vnpy.trader.database import get_database
from vnpy.trader.datafeed import get_datafeed
from typing import List
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import HistoryRequest, BarData
from datetime import datetime
import tushare as ts
from vnpy_tushare.tushare_datafeed import TushareDatafeed
from pymongo import MongoClient
from vnpy_mongodb.mongodb_database import MongodbDatabase
database_manager = get_database()
datafeed = get_datafeed()
start_date = datetime(2005,1,1)
end_date = datetime(2030,12,31)
symbols={
"SHFE": ["AG","AL","AU","BU","CU","FU","HC","LU","NI","NR","PB","RB","RU","SC","SN","SP","SS","WR","ZN"],
"DCE": ["A","B","BB","C","CS","EG","EB","FB","I","J","JD","JM","L","LH","M","P","PG","PP","RR","V","Y"],
"CZCE": ["AP","CF","CJ","CY","FG","JR","LR","MA","OI","PF","PM","RI","RM","RS","SA","SF","SM","SR","TA","UR","WH","ZC"],
"CFFEX": ["IC","IF","IH","T","TF","TS"]
}
symbol_type = "301"
print("symbols字典创建成功")
tsdatafeed = TushareDatafeed()
tsdatafeed.init()
mgdb = MongodbDatabase()
def load_data(req):
data = tsdatafeed.query_bar_history(req)
mgdb.save_bar_data(data)
print(f"{req.symbol}历史数据下载完成")
for exchange, symbols_list in symbols.items():
for s in symbols_list:
req = HistoryRequest(
symbol=s+str(symbol_type),
exchange=Exchange(exchange),
start=start_date,
interval=Interval.MINUTE,
end=end_date,
)
load_data(req=req)
print("运行结束!")
InvalidOperation Traceback (most recent call last)
Cell In[19], line 19
11 for s in symbols_list:
12 req = HistoryRequest(
13 symbol=s+str(symbol_type),
14 exchange=Exchange(exchange),
(...)
17 end=end_date,
18 )
---> 19 load_data(req=req)
20 print("运行结束!")
Cell In[19], line 7, in load_data(req)
5 def load_data(req):
6 data = tsdatafeed.query_bar_history(req)
----> 7 mgdb.save_bar_data(data)
8 print(f"{req.symbol}历史数据下载完成")
File E:\VNPY\lib\site-packages\vnpy_mongodb\mongodb_database.py:123, in MongodbDatabase.save_bar_data(self, bars, stream)
107 d: dict = {
108 "symbol": bar.symbol,
109 "exchange": bar.exchange.value,
(...)
118 "close_price": bar.close_price,
119 }
121 requests.append(ReplaceOne(filter, d, upsert=True))
--> 123 self.bar_collection.bulk_write(requests, ordered=False)
125 # 更新汇总
126 filter: dict = {
127 "symbol": bar.symbol,
128 "exchange": bar.exchange.value,
129 "interval": bar.interval.value
130 }
File E:\VNPY\lib\site-packages\pymongo_csot.py:108, in apply.<locals>.csot_wrapper(self, args, **kwargs)
106 with _TimeoutContext(timeout):
107 return func(self, args, kwargs)
--> 108 return func(self, *args, kwargs)
File E:\VNPY\lib\site-packages\pymongo\collection.py:571, in Collection.bulk_write(self, requests, ordered, bypass_document_validation, session, comment, let)
568 raise TypeError(f"{request!r} is not a valid request")
570 write_concern = self._write_concern_for(session)
--> 571 bulk_api_result = blk.execute(write_concern, session)
572 if bulk_api_result is not None:
573 return BulkWriteResult(bulk_api_result, True)
File E:\VNPY\lib\site-packages\pymongo\bulk.py:562, in _Bulk.execute(self, write_concern, session)
560 """Execute operations."""
561 if not self.ops:
--> 562 raise InvalidOperation("No operations to execute")
563 if self.executed:
564 raise InvalidOperation("Bulk operations can only be executed once.")
InvalidOperation: No operations to execute