第 19 章:发布-订阅——多 Agent 通信的广播机制

个人公众号

难度:中等

三个 Agent 在讨论中轮流发言,每个人说的话其他人都能看到。这不是轮询,而是自动广播——MsgHub 是怎么做到的?

上一章:第 18 章 中间件与洋葱模型

知识补全:发布-订阅模式

发布-订阅模式(Pub-Sub) 的核心思想:发布者不直接发送消息给接收者,而是把消息广播出去,所有订阅者自动收到。

1
2
3
4
5
6
7
8
9
10
11
发布者 (Agent A)

▼ 发布消息
┌─────────┐
│ MsgHub │ ← 消息总线
│ (调度中心)│
└─────────┘
│ │ │
▼ ▼ ▼
订阅者 订阅者 订阅者
(B) (C) (D)

关键优势:发布者不需要知道谁在监听。新增一个订阅者不需要修改发布者的代码。


AgentScope 的实现:三层协作

发布-订阅在 AgentScope 中由三个部分协作完成:

1
2
3
4
flowchart TD
A["MsgHub (调度中心)"] -->|"进入时注册订阅"| B["Agent._subscribers"]
C["Agent.__call__()"] -->|"reply 后广播"| D["Agent._broadcast_to_subscribers()"]
D -->|"遍历 _subscribers"| E["subscriber.observe()"]

第一层:AgentBase._subscribers

打开 src/agentscope/agent/_agent_base.py,第 168 行:

1
2
# _agent_base.py:168
self._subscribers: dict[str, list[AgentBase]] = {}

字典的 key 是 MsgHub 的名称,value 是订阅者列表。一个 Agent 可以同时参与多个 MsgHub。

第二层:Agent.call() 中的广播

__call__ 方法(第 448 行)在 reply 完成后自动广播:

1
2
3
4
5
6
7
8
9
# _agent_base.py:448-467
async def __call__(self, *args, **kwargs) -> Msg:
reply_msg = await self.reply(*args, **kwargs)
try:
...
finally:
if reply_msg:
await self._broadcast_to_subscribers(reply_msg)
return reply_msg

_broadcast_to_subscribers(第 469 行)遍历所有订阅者:

1
2
3
4
5
6
7
# _agent_base.py:469-485
async def _broadcast_to_subscribers(self, msg):
broadcast_msg = self._strip_thinking_blocks(msg)

for subscribers in self._subscribers.values():
for subscriber in subscribers:
await subscriber.observe(broadcast_msg)

注意 _strip_thinking_blocks——思考块(模型内部推理过程)在广播前被移除,因为它们不应该暴露给其他 Agent。

第三层:MsgHub 的订阅管理

MsgHubsrc/agentscope/pipeline/_msghub.py:14)是一个异步上下文管理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# _msghub.py:42-71
class MsgHub:
def __init__(self, participants, announcement=None,
enable_auto_broadcast=True):
self.name = name or shortuuid.uuid()
self.participants = list(participants)
self.enable_auto_broadcast = enable_auto_broadcast

async def __aenter__(self):
self._reset_subscriber() # 注册订阅关系
if self.announcement:
await self.broadcast(self.announcement) # 广播初始消息
return self

async def __aexit__(self, *args):
if self.enable_auto_broadcast:
for agent in self.participants:
agent.remove_subscribers(self.name) # 清理订阅

_reset_subscriber(第 89 行)把所有参与者注册为彼此的订阅者:

1
2
3
4
5
# _msghub.py:89-93
def _reset_subscriber(self):
if self.enable_auto_broadcast:
for agent in self.participants:
agent.reset_subscribers(self.name, self.participants)

完整的消息流转

假设三个 Agent 在 MsgHub 中:

1
2
3
async with MsgHub(participants=[agent1, agent2, agent3]):
msg1 = await agent1()
msg2 = await agent2()

进入 MsgHub 时

  1. _reset_subscriber() 被调用
  2. agent1 的 _subscribers["hub_x"] = [agent2, agent3]
  3. agent2 的 _subscribers["hub_x"] = [agent1, agent3]
  4. agent3 的 _subscribers["hub_x"] = [agent1, agent2]

agent1() 被调用时

  1. agent1.reply() 执行,返回 msg1
  2. _broadcast_to_subscribers(msg1) 被调用
  3. agent2.observe(msg1) → agent2 的记忆中加入了 msg1
  4. agent3.observe(msg1) → agent3 的记忆中加入了 msg1

agent2() 被调用时

  1. agent2.reply() 执行,此时 agent2 的记忆中已有 msg1
  2. 返回 msg2
  3. agent1.observe(msg2) → agent1 的记忆中加入了 msg2
  4. agent3.observe(msg2) → agent3 的记忆中加入了 msg2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sequenceDiagram
participant A as agent1
participant H as MsgHub
participant B as agent2
participant C as agent3

Note over H: 进入 MsgHub,注册订阅关系
H->>A: reset_subscribers([B, C])
H->>B: reset_subscribers([A, C])
H->>C: reset_subscribers([A, B])

A->>A: reply() → msg1
A->>B: observe(msg1)
A->>C: observe(msg1)

B->>B: reply() → msg2(记忆中有 msg1)
B->>A: observe(msg2)
B->>C: observe(msg2)

Note over H: 退出 MsgHub,清理订阅

为什么要移除思考块?

_strip_thinking_blocks(第 488 行)在广播前调用:

1
2
3
4
5
6
# _agent_base.py:496-498
@staticmethod
def _strip_thinking_blocks_single(msg):
if not isinstance(msg.content, list):
return msg
# 移除 type="thinking" 的 block

这是合理的——Agent 的内部推理过程不应该暴露给其他 Agent,就像你不会把内心独白说给同事听。

设计一瞥:MsgHub 使用异步上下文管理器(async with)来管理订阅的生命周期。进入时注册,退出时清理——这确保了订阅关系不会泄漏到 MsgHub 之外。如果不使用上下文管理器,你需要手动管理订阅,容易忘记清理。

另一个设计选择是:为什么不直接让 Agent 之间互相引用?因为直接引用会产生紧耦合——Agent A 需要知道 Agent B 的存在。通过 MsgHub 解耦后,Agent 只需 reply,不关心谁在监听。

AgentScope 官方文档的 Building Blocks > Multi-Agent Collaboration > MsgHub 页面展示了 MsgHub 的使用方法和 enable_auto_broadcast 参数——开启后,Agent 的每次回复都会自动广播给 Hub 中的其他 Agent。

AgentScope 1.0 论文对多 Agent 协作的设计说明是:

“agent-agent interaction patterns while improving execution efficiency”

— AgentScope 1.0: A Comprehensive Framework for Building Agentic Applications, arXiv:2508.16279, Section 2.3

本章解释了 _broadcast_to_subscribers_subscribers 字典如何实现这一消息分发机制。


试一试:观察广播过程

目标:看到 observe 被自动调用。

步骤

  1. src/agentscope/agent/_agent_base.py_broadcast_to_subscribers 方法(第 469 行)中加一行:
1
2
3
4
5
6
7
8
async def _broadcast_to_subscribers(self, msg):
if msg is None:
return
broadcast_msg = self._strip_thinking_blocks(msg)
print(f"[DEBUG] {self.name} 广播给 {sum(len(s) for s in self._subscribers.values())} 个订阅者")
for subscribers in self._subscribers.values():
for subscriber in subscribers:
await subscriber.observe(broadcast_msg)
  1. 创建测试脚本,观察广播过程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
from agentscope.agent import UserAgent
from agentscope.pipeline import MsgHub
from agentscope.message import Msg

async def main():
a = UserAgent(name="Alice")
b = UserAgent(name="Bob")
c = UserAgent(name="Charlie")

# 手动模拟订阅关系(不需要 API key)
a.reset_subscribers("test_hub", [b, c])

msg = Msg(name="Alice", content="Hello!", role="assistant")
await a._broadcast_to_subscribers(msg)

print(f"Bob 的记忆: {b.memory.get_memory()}")
print(f"Charlie 的记忆: {c.memory.get_memory()}")

asyncio.run(main())
  1. 观察:Bob 和 Charlie 的记忆中都收到了 Alice 的消息。

改完后恢复:

1
git checkout src/agentscope/agent/_agent_base.py

调试实践:追踪消息广播

目标:观察 Agent 消息如何通过 MsgHub 广播到多个订阅者。

步骤

  1. src/agentscope/agent/_agent_base.py_broadcast_to_subscribers 方法中加 print:
1
2
3
4
5
6
7
async def _broadcast_to_subscribers(self, msg):
for hub_name, subscribers in self._subscribers.items():
print(f"[DEBUG] MsgHub '{hub_name}': 广播到 {len(subscribers)} 个订阅者")
for sub in subscribers:
if sub is not self:
print(f"[DEBUG] → {sub.name}")
await sub.observe(msg)
  1. 创建 2-3 个 Agent 加入同一个 MsgHub,让一个 Agent 处理消息,观察广播过程。

  2. observe 方法中也加 print,追踪消息到达订阅者的完整路径。

完成后清理:

1
git checkout src/agentscope/agent/_agent_base.py

检查点

  • 发布-订阅模式:发布者不直接知道接收者,通过调度中心(MsgHub)广播
  • _subscribers 字典按 MsgHub 名称分组,一个 Agent 可同时参与多个 MsgHub
  • __call__ 在 reply 后自动调用 _broadcast_to_subscribers
  • MsgHub 是异步上下文管理器,进入时注册订阅,退出时清理
  • 思考块在广播前被移除,保护 Agent 的内部推理

自检练习

  1. 如果两个 MsgHub 有重叠的参与者,_subscribers 字典会怎样?
  2. enable_auto_broadcast=False 时,MsgHub 还能做什么?

observe 方法的内部实现

observe_agent_base.py:185)是订阅者接收广播消息的方法:

1
2
3
# _agent_base.py:185
async def observe(self, msg: Msg) -> None:
"""接收并处理观察到的消息"""

默认实现将消息存入 Agent 的工作记忆。这意味着当一个 Agent 广播消息时,所有订阅者的记忆中都会多出这条消息——它们在下一轮推理时就能看到上下文。

这就是为什么 _strip_thinking_blocks 很重要——如果 Agent 的内部推理(ThinkingBlock)被广播给其他 Agent,后者可能基于不应该看到的信息做出推理,产生”信息泄漏”问题。

设计一瞥observe 也可以被 Hook 包裹(pre_observe / post_observe)。这意味着你可以在消息到达订阅者之前过滤或转换它——比如在多语言 Agent 场景中,把中文消息翻译成英文再传递给英文 Agent。


下一章预告

到目前为止,我们看了继承、元类/Hook、策略、工厂/Schema、中间件、发布-订阅六种模式。最后一章我们把目光投向系统的”可观测性”——日志、追踪和持久化。

下一章:第 20 章 可观测性与持久化