问题描述
解决方案
在 QMT 策略代码中连接 MySQL 数据库,通常使用 Python 的第三方库 pymysql 或 sqlalchemy。由于 QMT 自带独立的 Python 环境,首先必须确保在该环境中安装了相应的库。
以下是详细的步骤和完整的策略代码示例。
第一步:安装 pymysql 库
QMT 的 Python 环境通常位于安装目录下的 bin.x64 文件夹中。你需要将库安装到 QMT 的 site-packages 目录。
- 找到 QMT 安装目录(例如
D:\QMT)。 - 打开系统的命令行(CMD)。
- 执行以下命令(请根据你的实际安装路径修改路径):
pip install pymysql --target=D:\QMT\bin.x64\Lib\site-packages
第二步:编写策略代码
以下是一个完整的策略示例,展示了如何在 init 中建立连接,并在 handlebar 中将行情数据写入数据库。
注意:
- 请将
db_config中的数据库信息修改为你自己的配置。 - 建议在数据库中预先创建好表结构(例如表名为
stock_prices)。 - 为了防止回测时产生大量数据库操作,示例中添加了
is_last_bar()判断,仅在实时行情的最新一根 K 线触发写入。
# -*- coding: gbk -*-
import pymysql
import time
def init(ContextInfo):
"""
初始化函数:建立数据库连接
"""
print("策略初始化...")
# 1. 设置股票池 (示例)
ContextInfo.set_universe(['600000.SH'])
# 2. 数据库配置信息 (请修改为你的实际配置)
ContextInfo.db_config = {
'host': '127.0.0.1', # 数据库地址
'port': 3306, # 端口
'user': 'root', # 用户名
'password': 'your_password', # 密码
'database': 'quant_db', # 数据库名
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 3. 建立连接并存储在 ContextInfo 中
connect_db(ContextInfo)
def connect_db(ContextInfo):
"""
辅助函数:建立或重连数据库
"""
try:
ContextInfo.conn = pymysql.connect(**ContextInfo.db_config)
print("MySQL 数据库连接成功")
except Exception as e:
print(f"MySQL 连接失败: {e}")
ContextInfo.conn = None
def handlebar(ContextInfo):
"""
行情驱动函数:获取数据并写入数据库
"""
# 仅在最后一根K线(实时行情)运行,避免回测时频繁写入
if not ContextInfo.is_last_bar():
return
# 获取当前主图代码
stock_code = ContextInfo.stockcode
# 获取最新行情数据 (使用 get_market_data_ex)
# 获取最近1条数据
data = ContextInfo.get_market_data_ex(
['close', 'open', 'high', 'low', 'volume'],
[stock_code],
period=ContextInfo.period,
count=1
)
if stock_code not in data or data[stock_code].empty:
return
# 提取最新的一行数据
latest_bar = data[stock_code].iloc[-1]
current_price = float(latest_bar['close'])
current_time = latest_bar.name # 获取时间索引
print(f"当前 {stock_code} 价格: {current_price}, 时间: {current_time}")
# 写入数据库操作
insert_data_to_db(ContextInfo, stock_code, current_time, current_price)
def insert_data_to_db(ContextInfo, code, time_str, price):
"""
辅助函数:执行 SQL 插入
"""
# 检查连接是否断开,如果断开则重连
if ContextInfo.conn is None or not ContextInfo.conn.open:
print("连接已断开,尝试重连...")
connect_db(ContextInfo)
if ContextInfo.conn is None:
return # 重连失败,放弃本次写入
try:
with ContextInfo.conn.cursor() as cursor:
# 假设数据库中有一个表叫 stock_prices,包含 code, time, price 字段
# SQL 语句
sql = "INSERT INTO stock_prices (code, time, price) VALUES (%s, %s, %s)"
# 执行插入
cursor.execute(sql, (code, str(time_str), price))
# 提交事务
ContextInfo.conn.commit()
print("数据写入成功")
except Exception as e:
print(f"写入数据库出错: {e}")
# 出错回滚
ContextInfo.conn.rollback()
# 注意:QMT 没有显式的 stop 函数来关闭连接,
# 但 Python 进程结束时通常会自动释放资源。
# 如果需要严谨处理,可以在特定逻辑下调用 ContextInfo.conn.close()
关键点说明
-
ContextInfo 全局传递:
我们将数据库连接对象conn挂载在ContextInfo上(ContextInfo.conn)。这是因为init和handlebar是不同的作用域,通过ContextInfo可以实现跨函数共享连接对象,避免每次handlebar触发时都重新建立连接(那样会极其消耗资源并导致连接数耗尽)。 -
断线重连机制:
在insert_data_to_db函数中,我们检查了ContextInfo.conn.open。如果连接断开(例如网络波动或数据库重启),代码会尝试重新调用connect_db。 -
性能影响:
数据库 I/O 操作(尤其是写入)是阻塞式的。如果网络延迟较高,handlebar函数的执行会被卡住,可能导致策略错过后续的行情 Tick 或下单机会。- 建议:对于高频策略,不要在主线程直接写库,建议使用 Python 的
threading模块开启子线程异步写入,或者使用消息队列(如 Redis)做缓冲。
- 建议:对于高频策略,不要在主线程直接写库,建议使用 Python 的
-
编码格式:
文件头部必须包含# -*- coding: gbk -*-,否则中文注释或字符串可能会导致乱码或报错。
Q&A
Q: 为什么运行代码提示 No module named 'pymysql'?
A: 这是因为 QMT 自带的 Python 环境没有安装该库。请务必按照“第一步”中的说明,使用 pip 将库安装到 QMT 的 bin.x64\Lib\site-packages 目录下,而不是安装到你系统默认的 Python 环境中。
Q: 可以使用 SQLAlchemy 吗?
A: 可以。安装方法相同(pip install sqlalchemy 到 QMT 目录)。使用 SQLAlchemy 可以更方便地处理 Pandas DataFrame 的写入(使用 df.to_sql),但对于简单的插入操作,pymysql 更轻量。
Q: 可以在回测模式下连接数据库吗?
A: 可以,但要非常小心。回测速度非常快,如果每一根 K 线都写库,会极大地拖慢回测速度并可能导致数据库崩溃。建议在回测模式下将数据先存入内存(如 List),在回测结束时批量写入,或者仅在实盘模式下开启写库功能。