问题描述
解决方案
在聚宽(JoinQuant)的模拟交易环境中,由于系统会在每天收盘后保存策略状态并结束进程,第二天开盘前重启进程并恢复状态,因此数据库连接、文件句柄、网络套接字等涉及 I/O 的对象无法被持久化保存(无法被序列化)。
如果直接在 initialize 中建立连接并赋值给 g 变量,第二天重启时连接会丢失或失效。
要解决这个问题,必须使用 process_initialize(context) 函数,并配合 双下划线开头的全局变量。
核心解决方案
- 使用
process_initialize(context):该函数会在每次模拟盘进程启动时(包括每天早上的重启)执行。 - 使用
g.__变量:将数据库连接对象赋值给以双下划线开头(例如g.__conn)的全局变量。这告诉系统在保存状态时忽略该变量,避免因无法序列化而报错,同时确保每次重启后重新赋值。
代码实现示例
以下是一个连接 MySQL 数据库的完整示例代码:
# -*- coding: utf-8 -*-
import pymysql
def initialize(context):
# initialize 只在策略创建时执行一次,第二天重启不会执行
log.info("策略初始化")
# 这里不要建立数据库连接,也不要定义 g.__conn
def process_initialize(context):
"""
该函数会在每次模拟盘/回测进程重启时执行。
用于初始化不能被持久化保存的对象(如数据库连接)。
"""
log.info("进程启动/重启,正在建立数据库连接...")
# 建立数据库连接
# 注意:变量名必须以两个下划线开头 (g.__),防止系统尝试对其进行序列化保存
try:
g.__conn = pymysql.connect(
host='your_host',
user='your_username',
password='your_password',
database='your_db',
port=3306,
charset='utf8'
)
log.info("数据库连接成功")
except Exception as e:
log.error("数据库连接失败:Str(e)")
def handle_data(context, data):
# 在交易逻辑中使用数据库连接
if hasattr(g, '__conn') and g.__conn:
try:
# 使用连接进行查询
with g.__conn.cursor() as cursor:
sql = "SELECT * FROM your_table LIMIT 1"
cursor.execute(sql)
result = cursor.fetchone()
log.info("查询结果: " + str(result))
except Exception as e:
log.error("查询出错: " + str(e))
# 可以在这里添加重连逻辑
process_initialize(context)
def on_strategy_end(context):
# 策略结束时关闭连接
if hasattr(g, '__conn') and g.__conn:
g.__conn.close()
关键点解析
-
为什么不能用
initialize?initialize在模拟盘的整个生命周期中只运行一次。第二天重启时,系统是直接加载前一天保存的g对象,而不会重新运行initialize。因此,如果连接断开,initialize无法提供重连机制。
-
为什么要用
g.__variable(双下划线)?- 聚宽系统在每天收盘后会使用
pickle库将g对象保存到磁盘。 - 数据库连接对象(Connection Object)包含底层的 socket 连接,是不可序列化的。如果将其赋值给普通的
g.conn,系统在保存状态时会抛出异常,导致模拟盘运行失败。 - 以
__开头的变量(如g.__conn)会被系统自动忽略,不进行保存。配合process_initialize在每次启动时重新创建该对象,即可完美解决问题。
- 聚宽系统在每天收盘后会使用
Q&A 常见问题解答
Q: 如果我在盘中网络波动导致数据库断开怎么办?
A: process_initialize 仅处理进程重启时的连接。为了稳健性,建议在 handle_data 或其他定时函数中使用 try...except 捕获数据库异常,并在 except 块中调用 process_initialize(context) 或自定义的重连函数进行重连。
Q: process_initialize 在回测中会执行吗?
A: 会的。在回测中,process_initialize 会在 initialize 之后立即执行一次。如果回测因某种原因中断后恢复(虽然回测通常是一次性跑完),它也会被调用。主要用途还是为了兼容模拟盘的机制。
Q: 除了数据库连接,还有哪些对象需要这样处理?
A: 所有涉及 I/O 操作或无法被序列化的对象都需要这样处理,包括:
- 打开的文件句柄 (
open('file.txt')) - 网络连接对象 (
socket,requests.Session等) - 线程池或进程池对象
query()返回的查询对象(文档中明确提到g.query不能被保存,应使用g.__query并在process_initialize中初始化)。