🚀 AI 一键生成 qmt 策略代码

QMT 平台 download_history_data2 调用卡住无超时时间问题解析

qmt | 发布时间: 2026/5/22
以下内容由 EasyQuant 生成。

问题描述

download_history_data2等调用没有超时时间的设计?有时候会卡住

download_history_data2 有时会卡在 result 为真的 sleep 调用里面,这个是已知问题吗?是不是可以加个超时时间。同样的,其他一些调用也建议加个全局超时时间以及调用局部的超时时间。

result = client.supply_history_data2(stock_list, spec_period, start_time, end_time, _BSON_.BSON.encode(param), on_progress)
if not result:
import time
try:
while not status[0] and client.is_connected():
_TIME_.sleep(0.1)
except:
if status[1] < status[2]:
client.stop_supply_history_data2()
_TRACEBACK_.print_exc()
if not client.is_connected():
raise Exception('行情服务连接断开')
if status[3]:
raise Exception('下载数据失败:' + status[3])
else:
while not status[0] and client.is_connected():
_TIME_.sleep(0.1)

return status[4]
result = client.supply_history_data2(stock_list, spec_period, start_time, end_time, _BSON_.BSON.encode(param), on_progress)
if not result:
import time
try:
while not status[0] and client.is_connected():
_TIME_.sleep(0.1)
except:
if status[1] < status[2]:
client.stop_supply_history_data2()
_TRACEBACK_.print_exc()
if not client.is_connected():
raise Exception('行情服务连接断开')
if status[3]:
raise Exception('下载数据失败:' + status[3])
else:
while not status[0] and client.is_connected():
_TIME_.sleep(0.1)

return status[4]

解决方案

在 QMT 平台的 Python API 中,download_history_data2(或底层调用的 supply_history_data2)等数据下载接口在设计上确实存在一些可能导致调用卡住的情况。您提到的代码片段显示,当 result 为真时,程序会进入一个 while 循环,不断检查 status[0]client.is_connected(),并在循环中 sleep(0.1)。如果因为网络波动、服务端异常或数据量过大导致回调函数(on_progress)未能正确更新 status[0],且连接未断开,这个循环就会变成死循环,导致程序卡住。

问题分析

  1. 缺乏内置超时机制:目前的 API 实现中,while 循环依赖于状态标志位或连接状态的改变来退出。如果没有设置最大等待时间,一旦服务端没有响应且连接保持,客户端就会无限期等待。
  2. 网络或服务端异常:在请求大量历史数据时,服务端处理可能需要较长时间,或者在传输过程中发生静默丢包,导致客户端收不到完成信号。

解决思路与建议

虽然官方 API 底层可能暂时没有提供直接的超时参数,但您可以在自己的策略代码中通过封装来增加超时控制:

1. 自定义超时封装

您可以使用 Python 的 threadingconcurrent.futures 模块,将数据下载操作放入一个单独的线程中执行,并在主线程中设置超时等待。

import threading
import time

def download_data_with_timeout(client, stock_list, period, start_time, end_time, timeout=60):
    result_container = {'data': None, 'error': None, 'done': False}
    
    def target_func():
        try:
            # 假设这是您调用下载数据的函数
            # data = client.download_history_data2(...)
            # result_container['data'] = data
            pass
        except Exception as e:
            result_container['error'] = e
        finally:
            result_container['done'] = True

    thread = threading.Thread(target=target_func)
    thread.start()
    thread.join(timeout)

    if thread.is_alive():
        # 超时处理
        # 注意:Python 中强制终止线程比较复杂,通常建议设置标志位让线程自行退出
        # 或者在可能的情况下调用 client.stop_supply_history_data2()
        raise TimeoutError(f"数据下载超时 ({timeout}s)")
    
    if result_container['error']:
        raise result_container['error']
        
    return result_container['data']

2. 分批下载数据

为了减少单次请求卡住的概率,建议将大数据量的请求拆分为多个小时间段或小批量股票的请求。这样即使某次请求失败或超时,重试的成本也较低。

3. 官方反馈

这是一个非常合理的建议。为网络请求和数据下载接口增加全局和局部的超时参数(如 timeout 参数),是提高 API 健壮性的标准做法。建议您将此问题及代码片段反馈给 QMT 官方技术支持,促使他们在后续版本中优化底层 while 循环的逻辑,增加超时退出机制。

在官方更新之前,建议在策略层面做好异常捕获和超时控制,以保证量化程序的稳定运行。