🚀 AI 一键生成 joinquant 策略代码

如何用 JoinQuant 构建一个经典的配对交易(统计套利)策略?

joinquant | 发布时间: 2025/11/26
以下内容由 EasyQuant 生成。

问题描述

请帮我实现一个两只股票的配对交易策略。

解决方案

这是一个基于 JoinQuant (聚宽) 平台的两只股票配对交易(Pair Trading)策略实现。

策略原理

配对交易是一种统计套利策略。它假设两只相关性很高的股票(通常是同行业、同业务模式的公司),其价格比率(Spread)会围绕一个均值波动。

  1. 选取标的:选取两只相关性高的股票(本例使用 平安银行 000001.XSHE浦发银行 600000.XSHG)。
  2. 计算价差:计算两只股票价格的比值(Ratio = Price A / Price B)。
  3. Z-Score 信号:根据过去 N 天的历史数据,计算比值的均值和标准差,得出 Z-Score。
    • $Z = \frac{\text{当前比值} - \text{均值}}{\text{标准差}}$
  4. 交易逻辑
    • 当 Z > 2:比值过高,说明 A 贵 B 便宜 -> 融券卖出 A,买入 B
    • 当 Z < -2:比值过低,说明 A 便宜 B 贵 -> 买入 A,融券卖出 B
    • 当 abs(Z) < 0.5:比值回归均值 -> 平掉所有仓位

注意事项

由于配对交易涉及做空,必须在初始化时将账户类型设置为融资融券账户 (stock_margin)

策略代码

# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd

def initialize(context):
    # 1. 设定账户类型为融资融券账户 (必须步骤,否则无法做空)
    # 获取初始资金
    init_cash = context.portfolio.starting_cash
    # 设定账户为融资融券账户
    set_subportfolios([SubPortfolioConfig(cash=init_cash, type='stock_margin')])
    
    # 2. 设定基准和费率
    set_benchmark('000300.XSHG')
    set_option('use_real_price', True)
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    
    # 3. 定义配对的股票
    # 这里选取两只银行股作为示例:平安银行(S1) 和 浦发银行(S2)
    g.s1 = '000001.XSHE' 
    g.s2 = '600000.XSHG'
    
    # 4. 策略参数
    g.window = 20      # 计算均值和标准差的窗口期
    g.entry_threshold = 2.0  # 开仓阈值 (2倍标准差)
    g.exit_threshold = 0.5   # 平仓阈值 (0.5倍标准差)
    
    # 运行频率:每天开盘时运行
    run_daily(trade, 'every_bar')

def trade(context):
    # 获取两只股票过去 g.window 天的收盘价
    # 注意:要多取一天数据以包含当前未收盘的价格(回测中current_dt指向的时间点)
    # 但为了稳定性,通常使用过去N天的历史收盘价来计算统计指标
    prices_s1 = attribute_history(g.s1, g.window, '1d', ['close'])['close']
    prices_s2 = attribute_history(g.s2, g.window, '1d', ['close'])['close']
    
    # 检查数据是否足够
    if len(prices_s1) < g.window or len(prices_s2) < g.window:
        return

    # 1. 计算比价 (Ratio)
    ratio = prices_s1 / prices_s2
    
    # 2. 计算统计指标
    mean = ratio.mean()
    std = ratio.std()
    
    # 获取当前的最新价格(用于计算当前的 Z-Score)
    current_p1 = data_get_current_price(g.s1)
    current_p2 = data_get_current_price(g.s2)
    
    if current_p1 is None or current_p2 is None:
        return
        
    current_ratio = current_p1 / current_p2
    
    # 3. 计算 Z-Score
    z_score = (current_ratio - mean) / std
    
    # 打印日志方便调试
    # log.info(f"Z-Score: {z_score:.2f}, Ratio: {current_ratio:.2f}")
    
    # 获取当前仓位情况
    # long_positions: 多单仓位, short_positions: 空单仓位
    # 注意:融资融券账户的仓位结构在 context.portfolio.long_positions 和 short_positions 中
    p = context.portfolio
    
    s1_long = p.long_positions.get(g.s1, None)
    s1_short = p.short_positions.get(g.s1, None)
    s2_long = p.long_positions.get(g.s2, None)
    s2_short = p.short_positions.get(g.s2, None)
    
    has_s1_long = s1_long and s1_long.total_amount > 0
    has_s1_short = s1_short and s1_short.total_amount > 0
    has_s2_long = s2_long and s2_long.total_amount > 0
    has_s2_short = s2_short and s2_short.total_amount > 0
    
    # 4. 交易逻辑
    
    # --- 平仓逻辑 (回归均值) ---
    if abs(z_score) < g.exit_threshold:
        # 如果持有任何仓位,全部平仓
        if has_s1_long:
            order_target_value(g.s1, 0)
        if has_s1_short:
            # 买券还券
            marginsec_close(g.s1, s1_short.total_amount)
            
        if has_s2_long:
            order_target_value(g.s2, 0)
        if has_s2_short:
            # 买券还券
            marginsec_close(g.s2, s2_short.total_amount)
            
    # --- 开仓逻辑 ---
    
    # 情况 A: Z-Score > 2 (S1 贵, S2 便宜) -> 做空 S1, 做多 S2
    elif z_score > g.entry_threshold:
        # 如果当前没有持有该方向的仓位
        if not (has_s1_short and has_s2_long):
            # 先平掉反向仓位 (如果有)
            if has_s1_long: order_target_value(g.s1, 0)
            if has_s2_short: marginsec_close(g.s2, s2_short.total_amount)
            
            # 资金分配:各使用一半可用资金
            cash = context.portfolio.available_cash / 2
            
            # 融券卖出 S1
            marginsec_open(g.s1, int(cash / current_p1 / 100) * 100)
            # 买入 S2
            order_value(g.s2, cash)
            
            log.info(f"触发做空信号 (Z={z_score:.2f}): 卖出 {g.s1}, 买入 {g.s2}")

    # 情况 B: Z-Score < -2 (S1 便宜, S2 贵) -> 做多 S1, 做空 S2
    elif z_score < -g.entry_threshold:
        # 如果当前没有持有该方向的仓位
        if not (has_s1_long and has_s2_short):
            # 先平掉反向仓位 (如果有)
            if has_s1_short: marginsec_close(g.s1, s1_short.total_amount)
            if has_s2_long: order_target_value(g.s2, 0)
            
            # 资金分配
            cash = context.portfolio.available_cash / 2
            
            # 买入 S1
            order_value(g.s1, cash)
            # 融券卖出 S2
            marginsec_open(g.s2, int(cash / current_p2 / 100) * 100)
            
            log.info(f"触发做多信号 (Z={z_score:.2f}): 买入 {g.s1}, 卖出 {g.s2}")

# 辅助函数:获取当前价格
def data_get_current_price(security):
    data = get_current_data()
    if security in data and not data[security].paused:
        return data[security].last_price
    return None

关键代码解释

  1. set_subportfolios(..., type='stock_margin'):

    • 这是最关键的一步。默认的回测账户是普通股票账户,无法进行做空操作。必须将其设置为 stock_margin 才能使用 marginsec_open(融券卖出)。
  2. marginsec_openmarginsec_close:

    • marginsec_open(security, amount): 融券卖出(开空仓)。
    • marginsec_close(security, amount): 买券还券(平空仓)。
    • 对于多头仓位,我们直接使用标准的 order_valueorder_target_value 即可。
  3. Z-Score 计算:

    • 我们使用 attribute_history 获取过去 20 天的数据来构建统计基准(均值和标准差)。
    • Z-Score 衡量当前价差偏离均值的程度。偏离越大,回归的可能性(理论上)越大。
  4. 仓位管理:

    • 代码中加入了逻辑判断 if not (has_s1_short and has_s2_long),防止在已经持有对应方向仓位时重复开仓。
    • 在开新仓前,先检查并平掉反向仓位(止损或反手)。

常见问题 Q&A

Q: 为什么回测时报错 "Order Rejected" 或无法做空?
A: 请检查 initialize 函数中是否正确设置了 set_subportfolios(..., type='stock_margin')。此外,回测标的必须在交易所的融资融券标的名单中(聚宽回测环境会自动校验这一点,非两融标的无法做空)。

Q: 如何选择配对的股票?
A: 代码中硬编码了平安银行和浦发银行。在实际研究中,你需要先进行协整性检验(Cointegration Test)来筛选股票对。

Q: 为什么使用 order_value 买入而不是 margincash_open
A: margincash_open 是融资买入(借钱买),会产生利息并增加杠杆。为了简化策略风险,本示例中多头部分使用自有资金买入(order_value),仅在空头部分使用融券。如果你希望双向都加杠杆,可以将 order_value 替换为 margincash_open