第 8 章:响应的归途

1 | Browser <- HTTP <- FastAPI <- Runner <- Agent <- Prompt <- ReAct <- LLM <- Tool <- [Response] |
前七章我们跟请求从浏览器走到了 LLM 调用和工具执行。现在请求的”去程”结束了,响应要原路返回。Agent 生成了文字回复,这段文字是怎么一步步流回浏览器的?为什么你能看到逐字输出的效果?这一章,我们走”回程”。
问题
Agent 的最终文字回复在 Python 对象里(Msg 对象)。它怎么变成浏览器屏幕上的文字?中间经过了几道转换?为什么不是等全部生成完再显示,而是一个字一个字地冒出来?
术语其实很简单
术语:消息队列(Message Queue)
想象一条流水线——工人(Agent)不断把产品放到传送带上,质检员(Runner)在另一端不断取走。Agent 内部的msg_queue就是这条传送带——Agent 每生成一段文字就放上去,Runner 不断取走并转发。
术语:流适配器(Stream Adapter)
想象一个翻译器——输入是 Agent 内部格式的消息,输出是 SSE 协议格式的事件。流适配器做格式转换——把Msg对象转成前端能理解的 JSON 事件。
术语:生产者-消费者模型(Producer-Consumer Pattern)
想象面包店——面包师不断做面包(生产者),顾客不断买面包(消费者)。两者通过柜台(队列)连接,互不等待。QwenPaw 的流式输出就是这种模型——Agent 生产文字,浏览器消费文字,中间通过队列传递。
探索
回程的完整路径
Agent 生成最终文字回复后,数据经过五道关口回到浏览器:1
2
3
4
51. Agent.print(msg) # Agent 把消息放入 msg_queue
2. Runner 从 msg_queue 取出 # _stream_printing_messages_interruptible()
3. Stream Adapter 格式转换 # Msg -> SSE Event JSON
4. TaskTracker 分发 # 放入所有订阅者的队列
5. event_generator() 逐个发送 # SSE 字符串 -> HTTP Response -> 浏览器
第 1 关:Agent.print()——消息入队
上一章我们看到 ReActAgent.reply() 最终返回一个 Msg 对象。但在此之前,Agent 在流式生成过程中不断调用 self.print():1
2
3
4
5# agentscope _agent_base.py 核心逻辑
async def print(self, msg, last=False):
if not self._disable_msg_queue:
# 把消息放入队列,供 Runner 消费
await self.msg_queue.put((deepcopy(msg), last, speech))
print() 不是打印到终端——它是把消息放入 msg_queue。每次 LLM 返回一个 chunk(几个 Token),Agent 就把这个 chunk 包装成 Msg 放入队列。last=True 表示这条消息是最终版本(不再更新)。
流式生成过程中:1
2
3LLM chunk 1: "你" -> print(msg{"你"}, last=False) -> msg_queue
LLM chunk 2: "好" -> print(msg{"你好"}, last=False) -> msg_queue
LLM chunk 3: "!" -> print(msg{"你好!"}, last=True) -> msg_queue # 最终版
注意每个 chunk 的 msg 包含完整的已生成内容(不是增量),last 标记是否还有后续。
第 2 关:Runner 取出消息
AgentRunner.query_handler() 是异步生成器,它通过 _stream_printing_messages_interruptible() 从 Agent 的 msg_queue 读取:1
2
3
4
5
6
7
8
9
10
11# runner.py 核心逻辑
async def query_handler(self, msgs, request=None, **kwargs):
# ... 创建 Agent, 恢复 Session ...
async for msg, last in _stream_printing_messages_interruptible(
agents=[agent],
coroutine_task=agent(msgs), # 启动 Agent
):
yield msg, last # 逐条 yield 给上层
# ... 保存 Session ...
_stream_printing_messages_interruptible() 做了两件事:
- 设置 Agent 的 msg_queue(
agent.set_msg_queue_enabled(True, queue)) - 在后台启动
agent(msgs)任务,同时从 msg_queue 不断读取(msg, last)元组
当 Agent 任务完成时,放入一个终止信号,循环结束。
第 3 关:Stream Adapter——格式转换
Runner 基类的 stream_query() 接收 (msg, last) 元组后,通过 adapt_agentscope_message_stream() 做格式转换:1
2
3
4
5
6
7
8
9
10
11
12
13Agent 内部格式 (Msg) SSE 事件格式 (Event)
===================== ====================
Msg(content=[TextBlock]) -> Message(in_progress) + TextContent(delta)
+ Message(completed)
Msg(content=[ToolUseBlock]) -> Message(type=plugin_call)
+ DataContent(function_call)
Msg(content=[ThinkingBlock])-> Message(type=reasoning)
+ TextContent(delta)
Msg(content=[ToolResultBlock]) -> Message(type=plugin_call_output)
+ DataContent(function_call_output)
每种内容块被转换成对应的事件类型。前端根据事件类型决定如何渲染——文字直接显示,工具调用显示为”正在执行 XX 工具”,工具结果显示为折叠卡片。
一个完整的对话产生的事件序列:1
2
3
4
5
6
7data: {"object": "response", "status": "created"} # 开始
data: {"object": "message", "type": "message", ...} # 文字消息开始
data: {"object": "content", "type": "text", "delta": "你"} # 逐字
data: {"object": "content", "type": "text", "delta": "好"}
data: {"object": "content", "type": "text", "delta": "!"}
data: {"object": "message", "status": "completed"} # 文字消息完成
data: {"object": "response", "status": "completed"} # 结束
第 4 关:TaskTracker——分发到订阅者
转换后的 SSE 事件字符串进入 TaskTracker。第 2 章介绍的 _producer() 把每个事件分发给所有订阅者:1
2
3
4
5
6# task_tracker.py _producer() 核心逻辑
async def _producer(run_state, stream_fn, payload):
async for sse_string in stream_fn(payload):
run_state.buffer.append(sse_string) # 存入缓冲(断线重连用)
for q in run_state.subscribers:
q.put_nowait(sse_string) # 分发给每个订阅者
每个订阅者有自己的 asyncio.Queue。如果用户刷新页面,新的连接会创建新队列,先回放 buffer 中的历史事件,再接收新事件——这就是断线重连。
第 5 关:event_generator()——HTTP 响应
最终,console.py 中的 event_generator() 从订阅队列读取 SSE 字符串,通过 StreamingResponse 发给浏览器:1
2
3
4
5
6
7# console.py
async def event_generator():
stream_it = tracker.stream_from_queue(queue, chat.id)
async for event_data in stream_it:
yield event_data # 每个 SSE 字符串直接发给浏览器
return StreamingResponse(event_generator(), media_type="text/event-stream")
stream_from_queue() 从队列阻塞读取,直到收到终止信号(None)。浏览器收到的是标准 SSE 格式:1
2
3
4
5
6
7
8
9data: {"object":"response","status":"created"}
data: {"object":"message","type":"message","status":"in_progress"}
data: {"object":"content","type":"text","text":"你好"}
data: {"object":"message","status":"completed"}
data: {"object":"response","status":"completed"}
对话历史的保存
响应回程的同时,Session 被持久化。query_handler() 的 finally 块:1
2
3
4
5
6
7finally:
if agent is not None and session_state_loaded:
await self.session.save_session_state(
session_id=session_id,
user_id=user_id,
agent=agent,
)
save_session_state() 把 Agent 的完整记忆(包含所有消息——用户输入、工具调用、工具结果、最终回复)序列化为 JSON 文件。下次请求时 load_session_state() 恢复这些记忆,Agent 就”记得”之前说了什么。
回程时序——从 Agent 到浏览器
把整个回程的时间线画出来:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15时间 ──────────────────────────────────────────────────>
Agent: [思考] -> print(你) -> print(你好) -> print(你好!last) -> [结束]
| | |
Runner: v v v
yield (msg,False) yield (msg,False) yield (msg,True)
| | |
Adapter: v v v
SSE事件 SSE事件 SSE事件
| | |
Tracker: v v v
put(queue) put(queue) put(queue)
| | |
Browser: v v v
显示"你" 显示"你好" 显示"你好!"(完成)
整个链条是流式的——Agent 生成一个 chunk,立刻传递给浏览器,不等全部生成完。
实验
用 curl 观察原始 SSE 事件的格式:
- 启动 QwenPaw:
qwenpaw app - 发送请求:
1 | curl -N -X POST http://127.0.0.1:8088/api/console/chat \ |
- 观察
data:开头的每一行
预期输出:能看到 object 字段为 response、message、content 的多种事件类型,最后一个事件的 status 为 completed。
工程权衡
为什么用 msg_queue 而不是直接 yield?
Agent(agentscope 库)和 Runner(QwenPaw)是两个独立的代码库。Agent 通过 msg_queue 解耦——它不知道 Runner 的存在,只管往队列里放消息。Runner 从队列取消息,做格式转换后发给浏览器。这种解耦让 QwenPaw 可以用不同的方式消费 Agent 的输出(SSE、WebSocket、日志等),不需要改 agentscope 的代码。
为什么每个 chunk 包含完整内容而非增量?
print() 放入队列的每个 Msg 包含完整的已生成内容(”你好”而非只有”好”)。这简化了消费端的逻辑——消费端不需要自己拼接增量。对于需要增量的场景(如 SSE),Stream Adapter 会计算 delta(当前内容减去上次内容)再发送。
为什么 TaskTracker 要 buffer 所有事件?
断线重连。用户在 AI 回复过程中刷新页面,新连接需要先看到之前已经发送的所有事件。buffer 存储了完整的事件历史,新订阅者先回放 buffer 再接收新事件。代价是内存占用——一个长对话的 buffer 可能很大。但这是实时应用的标准做法。
常见误区
误区:Agent 的最终回复是等全部生成完才发给浏览器的?
不是。Agent 在流式生成过程中,每收到 LLM 的一个 chunk 就调用
print()放入 msg_queue。Runner 立刻取走,经过格式转换后通过 SSE 推给浏览器。从 LLM 生成第一个字到浏览器显示,延迟通常在毫秒级。
误区:对话历史存在数据库里?
不是。对话历史存在 JSON 文件里(
<workspace>/sessions/<user_id>_<session_id>.json)。QwenPaw 是单机部署的个人助手,不需要数据库。JSON 文件简单、可读、可手动编辑。SafeJSONSession保证写入的原子性——断电不会损坏文件。
动手环节
任务:观察 SSE 事件的原始格式。
步骤:
- 启动 QwenPaw:
qwenpaw app - 用 curl 发一条请求(加
-N关闭缓冲) - 观察
data:行中的object和type字段 - 数一数一个简单对话产生了多少个事件
预期输出:
- 至少看到
response(开始/结束)和message(内容)两种 object - 文字内容在
content类型的text字段中 - 最后一个事件的
status为completed
自检:
- 理解了 msg_queue 是 Agent 和 Runner 之间的桥梁
- 知道 Stream Adapter 把
Msg转换成 SSE 事件 - 知道 TaskTracker 的 buffer 支持断线重连
至此,一个请求的完整生命周期走完了——从浏览器按下回车,经过 FastAPI、Runner、Agent 创建、提示词拼装、ReAct 循环、LLM 调用、工具执行,最终流回浏览器。第一卷”启程”结束。下一卷”图纸”,我们看看系统是怎么配置的——Provider 管理、Agent 配置、工作空间结构。