问题描述
解决方案
在量化交易中,为了买入大量股票而不引起价格剧烈波动(即减少“市场冲击”成本),最常用的方法是**拆单(Order Splitting)或使用算法交易(Algorithmic Trading)**策略。
在 PTrade 中,你可以通过编写策略来实现类似 TWAP(时间加权平均价格) 或 冰山订单(Iceberg Order) 的逻辑。
核心思路
- 化整为零:将一个大单(例如 10 万股)拆分成许多小单(例如每次 1000 股)。
- 分时执行:利用
run_interval或handle_data,每隔一段时间(如每分钟或每 30 秒)执行一次小单买入。 - 限价保护:使用限价单(Limit Order)而非市价单,防止吃掉卖盘深度导致价格瞬间拉升。
PTrade 策略代码示例:简易 TWAP 拆单策略
以下是一个具体的代码示例。假设我们要买入 600570.SS(恒生电子)共 10,000 股,我们将其拆分,在交易时间内每隔 1 分钟买入一部分,直到买够为止。
def initialize(context):
"""
初始化函数
"""
# 设定要操作的股票
g.security = '600570.SS'
set_universe(g.security)
# 设定总目标买入数量
g.target_amount = 10000
# 设定每次拆单买入的数量(例如每次买500股)
# 注意:股票买入最小单位为100股
g.batch_size = 500
# 记录已买入数量
g.filled_amount = 0
# 设置定时任务,每60秒运行一次 trade_func 函数
# 这种方式比在 handle_data 中判断时间更灵活
run_interval(context, trade_func, seconds=60)
def trade_func(context):
"""
定时执行的拆单交易函数
"""
# 如果已经买够了,就不再执行
if g.filled_amount >= g.target_amount:
return
# 获取当前时间,避免在非交易时间段下单(虽然PTrade回测会自动过滤,但实盘需注意)
current_hour = context.blotter.current_dt.hour
current_minute = context.blotter.current_dt.minute
# 简单的时间过滤:只在 9:30 到 11:30 和 13:00 到 14:50 之间下单
# 避开尾盘集合竞价和午休
is_trading_time = (
(current_hour == 9 and current_minute >= 30) or
(current_hour == 10) or
(current_hour == 11 and current_minute <= 30) or
(current_hour == 13) or
(current_hour == 14 and current_minute <= 50)
)
if not is_trading_time:
return
# 计算本次下单数量
# 如果剩余需要买的数量小于单次批次量,则只买剩余的
remaining = g.target_amount - g.filled_amount
order_vol = min(g.batch_size, remaining)
if order_vol <= 0:
return
# 获取当前快照数据以确定价格
snapshot = get_snapshot(g.security)
if not snapshot:
return
# 获取卖一价(Ask 1)和 最新价
# 为了不推高价格,我们可以选择挂在"最新价"或者"买一价"等待成交(被动买入)
# 或者为了成交速度,挂在"卖一价"(主动买入,但量小影响不大)
# 这里演示:以卖一价限价下单,确保大概率成交但限制最高成本
# 注意:snapshot数据结构在回测和实盘可能略有差异,回测中通常用 last_px
current_price = snapshot[g.security]['last_px']
# 下单
# limit_price 设置为当前价格,防止市价单滑点过大
order_id = order(g.security, order_vol, limit_price=current_price)
if order_id:
g.filled_amount += order_vol
log.info("拆单执行:买入 %s, 数量 %s, 价格 %.2f, 进度 %s/%s" % (
g.security, order_vol, current_price, g.filled_amount, g.target_amount
))
def handle_data(context, data):
"""
必须实现的函数,这里留空,逻辑主要在 run_interval 中
"""
pass
策略详解
-
run_interval(context, trade_func, seconds=60):- 这是实现拆单的核心。它告诉 PTrade 引擎每隔 60 秒调用一次
trade_func。这样可以将大单分散到全天的交易时间中,避免在某一时刻集中释放需求。
- 这是实现拆单的核心。它告诉 PTrade 引擎每隔 60 秒调用一次
-
g.batch_size(批次数量):- 这是控制市场冲击的关键参数。如果该数值相对于该股票的分钟成交量很小(例如占分钟成交量的 1% 以下),那么对价格的影响几乎可以忽略不计。
-
limit_price(限价保护):- 在
order函数中传入limit_price是非常重要的。如果使用市价单(不传价格),在流动性瞬间缺失的情况下,可能会以极高的价格成交(“乌龙指”效果)。设置为当前最新价或卖一价,可以保证成交成本可控。
- 在
-
进阶优化方向 (Tick 级被动交易):
- 如果是在实盘交易且对冲击成本极其敏感,可以使用
tick_data函数。 - 在
tick_data中,你可以监控盘口挂单(买一、卖一)。 - 策略可以是:只在买一价(Bid 1)挂单排队,等待卖家主动成交。如果长时间未成交,再撤单重挂。这种方式被称为Maker(做市商)策略,对市场价格的冲击最小,甚至能赚取买卖价差,但成交速度较慢。
- 如果是在实盘交易且对冲击成本极其敏感,可以使用
总结
要在 PTrade 中买入大量股票而不影响价格,核心原则是**“小单、高频、限价”**。通过上述代码,你可以将大额买入需求平滑地分布在时间轴上,从而隐藏交易意图,降低平均持仓成本。
Q&A
Q1: 这种拆单策略在回测中能看到减少冲击的效果吗?
A1: 在普通的回测模式下,很难精确模拟“由于你下单导致市场价格上涨”的反身性效果(除非使用非常高级的冲击成本模型)。但在实盘中,这种策略对减少滑点非常有效。
Q2: 如果我想按成交额而不是股数拆单怎么办?
A2: 可以使用 order_value(security, value) 接口,将 g.batch_size 设定为金额(例如每次买 5000 元),逻辑是一样的。
Q3: run_interval 在回测和实盘中表现一致吗?
A3: 基本一致。但在回测中,如果选择了日线级别回测,run_interval 可能不会按秒触发。建议在回测时选择分钟级别回测以验证拆单逻辑。