难度 :进阶
ReAct Agent 是”边想边做”。但有些复杂任务需要”先想清楚再动手”——先制定计划,再逐步执行。这就是 Plan-Execute 模式。
上一章:第 24 章 造一个新 Memory Backend
任务目标 实现一个 Plan-Execute Agent,工作流程为:
Plan 阶段 :分析任务,生成步骤列表Execute 阶段 :逐步执行每个步骤Review 阶段 :检查执行结果,决定是否需要修改计划1 2 3 4 5 6 7 8 9 10 flowchart TD A["收到用户消息"] --> B["Plan: 生成步骤列表"] B --> C["Execute: 执行第 1 步"] C --> D["Review: 检查结果"] D --> E{"需要修改计划?"} E -->|是| B E -->|否| F{"还有下一步?"} F -->|是| G["Execute: 执行下一步"] G --> D F -->|否| H["返回最终结果"]
回顾:AgentBase 的接口 AgentBase(_agent_base.py)的核心方法:1 2 3 4 5 6 7 8 async def reply (self, *args, **kwargs ) -> Msg: """子类需要覆盖此方法,实现 Agent 的核心逻辑。""" raise NotImplementedError(...) async def __call__ (self, msg=None , **kwargs ) -> Msg: """入口方法:Hook 前置 → reply() → Hook 后置 → 广播"""
reply 不是 @abstractmethod——它直接 raise NotImplementedError。子类必须覆盖 它。
ReActAgentBase 中间层 ReActAgentBase(_react_agent_base.py)在 AgentBase 和 ReActAgent 之间提供了:
_reasoning、_acting、_summarizing 的骨架方法memory、model、formatter、toolkit 的统一管理max_iters 参数我们的 PlanExecuteAgent 可以直接继承 AgentBase,因为 Plan-Execute 的循环结构和 ReAct 不同。
已有 Agent 类型 src/agentscope/agent/ 目录下:
文件 Agent 类型 特点 _agent_base.pyAgentBase 所有 Agent 的基类 _react_agent.pyReActAgent 推理-行动循环 _user_agent.pyUserAgent 人类输入代理 _a2a_agent.pyA2AAgent Agent-to-Agent 协议 _realtime_agent.pyRealtimeAgent 实时语音交互
Step 1:设计方案 1.1 Plan-Execute vs ReAct 1 2 3 4 5 6 7 8 9 10 11 12 13 14 flowchart LR subgraph ReAct R1["推理"] --> R2["行动"] R2 --> R3["推理"] R3 --> R4["行动"] R4 --> R5["回答"] end subgraph "Plan-Execute" P1["制定计划"] --> P2["执行步骤1"] P2 --> P3["执行步骤2"] P3 --> P4["执行步骤3"] P4 --> P5["总结回答"] end
ReAct 是交错 的(推理-行动-推理-行动……),Plan-Execute 是分离 的(先计划,再执行)。
1.2 类结构 1 2 3 4 5 6 7 8 class PlanExecuteAgent (AgentBase ): """Plan-Execute 模式的 Agent。""" plan_prompt: str execute_prompt: str review_prompt: str max_plan_revisions: int
Step 2:逐步实现 2.1 类定义和初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import jsonfrom typing import Any from agentscope.agent._agent_base import AgentBasefrom agentscope.message import Msg, TextBlock, ToolUseBlockclass PlanExecuteAgent (AgentBase ): """Plan-Execute 模式的 Agent。 先制定计划,再逐步执行每个步骤,适合复杂多步任务。 Args: name (`str`): Agent 名称 sys_prompt (`str`): 系统提示 model: 模型实例(ChatModelBase) formatter: 格式转换器 toolkit: 工具箱 memory: 工作记忆 max_plan_revisions (`int`, optional): 最大计划修改次数,默认 3 """ def __init__ ( self, name: str , sys_prompt: str = "你是一个有帮助的助手。" , model=None , formatter=None , toolkit=None , memory=None , max_plan_revisions: int = 3 , **kwargs: Any , ) -> None : super ().__init__(name=name, **kwargs) self .sys_prompt = sys_prompt self .model = model self .formatter = formatter self .toolkit = toolkit self .memory = memory self .max_plan_revisions = max_plan_revisions self ._plan: list [dict ] = [] self ._current_step: int = 0 self ._results: list [dict ] = []
2.2 reply 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 async def reply (self, msg: Msg | list [Msg] | None = None , **kwargs ) -> Msg: """Plan-Execute 的主循环。""" if msg is not None : if isinstance (msg, Msg): msg = [msg] for m in msg: await self .memory.add(m) self ._plan = await self ._plan_phase() self ._current_step = 0 self ._results = [] plan_revisions = 0 while self ._current_step < len (self ._plan): step_result = await self ._execute_step(self ._plan[self ._current_step]) self ._results.append(step_result) review = await self ._review_phase() if review.get("needs_revision" ) and plan_revisions < self .max_plan_revisions: self ._plan = await self ._plan_phase(is_revision=True ) plan_revisions += 1 else : self ._current_step += 1 final_msg = await self ._summarize_results() return final_msg
2.3 Plan 阶段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 async def _plan_phase (self, is_revision: bool = False ) -> list [dict ]: """生成或修改计划。""" if is_revision: prompt = ( "根据执行结果,修改你的计划。\n" f"已完成步骤: {self._results} \n" "请输出修正后的步骤列表。" ) else : prompt = ( "分析用户的请求,制定一个执行计划。\n" "每个步骤应该包含:step(步骤描述)、tool(需要用的工具)、" "reason(为什么需要这一步)。\n" "以 JSON 数组格式输出。" ) plan_msg = Msg("system" , prompt, "system" ) await self .memory.add(plan_msg) formatted = await self .formatter.format ([ Msg("system" , self .sys_prompt, "system" ), *await self .memory.get_memory(), ]) response = await self .model(formatted, tools=self .toolkit.get_json_schemas() if self .toolkit else None ) plan_text = "" for block in response.content: if isinstance (block, dict ) and block.get("type" ) == "text" : plan_text += block.get("text" , "" ) try : start = plan_text.find("[" ) end = plan_text.rfind("]" ) + 1 if start >= 0 and end > start: plan = json.loads(plan_text[start:end]) else : plan = [{"step" : plan_text, "tool" : None , "reason" : "直接回答" }] except json.JSONDecodeError: plan = [{"step" : plan_text, "tool" : None , "reason" : "直接回答" }] await self .memory.add(Msg("assistant" , plan_text, "assistant" )) return plan
2.4 Execute 阶段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 async def _execute_step (self, step: dict ) -> dict : """执行计划中的一个步骤。""" step_prompt = f"执行步骤: {step.get('step' , '' )} \n原因: {step.get('reason' , '' )} " await self .memory.add(Msg("system" , step_prompt, "system" )) formatted = await self .formatter.format ([ Msg("system" , self .sys_prompt, "system" ), *await self .memory.get_memory(), ]) tools = self .toolkit.get_json_schemas() if self .toolkit else None response = await self .model(formatted, tools=tools) result_text = "" tool_results = [] for block in response.content: if isinstance (block, dict ): if block.get("type" ) == "text" : result_text += block.get("text" , "" ) elif block.get("type" ) == "tool_use" : tool_res = self .toolkit.call_tool_function(block) async for chunk in tool_res: for c in chunk.content: tool_results.append(c.get("text" , "" )) if tool_results: result_text += "\n工具结果: " + "\n" .join(tool_results) await self .memory.add(Msg("assistant" , result_text, "assistant" )) return {"step" : step, "result" : result_text}
2.5 Review 阶段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 async def _review_phase (self ) -> dict : """审查当前步骤的执行结果。""" review_prompt = ( f"检查步骤 '{self._plan[self._current_step].get('step' , '' )} ' 的执行结果。\n" f"结果: {self._results[-1 ].get('result' , '' )} \n" "回答: 结果是否满意?是否需要修改后续计划?\n" '以 JSON 格式回答: {"satisfied": true/false, "needs_revision": true/false, "reason": "..."}' ) await self .memory.add(Msg("system" , review_prompt, "system" )) formatted = await self .formatter.format ([ Msg("system" , self .sys_prompt, "system" ), *await self .memory.get_memory(), ]) response = await self .model(formatted) review_text = "" for block in response.content: if isinstance (block, dict ) and block.get("type" ) == "text" : review_text += block.get("text" , "" ) try : start = review_text.find("{" ) end = review_text.rfind("}" ) + 1 if start >= 0 and end > start: return json.loads(review_text[start:end]) except json.JSONDecodeError: pass return {"satisfied" : True , "needs_revision" : False , "reason" : "解析失败,默认继续" }
2.6 总结阶段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 async def _summarize_results (self ) -> Msg: """总结所有执行结果,返回最终回复。""" summary_prompt = ( "所有步骤已执行完毕。以下是执行结果:\n" + "\n" .join( f"- {r['step' ].get('step' , '' )} : {r.get('result' , '' )} " for r in self ._results ) + "\n\n请给出最终回答。" ) final_msgs = [ Msg("system" , self .sys_prompt, "system" ), Msg("user" , summary_prompt, "user" ), ] formatted = await self .formatter.format (final_msgs) response = await self .model(formatted) result_text = "" for block in response.content: if isinstance (block, dict ) and block.get("type" ) == "text" : result_text += block.get("text" , "" ) final_msg = Msg(self .name, result_text, "assistant" ) await self .memory.add(final_msg) return final_msg
Step 3:使用 PlanExecuteAgent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import agentscopefrom agentscope.agent import AgentBasefrom agentscope.model import OpenAIChatModelfrom agentscope.formatter import OpenAIChatFormatterfrom agentscope.tool import Toolkitfrom agentscope.memory import InMemoryMemoryagentscope.init(project="plan-execute-demo" ) model = OpenAIChatModel(model_name="gpt-4o" ) toolkit = Toolkit() toolkit.register_tool_function(search_web) toolkit.register_tool_function(analyze_data) agent = PlanExecuteAgent( name="planner" , sys_prompt="你是一个数据分析助手。先制定计划,再逐步执行。" , model=model, formatter=OpenAIChatFormatter(), toolkit=toolkit, memory=InMemoryMemory(), max_plan_revisions=3 , )
设计一瞥 设计一瞥 :为什么不继承 ReActAgentBase?ReActAgentBase 提供了 _reasoning、_acting、_summarizing 的骨架,但它的循环结构是固定的(推理-行动交替)。Plan-Execute 模式的循环结构不同(计划-执行-审查),强行复用 ReActAgentBase 的骨架会导致代码更复杂而不是更简单。 直接继承 AgentBase 意味着你需要自己管理 model、formatter、memory 等属性,但换来了完全的循环控制自由。 这是继承 vs 组合 的经典权衡。详见卷四第 31 章。
试一试:添加计划可视化 这个练习不需要 API key(可以用 print 模拟)。
目标 :在 Plan-Execute 的每个阶段打印当前状态。
步骤 :
在 _plan_phase 中添加: 1 2 3 4 5 print (f"\n{'=' *50 } " )print (f"📋 计划({'修改' if is_revision else '新建' } ):" )for i, step in enumerate (plan): marker = "→" if i == self ._current_step else " " print (f" {marker} 步骤 {i+1 } : {step.get('step' , '' )} " )
在 _execute_step 中添加: 1 2 print (f"\n▶ 执行步骤 {self._current_step + 1 } /{len (self._plan)} " )print (f" 工具: {step.get('tool' , '无' )} " )
在 _review_phase 中添加: 1 2 3 print (f"✓ 审查: {'满意' if review.get('satisfied' ) else '需要修改' } " )if review.get("needs_revision" ): print (f" 原因: {review.get('reason' , '' )} " )
运行 Agent,观察 Plan → Execute → Review 的完整流程输出。 PR 检查清单 提交新 Agent 类型的 PR 时:
检查点 你现在理解了:
AgentBase.reply 是唯一需要覆盖的方法,实现 Agent 的核心逻辑Plan-Execute 模式:Plan(制定计划)→ Execute(逐步执行)→ Review(审查结果)与 ReAct 的区别:Plan-Execute 是”先计划再执行”,ReAct 是”边想边做” 直接继承 AgentBase(而非 ReActAgentBase)可以获得完全的循环控制自由 JSON 解析 :从模型文本响应中提取结构化数据是一个常见需求自检练习 :
如果模型在 Plan 阶段返回的不是合法 JSON,代码会怎么处理?(提示:看 _plan_phase 的 try/except) PlanExecuteAgent 的 reply 方法中,哪些阶段会调用工具?(提示:只有 Execute 阶段)参考答案 :
_plan_phase 的 try/except 捕获 JSON 解析错误(json.JSONDecodeError)。解析失败时,通常会把错误信息反馈给模型,让它重新生成合法 JSON,或者回退到纯文本模式。具体的回退策略取决于实现——可以是重试、返回空计划、或直接把原始文本当作计划。只有 Execute 阶段 调用工具。Plan 阶段只让模型制定计划(不执行),Review 阶段只让模型评估执行结果(不执行)。这是 Plan-Execute 模式的核心设计:规划和执行解耦,避免模型在规划时误执行不成熟的操作。下一章预告 我们造了 Tool、Model、Memory、Agent 四个齿轮。下一章,我们接入一个外部工具协议 ——MCP Server,让 Agent 可以调用本地 MCP 服务提供的工具。
下一章:第 26 章 集成 MCP Server