1. 价差数量类SpreadData的目前代码:
class SpreadData:
""""""
def __init__(
self,
name: str,
legs: List[LegData],
price_multipliers: Dict[str, int],
trading_multipliers: Dict[str, int],
active_symbol: str,
inverse_contracts: Dict[str, bool],
min_volume: float
):
""""""
self.name: str = name
self.legs: Dict[str, LegData] = {}
self.active_leg: LegData = None
self.passive_legs: List[LegData] = []
self.min_volume: float = min_volume
self.pricetick: float = 0
# For calculating spread price
self.price_multipliers: Dict[str, int] = price_multipliers
# For calculating spread pos and sending orders
self.trading_multipliers: Dict[str, int] = trading_multipliers
self.inverse_contracts: Dict[str, bool] = inverse_contracts
self.price_formula: str = ""
self.trading_formula: str = ""
for leg in legs:
self.legs[leg.vt_symbol] = leg
if leg.vt_symbol == active_symbol:
self.active_leg = leg
else:
self.passive_legs.append(leg)
price_multiplier = self.price_multipliers[leg.vt_symbol]
if price_multiplier > 0:
self.price_formula += f"+{price_multiplier}*{leg.vt_symbol}"
else:
self.price_formula += f"{price_multiplier}*{leg.vt_symbol}"
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
if trading_multiplier > 0:
self.trading_formula += f"+{trading_multiplier}*{leg.vt_symbol}"
else:
self.trading_formula += f"{trading_multiplier}*{leg.vt_symbol}"
if not self.pricetick:
self.pricetick = leg.pricetick
else:
self.pricetick = min(self.pricetick, leg.pricetick)
# Spread data
self.bid_price: float = 0
self.ask_price: float = 0
self.bid_volume: float = 0
self.ask_volume: float = 0
self.net_pos: float = 0
self.datetime: datetime = None
def calculate_price(self):
""""""
self.clear_price()
# Go through all legs to calculate price
for n, leg in enumerate(self.legs.values()):
# Filter not all leg price data has been received
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return
# Calculate price
price_multiplier = self.price_multipliers[leg.vt_symbol]
if price_multiplier > 0:
self.bid_price += leg.bid_price * price_multiplier
self.ask_price += leg.ask_price * price_multiplier
else:
self.bid_price += leg.ask_price * price_multiplier
self.ask_price += leg.bid_price * price_multiplier
# Round price to pricetick
if self.pricetick:
self.bid_price = round_to(self.bid_price, self.pricetick)
self.ask_price = round_to(self.ask_price, self.pricetick)
# Calculate volume
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
inverse_contract = self.inverse_contracts[leg.vt_symbol]
if not inverse_contract:
leg_bid_volume = leg.bid_volume
leg_ask_volume = leg.ask_volume
else:
leg_bid_volume = calculate_inverse_volume(
leg.bid_volume, leg.bid_price, leg.size)
leg_ask_volume = calculate_inverse_volume(
leg.ask_volume, leg.ask_price, leg.size)
if trading_multiplier > 0:
adjusted_bid_volume = floor_to(
leg_bid_volume / trading_multiplier,
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_ask_volume / trading_multiplier,
self.min_volume
)
else:
adjusted_bid_volume = floor_to(
leg_ask_volume / abs(trading_multiplier),
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_bid_volume / abs(trading_multiplier),
self.min_volume
)
# For the first leg, just initialize
if not n:
self.bid_volume = adjusted_bid_volume
self.ask_volume = adjusted_ask_volume
# For following legs, use min value of each leg quoting volume
else:
self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
self.ask_volume = min(self.ask_volume, adjusted_ask_volume)
# Update calculate time
self.datetime = datetime.now(LOCAL_TZ)
def calculate_pos(self):
""""""
long_pos = 0
short_pos = 0
for n, leg in enumerate(self.legs.values()):
leg_long_pos = 0
leg_short_pos = 0
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
if not trading_multiplier:
continue
inverse_contract = self.inverse_contracts[leg.vt_symbol]
if not inverse_contract:
net_pos = leg.net_pos
else:
net_pos = calculate_inverse_volume(
leg.net_pos, leg.net_pos_price, leg.size)
adjusted_net_pos = net_pos / trading_multiplier
if adjusted_net_pos > 0:
adjusted_net_pos = floor_to(adjusted_net_pos, self.min_volume)
leg_long_pos = adjusted_net_pos
else:
adjusted_net_pos = ceil_to(adjusted_net_pos, self.min_volume)
leg_short_pos = abs(adjusted_net_pos)
if not n:
long_pos = leg_long_pos
short_pos = leg_short_pos
else:
long_pos = min(long_pos, leg_long_pos)
short_pos = min(short_pos, leg_short_pos)
if long_pos > 0:
self.net_pos = long_pos
else:
self.net_pos = -short_pos
def clear_price(self):
""""""
self.bid_price = 0
self.ask_price = 0
self.bid_volume = 0
self.ask_volume = 0
def calculate_leg_volume(self, vt_symbol: str, spread_volume: float) -> float:
""""""
leg = self.legs[vt_symbol]
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
leg_volume = spread_volume * trading_multiplier
return leg_volume
def calculate_spread_volume(self, vt_symbol: str, leg_volume: float) -> float:
""""""
leg = self.legs[vt_symbol]
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
spread_volume = leg_volume / trading_multiplier
if spread_volume > 0:
spread_volume = floor_to(spread_volume, self.min_volume)
else:
spread_volume = ceil_to(spread_volume, self.min_volume)
return spread_volume
def to_tick(self):
""""""
tick = TickData(
symbol=self.name,
exchange=Exchange.LOCAL,
datetime=self.datetime,
name=self.name,
last_price=(self.bid_price + self.ask_price) / 2,
bid_price_1=self.bid_price,
ask_price_1=self.ask_price,
bid_volume_1=self.bid_volume,
ask_volume_1=self.ask_volume,
gateway_name="SPREAD"
)
return tick
def is_inverse(self, vt_symbol: str) -> bool:
""""""
inverse_contract = self.inverse_contracts[vt_symbol]
return inverse_contract
def get_leg_size(self, vt_symbol: str) -> float:
""""""
leg = self.legs[vt_symbol]
return leg.size
2. 腿数据的LegData定义
class LegData:
""""""
def __init__(self, vt_symbol: str):
""""""
self.vt_symbol: str = vt_symbol
# Price and position data
self.bid_price: float = 0
self.ask_price: float = 0
self.bid_volume: float = 0
self.ask_volume: float = 0
self.long_pos: float = 0
self.short_pos: float = 0
self.net_pos: float = 0
self.last_price: float = 0
self.net_pos_price: float = 0 # Average entry price of net position
# Tick data buf
self.tick: TickData = None
# Contract data
self.size: float = 0
self.net_position: bool = False
self.min_volume: float = 0
self.pricetick: float = 0
def update_contract(self, contract: ContractData):
""""""
self.size = contract.size
self.net_position = contract.net_position
self.min_volume = contract.min_volume
self.pricetick = contract.pricetick
def update_tick(self, tick: TickData):
""""""
self.bid_price = tick.bid_price_1
self.ask_price = tick.ask_price_1
self.bid_volume = tick.bid_volume_1
self.ask_volume = tick.ask_volume_1
self.last_price = tick.last_price
self.tick = tick
def update_position(self, position: PositionData):
""""""
if position.direction == Direction.NET:
self.net_pos = position.volume
self.net_pos_price = position.price
else:
if position.direction == Direction.LONG:
self.long_pos = position.volume
else:
self.short_pos = position.volume
self.net_pos = self.long_pos - self.short_pos
def update_trade(self, trade: TradeData):
""""""
# Only update net pos for contract with net position mode
if self.net_position:
trade_cost = trade.volume * trade.price
old_cost = self.net_pos * self.net_pos_price
if trade.direction == Direction.LONG:
new_pos = self.net_pos + trade.volume
if self.net_pos >= 0:
new_cost = old_cost + trade_cost
self.net_pos_price = new_cost / new_pos
else:
# If all previous short position closed
if not new_pos:
self.net_pos_price = 0
# If only part short position closed
elif new_pos > 0:
self.net_pos_price = trade.price
else:
new_pos = self.net_pos - trade.volume
if self.net_pos <= 0:
new_cost = old_cost - trade_cost
self.net_pos_price = new_cost / new_pos
else:
# If all previous long position closed
if not new_pos:
self.net_pos_price = 0
# If only part long position closed
elif new_pos < 0:
self.net_pos_price = trade.price
self.net_pos = new_pos
else:
if trade.direction == Direction.LONG:
if trade.offset == Offset.OPEN:
self.long_pos += trade.volume
else:
self.short_pos -= trade.volume
else:
if trade.offset == Offset.OPEN:
self.short_pos += trade.volume
else:
self.long_pos -= trade.volume
self.net_pos = self.long_pos - self.short_pos
3. SpreadData的to_tck()一定能够返回有效tick吗?
3.1 SpreadData的3个价格的计算
从上面的代码可以知道,SpreadData中包含若干个腿,它的tick数据应该是有各腿的tick合成的,可是我们看SpreadData的to_tck()的代码,看不是这样的!
def to_tick(self):
""""""
tick = TickData(
symbol=self.name,
exchange=Exchange.LOCAL,
datetime=self.datetime,
name=self.name,
last_price=(self.bid_price + self.ask_price) / 2,
bid_price_1=self.bid_price,
ask_price_1=self.ask_price,
bid_volume_1=self.bid_volume,
ask_volume_1=self.ask_volume,
gateway_name="SPREAD"
)
return tick
举例吧:
假如价差(SpreadData)的实例S中包含两腿(LegData)L1和L2,L1、L2的价格乘数分别为1和-1,那么:
在任意时刻,当L1得到了最新tick1,L2得到最新tick2,
L1的
L1.last_price=tick1.last_price
L1.bid_price_1=tick1.bid_price_1
L1.ask_price_1=tick1.ask_price_1
L2的部分数据
L2.last_price=tick2.last_price
L2.bid_price_1=tick2.bid_price_1
L2.ask_price_1=tick2.ask_price_1
那么经过价差S的calculate_price()的计算后,
S.last_price=L1.last_price-L2.last_price
S.bid_price_1=L1.bid_price_1-L2.bid_price_1
S.ask_price_1=L1.ask_price_1-L2.ask_price_1
价差S的价格的有效性来自于腿L1和腿L2的价格的有效性!
问题来了:如果腿L1已经得到了有效数据,而腿L2还没有得到有效数据,那么价差S的价格将是无效的!
3.2 注意价差的calculate_price()函数中的判断条件
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return
这里的条件意思是说如果价差的某个腿中的数据是无意义的,那么就清空价差的所有价格,那么此时SpreadData的to_tick()得到的tick就不是一个有效的tick数据!
3.3 何时会发生这种情况?
只要价差的多个腿中有一个腿的数据没有使用实际的tick更新过,就会发生这种情况!
3.4 出现这种情况,只要价差的价格是多少?
全部是0,因为clear_price()的代码如下:
def clear_price(self):
""""""
self.bid_price = 0
self.ask_price = 0
self.bid_volume = 0
self.ask_volume = 0
4. 这样的数据应会被推送给价差和价差策略吗?
4.1 SpreadDataEngine是如何推送价差数据给价格和价差策略的?
我们知道价差的数据计算和更新是有SpreadDataEngine维护的,下面是SpreadDataEngine的process_tick_event():
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
leg = self.legs.get(tick.vt_symbol, None)
if not leg:
return
leg.update_tick(tick)
for spread in self.symbol_spread_map[tick.vt_symbol]:
spread.calculate_price() # 这里并没有对价差的价格计算是否有效的判断
self.put_data_event(spread)
def put_data_event(self, spread: SpreadData) -> None:
""""""
event = Event(EVENT_SPREAD_DATA, spread)
self.event_engine.put(event)
这里并没有对价差的价格计算是否有效的判断,就直接向价差发送了EVENT_SPREAD_DATA消息,这看引起价差和价差策略通过推送接口on_spread_data()得到错误的价差数据!!!
4.2 如何改正此错误?
4.2.1 修改价差SpreadData的calculate_price()函数,使其可以返回价差是否有效:
def calculate_price(self)->bool: # hxxjava change
""""""
self.clear_price()
# Go through all legs to calculate price
for n, leg in enumerate(self.legs.values()):
# Filter not all leg price data has been received
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return False # hxxjava add
# Calculate price
price_multiplier = self.price_multipliers[leg.vt_symbol]
if price_multiplier > 0:
self.bid_price += leg.bid_price * price_multiplier
self.ask_price += leg.ask_price * price_multiplier
else:
self.bid_price += leg.ask_price * price_multiplier
self.ask_price += leg.bid_price * price_multiplier
# Round price to pricetick
if self.pricetick:
self.bid_price = round_to(self.bid_price, self.pricetick)
self.ask_price = round_to(self.ask_price, self.pricetick)
# Calculate volume
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
inverse_contract = self.inverse_contracts[leg.vt_symbol]
if not inverse_contract:
leg_bid_volume = leg.bid_volume
leg_ask_volume = leg.ask_volume
else:
leg_bid_volume = calculate_inverse_volume(
leg.bid_volume, leg.bid_price, leg.size)
leg_ask_volume = calculate_inverse_volume(
leg.ask_volume, leg.ask_price, leg.size)
if trading_multiplier > 0:
adjusted_bid_volume = floor_to(
leg_bid_volume / trading_multiplier,
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_ask_volume / trading_multiplier,
self.min_volume
)
else:
adjusted_bid_volume = floor_to(
leg_ask_volume / abs(trading_multiplier),
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_bid_volume / abs(trading_multiplier),
self.min_volume
)
# For the first leg, just initialize
if not n:
self.bid_volume = adjusted_bid_volume
self.ask_volume = adjusted_ask_volume
# For following legs, use min value of each leg quoting volume
else:
self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
self.ask_volume = min(self.ask_volume, adjusted_ask_volume)
# Update calculate time
self.datetime = datetime.now(LOCAL_TZ)
return True # hxxjava add
4.2.2 修改灵活价差AdvancedSpreadData的calculate_price()函数,使其可以返回价差是否有效:
class AdvancedSpreadData(SpreadData):
def calculate_price(self)->bool: # hxxjava change
""""""
self.clear_price()
# Go through all legs to calculate price
bid_data = {}
ask_data = {}
volume_inited = False
for variable, leg in self.variable_legs.items():
# Filter not all leg price data has been received
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return False # hxxjava change
# Generate price dict for calculating spread bid/ask
variable_direction = self.variable_directions[variable]
if variable_direction > 0:
bid_data[variable] = leg.bid_price
ask_data[variable] = leg.ask_price
else:
bid_data[variable] = leg.ask_price
ask_data[variable] = leg.bid_price
# Calculate volume
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
if not trading_multiplier:
continue
inverse_contract = self.inverse_contracts[leg.vt_symbol]
if not inverse_contract:
leg_bid_volume = leg.bid_volume
leg_ask_volume = leg.ask_volume
else:
leg_bid_volume = calculate_inverse_volume(
leg.bid_volume, leg.bid_price, leg.size)
leg_ask_volume = calculate_inverse_volume(
leg.ask_volume, leg.ask_price, leg.size)
if trading_multiplier > 0:
adjusted_bid_volume = floor_to(
leg_bid_volume / trading_multiplier,
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_ask_volume / trading_multiplier,
self.min_volume
)
else:
adjusted_bid_volume = floor_to(
leg_ask_volume / abs(trading_multiplier),
self.min_volume
)
adjusted_ask_volume = floor_to(
leg_bid_volume / abs(trading_multiplier),
self.min_volume
)
# For the first leg, just initialize
if not volume_inited:
self.bid_volume = adjusted_bid_volume
self.ask_volume = adjusted_ask_volume
volume_inited = True
# For following legs, use min value of each leg quoting volume
else:
self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
self.ask_volume = min(self.ask_volume, adjusted_ask_volume)
# Calculate spread price
self.bid_price = self.parse_formula(self.price_code, bid_data)
self.ask_price = self.parse_formula(self.price_code, ask_data)
# Round price to pricetick
if self.pricetick:
self.bid_price = round_to(self.bid_price, self.pricetick)
self.ask_price = round_to(self.ask_price, self.pricetick)
# Update calculate time
self.datetime = datetime.now(LOCAL_TZ)
return True # hxxjava add
4.2.3 修改SpreadDataEngine的process_tick_event()函数:
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
leg = self.legs.get(tick.vt_symbol, None)
if not leg:
return
leg.update_tick(tick)
for spread in self.symbol_spread_map[tick.vt_symbol]:
if spread.calculate_price(): # hxxjava change
self.put_data_event(spread)
5. 如果不做上述改动会发生什么问题?
如果不做上述改动会,可能会出现策略在开盘的时间,由于价差没有收齐所有腿的tick,
导致价差的 lastest_price等数据为0,可是仍然被推价差数据,进而产生用错误的价差tick。
错误的价差tick会引发错误的价差交易信号,并且以错误的价格进行价差的开仓和平仓!!!
本人在实际的价差策略交易中已经发生过上述的错误!