问题描述
/
解决方案
在 ptrade 量化交易平台中,run_interval 函数用于以设定的时间间隔(最小3秒)周期性运行指定函数。根据官方 API 文档,当多次设定 run_interval 时,这些定时任务会以多个线程并行运行。这就意味着,如果多个线程同时访问或修改同一个全局变量(如 g 对象中的属性),就会产生竞态条件(Race Condition),从而导致数据不一致或重复执行下单逻辑。
要处理这种线程间的逻辑关联,可以采用以下几种方法:
1. 使用状态标志位(Flag)进行基础控制
对于简单的防重复执行场景,可以在全局对象 g 中设置状态标志位。在执行关键逻辑(如下单)前检查标志位,执行后更新标志位。
def initialize(context):
g.security = '600570.SS'
set_universe(g.security)
g.has_ordered = False # 下单状态标志
# 设置两个并行运行的定时任务
run_interval(context, task_A, seconds=5)
run_interval(context, task_B, seconds=5)
def task_A(context):
if not g.has_ordered:
# 执行策略逻辑 A
pass
def task_B(context):
if not g.has_ordered:
# 执行策略逻辑 B
pass
注意:这种方法在极高并发下仍可能存在微小的时间差导致同时进入判断,适用于对并发要求不那么严苛的场景。
2. 引入 Python 线程锁(Threading Lock)
为了绝对保证数据的一致性,最标准的多线程处理方式是引入 Python 内置的 threading.Lock。在修改共享变量或执行下单操作时加锁,确保同一时刻只有一个线程能执行该代码块。
import threading
def initialize(context):
g.security = '600570.SS'
set_universe(g.security)
# 初始化线程锁(注意:带有下划线开头的变量不会被持久化保存)
g.__lock = threading.Lock()
g.shared_counter = 0
run_interval(context, task_A, seconds=3)
run_interval(context, task_B, seconds=3)
def task_A(context):
# 获取锁
with g.__lock:
# 在锁的保护下修改共享变量
g.shared_counter += 1
log.info(f"Task A updated counter to {g.shared_counter}")
# 执行下单等关键操作
def task_B(context):
# 获取锁
with g.__lock:
g.shared_counter += 1
log.info(f"Task B updated counter to {g.shared_counter}")
3. 逻辑解耦与数据隔离
尽量避免在多个 run_interval 任务中修改同一个全局变量。可以将不同的任务分配给不同的标的,或者让一个任务专门负责数据收集与计算,另一个任务专门负责下单执行。
- 数据收集线程:负责调用
get_snapshot、get_gear_price等接口获取行情,并更新到g.market_data中。 - 交易执行线程:定期检查
g.market_data中的信号,满足条件则执行order,并更新自身的交易状态。
总结与注意事项
- 明确多线程特性:
run_interval是多线程并行的,必须具备并发编程思维。 - 保护关键操作:下单接口(如
order、order_tick)和共享状态的修改必须被保护(使用锁或严格的逻辑隔离)。 - 持久化问题:如果使用了
threading.Lock(),请务必将其命名为私有变量(如g.__lock),因为锁对象是无法被pickle序列化保存的,否则会导致框架持久化报错。