问题描述
解决方案
这是一个非常经典的算法交易需求。VWAP(成交量加权平均价)策略的核心思想是让你的下单节奏与该股票的历史成交量分布保持一致。
简单来说,如果某只股票在上午 9:30-10:00 的成交量通常占全天的 10%,那么你的策略也应该在这段时间内完成总委托量的 10%。这样可以最大程度地隐藏交易行为,获得接近市场当天的平均成交价。
策略实现逻辑
- 盘前准备 (9:00):
- 获取该股票过去一段时间(如过去 20 个交易日)的分时成交量数据。
- 计算每一分钟成交量占全天总成交量的比例,生成一条“累计成交量比例曲线”。
- 盘中执行 (9:30 - 15:00):
- 每分钟检查一次当前时间。
- 根据“累计成交量比例曲线”,计算截止当前时刻应该完成的累计成交量(目标持仓)。
- 计算(目标持仓 - 当前已成交量),得出本分钟需要下单的数量。
- 下单。
JoinQuant 策略代码实现
以下是一个完整的 VWAP 策略代码。你可以直接将其复制到聚宽的回测环境中运行。
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
def initialize(context):
"""
初始化函数
"""
# 1. 设定要操作的股票
g.security = '000001.XSHE' # 平安银行
# 2. 设定总共需要买入/卖出的数量
g.total_order_amount = 100000 # 例如:计划买入10万股
# 3. 设定交易方向: 'buy' (买入) 或 'sell' (卖出)
g.side = 'buy'
# 4. 记录已经成交的数量
g.traded_amount = 0
# 5. 存储历史成交量分布比例的变量
g.volume_profile = None
# 设定基准
set_benchmark('000300.XSHG')
# 开启动态复权模式(真实价格)
set_option('use_real_price', True)
# 过滤报错日志
log.set_level('order', 'error')
# 每天开盘前计算当天的VWAP分布曲线
run_daily(calculate_vwap_profile, '9:00')
# 盘中每分钟执行一次交易逻辑
run_daily(execute_vwap_trade, 'every_bar')
def calculate_vwap_profile(context):
"""
盘前计算:根据过去N天的分钟数据,计算每分钟的累计成交量比例
"""
# 获取过去 20 天的分钟级行情数据
# 每天 240 分钟,20天 = 4800 分钟
days = 20
count = 240 * days
# 获取分钟级别的成交量数据
# fields=['volume'] 返回的是一个 panel 或者 dataframe,这里我们需要处理一下
prices = get_price(g.security, count=count, end_date=context.previous_date, frequency='1m', fields=['volume'])
if prices.empty:
log.error("无法获取历史数据,策略暂停")
return
# 将数据按时间(分钟索引 0-239)分组平均
# 聚宽的分钟数据索引是 datetime,我们需要将其转换为 0-239 的分钟序号
# 创建一个长度为 240 的数组来存储每分钟的平均成交量
minute_volumes = np.zeros(240)
# 遍历数据计算分布 (这里使用简单的循环处理,也可以用 pandas groupby 优化)
# 为了演示清晰,我们模拟将数据切分成天,然后按分钟累加
volumes = prices['volume'].values
# 确保数据量是 240 的倍数
valid_len = (len(volumes) // 240) * 240
volumes = volumes[-valid_len:]
# 重塑数组为 (天数, 240分钟),然后沿轴0(天数)求平均
daily_matrix = volumes.reshape(-1, 240)
avg_minute_volume = daily_matrix.mean(axis=0)
# 计算累计成交量
cumsum_volume = np.cumsum(avg_minute_volume)
total_volume = cumsum_volume[-1]
# 计算累计比例 (0.0 到 1.0)
if total_volume > 0:
g.volume_profile = cumsum_volume / total_volume
else:
# 如果数据异常,使用线性分布作为备选
g.volume_profile = np.linspace(1/240.0, 1.0, 240)
# 重置当日已交易数量 (如果是按日回测,每天重置;如果是实盘单次任务,需根据实际情况调整)
# 这里假设是每天都要执行 g.total_order_amount 的量。
# 如果是大单分多天执行,这里需要修改逻辑,不要重置 g.traded_amount
g.traded_amount = 0
log.info("VWAP 曲线计算完成,准备执行交易。")
def get_current_minute_index(current_dt):
"""
辅助函数:获取当前时间是交易日的第几分钟 (0-239)
"""
hour = current_dt.hour
minute = current_dt.minute
# 上午 9:30 - 11:30
if 9 <= hour < 11 or (hour == 11 and minute <= 30):
# 9:31 是第 0 分钟
minutes_passed = (hour - 9) * 60 + minute - 30 - 1
# 下午 13:00 - 15:00
elif 13 <= hour < 15:
# 13:01 是第 120 分钟
minutes_passed = 120 + (hour - 13) * 60 + minute - 1
else:
return -1 # 非交易时间
# 边界修正
if minutes_passed < 0: minutes_passed = 0
if minutes_passed > 239: minutes_passed = 239
return int(minutes_passed)
def execute_vwap_trade(context):
"""
盘中执行:根据当前时间和VWAP曲线进行下单
"""
if g.volume_profile is None:
return
# 1. 获取当前是第几分钟 (0-239)
idx = get_current_minute_index(context.current_dt)
if idx == -1:
return
# 2. 获取当前时刻应该完成的累计比例
target_percent = g.volume_profile[idx]
# 3. 计算截止当前应该成交的总数量
target_cum_amount = int(g.total_order_amount * target_percent)
# 4. 计算本次需要下单的数量 (目标累计 - 实际已成交)
# 注意:这里我们用策略内部变量 g.traded_amount 记录已成交,
# 也可以使用 context.portfolio.positions[g.security].total_amount 来判断,视需求而定。
order_amount = target_cum_amount - g.traded_amount
# 5. 下单逻辑 (最小交易单位 100 股)
if order_amount >= 100:
# 向下取整到 100 的倍数
order_amount = int(order_amount / 100) * 100
if g.side == 'buy':
# 买入
order(g.security, order_amount)
log.info("VWAP买入: 时间 %s, 进度 %.2f%%, 下单 %d 股" % (context.current_dt, target_percent*100, order_amount))
elif g.side == 'sell':
# 卖出
order(g.security, -order_amount)
log.info("VWAP卖出: 时间 %s, 进度 %.2f%%, 下单 %d 股" % (context.current_dt, target_percent*100, order_amount))
# 更新已交易数量
g.traded_amount += order_amount
# 6. 收盘前的兜底逻辑 (14:55之后)
# 如果临近收盘还有未完成的单子,强制执行完
if idx >= 235 and g.traded_amount < g.total_order_amount:
remain = g.total_order_amount - g.traded_amount
if remain >= 100:
if g.side == 'buy':
order(g.security, remain)
else:
order(g.security, -remain)
g.traded_amount += remain
log.info("收盘兜底执行剩余 %d 股" % remain)
代码关键点解析
-
历史数据计算 (
calculate_vwap_profile):- 我们取了过去 20 天的分钟数据。
reshape(-1, 240)是核心步骤,它将连续的时间序列转换成“天 x 分钟”的矩阵,方便按列(即按每天的同一分钟)求平均值。np.cumsum计算出一条从 0 到 1 递增的曲线,代表了该股票正常的日内成交节奏。
-
时间索引转换 (
get_current_minute_index):- A股的交易时间是断开的(中午休市)。
- 我们需要将
9:31映射为索引0,13:01映射为索引120,以此类推直到239。
-
下单控制 (
execute_vwap_trade):- 目标驱动:我们不关心这一分钟具体要买多少,我们关心的是“截止到现在,我应该总共买多少”。
本次下单量 = 目标累计量 - 已成交量。这种差分计算方式具有自我修正功能。如果上一分钟下单失败或未完全成交,下一分钟会自动补上差额。
-
兜底逻辑:
- 由于取整(100股一手)或行情波动,可能会导致最后几分钟还剩一点零头。代码在 14:55 后会检查并强制执行剩余数量。
如何优化?
- 使用限价单:目前的示例使用的是市价单(
order默认是市价)。为了进一步减少冲击,可以将order改为order_target_value或使用LimitOrderStyle,挂在买一或卖一价上。 - 动态调整:如果当日股价大幅偏离均线,或者出现极端行情,可以加入风控逻辑暂停 VWAP 执行。
- 多日拆单:如果资金量特别大(例如超过该股票日均成交量的 5%),建议将
g.total_order_amount拆分到多天执行。
Q&A
Q: 为什么使用 VWAP 而不是简单的定时等量下单(TWAP)?
A: 简单的定时下单(例如每5分钟买1000股)在早盘和尾盘成交活跃时可能对市场影响较小,但在中午成交清淡时,同样的单量可能会造成巨大的价格滑点。VWAP 遵循成交量分布,在成交活跃时多买,清淡时少买,能更有效地降低冲击成本。
Q: 这段代码可以直接用于实盘吗?
A: 这段代码是基于 JoinQuant 回测 API 编写的。如果要用于实盘(如通过 JoinQuant 的实盘交易接口),逻辑是通用的,但需要注意实盘中的信号延迟、废单处理以及资金同步问题。建议先在模拟盘充分测试。
Q: 如何处理停牌的股票?
A: 在 calculate_vwap_profile 中获取数据时,如果股票停牌,get_price 获取的数据可能为空或全为0。代码中加入了 if prices.empty 的判断。在实盘中,下单函数 order 如果遇到停牌股票会返回 None 或报错,建议在下单前检查 get_current_data()[security].paused 属性。