🚀 AI 一键生成 qmt 策略代码

QMT 统计套利进阶:如何基于协整关系进行股票配对交易?

qmt | 发布时间: 2025/12/16
以下内容由 EasyQuant 生成。

问题描述

请帮我编写一个基于协整检验的股票配对交易策略

解决方案

这是一个基于协整检验(Cointegration)和均值回归原理的股票配对交易(Pair Trading)策略。

策略逻辑说明

  1. 配对选择:选取两只在历史上价格走势高度相关且通过协整检验的股票(例如:浦发银行 600000.SH 和 招商银行 600036.SH)。
  2. 数据获取:在 handlebar 中获取两只股票过去 N 天的收盘价数据。
  3. 协整回归 (OLS)
    • 使用最小二乘法(OLS)计算两只股票价格之间的线性关系:$Price_Y = \alpha + \beta \times Price_X + \epsilon$。
    • 计算残差(Spread):$Spread = Price_Y - \beta \times Price_X$。
  4. Z-Score 标准化
    • 计算 Spread 的均值和标准差。
    • 计算当前 Spread 的 Z-Score:$Z = \frac{Spread - Mean}{Std}$。
  5. 交易信号
    • 做空价差 (Short Spread):当 $Z > \text{Upper Threshold}$ (如 2.0),说明 Y 相对 X 被高估。操作:卖出 Y,买入 X。
    • 做多价差 (Long Spread):当 $Z < \text{Lower Threshold}$ (如 -2.0),说明 Y 相对 X 被低估。操作:买入 Y,卖出 X。
    • 平仓 (Exit):当 $|Z| < \text{Exit Threshold}$ (如 0.5),说明价差回归均值。操作:平掉所有仓位。

注意事项

  • 融券限制:在 A 股市场,做空(卖出)股票需要通过**信用账户(融资融券)**进行。本策略代码为了演示逻辑,使用了标准的下单函数。如果是实盘或精确回测,需要确保账户有融券资格,或者将策略修改为“持有底仓进行轮动”的模式。
  • 第三方库:本策略使用了 statsmodels 进行 OLS 回归,QMT 内置的 Python 环境通常包含此库。

策略代码

# -*- coding: gbk -*-
import pandas as pd
import numpy as np
import statsmodels.api as sm

def init(ContextInfo):
    """
    策略初始化函数
    """
    # 1. 设置配对股票 (Y, X)
    # 这里以浦发银行(Y)和招商银行(X)为例,实际使用请先进行协整检验筛选
    ContextInfo.stock_y = '600000.SH' 
    ContextInfo.stock_x = '600036.SH'
    ContextInfo.pair = [ContextInfo.stock_y, ContextInfo.stock_x]
    
    # 2. 设置策略参数
    ContextInfo.window_size = 60      # 回归和计算均值的窗口长度
    ContextInfo.entry_threshold = 2.0 # 开仓阈值 (Z-score)
    ContextInfo.exit_threshold = 0.5  # 平仓阈值 (Z-score)
    
    # 3. 设置账号 (请替换为您的实际资金账号)
    ContextInfo.account_id = 'YOUR_ACCOUNT_ID'
    ContextInfo.account_type = 'STOCK' # 股票账号
    ContextInfo.set_account(ContextInfo.account_id)
    
    # 4. 初始化持仓状态
    # 0: 空仓, 1: 做多价差(Long Y, Short X), -1: 做空价差(Short Y, Long X)
    ContextInfo.position_status = 0 

def handlebar(ContextInfo):
    """
    K线周期运行函数
    """
    # 仅在K线结束时运行,或者是回测模式
    if not ContextInfo.is_last_bar():
        return

    # 1. 获取历史数据
    # 获取过去 window_size + 1 根K线的数据,保证有足够的数据进行回归
    count = ContextInfo.window_size + 1
    
    # 使用 get_market_data_ex 获取数据
    # 返回格式: {code: dataframe}
    market_data = ContextInfo.get_market_data_ex(
        fields=['close'], 
        stock_code=ContextInfo.pair, 
        period='1d', 
        count=count,
        dividend_type='front' # 前复权
    )
    
    # 检查数据是否获取完整
    if ContextInfo.stock_y not in market_data or ContextInfo.stock_x not in market_data:
        print("数据获取失败")
        return
        
    df_y = market_data[ContextInfo.stock_y]
    df_x = market_data[ContextInfo.stock_x]
    
    # 确保数据长度一致且足够
    common_index = df_y.index.intersection(df_x.index)
    if len(common_index) < ContextInfo.window_size:
        print("公共时间段数据不足,跳过计算")
        return
        
    # 对齐数据
    price_y = df_y.loc[common_index]['close']
    price_x = df_x.loc[common_index]['close']
    
    # 2. 计算协整关系 (OLS 回归)
    # 我们使用对数价格进行回归,通常比直接价格更稳定
    log_y = np.log(price_y)
    log_x = np.log(price_x)
    
    # 添加常数项 (截距)
    x_with_const = sm.add_constant(log_x)
    
    # 执行 OLS 回归
    model = sm.OLS(log_y, x_with_const).fit()
    
    # 获取回归系数 (Hedge Ratio)
    beta = model.params[1]
    alpha = model.params[0]
    
    # 3. 计算残差 (Spread) 序列
    # Spread = Log(Y) - (alpha + beta * Log(X))
    spread_series = log_y - (alpha + beta * log_x)
    
    # 4. 计算 Z-Score
    # 使用窗口内的均值和标准差
    spread_mean = spread_series.mean()
    spread_std = spread_series.std()
    
    # 获取最新的 Spread 值
    current_spread = spread_series.iloc[-1]
    
    # 计算 Z-Score
    if spread_std == 0:
        z_score = 0
    else:
        z_score = (current_spread - spread_mean) / spread_std
        
    print(f"当前时间: {common_index[-1]}, Z-Score: {z_score:.4f}, Beta: {beta:.4f}")

    # 5. 交易逻辑
    # 获取当前资金用于计算仓位 (这里简单假设每只股票分配一半资金)
    # 注意:实盘中需要更复杂的资金管理
    capital = 1000000 # 假设名义本金,回测中可用 ContextInfo.capital
    target_value = capital / 2 
    
    # --- 信号判断 ---
    
    # 情况 A: Z-Score > 上轨 (价差过大,Y贵X便宜 -> 卖Y买X)
    if z_score > ContextInfo.entry_threshold:
        if ContextInfo.position_status != -1:
            print(f"触发做空价差信号 (Z > {ContextInfo.entry_threshold}) -> 卖出 {ContextInfo.stock_y}, 买入 {ContextInfo.stock_x}")
            
            # 卖出 Y (做空)
            # 注意:A股普通账户无法直接做空,此处代码为逻辑演示。
            # 如果是信用账户,需使用 passorder(28, ...) 融券卖出
            order_target_value(ContextInfo.stock_y, 0, ContextInfo, ContextInfo.account_id) 
            
            # 买入 X (做多)
            order_target_value(ContextInfo.stock_x, target_value, ContextInfo, ContextInfo.account_id)
            
            ContextInfo.position_status = -1

    # 情况 B: Z-Score < 下轨 (价差过小,Y便宜X贵 -> 买Y卖X)
    elif z_score < -ContextInfo.entry_threshold:
        if ContextInfo.position_status != 1:
            print(f"触发做多价差信号 (Z < -{ContextInfo.entry_threshold}) -> 买入 {ContextInfo.stock_y}, 卖出 {ContextInfo.stock_x}")
            
            # 买入 Y (做多)
            order_target_value(ContextInfo.stock_y, target_value, ContextInfo, ContextInfo.account_id)
            
            # 卖出 X (做空)
            order_target_value(ContextInfo.stock_x, 0, ContextInfo, ContextInfo.account_id)
            
            ContextInfo.position_status = 1

    # 情况 C: Z-Score 回归均值 (平仓)
    elif abs(z_score) < ContextInfo.exit_threshold:
        if ContextInfo.position_status != 0:
            print(f"价差回归均值 (|Z| < {ContextInfo.exit_threshold}) -> 平仓")
            
            # 平掉所有仓位
            order_target_value(ContextInfo.stock_y, 0, ContextInfo, ContextInfo.account_id)
            order_target_value(ContextInfo.stock_x, 0, ContextInfo, ContextInfo.account_id)
            
            ContextInfo.position_status = 0

# 辅助函数:下单目标价值
def order_target_value(stock_code, target_value, ContextInfo, account_id):
    """
    封装下单函数,适配 QMT 的 order_target_value
    注意:此函数在普通账户下无法实现真正的'做空'(持有负仓位),
    只能将持仓调整为 0。如需融券交易,请使用 passorder 函数。
    """
    # 获取最新价格
    last_price = ContextInfo.get_market_data_ex(
        ['close'], [stock_code], period='1d', count=1, dividend_type='front'
    )[stock_code].iloc[-1]['close']
    
    if last_price <= 0:
        return

    # 获取当前持仓
    positions = get_trade_detail_data(account_id, ContextInfo.account_type, 'POSITION')
    current_vol = 0
    for pos in positions:
        if pos.m_strInstrumentID + '.' + pos.m_strExchangeID == stock_code:
            current_vol = pos.m_nVolume
            break
            
    target_vol = int(target_value / last_price / 100) * 100
    
    # 计算差额
    diff_vol = target_vol - current_vol
    
    if diff_vol == 0:
        return
        
    # 下单
    if diff_vol > 0:
        # 买入
        order_shares(stock_code, diff_vol, 'fix', last_price, ContextInfo, account_id)
    else:
        # 卖出
        order_shares(stock_code, diff_vol, 'fix', last_price, ContextInfo, account_id)

常见问题 (Q&A)

Q1: 为什么代码中使用了 order_target_value 而不是融券接口?
A: 为了保证代码的通用性和安全性,示例使用了标准接口。在 A 股普通账户中,order_target_value 设为 0 只能卖出持仓,无法建立空头头寸。如果您拥有信用账户(两融账户),需要将下单逻辑替换为 passorder 函数,并指定 opType 为 27 (融资买入) 或 28 (融券卖出)。

Q2: 如何选择配对的股票?
A: 可以在策略外部(如 Jupyter Notebook)先下载一批同行业股票的历史数据,使用 statsmodels.tsa.stattools.coint 进行两两协整检验,P值小于 0.05 的组合适合作为配对标的。

Q3: get_market_data_ex 获取的数据包含停牌吗?
A: 默认情况下包含。在配对交易中,如果一只股票停牌,会导致价差计算失真。建议在代码中增加 ContextInfo.is_suspended_stock 判断,如果任一标的停牌,则暂停当次计算或平仓。

Q4: 为什么使用对数价格 (Log Price)?
A: 股票价格通常服从对数正态分布,取对数后计算价差(即 Log Spread)实际上反映的是两只股票的收益率比率关系,比直接价格差更稳定,且不受股价绝对值大小的影响。