问题描述
解决方案
这是一个基于PTrade API实现的**TWAP(时间加权平均价格)**算法交易策略源码。
策略原理
TWAP(Time Weighted Average Price)的核心思想是将大额订单拆分为多个小额订单,在设定的时间段内按照固定的时间间隔进行下单,从而减小对市场的冲击,使成交均价尽可能接近该时间段的市场均价。
策略功能
- 自动拆单:根据设定的总交易量和时间间隔,自动计算单笔委托数量。
- 定时执行:利用
run_interval实现定时触发交易。 - 撤单机制:每次下单前会自动检查并撤销上一轮未成交的挂单,防止资金占用和重复下单。
- 尾盘处理:在策略结束前,会检查剩余未成交数量,确保总目标达成(可选)。
- 午休过滤:自动跳过 11:30 - 13:00 的非交易时间段。
策略源码
import math
def initialize(context):
"""
初始化函数,设置策略参数
"""
# 1. 设置要交易的标的
g.security = '600570.SS' # 示例:恒生电子
set_universe(g.security)
# 2. TWAP 策略参数设置
g.target_amount = 10000 # 计划买入的总股数
g.interval = 60 # 下单间隔(秒),例如每60秒下一次单
# 3. 交易时间段设置 (格式:HH:MM)
g.start_time = '09:31' # 开始时间
g.end_time = '14:50' # 结束时间(留最后10分钟处理尾单或防止无法成交)
# 4. 内部状态变量
g.traded_amount = 0 # 已委托数量
g.batch_amount = 0 # 每次拆单的数量
g.total_batches = 0 # 预计总拆单次数
# 5. 计算拆单数量
calculate_batch_amount(context)
# 6. 注册定时任务,每隔 g.interval 秒执行一次 twap_trade
run_interval(context, twap_trade, seconds=g.interval)
def calculate_batch_amount(context):
"""
计算每次下单的股数
"""
# A股每天交易时间大约为 240 分钟 (4小时)
# 计算总交易时长(分钟)
total_minutes = 240
# 计算总共会触发多少次 run_interval
# interval 是秒,所以总次数 = 总分钟 * 60 / 间隔秒数
g.total_batches = int((total_minutes * 60) / g.interval)
if g.total_batches > 0:
# 计算单次下单数量
raw_batch = g.target_amount / g.total_batches
# 向下取整到100的倍数(A股买入最小单位为100股)
g.batch_amount = int(raw_batch / 100) * 100
# 如果计算结果小于100股,则强制设为100股,避免无法下单
if g.batch_amount < 100:
g.batch_amount = 100
log.info("TWAP策略初始化完成:目标总数 %s, 预计拆单次数 %s, 单笔委托数量 %s" % (
g.target_amount, g.total_batches, g.batch_amount))
def twap_trade(context):
"""
定时交易执行函数
"""
# 获取当前时间字符串,格式 HH:MM
current_time = context.blotter.current_dt.strftime('%H:%M')
# 1. 时间过滤
# 过滤掉盘前和盘后
if current_time < g.start_time or current_time > g.end_time:
return
# 过滤掉午休时间 (11:30 - 13:00)
if '11:30' <= current_time < '13:00':
return
# 2. 检查目标是否已完成
if g.traded_amount >= g.target_amount:
log.info("目标交易数量已完成,停止下单。")
return
# 3. 撤销未完成的订单
# 在下新单之前,先处理旧的挂单,避免资金被占用或重复成交
cancel_open_orders(context)
# 4. 计算本次下单数量
# 剩余需要买入的数量
remaining_amount = g.target_amount - g.traded_amount
# 默认下单数量为计算好的 batch_amount
order_vol = g.batch_amount
# 如果剩余数量小于单笔数量,则最后一次下单为剩余数量
if remaining_amount < g.batch_amount:
order_vol = remaining_amount
# 再次确保下单数量是100的倍数(如果是买入)
# 注意:如果是卖出,最后一次零股是可以卖出的,这里演示的是买入逻辑
order_vol = int(order_vol / 100) * 100
if order_vol == 0:
return
# 5. 执行下单
# 这里使用市价单(对手方最优)以保证成交速度,也可以改为限价单
# order_market(g.security, order_vol, market_type=0)
# 或者使用限价单(例如以卖一价下单,模拟市价效果但可控)
# 获取盘口数据
snapshot = get_snapshot(g.security)
if snapshot:
# 获取卖一价
ask_price = snapshot[g.security]['offer_grp'][1][0]
if ask_price > 0:
order(g.security, order_vol, limit_price=ask_price)
# 更新已委托数量
g.traded_amount += order_vol
log.info("TWAP执行:时间 %s, 下单 %s 股, 价格 %s, 累计委托 %s/%s" % (
current_time, order_vol, ask_price, g.traded_amount, g.target_amount))
else:
log.warning("获取行情失败或跌停,暂停下单")
def cancel_open_orders(context):
"""
撤销当前标的的所有未完成订单
"""
open_orders = get_open_orders(g.security)
if open_orders:
for _order in open_orders:
cancel_order(_order.id)
log.info("撤销未成交订单:%s" % _order.id)
# 注意:严格来说,撤单后应该减去 g.traded_amount 中对应的未成交部分
# 但为了简化逻辑,这里假设TWAP主要追求按时发出委托。
# 如果需要严格控制成交量,应使用 get_position(g.security).today_amount 来判断实际进度
def handle_data(context, data):
"""
必须实现的函数,这里留空,逻辑都在 run_interval 中
"""
pass
代码关键点解析
-
run_interval的使用:- 这是实现TWAP的关键。不同于
handle_data(通常随行情快照触发),run_interval允许我们精确控制时间间隔(如每60秒触发一次),这完全符合TWAP的时间切片逻辑。
- 这是实现TWAP的关键。不同于
-
calculate_batch_amount(拆单计算):- 我们在
initialize中预估了全天的交易次数。 - 公式:
单笔数量 = 总数量 / (240分钟 * 60 / 间隔秒数)。 - 加入了
int(x / 100) * 100的逻辑,确保买入数量符合A股一手(100股)的规定。
- 我们在
-
时间过滤:
- 在
twap_trade函数中,我们显式过滤了11:30到13:00。这是因为run_interval是基于服务器时间的定时器,它不会自动跳过午休时间,如果不加判断,可能会在午休期间产生无效信号或报错。
- 在
-
撤单逻辑 (
cancel_open_orders):- TWAP策略通常希望紧跟时间表。如果上一分钟的单子还没成交(例如价格波动导致挂单未成交),通常的做法是撤单,然后用最新的价格重新下单,以保证交易进度。
-
下单价格:
- 示例中使用的是
get_snapshot获取实时的卖一价(offer_grp)进行限价买入。这是一种激进的限价单策略,通常能保证较高成交率且价格可控。你也可以将其改为order_market使用市价单。
- 示例中使用的是
问答 (Q&A)
Q1: 如何将此策略改为卖出策略?
A1:
- 将
g.target_amount设置为负数(或者在下单时乘以 -1)。 - 在
calculate_batch_amount中计算绝对值。 - 在
twap_trade中,获取行情时改用买一价(bid_grp)。 - 卖出时不需要强制整百股(零股可以卖出),可以去掉
int(order_vol / 100) * 100的限制。
Q2: 如果我想在回测中使用这个策略,需要注意什么?
A2:
PTrade的回测模式支持 run_interval,但需要注意回测频率。如果回测选择的是“日线”级别,run_interval 不会生效。必须选择**“分钟”**级别回测,这样 run_interval 才能在盘中按秒级触发。
Q3: 为什么不直接在 handle_data 里写逻辑?
A3:
handle_data 的触发频率依赖于行情推送或回测数据的频率(通常是3秒或1分钟)。如果使用 handle_data,很难精确控制“每隔X秒”执行一次。使用 run_interval 可以将策略逻辑与行情推送频率解耦,更适合算法交易。