VeighNa量化社区
你的开源社区量化交易平台
Member
avatar
加入于:
帖子: 22
声望: 0

我想写一个三天后平仓的策略
中间一旦出现比我买入的价格低的时候就自动卖出(做多)
或者一旦出现比我卖出价格高的时候自动买入平仓(做空)
三天后自动平仓
from vnpy_ctastrategy import (
CtaTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData
)

from time import time

class TestStrategy(CtaTemplate):
""""""
author = "用Python的交易员"

test_trigger = 10

tick_count = 0
test_all_done = False

parameters = ["test_trigger"]
variables = ["tick_count", "test_all_done"]

def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
    """"""
    super().__init__(cta_engine, strategy_name, vt_symbol, setting)

    self.test_funcs = [
        self.test_market_order,
        self.test_limit_order,
        self.test_cancel_all,
        self.test_stop_order
    ]
    self.last_tick = None

def on_init(self):
    """
    Callback when strategy is inited.
    """
    self.write_log("策略初始化")

def on_start(self):
    """
    Callback when strategy is started.
    """
    self.write_log("策略启动")

def on_stop(self):
    """
    Callback when strategy is stopped.
    """
    self.write_log("策略停止")

def on_tick(self, tick: TickData):
    """
    Callback of new tick data update.
    """
    if self.test_all_done:
        return

    self.last_tick = tick

    self.tick_count += 1
    if self.tick_count >= self.test_trigger:
        self.tick_count = 0

        if self.test_funcs:
            test_func = self.test_funcs.pop(0)

            start = time()
            test_func()
            time_cost = (time() - start) * 1000
            self.write_log("耗时%s毫秒" % (time_cost))
        else:
            self.write_log("测试已全部完成")
            self.test_all_done = True

    self.put_event()

def on_bar(self, bar: BarData):
    """
    Callback of new bar data update.
    """
    pass

def on_order(self, order: OrderData):
    """
    Callback of new order data update.
    """
    self.put_event()

def on_trade(self, trade: TradeData):
    """
    Callback of new trade data update.
    """
    self.put_event()

def on_stop_order(self, stop_order: StopOrder):
    """
    Callback of stop order update.
    """
    self.put_event()

def test_market_order(self):
    """"""
    self.buy(self.last_tick.limit_up, 1)
    self.write_log("执行市价单测试")

def test_limit_order(self):
    """"""
    self.buy(self.last_tick.limit_down, 1)
    self.write_log("执行限价单测试")

def test_stop_order(self):
    """"""
    self.buy(self.last_tick.ask_price_1, 1, True)
    self.write_log("执行停止单测试")

def test_cancel_all(self):
    """"""
    self.cancel_all()
    self.write_log("执行全部撤单测试")
Member
avatar
加入于:
帖子: 22
声望: 0

大佬 ,, 可以帮我指点一二吗?

© 2015-2022 上海韦纳软件科技有限公司
备案服务号:沪ICP备18006526号

沪公网安备 31011502017034号

【用户协议】
【隐私政策】
【免责条款】