问题描述
解决方案
这是一个基于聚宽(JoinQuant)API 编写的简单 Tick 级价差套利策略。
策略逻辑说明
- 核心思想:利用高频 Tick 数据中的买一价(Bid 1)和卖一价(Ask 1)的价差。当价差超过设定的阈值时,认为存在套利空间(或流动性提供空间)。
- 买入触发:当空仓时,如果
卖一价 - 买一价 >= 阈值,则以 买一价 挂单买入(试图作为 Maker 吃掉价差的一端)。 - 卖出触发:当持仓时,以 卖一价 挂单卖出(试图在价差的另一端获利平仓)。
- 运行频率:Tick 级(每 3 秒或更短时间触发一次)。
策略代码
# -*- coding: utf-8 -*-
from jqdata import *
def initialize(context):
"""
初始化函数,设定基准、股票池、参数等
"""
# 1. 设定基准
set_benchmark('000300.XSHG')
# 2. 开启真实价格模式 (Tick级回测必须开启)
set_option('use_real_price', True)
# 3. 设置手续费 (模拟股票交易,印花税等)
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
# 4. 定义要操作的标的 (这里以平安银行为例)
g.security = '000001.XSHE'
# 5. 定义价差阈值 (单位:元)
# 如果 卖一价 - 买一价 >= 0.02元,则触发交易
g.spread_threshold = 0.02
# 6. 订阅 Tick 事件 (必须步骤)
subscribe(g.security, 'tick')
# 7. 记录上一次操作,防止同一tick重复下单
g.last_order_time = None
def handle_tick(context, tick):
"""
Tick处理函数,每有一个新的Tick数据推送时调用一次
"""
security = g.security
# 获取当前的买一价和卖一价
# a1_p: 卖一价 (Ask 1 Price)
# b1_p: 买一价 (Bid 1 Price)
ask_price = tick.a1_p
bid_price = tick.b1_p
# 过滤无效数据 (如集合竞价阶段可能价格为0)
if ask_price == 0 or bid_price == 0:
return
# 计算价差
spread = ask_price - bid_price
# 获取当前持仓数量 (可卖出的持仓)
position = context.portfolio.positions[security]
current_amount = position.total_amount
# --- 交易逻辑 ---
# 1. 买入逻辑:空仓 且 价差足够大
if current_amount == 0 and spread >= g.spread_threshold:
# 使用可用资金买入
cash = context.portfolio.available_cash
# 预留一点资金防止手续费不足,这里简单计算买入数量(向下取整到100股)
amount = int(cash / bid_price / 100) * 100
if amount >= 100:
# 挂限价单:以买一价买入 (LimitOrderStyle)
order(security, amount, style=LimitOrderStyle(bid_price))
log.info("触发买入: 卖一:%.2f, 买一:%.2f, 价差:%.2f >= 阈值:%.2f, 挂单价:%.2f" % (
ask_price, bid_price, spread, g.spread_threshold, bid_price))
# 2. 卖出逻辑:持有仓位 (这里简化为只要有货就尝试以卖一价卖出获利)
# 在实际套利中,通常是买入成交后,立即挂卖单
elif current_amount > 0:
# 获取可卖持仓
closeable_amount = position.closeable_amount
if closeable_amount > 0:
# 挂限价单:以卖一价卖出
order(security, -closeable_amount, style=LimitOrderStyle(ask_price))
log.info("触发卖出: 卖一:%.2f, 买一:%.2f, 挂单价:%.2f" % (
ask_price, bid_price, ask_price))
def on_strategy_end(context):
"""
策略结束时调用
"""
# 取消订阅
unsubscribe_all()
log.info("策略结束,取消所有订阅")
代码关键点解析
-
set_option('use_real_price', True):- 这是 Tick 级回测的强制要求。Tick 数据必须基于真实价格撮合,不能使用前复权价格。
-
subscribe(g.security, 'tick'):- 在
initialize中调用此函数订阅标的。只有订阅了,handle_tick函数才会被触发。
- 在
-
handle_tick(context, tick):- 这是 Tick 策略的核心驱动函数。
tick对象包含了当前的盘口快照数据。tick.a1_p:卖一价。tick.b1_p:买一价。
-
LimitOrderStyle(限价单):- 套利或做市策略通常对价格非常敏感,因此使用限价单(Limit Order)而不是市价单(Market Order)。
- 代码中
order(..., style=LimitOrderStyle(price))明确指定了挂单价格。
-
价差逻辑:
spread = ask_price - bid_price。- 逻辑假设:如果价差大,我们在 Bid 1 排队买入,如果成交了,再在 Ask 1 卖出,理论上可以赚取这个 Spread 差价。
注意事项
- 回测速度:Tick 级回测数据量巨大,回测速度会比日线或分钟线慢很多,建议选择较短的时间段进行测试(如几天)。
- 成交机制:在回测中,限价单的撮合逻辑取决于是否开启了“盘口撮合”。如果挂在买一价,需要有后续的成交价打破该价格或有足够的对手盘才能成交。
- 未来函数:Tick 数据本身是快照,但在编写逻辑时要小心不要用到下一秒的数据(本代码使用的是当前推送的 tick,无未来函数)。
Q&A
Q: 为什么我的 Tick 策略回测非常慢?
A: Tick 数据量是分钟数据的 240 倍以上(每天约 4800 个 Tick 快照)。处理每一个 Tick 都需要消耗计算资源。建议在调试阶段只回测 1-3 天的数据,确认逻辑无误后再扩大范围。
Q: handle_tick 和 handle_data 有什么区别?
A: handle_data 是按分钟或按天定时触发的(例如每分钟的第 0 秒)。handle_tick 是由行情驱动的,每当交易所推送一个新的 Tick 快照(有成交或盘口变化)时触发。Tick 策略必须使用 handle_tick。
Q: 如何查看 Tick 对象里还有哪些数据?
A: Tick 对象还包含 current (最新价), volume (累计成交量), money (累计成交额), 以及五档盘口数据 (a1_v~a5_v, b1_v~b5_v 等)。你可以使用 print(tick) 或 log.info(tick) 在日志中查看完整结构。