🚀 AI 一键生成 qmt 策略代码
立即体验
👉 AI 生成代码 👈

联储证券QMT/miniqmt盘中行情推送中断怎么办?

qmt | 发布时间: 2026/6/2
以下内容由 EasyQuant 生成。

问题描述

联储证券QMT数据推送问题

联储证券 QMT)python 原生链接 miniqmt,开盘前正常 subscribe_whole_quote 进行行情订阅,存在一定概率(概率还挺高的)在盘中在回调中收不到任何数据推送了。底下的看到是 RPC 的,但具体也没有办法定位问题了,请帮忙回答一下是什么原因,同时有什么办法自愈一下吗?谢谢。

解决方案

在使用联储证券 QMT(通过 Python 原生连接 miniqmt)时,盘中出现 subscribe_whole_quote 行情推送中断是一个比较常见的问题。底层通信确实是基于 RPC(Remote Procedure Call)的。

一、 导致行情推送中断的可能原因

  1. 网络波动与 RPC 超时:底层 RPC 连接对网络稳定性要求较高。盘中如果出现短暂的网络抖动,可能导致 RPC 连接假死(半连接状态),服务端以为还在推送,但客户端已经收不到了。
  2. 全市场订阅数据量过大subscribe_whole_quote 会推送全市场的数据,盘中行情剧烈波动时,瞬间的数据吞吐量极大。如果 Python 端的 callback 回调函数处理逻辑过重(如进行了复杂的计算、写文件或数据库操作),会导致底层接收队列阻塞,最终触发 RPC 缓冲区溢出或超时断开。
  3. 券商行情服务器压力:部分券商的行情服务器在早盘高峰期压力较大,可能会主动断开一些占用资源过大的全市场订阅连接。
  4. QMT 客户端内存泄漏或假死:miniqmt 依赖于运行在本地的 QMT 客户端,如果 QMT 客户端本身出现卡顿或内存占用过高,也会导致推送停止。

二、 自愈方案(心跳检测与自动重连)

要解决这个问题,最有效的方法是在 Python 代码中实现心跳检测(Watchdog)机制。基本思路是:记录最后一次收到行情数据的时间,如果超过一定时间(例如 10 秒)没有收到任何数据,则认为连接已断开,主动执行退订并重新订阅。

自愈代码示例思路

import time
import threading
from xtquant import xtdata

class QuoteManager:
    def __init__(self):
        self.last_receive_time = time.time()
        self.sub_id = None
        self.is_running = True
        self.timeout_threshold = 10  # 10秒未收到数据认为断线

    def on_quote_callback(self, data):
        # 1. 更新最后接收时间
        self.last_receive_time = time.time()
        
        # 2. 处理行情数据(注意:回调函数内一定要轻量,不要做耗时操作!)
        # 如果有复杂逻辑,建议放入队列(queue),由其他线程异步处理
        # print("收到行情:", data)

    def subscribe(self):
        print("开始订阅全市场行情...")
        # 假设订阅全市场沪深A股
        self.sub_id = xtdata.subscribe_whole_quote(['SH', 'SZ'], callback=self.on_quote_callback)
        self.last_receive_time = time.time()

    def unsubscribe(self):
        if self.sub_id is not None:
            print("取消订阅...")
            xtdata.unsubscribe_quote(self.sub_id)
            self.sub_id = None

    def watchdog_loop(self):
        while self.is_running:
            time.sleep(3)
            # 检查当前时间与最后接收时间的差值
            if time.time() - self.last_receive_time > self.timeout_threshold:
                print(f"警告:超过 {self.timeout_threshold} 秒未收到行情,触发自愈重连机制!")
                self.unsubscribe()
                time.sleep(1)  # 稍微等待底层释放资源
                self.subscribe()

    def start(self):
        self.subscribe()
        # 启动看门狗线程
        self.watchdog_thread = threading.Thread(target=self.watchdog_loop, daemon=True)
        self.watchdog_thread.start()

    def stop(self):
        self.is_running = False
        self.unsubscribe()

# 使用示例
if __name__ == '__main__':
    manager = QuoteManager()
    manager.start()
    
    try:
        # 保持主线程运行
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        manager.stop()

三、 优化建议

  1. 回调函数极简化:绝对不要在 callback 中进行耗时的 IO 操作(如写数据库、复杂计算)。建议将收到的数据直接 putqueue.Queue 中,由另外的消费者线程去处理。
  2. 按需订阅代替全市场订阅:如果你的策略只需要监控几百只股票,强烈建议改用 subscribe_quote 传入具体的股票列表,而不是使用 subscribe_whole_quote。这能极大降低底层 RPC 的压力,显著减少断线概率。
  3. 定期重启 QMT:如果条件允许,可以通过脚本在每天盘前自动重启一次 QMT 客户端,保持环境的干净。