消息调度机制

消息防抖、去重、队列——OpenVort 如何可靠地处理并发消息。

概述

用户通过 IM 或 Web 发送消息时,消息不会直接进入 AI Agent 处理。中间经过 Dispatcher(消息调度器)进行防抖、去重和队列管理,确保消息被正确、高效地处理。

消息处理流程

用户消息
  │
  ▼
IM Channel / Web API
  │
  ▼
Dispatcher (消息调度器)
  ├── 防抖:2 秒内的连续消息合并
  ├── 去重:多实例部署时防止重复处理
  └── 队列:处理期间新消息排队
  │
  ▼
Agent Runtime (AI 处理)
  │
  ▼
回复用户

防抖 (Debounce)

用户在聊天中经常会连续发送多条消息(打字快的人尤其明显)。如果每条消息都单独触发一次 AI 处理,会导致:

  • AI 只看到部分信息就开始回复
  • 多次处理浪费资源
  • 用户收到多条不完整的回复

机制

收到消息后,Dispatcher 等待 2 秒。如果 2 秒内有新消息到达,重置计时器继续等待。直到 2 秒内没有新消息,才将所有累积的消息合并提交给 Agent。

时间线:
  0.0s  收到 "帮我" → 开始 2s 计时
  0.5s  收到 "查一下" → 重置计时器
  1.2s  收到 "Jenkins 构建状态" → 重置计时器
  3.2s  2s 无新消息 → 合并为 "帮我查一下Jenkins 构建状态" → 提交 Agent

图片累积

防抖期间,图片也会被累积。用户先发文字再发图片是常见操作,防抖确保它们一起被 AI 看到。

去重 (Dedup)

场景

IM 平台(尤其是企微)可能会重复推送同一条消息。OpenVort 部署多实例时,同一条消息会被每个实例都收到。

机制:InboxService

基于 PostgreSQL 的分布式消息去重:

  1. 每条消息有唯一 ID(由 IM 平台分配)
  2. 收到消息时,执行 INSERT ... ON CONFLICT DO NOTHING
  3. 插入成功 → 这是新消息,交给 Dispatcher 处理
  4. 插入冲突 → 这是重复消息,直接丢弃

这利用了 PostgreSQL 的原子操作,即使多个实例同时收到同一条消息,也只有一个实例会处理。

队列 (Pending Queue)

场景

AI 正在处理用户的消息 A,这时用户又发了消息 B 和 C。

机制

  • 每个用户有一把 per-user 锁,同一时间只有一个处理任务在执行
  • 处理期间的新消息进入 pending 队列(上限 10 条)
  • 当前处理完成后,从队列中取出所有待处理消息,合并后再次提交 Agent
时间线:
  T0  消息 A 开始处理 (Agent 运行中)
  T1  消息 B 到达 → 入队
  T2  消息 C 到达 → 入队
  T3  消息 A 处理完成 → 取出 B+C 合并 → 开始处理

中间回复丢弃

一个特别的优化:如果 Agent 正在处理消息 A 并即将生成回复,但此时发现队列中有新消息,系统会丢弃当前回复,将新消息和已有上下文一起重新提交 Agent。

这是因为新消息可能改变了用户意图,基于旧上下文生成的回复可能已经不合适了。

对用户的影响

这些机制对用户是完全透明的。用户只需要正常聊天,不用担心:

  • 连续发消息会不会被拆开处理 → 不会(防抖合并)
  • 消息会不会被重复处理 → 不会(去重)
  • AI 在忙时发消息会不会丢 → 不会(队列排队)

可配置参数

参数默认值说明
防抖等待时间2 秒连续消息的合并窗口
队列上限10 条处理期间最多缓存的新消息数