
想问一下,因为我这个模拟账户里面是有很多其他持仓的,这个可用资金应该是小于我的账户总资金,这里为啥会查不到保证金呀?
from vnpy_ctastrategy import (
CtaTemplate, StopOrder, TickData, BarData, TradeData, OrderData,
BarGenerator, ArrayManager
)
from vnpy.trader.constant import Direction, Exchange
from vnpy.trader.object import BarData, AccountData
from datetime import datetime, timedelta
import numpy as np
class DA_MACROSS2(CtaTemplate):
"""快速启动版本 - 与直达国际FutureApi兼容"""
author = "June_DA_FastStart"
# ====== 策略核心参数 ======
fast_window = 8
slow_window = 21
# ====== 仓位管理参数 ======
target_position = 50
base_lots = 2
# ====== 合约特定参数 ======
multiplier = 20
margin_ratio_broker = 0.05
# ====== 风险控制参数 ======
maintenance_margin_ratio = 0.25
force_close_ratio = 0.35
safety_buffer = 0.08
# ====== 止盈止损参数 ======
stop_loss_pct = 0.015
take_profit_pct = 0.025
take_profit_ratio = 0.6
# ====== 冷却参数 ======
buy_cooldown_bars = 5
# ====== 数据流监控参数 ======
warmup_bars = 3
data_timeout_threshold = 60
parameters = [
"fast_window", "slow_window",
"target_position", "base_lots",
"multiplier", "margin_ratio_broker",
"maintenance_margin_ratio", "force_close_ratio", "safety_buffer",
"stop_loss_pct", "take_profit_pct", "take_profit_ratio",
"buy_cooldown_bars", "warmup_bars", "data_timeout_threshold"
]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar)
self.am = ArrayManager(50)
# 初始化变量
self.fast_ma = 0
self.slow_ma = 0
self.entry_price = 0
self.unrealized_pnl = 0
self.contract_value = 0
self.current_margin_per_lot = 0
# 交易状态
self.buy_cooldown = 0
self.trade_count = 0
self.force_close_triggered = False
# 数据流监控
self.bar_count = 0
self.tick_count = 0
self.am_inited_logged = False
self.data_subscription_time = None
self.last_data_warning_time = None
self.last_tick = None
self._bars_since_start = 0
self._warmed_up = False
self._last_bar_dt = None
# 账户资金信息 - 基于FutureApi字段
self.account_balance = 0 # 账户权益 (TodayInitialBalance)
self.account_available = 0 # 可用资金 (TodayTradableFund)
self.account_margin = 0 # 占用保证金 (InitialMargin)
self.account_connected = False
self.target_account_id = "USD" # 币种代码
# 资金查询控制 - 根据文档要求最小1秒间隔
self.last_capital_query_time = None
self.capital_query_interval = 2 # 设为2秒更安全
# 账户映射缓存
self.currency_account_map = {} # 币种->资金账户映射
self.account_currency_map = {} # 资金账户->币种映射
def on_init(self):
"""策略初始化"""
self.write_log("=== 策略初始化开始 ===")
self.write_log(f"[初始化] 快速启动模式 - 只需要 {self.am.size} 根K线初始化")
# 尝试加载历史数据,但不阻塞策略
try:
self.write_log(f"[初始化] 尝试加载 {self.am.size + 10} 根历史K线")
self.load_bar(self.am.size + 10)
self.write_log("[初始化] 历史数据加载完成")
except Exception as e:
self.write_log(f"[初始化警告] 历史数据加载失败: {str(e)}")
self.write_log("[初始化] 将等待实时数据初始化")
def on_start(self):
"""策略启动"""
self.write_log("=== 策略启动 ===")
self.write_log(f"[启动] 目标账户币种: {self.target_account_id}")
self.write_log(f"[数据订阅] 订阅合约: {self.vt_symbol}")
# 记录数据订阅时间
self.data_subscription_time = datetime.now()
self.write_log(f"[数据订阅] 订阅时间: {self.data_subscription_time.strftime('%H:%M:%S')}")
# 初始化状态
self._bars_since_start = 0
self._warmed_up = False
# 开始诊断
self.diagnose_account_connection()
def diagnose_account_connection(self):
"""诊断账户连接状态 - 基于直达国际API规范"""
self.write_log("=== 账户连接诊断开始 ===")
try:
# 获取主引擎
main_engine = self.cta_engine.main_engine
if not main_engine:
self.write_log("[诊断错误] 主引擎未找到")
return
# 获取所有账户
accounts = main_engine.get_all_accounts()
if not accounts:
self.write_log("[诊断错误] 未找到任何账户")
return
# 详细记录所有账户信息
self.write_log(f"[诊断] 找到 {len(accounts)} 个账户:")
for i, account in enumerate(accounts):
account_info = f"[诊断] 账户{i+1}: ID={account.accountid}"
# 添加账户详细信息
if hasattr(account, 'balance'):
account_info += f", 余额={account.balance}"
if hasattr(account, 'available'):
account_info += f", 可用={account.available}"
if hasattr(account, 'frozen'):
account_info += f", 冻结={account.frozen}"
if hasattr(account, 'currency_no'):
account_info += f", 币种={account.currency_no}"
self.write_log(account_info)
# 查找目标账户 - 精确匹配账户ID为 "USD"
target_account = None
for account in accounts:
# 精确匹配账户ID为 "USD"
if account.accountid == "USD":
target_account = account
# 建立映射关系
if hasattr(account, 'currency_no'):
self.currency_account_map[account.currency_no] = account.accountid
self.account_currency_map[account.accountid] = account.currency_no
break
# 如果没找到精确匹配的USD账户,使用原来的逻辑
if not target_account:
self.write_log("[诊断] 未找到精确匹配的USD账户,尝试其他匹配方式")
for account in accounts:
# 多种匹配方式
currency_match = (
hasattr(account, 'currency_no') and
account.currency_no == self.target_account_id
)
accountid_match = (
account.accountid == self.target_account_id or
account.accountid.endswith(self.target_account_id)
)
if currency_match or accountid_match:
target_account = account
# 建立映射关系
if hasattr(account, 'currency_no'):
self.currency_account_map[account.currency_no] = account.accountid
self.account_currency_map[account.accountid] = account.currency_no
break
if target_account:
# 使用FutureApi字段映射资金信息
self.account_balance = self._get_account_balance(target_account)
self.account_available = self._get_account_available(target_account)
self.account_margin = self._get_account_margin(target_account)
self.account_connected = True
self.write_log(f"[诊断成功] 连接到目标账户: {target_account.accountid}")
self.write_log(f"[诊断成功] 账户权益: {self.account_balance:,.2f}")
self.write_log(f"[诊断成功] 可用资金: {self.account_available:,.2f}")
self.write_log(f"[诊断成功] 占用保证金: {self.account_margin:,.2f}")
# 记录币种映射
if hasattr(target_account, 'currency_no'):
self.write_log(f"[诊断成功] 账户币种: {target_account.currency_no}")
# 检查资金状态
if self.account_available <= 0:
self.write_log("[诊断警告] 可用资金≤0,策略将无法开新仓")
else:
self.write_log(f"[诊断成功] 账户正常,等待 {self.am.size} 根K线初始化后开始交易")
# 记录其他重要资金信息
self._log_additional_capital_info(target_account)
else:
self.write_log(f"[诊断错误] 未找到目标币种账户 '{self.target_account_id}'")
self.write_log(f"[诊断] 可用账户ID: {[acc.accountid for acc in accounts]}")
except Exception as e:
self.write_log(f"[诊断异常] {str(e)}")
def _get_account_balance(self, account) -> float:
"""根据FutureApi获取账户权益"""
# 优先使用 TodayInitialBalance (当日期初权益)
if hasattr(account, 'today_initial_balance'):
return float(account.today_initial_balance)
# 其次使用 TodayRealtimeBalance (当日实时浮动权益)
elif hasattr(account, 'today_realtime_balance'):
return float(account.today_realtime_balance)
# 最后使用 balance
else:
return account.balance or 0
def _get_account_available(self, account) -> float:
"""根据FutureApi获取可用资金"""
# 优先使用 TodayTradableFund (今日可用于交易的资金量)
if hasattr(account, 'today_tradable_fund'):
return float(account.today_tradable_fund)
# 其次使用 Available (今可用)
elif hasattr(account, 'available'):
return account.available or 0
else:
return 0
def _get_account_margin(self, account) -> float:
"""根据FutureApi获取占用保证金"""
# 优先使用 InitialMargin (初始保证金)
if hasattr(account, 'initial_margin'):
return float(account.initial_margin)
# 其次使用 Deposit (保证金)
elif hasattr(account, 'deposit'):
return float(account.deposit)
# 最后使用 margin + frozen_margin
else:
margin = getattr(account, 'margin', 0) or 0
frozen_margin = getattr(account, 'frozen_margin', 0) or 0
return margin + frozen_margin
def _log_additional_capital_info(self, account):
"""记录额外的资金信息 - 基于FutureApi字段"""
additional_info = []
# 可取资金 (CanCashOutMoneyAmount)
if hasattr(account, 'can_cash_out_money_amount'):
cash_out = float(account.can_cash_out_money_amount)
additional_info.append(f"可取资金: {cash_out:,.2f}")
# 冻结资金 (FrozenFund)
if hasattr(account, 'frozen_fund'):
frozen_fund = float(account.frozen_fund)
additional_info.append(f"冻结资金: {frozen_fund:,.2f}")
# 浮动盈亏 (ProfitLoss)
if hasattr(account, 'profit_loss'):
profit_loss = float(account.profit_loss)
additional_info.append(f"浮动盈亏: {profit_loss:,.2f}")
# 昨日期初权益 (YdInitialBalance)
if hasattr(account, 'yd_initial_balance'):
yd_balance = float(account.yd_initial_balance)
additional_info.append(f"昨日权益: {yd_balance:,.2f}")
if additional_info:
self.write_log("[资金详情] " + ", ".join(additional_info))
def check_data_timeout(self):
"""检查数据是否超时"""
if not self.data_subscription_time:
return
current_time = datetime.now()
time_since_subscription = (current_time - self.data_subscription_time).total_seconds()
# 每30秒检查一次数据超时
if (time_since_subscription > self.data_timeout_threshold and
(not self.last_data_warning_time or
(current_time - self.last_data_warning_time).total_seconds() > 30)):
self.last_data_warning_time = current_time
self.write_log(f"[数据超时警告] 订阅后{time_since_subscription:.0f}秒未收到任何K线数据")
def on_tick(self, tick: TickData):
"""Tick数据更新"""
self.last_tick = tick
self.tick_count += 1
# 检查数据超时
self.check_data_timeout()
# 更新K线生成器
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""K线数据更新 - 主逻辑入口"""
# 恢复空档检测
if self._last_bar_dt is not None:
gap_min = (bar.datetime - self._last_bar_dt).total_seconds() / 60.0
if gap_min >= 3: # 3分钟空档
self._warmed_up = False
self._bars_since_start = 0
self.write_log(f"[RESUME] 检测到 {gap_min:.1f} 分钟空档,重新进入 WARMUP")
# 冷启动/恢复等待
if not self._warmed_up:
self._bars_since_start += 1
self.am.update_bar(bar)
if self._bars_since_start < max(0, int(self.warmup_bars)):
if self._bars_since_start % 5 == 0: # 每5根报告一次
self.write_log(f"[WARMUP] 第 {self._bars_since_start}/{self.warmup_bars} 根K线")
self._last_bar_dt = bar.datetime
self.put_event()
return
else:
self._warmed_up = True
self.write_log("[WARMUP] 完成,开始允许交易逻辑")
self.bar_count += 1
# 重置数据超时警告(因为收到了数据)
self.last_data_warning_time = None
# 更新K线数据
self.am.update_bar(bar)
if not self.am.inited:
progress = len(self.am.close_array)
total = self.am.size
# 进度报告
if progress % 10 == 0:
self.write_log(f"[初始化进度] {progress}/{total} ({progress/total*100:.0f}%)")
self._last_bar_dt = bar.datetime
self.put_event()
return
else:
# 第一次初始化完成时记录
if not self.am_inited_logged:
self.write_log(f"✅ K线数据初始化完成! 耗时 {self.bar_count} 根K线")
self.write_log("🎯 策略正式开始交易!")
self.am_inited_logged = True
# 撤销所有未完成订单
self.cancel_all()
# 计算技术指标
self.fast_ma = self.am.sma(self.fast_window)
self.slow_ma = self.am.sma(self.slow_window)
# 更新冷却期
if self.buy_cooldown > 0:
self.buy_cooldown -= 1
# 同步账户数据 - 添加查询间隔控制
self.sync_account_data()
# 使用实时可用资金进行判断
if self.account_available <= 0:
if self.bar_count % 20 == 0:
self.write_log(f"[等待] 可用资金≤0,无法开新仓 - 快线: {self.fast_ma:.2f}, 慢线: {self.slow_ma:.2f}")
self._last_bar_dt = bar.datetime
self.put_event()
return
# 定期记录状态
if self.bar_count % 20 == 0:
self.write_log(f"[状态] 快线: {self.fast_ma:.2f}, 慢线: {self.slow_ma:.2f}, 持仓: {self.pos}")
self.write_log(f"[资金状态] 权益: {self.account_balance:,.2f}, 可用: {self.account_available:,.2f}, 保证金: {self.account_margin:,.2f}")
# 计算保证金要求
self.calculate_margin_requirements_by_price(bar.close_price)
# 执行风控(最高优先级)
if self.execute_margin_control(bar.close_price):
self._last_bar_dt = bar.datetime
self.put_event()
return
# 执行止损
if self.execute_stop_loss(bar):
self._last_bar_dt = bar.datetime
self.put_event()
return
# 执行止盈
if self.execute_take_profit(bar):
self._last_bar_dt = bar.datetime
self.put_event()
return
# 执行交易逻辑
self.execute_trading_logic(bar)
self._last_bar_dt = bar.datetime
self.put_event()
def sync_account_data(self):
"""同步账户数据 - 基于FutureApi的查询间隔控制"""
if not self.account_connected:
return
# 检查查询间隔 - 文档要求最小1秒间隔
current_time = datetime.now()
if (self.last_capital_query_time and
(current_time - self.last_capital_query_time).total_seconds() < self.capital_query_interval):
return
try:
main_engine = self.cta_engine.main_engine
accounts = main_engine.get_all_accounts()
if accounts:
target_account = None
# 精确查找账户ID为 "USD" 的账户
for account in accounts:
if account.accountid == "USD":
target_account = account
# 更新映射
if hasattr(account, 'currency_no'):
self.currency_account_map[account.currency_no] = account.accountid
self.account_currency_map[account.accountid] = account.currency_no
break
# 如果没有找到精确匹配,使用原来的逻辑
if not target_account:
self.write_log("[资金同步] 未找到精确匹配的USD账户,尝试其他匹配方式")
# 优先使用币种映射查找账户
if self.target_account_id in self.currency_account_map:
target_account_id = self.currency_account_map[self.target_account_id]
for account in accounts:
if account.accountid == target_account_id:
target_account = account
break
# 如果没有映射,尝试直接匹配
if not target_account:
for account in accounts:
currency_match = (
hasattr(account, 'currency_no') and
account.currency_no == self.target_account_id
)
accountid_match = (
account.accountid == self.target_account_id or
account.accountid.endswith(self.target_account_id)
)
if currency_match or accountid_match:
target_account = account
# 更新映射
if hasattr(account, 'currency_no'):
self.currency_account_map[account.currency_no] = account.accountid
self.account_currency_map[account.accountid] = account.currency_no
break
if target_account:
# 使用FutureApi字段映射
self.account_balance = self._get_account_balance(target_account)
self.account_available = self._get_account_available(target_account)
self.account_margin = self._get_account_margin(target_account)
# 更新查询时间
self.last_capital_query_time = current_time
# 定期记录详细资金信息
if self.bar_count % 50 == 0:
self._log_additional_capital_info(target_account)
except Exception as e:
# 静默处理错误,不影响主逻辑
if self.bar_count % 50 == 0: # 每50根K线报告一次错误
self.write_log(f"[资金查询异常] {str(e)}")
def calculate_margin_requirements_by_price(self, price: float):
"""基于价格计算保证金要求"""
if price <= 0:
return 0
# 计算合约价值
self.contract_value = price * self.multiplier
# 计算每手保证金要求
self.current_margin_per_lot = self.contract_value * self.margin_ratio_broker
# 计算当前策略持仓的已用保证金
strategy_used_margin = self.pos * self.current_margin_per_lot
# 计算浮动盈亏
if self.pos > 0 and self.entry_price > 0:
price_diff = price - self.entry_price
self.unrealized_pnl = price_diff * self.multiplier * self.pos
else:
self.unrealized_pnl = 0
return self.account_balance + self.unrealized_pnl
def calculate_max_safe_position_by_price(self, price: float) -> int:
"""基于价格计算最大安全持仓量 - 使用实时可用资金"""
if price <= 0:
return 0
total_equity = self.calculate_margin_requirements_by_price(price)
if self.current_margin_per_lot <= 0:
return self.target_position
if total_equity <= 0:
return 0
# 基于维持保证金和安全缓冲计算
safety_line = self.maintenance_margin_ratio - self.safety_buffer
# 计算还能承受的额外保证金
available_margin_capacity = total_equity * safety_line
# 基于可用资金计算
max_by_available = int(self.account_available / self.current_margin_per_lot)
# 取较小值,并不超过目标仓位
max_safe = min(int(available_margin_capacity / self.current_margin_per_lot), max_by_available, self.target_position)
return max(0, max_safe)
def execute_margin_control(self, price: float) -> bool:
"""执行保证金风控"""
max_safe_position = self.calculate_max_safe_position_by_price(price)
current_position = self.pos
# 需要减仓的情况
if current_position > max_safe_position:
reduce_lots = current_position - max_safe_position
# 计算当前保证金比率
total_equity = self.account_balance + self.unrealized_pnl
if total_equity > 0:
margin_ratio = (self.account_margin + self.current_margin_per_lot * current_position) / total_equity
else:
margin_ratio = 1.0
if margin_ratio >= self.force_close_ratio:
reduce_lots = current_position
vt_orderid = self.sell(price, reduce_lots)
if vt_orderid:
self.write_log(f"[强制平仓] 保证金比率{margin_ratio:.2%},全平{reduce_lots}手")
self.force_close_triggered = True
return True
elif margin_ratio >= self.maintenance_margin_ratio:
vt_orderid = self.sell(price, reduce_lots)
if vt_orderid:
self.write_log(f"[危险减仓] 保证金比率{margin_ratio:.2%},减仓{reduce_lots}手")
return True
elif margin_ratio >= (self.maintenance_margin_ratio - self.safety_buffer):
reduce_lots = min(reduce_lots, (current_position - max_safe_position) // 2)
if reduce_lots > 0:
vt_orderid = self.sell(price, reduce_lots)
if vt_orderid:
self.write_log(f"[警告减仓] 保证金比率{margin_ratio:.2%},减仓{reduce_lots}手")
return True
return False
def execute_stop_loss(self, bar: BarData) -> bool:
"""执行止损逻辑"""
if self.pos <= 0 or self.entry_price <= 0:
return False
stop_loss_price = self.entry_price * (1 - self.stop_loss_pct)
if bar.close_price <= stop_loss_price:
vt_orderid = self.sell(bar.close_price, abs(self.pos))
if vt_orderid:
self.write_log(f"[止损] 价格{bar.close_price:.2f}低于止损价{stop_loss_price:.2f},平仓{self.pos}手")
self.entry_price = 0
return True
return False
def execute_take_profit(self, bar: BarData) -> bool:
"""执行止盈逻辑"""
if self.pos <= 0 or self.entry_price <= 0:
return False
take_profit_price = self.entry_price * (1 + self.take_profit_pct)
if bar.close_price >= take_profit_price:
take_profit_lots = int(self.pos * self.take_profit_ratio)
if take_profit_lots > 0:
vt_orderid = self.sell(bar.close_price, take_profit_lots)
if vt_orderid:
self.write_log(f"[止盈] 价格{bar.close_price:.2f}高于止盈价{take_profit_price:.2f},平仓{take_profit_lots}手")
return True
return False
def execute_trading_logic(self, bar: BarData):
"""执行交易逻辑"""
if self.force_close_triggered:
return
# 计算可开仓数量
max_safe_position = self.calculate_max_safe_position_by_price(bar.close_price)
available_lots = max_safe_position - self.pos
if available_lots <= 0:
return
# 金叉买入信号
if (self.fast_ma > self.slow_ma and
self.fast_ma > 0 and self.slow_ma > 0 and
self.buy_cooldown == 0 and
available_lots > 0):
buy_lots = min(self.base_lots, available_lots)
vt_orderid = self.buy(bar.close_price, buy_lots)
if vt_orderid:
self.write_log(f"[金叉买入] 买入{buy_lots}手,价格{bar.close_price:.2f}")
self.buy_cooldown = self.buy_cooldown_bars
# 死叉卖出信号
elif (self.fast_ma < self.slow_ma and
self.fast_ma > 0 and self.slow_ma > 0 and
self.pos > 0):
vt_orderid = self.sell(bar.close_price, abs(self.pos))
if vt_orderid:
self.write_log(f"[死叉卖出] 平仓{self.pos}手,价格{bar.close_price:.2f}")
self.entry_price = 0
def on_order(self, order: OrderData):
"""订单状态更新"""
self.put_event()
def on_trade(self, trade: TradeData):
"""成交回报"""
self.trade_count += 1
# 更新入场价格
if trade.direction == Direction.LONG:
if self.entry_price == 0:
self.entry_price = trade.price
else:
total_volume = self.pos
if total_volume > 0:
self.entry_price = (self.entry_price * (total_volume - trade.volume) +
trade.price * trade.volume) / total_volume
if trade.direction == Direction.LONG and self.force_close_triggered:
self.force_close_triggered = False
# 成交后重新同步账户数据
self.sync_account_data()
self.write_log(f"[成交#{self.trade_count}] {trade.direction.value} {trade.volume}手 @ {trade.price}")
if self.pos == 0:
self.entry_price = 0
self.put_event()