问题描述
解决方案
在 QMT(Quantitative Trading Platform)中使用多线程需要非常小心。QMT 的底层架构是 C++,而 Python 策略是在嵌入式环境中运行的。
核心原则:绝对不要在子线程中直接访问或操作 ContextInfo 对象。
ContextInfo 是一个 C++ 封装对象,它不是线程安全的。如果在子线程中调用 ContextInfo.get_market_data、ContextInfo.passorder 或访问其属性,极大概率会导致软件崩溃(闪退)或数据竞争错误。
安全使用多线程的模式
为了安全地使用多线程(例如进行耗时的复杂计算、访问外部数据库或爬虫),应遵循以下“生产者-消费者”模式:
- 主线程(Handlebar):负责提取数据(将
ContextInfo中的数据转换为标准的 Python 数据类型,如 list, dict, pandas DataFrame)。 - 主线程:启动子线程,并将标准 Python 数据传递给子线程。
- 子线程:进行耗时计算,不触碰
ContextInfo。 - 子线程:将计算结果放入线程安全的队列(
queue.Queue)。 - 主线程(Handlebar):在下一轮循环中检查队列,取出结果,并调用
ContextInfo的交易函数进行下单。
代码实现示例
以下是一个完整的策略示例,展示了如何在不阻塞主界面/主策略循环的情况下,在后台线程中执行耗时任务,并安全地返回结果进行交易。
# -*- coding: gbk -*-
import threading
import queue
import time
import pandas as pd
# 模拟一个耗时的计算任务函数
# 注意:此函数不接受 ContextInfo 作为参数,只接受普通数据
def heavy_calculation_task(price_data, result_queue):
"""
后台计算任务
:param price_data: 标准 Python/Pandas 数据 (不要传 ContextInfo)
:param result_queue: 线程安全的队列,用于回传结果
"""
try:
# 模拟耗时操作,例如复杂的数学运算或外部 API 请求
# 在实际场景中,这里不要写死循环,应该是一个有限的任务
time.sleep(2)
# 简单的逻辑演示:如果最新价大于均价,返回买入信号
last_price = price_data['close'].iloc[-1]
mean_price = price_data['close'].mean()
signal = None
if last_price > mean_price:
signal = 'buy'
else:
signal = 'sell'
print(f"子线程计算完成: 价格={last_price}, 均价={mean_price:.2f}, 信号={signal}")
# 将结果放入队列
result_queue.put(signal)
except Exception as e:
print(f"子线程出错: {e}")
def init(ContextInfo):
# 1. 初始化一个线程安全的队列
ContextInfo.msg_queue = queue.Queue()
# 2. 设置一个标志位,防止线程重复启动(根据需求决定是否允许并发)
ContextInfo.is_calculating = False
# 3. 设置股票池
ContextInfo.stock = '600000.SH'
ContextInfo.set_universe([ContextInfo.stock])
ContextInfo.set_account('6000000248') # 请替换为真实账号
def handlebar(ContextInfo):
# --- 第一部分:检查子线程的计算结果 ---
# 使用 try-except 块非阻塞地获取队列内容
try:
# get_nowait() 如果队列为空会抛出 Empty 异常,不会阻塞主线程
while not ContextInfo.msg_queue.empty():
signal = ContextInfo.msg_queue.get_nowait()
# 收到结果,重置计算标志位
ContextInfo.is_calculating = False
print(f"主线程收到信号: {signal},准备执行交易")
# 在主线程中安全地使用 ContextInfo 进行下单
if signal == 'buy':
print("执行买入操作...")
# passorder(23, 1101, ContextInfo.accid, ContextInfo.stock, 5, -1, 100, ContextInfo)
elif signal == 'sell':
print("执行卖出操作...")
# passorder(24, 1101, ContextInfo.accid, ContextInfo.stock, 5, -1, 100, ContextInfo)
except queue.Empty:
pass
# --- 第二部分:触发后台计算任务 ---
# 如果当前没有正在进行的计算,且满足某些触发条件(这里简单演示每根K线触发一次)
if not ContextInfo.is_calculating:
if ContextInfo.is_new_bar(): # 仅在K线更新时触发,避免过于频繁
print("启动后台线程进行计算...")
# 1. 【关键】先在主线程获取数据
# 必须在这里调用 get_market_data_ex,不能在子线程调用
data = ContextInfo.get_market_data_ex(
['close'],
[ContextInfo.stock],
period='1d',
count=20,
dividend_type='follow'
)
if ContextInfo.stock in data:
df = data[ContextInfo.stock]
# 2. 标记正在计算
ContextInfo.is_calculating = True
# 3. 创建并启动线程
# 注意:args 中传递的是 df (pandas对象) 和 queue,没有 ContextInfo
t = threading.Thread(
target=heavy_calculation_task,
args=(df, ContextInfo.msg_queue)
)
# 设置为守护线程,防止策略停止后线程还在跑
t.setDaemon(True)
t.start()
关键点解析
-
数据隔离:
- 在
handlebar(主线程)中调用ContextInfo.get_market_data_ex获取数据。 - 将获取到的
pandas.DataFrame传递给子线程。这样子线程操作的是纯 Python 对象,完全安全。
- 在
-
通信机制:
- 使用
queue.Queue。它是线程安全的,适合多线程间的数据交换。 - 子线程用
put()发送数据,主线程用get_nowait()接收数据。
- 使用
-
非阻塞设计:
handlebar函数必须快速返回,不能被join()或死循环阻塞,否则会导致 QMT 界面卡死或行情接收延迟。- 通过
ContextInfo.is_calculating标志位来控制线程的启动频率,避免同时开启过多线程耗尽资源。
-
交易执行:
- 所有的
passorder、cancel_order等交易指令必须写在handlebar或init等主线程回调函数中。
- 所有的
常见错误(请务必避免)
- 错误做法:把
ContextInfo传给线程函数threading.Thread(target=func, args=(ContextInfo,))。- 后果:随机崩溃,报错 Access Violation。
- 错误做法:在子线程中
import xtquant并尝试创建新的连接。- 后果:在 QMT 内置 Python 中,通常复用
ContextInfo提供的连接,重复创建连接可能导致冲突。
- 后果:在 QMT 内置 Python 中,通常复用
- 错误做法:在
handlebar中使用thread.join()等待子线程结束。- 后果:界面卡死,策略无法响应实时行情。
Q&A
Q: 为什么不能在子线程使用 ContextInfo?
A: ContextInfo 是 C++ 对象的 Python 包装器。QMT 的 C++ 宿主程序不是设计为线程安全的,跨线程调用其内存空间会导致未定义行为(通常是内存访问冲突)。
Q: 如果我的计算非常耗时(例如训练模型),会卡住 QMT 吗?
A: 如果你在 handlebar 直接运行耗时代码,会卡住 QMT。如果使用上述的多线程方法,将计算移至后台,主界面和行情接收不会受影响。
Q: 可以使用 multiprocessing 多进程吗?
A: 在 QMT 的嵌入式 Python 环境中,multiprocessing 可能会遇到 pickle 序列化问题或环境启动问题。通常建议使用 threading 处理 I/O 密集型任务。如果是 CPU 密集型任务且必须使用多进程,建议编写一个独立的外部 Python 脚本,通过文件、数据库或 Socket 与 QMT 策略进行通信,而不是在 QMT 内部直接启动子进程。