消息调度机制
消息防抖、去重、队列——OpenVort 如何可靠地处理并发消息。
概述
用户通过 IM 或 Web 发送消息时,消息不会直接进入 AI Agent 处理。中间经过 Dispatcher(消息调度器)进行防抖、去重和队列管理,确保消息被正确、高效地处理。
消息处理流程
用户消息
│
▼
IM Channel / Web API
│
▼
Dispatcher (消息调度器)
├── 防抖:2 秒内的连续消息合并
├── 去重:多实例部署时防止重复处理
└── 队列:处理期间新消息排队
│
▼
Agent Runtime (AI 处理)
│
▼
回复用户
防抖 (Debounce)
用户在聊天中经常会连续发送多条消息(打字快的人尤其明显)。如果每条消息都单独触发一次 AI 处理,会导致:
- AI 只看到部分信息就开始回复
- 多次处理浪费资源
- 用户收到多条不完整的回复
机制
收到消息后,Dispatcher 等待 2 秒。如果 2 秒内有新消息到达,重置计时器继续等待。直到 2 秒内没有新消息,才将所有累积的消息合并提交给 Agent。
时间线:
0.0s 收到 "帮我" → 开始 2s 计时
0.5s 收到 "查一下" → 重置计时器
1.2s 收到 "Jenkins 构建状态" → 重置计时器
3.2s 2s 无新消息 → 合并为 "帮我查一下Jenkins 构建状态" → 提交 Agent
图片累积
防抖期间,图片也会被累积。用户先发文字再发图片是常见操作,防抖确保它们一起被 AI 看到。
去重 (Dedup)
场景
IM 平台(尤其是企微)可能会重复推送同一条消息。OpenVort 部署多实例时,同一条消息会被每个实例都收到。
机制:InboxService
基于 PostgreSQL 的分布式消息去重:
- 每条消息有唯一 ID(由 IM 平台分配)
- 收到消息时,执行
INSERT ... ON CONFLICT DO NOTHING - 插入成功 → 这是新消息,交给 Dispatcher 处理
- 插入冲突 → 这是重复消息,直接丢弃
这利用了 PostgreSQL 的原子操作,即使多个实例同时收到同一条消息,也只有一个实例会处理。
队列 (Pending Queue)
场景
AI 正在处理用户的消息 A,这时用户又发了消息 B 和 C。
机制
- 每个用户有一把 per-user 锁,同一时间只有一个处理任务在执行
- 处理期间的新消息进入 pending 队列(上限 10 条)
- 当前处理完成后,从队列中取出所有待处理消息,合并后再次提交 Agent
时间线:
T0 消息 A 开始处理 (Agent 运行中)
T1 消息 B 到达 → 入队
T2 消息 C 到达 → 入队
T3 消息 A 处理完成 → 取出 B+C 合并 → 开始处理
中间回复丢弃
一个特别的优化:如果 Agent 正在处理消息 A 并即将生成回复,但此时发现队列中有新消息,系统会丢弃当前回复,将新消息和已有上下文一起重新提交 Agent。
这是因为新消息可能改变了用户意图,基于旧上下文生成的回复可能已经不合适了。
对用户的影响
这些机制对用户是完全透明的。用户只需要正常聊天,不用担心:
- 连续发消息会不会被拆开处理 → 不会(防抖合并)
- 消息会不会被重复处理 → 不会(去重)
- AI 在忙时发消息会不会丢 → 不会(队列排队)
可配置参数
| 参数 | 默认值 | 说明 |
|---|---|---|
| 防抖等待时间 | 2 秒 | 连续消息的合并窗口 |
| 队列上限 | 10 条 | 处理期间最多缓存的新消息数 |