🚀 AI 一键生成 qmt 策略代码

QMT 策略中如何安全地使用多线程执行耗时操作?

qmt | 发布时间: 2025/12/18
以下内容由 EasyQuant 生成。

问题描述

如何在交易策略中安全地使用多线程?

解决方案

在 QMT(Quantitative Trading Platform)中使用多线程需要非常小心。QMT 的底层架构是 C++,而 Python 策略是在嵌入式环境中运行的。

核心原则:绝对不要在子线程中直接访问或操作 ContextInfo 对象。

ContextInfo 是一个 C++ 封装对象,它不是线程安全的。如果在子线程中调用 ContextInfo.get_market_dataContextInfo.passorder 或访问其属性,极大概率会导致软件崩溃(闪退)或数据竞争错误。

安全使用多线程的模式

为了安全地使用多线程(例如进行耗时的复杂计算、访问外部数据库或爬虫),应遵循以下“生产者-消费者”模式:

  1. 主线程(Handlebar):负责提取数据(将 ContextInfo 中的数据转换为标准的 Python 数据类型,如 list, dict, pandas DataFrame)。
  2. 主线程:启动子线程,并将标准 Python 数据传递给子线程。
  3. 子线程:进行耗时计算,不触碰 ContextInfo
  4. 子线程:将计算结果放入线程安全的队列(queue.Queue)。
  5. 主线程(Handlebar):在下一轮循环中检查队列,取出结果,并调用 ContextInfo 的交易函数进行下单。

代码实现示例

以下是一个完整的策略示例,展示了如何在不阻塞主界面/主策略循环的情况下,在后台线程中执行耗时任务,并安全地返回结果进行交易。

# -*- coding: gbk -*-
import threading
import queue
import time
import pandas as pd

# 模拟一个耗时的计算任务函数
# 注意:此函数不接受 ContextInfo 作为参数,只接受普通数据
def heavy_calculation_task(price_data, result_queue):
    """
    后台计算任务
    :param price_data: 标准 Python/Pandas 数据 (不要传 ContextInfo)
    :param result_queue: 线程安全的队列,用于回传结果
    """
    try:
        # 模拟耗时操作,例如复杂的数学运算或外部 API 请求
        # 在实际场景中,这里不要写死循环,应该是一个有限的任务
        time.sleep(2) 
        
        # 简单的逻辑演示:如果最新价大于均价,返回买入信号
        last_price = price_data['close'].iloc[-1]
        mean_price = price_data['close'].mean()
        
        signal = None
        if last_price > mean_price:
            signal = 'buy'
        else:
            signal = 'sell'
            
        print(f"子线程计算完成: 价格={last_price}, 均价={mean_price:.2f}, 信号={signal}")
        
        # 将结果放入队列
        result_queue.put(signal)
        
    except Exception as e:
        print(f"子线程出错: {e}")

def init(ContextInfo):
    # 1. 初始化一个线程安全的队列
    ContextInfo.msg_queue = queue.Queue()
    
    # 2. 设置一个标志位,防止线程重复启动(根据需求决定是否允许并发)
    ContextInfo.is_calculating = False
    
    # 3. 设置股票池
    ContextInfo.stock = '600000.SH'
    ContextInfo.set_universe([ContextInfo.stock])
    ContextInfo.set_account('6000000248') # 请替换为真实账号

def handlebar(ContextInfo):
    # --- 第一部分:检查子线程的计算结果 ---
    
    # 使用 try-except 块非阻塞地获取队列内容
    try:
        # get_nowait() 如果队列为空会抛出 Empty 异常,不会阻塞主线程
        while not ContextInfo.msg_queue.empty():
            signal = ContextInfo.msg_queue.get_nowait()
            
            # 收到结果,重置计算标志位
            ContextInfo.is_calculating = False
            
            print(f"主线程收到信号: {signal},准备执行交易")
            
            # 在主线程中安全地使用 ContextInfo 进行下单
            if signal == 'buy':
                print("执行买入操作...")
                # passorder(23, 1101, ContextInfo.accid, ContextInfo.stock, 5, -1, 100, ContextInfo)
            elif signal == 'sell':
                print("执行卖出操作...")
                # passorder(24, 1101, ContextInfo.accid, ContextInfo.stock, 5, -1, 100, ContextInfo)
                
    except queue.Empty:
        pass

    # --- 第二部分:触发后台计算任务 ---
    
    # 如果当前没有正在进行的计算,且满足某些触发条件(这里简单演示每根K线触发一次)
    if not ContextInfo.is_calculating:
        if ContextInfo.is_new_bar(): # 仅在K线更新时触发,避免过于频繁
            print("启动后台线程进行计算...")
            
            # 1. 【关键】先在主线程获取数据
            # 必须在这里调用 get_market_data_ex,不能在子线程调用
            data = ContextInfo.get_market_data_ex(
                ['close'], 
                [ContextInfo.stock], 
                period='1d', 
                count=20,
                dividend_type='follow'
            )
            
            if ContextInfo.stock in data:
                df = data[ContextInfo.stock]
                
                # 2. 标记正在计算
                ContextInfo.is_calculating = True
                
                # 3. 创建并启动线程
                # 注意:args 中传递的是 df (pandas对象) 和 queue,没有 ContextInfo
                t = threading.Thread(
                    target=heavy_calculation_task, 
                    args=(df, ContextInfo.msg_queue)
                )
                # 设置为守护线程,防止策略停止后线程还在跑
                t.setDaemon(True) 
                t.start()

关键点解析

  1. 数据隔离

    • handlebar(主线程)中调用 ContextInfo.get_market_data_ex 获取数据。
    • 将获取到的 pandas.DataFrame 传递给子线程。这样子线程操作的是纯 Python 对象,完全安全。
  2. 通信机制

    • 使用 queue.Queue。它是线程安全的,适合多线程间的数据交换。
    • 子线程用 put() 发送数据,主线程用 get_nowait() 接收数据。
  3. 非阻塞设计

    • handlebar 函数必须快速返回,不能被 join() 或死循环阻塞,否则会导致 QMT 界面卡死或行情接收延迟。
    • 通过 ContextInfo.is_calculating 标志位来控制线程的启动频率,避免同时开启过多线程耗尽资源。
  4. 交易执行

    • 所有的 passordercancel_order 等交易指令必须写在 handlebarinit 等主线程回调函数中。

常见错误(请务必避免)

  • 错误做法:把 ContextInfo 传给线程函数 threading.Thread(target=func, args=(ContextInfo,))
    • 后果:随机崩溃,报错 Access Violation。
  • 错误做法:在子线程中 import xtquant 并尝试创建新的连接。
    • 后果:在 QMT 内置 Python 中,通常复用 ContextInfo 提供的连接,重复创建连接可能导致冲突。
  • 错误做法:在 handlebar 中使用 thread.join() 等待子线程结束。
    • 后果:界面卡死,策略无法响应实时行情。

Q&A

Q: 为什么不能在子线程使用 ContextInfo?
A: ContextInfo 是 C++ 对象的 Python 包装器。QMT 的 C++ 宿主程序不是设计为线程安全的,跨线程调用其内存空间会导致未定义行为(通常是内存访问冲突)。

Q: 如果我的计算非常耗时(例如训练模型),会卡住 QMT 吗?
A: 如果你在 handlebar 直接运行耗时代码,会卡住 QMT。如果使用上述的多线程方法,将计算移至后台,主界面和行情接收不会受影响。

Q: 可以使用 multiprocessing 多进程吗?
A: 在 QMT 的嵌入式 Python 环境中,multiprocessing 可能会遇到 pickle 序列化问题或环境启动问题。通常建议使用 threading 处理 I/O 密集型任务。如果是 CPU 密集型任务且必须使用多进程,建议编写一个独立的外部 Python 脚本,通过文件、数据库或 Socket 与 QMT 策略进行通信,而不是在 QMT 内部直接启动子进程。