问题描述
联储证券 QMT)python 原生链接 miniqmt,开盘前正常 subscribe_whole_quote 进行行情订阅,存在一定概率(概率还挺高的)在盘中在回调中收不到任何数据推送了。底下的看到是 RPC 的,但具体也没有办法定位问题了,请帮忙回答一下是什么原因,同时有什么办法自愈一下吗?谢谢。
解决方案
在使用联储证券 QMT(通过 Python 原生连接 miniqmt)时,盘中出现 subscribe_whole_quote 行情推送中断是一个比较常见的问题。底层通信确实是基于 RPC(Remote Procedure Call)的。
一、 导致行情推送中断的可能原因
- 网络波动与 RPC 超时:底层 RPC 连接对网络稳定性要求较高。盘中如果出现短暂的网络抖动,可能导致 RPC 连接假死(半连接状态),服务端以为还在推送,但客户端已经收不到了。
- 全市场订阅数据量过大:
subscribe_whole_quote会推送全市场的数据,盘中行情剧烈波动时,瞬间的数据吞吐量极大。如果 Python 端的callback回调函数处理逻辑过重(如进行了复杂的计算、写文件或数据库操作),会导致底层接收队列阻塞,最终触发 RPC 缓冲区溢出或超时断开。 - 券商行情服务器压力:部分券商的行情服务器在早盘高峰期压力较大,可能会主动断开一些占用资源过大的全市场订阅连接。
- QMT 客户端内存泄漏或假死:miniqmt 依赖于运行在本地的 QMT 客户端,如果 QMT 客户端本身出现卡顿或内存占用过高,也会导致推送停止。
二、 自愈方案(心跳检测与自动重连)
要解决这个问题,最有效的方法是在 Python 代码中实现心跳检测(Watchdog)机制。基本思路是:记录最后一次收到行情数据的时间,如果超过一定时间(例如 10 秒)没有收到任何数据,则认为连接已断开,主动执行退订并重新订阅。
自愈代码示例思路
import time
import threading
from xtquant import xtdata
class QuoteManager:
def __init__(self):
self.last_receive_time = time.time()
self.sub_id = None
self.is_running = True
self.timeout_threshold = 10 # 10秒未收到数据认为断线
def on_quote_callback(self, data):
# 1. 更新最后接收时间
self.last_receive_time = time.time()
# 2. 处理行情数据(注意:回调函数内一定要轻量,不要做耗时操作!)
# 如果有复杂逻辑,建议放入队列(queue),由其他线程异步处理
# print("收到行情:", data)
def subscribe(self):
print("开始订阅全市场行情...")
# 假设订阅全市场沪深A股
self.sub_id = xtdata.subscribe_whole_quote(['SH', 'SZ'], callback=self.on_quote_callback)
self.last_receive_time = time.time()
def unsubscribe(self):
if self.sub_id is not None:
print("取消订阅...")
xtdata.unsubscribe_quote(self.sub_id)
self.sub_id = None
def watchdog_loop(self):
while self.is_running:
time.sleep(3)
# 检查当前时间与最后接收时间的差值
if time.time() - self.last_receive_time > self.timeout_threshold:
print(f"警告:超过 {self.timeout_threshold} 秒未收到行情,触发自愈重连机制!")
self.unsubscribe()
time.sleep(1) # 稍微等待底层释放资源
self.subscribe()
def start(self):
self.subscribe()
# 启动看门狗线程
self.watchdog_thread = threading.Thread(target=self.watchdog_loop, daemon=True)
self.watchdog_thread.start()
def stop(self):
self.is_running = False
self.unsubscribe()
# 使用示例
if __name__ == '__main__':
manager = QuoteManager()
manager.start()
try:
# 保持主线程运行
while True:
time.sleep(1)
except KeyboardInterrupt:
manager.stop()
三、 优化建议
- 回调函数极简化:绝对不要在
callback中进行耗时的 IO 操作(如写数据库、复杂计算)。建议将收到的数据直接put到queue.Queue中,由另外的消费者线程去处理。 - 按需订阅代替全市场订阅:如果你的策略只需要监控几百只股票,强烈建议改用
subscribe_quote传入具体的股票列表,而不是使用subscribe_whole_quote。这能极大降低底层 RPC 的压力,显著减少断线概率。 - 定期重启 QMT:如果条件允许,可以通过脚本在每天盘前自动重启一次 QMT 客户端,保持环境的干净。