第 6 章:调用大语言模型

1 | Browser -> HTTP -> FastAPI -> Runner -> Agent -> Prompt -> ReAct -> [LLM] -> Tool -> Response |
上一章我们跟 ReAct 循环走到了 _reasoning()——Agent 要”思考”了。思考的本质是调用大语言模型(LLM)。这一章我们看看这个调用是怎么发生的:Agent 手里只有一个模型 ID(比如 qwen3-max),它怎么找到对应的 Provider?请求怎么被包装、限流、重试?流式响应怎么一路流回浏览器?
问题
Agent 说”我要调用 LLM”,但 LLM 在哪?怎么调用?如果 API 报错了怎么办?多个请求同时到达,会不会把 API 打爆?
术语其实很简单
术语:Provider(模型提供商)
想象你手机上装了多个通讯软件——微信、钉钉、飞书。每个软件都能打电话,但拨号方式不一样。Provider 就是这些”通讯软件”——OpenAI 是一个 Provider,DashScope 是另一个,Ollama 又是一个。每个 Provider 有自己的地址(base_url)、密钥(api_key)和拨号方式(API 协议)。
术语:装饰器链(Wrapper Chain)
想象快递的层层包装——最里面是商品(原始模型),包一层气泡膜(Token 记录),再套一个纸箱(重试和限流)。每一层只干一件事,组合起来就是完整功能。
术语:限流器(Rate Limiter)
想象高速收费站的闸机——同时只能过 3 辆车,前面有车在等就排队。限流器就是 API 调用的闸机——防止太多请求同时打到 API,触发限速。
探索
从模型 ID 到 API 调用——七步数据流
上一章提到 ReAct 循环中的 _reasoning() 最终调用 LLM。完整的调用路径是:1
2
3
4
5
6Agent._reasoning()
-> model(messages, tools) # 第 3 章创建的 model
-> RetryChatModel.__call__() # 第 1 层:重试 + 限流
-> TokenRecordingModelWrapper.__call__() # 第 2 层:Token 记录
-> OpenAIChatModelCompat.__call__() # 第 3 层:实际 API 调用
-> AsyncOpenAI.chat.completions.create() # HTTP 请求
从 Agent 发出调用到 HTTP 请求飞出,经过三层包装。让我们从最内层往外看。
Provider——模型的”通讯录”
第 3 章我们看到 create_model_and_formatter() 是工厂函数。它做的第一件事是找到 Provider:1
2
3
4
5
6
7
8
9
10
11
12# model_factory.py 中的核心逻辑
def create_model_and_formatter(agent_id=None):
# 1. 加载配置,确定用哪个 Provider 和模型
model_slot = load_agent_config(agent_id).active_model
# 2. 从 ProviderManager 获取 Provider
manager = ProviderManager.get_instance()
provider = manager.get_provider(model_slot.provider_id)
# 3. 从 Provider 获取模型实例
model = provider.get_chat_model_instance(model_slot.model)
...
ProviderManager 是一个单例,管理着所有 Provider。QwenPaw 内置了 20+ 个 Provider:1
2
3
4
5
6
7
8
9
10
11
12ProviderManager (单例)
+-- builtin_providers:
| "openai" -> OpenAIProvider
| "dashscope" -> OpenAIProvider
| "anthropic" -> AnthropicProvider
| "gemini" -> GeminiProvider
| "ollama" -> OllamaProvider
| "deepseek" -> OpenAIProvider
| ... (20+ 个)
+-- custom_providers: # 用户自己添加的
+-- plugin_providers: # 插件注册的
+-- active_model: # 当前激活的模型配置
get_provider() 根据 provider_id 查找——先查插件、再查内置、最后查自定义。
Provider ABC——统一接口
所有 Provider 继承自 Provider(ProviderInfo, ABC),定义在 src/qwenpaw/providers/provider.py:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class Provider(ProviderInfo, ABC):
async def check_connection(self, timeout=5) -> tuple[bool, str]:
"""检查 Provider 是否可达"""
async def fetch_models(self, timeout=5) -> List[ModelInfo]:
"""获取可用模型列表"""
async def check_model_connection(self, model_id, timeout=5) -> tuple[bool, str]:
"""检查特定模型是否可用"""
def get_chat_model_instance(self, model_id) -> ChatModelBase:
"""创建聊天模型实例"""
四个抽象方法定义了 Provider 的”四项基本功”:连接检查、模型发现、模型检查、创建实例。不管背后是 OpenAI 还是 Ollama,对外接口一致。
OpenAIProvider——最通用的实现
大部分 Provider 用 OpenAIProvider——因为很多模型服务兼容 OpenAI API 格式。DashScope、DeepSeek、Kimi、SiliconFlow 都是 OpenAI 兼容的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class OpenAIProvider(Provider):
def _client(self, timeout=5) -> AsyncOpenAI:
return AsyncOpenAI(
base_url=self.base_url,
api_key=self.api_key,
timeout=timeout,
)
def get_chat_model_instance(self, model_id) -> ChatModelBase:
return OpenAIChatModelCompat(
model_name=model_id,
stream=True,
api_key=self.api_key,
client_kwargs={"base_url": self.base_url},
generate_kwargs=self.get_effective_generate_kwargs(model_id),
)
_client() 创建 OpenAI SDK 客户端,get_chat_model_instance() 创建 agentscope 的聊天模型实例。注意 stream=True——QwenPaw 默认使用流式调用。
get_effective_generate_kwargs() 合并了两个层级的生成参数——Provider 级别的(所有模型共享)和模型级别的(特定模型覆盖)。用深度合并策略,模型级别可以覆盖 Provider 级别的特定字段。
第一层包装:TokenRecordingModelWrapper
工厂函数拿到原始模型后,开始包装:1
2# model_factory.py
wrapped_model = TokenRecordingModelWrapper(provider_id, model)
TokenRecordingModelWrapper 做一件事:记录每次调用用了多少 Token:1
2
3
4
5
6
7class TokenRecordingModelWrapper(ChatModelBase):
async def __call__(self, messages, tools=None, ...):
result = await self._model(messages=messages, tools=tools, ...)
if isinstance(result, AsyncGenerator):
return self._wrap_stream(result)
await self._record_usage(result.usage)
return result
对于流式响应,它在流结束后记录最后一个 chunk 的 usage(包含了整次调用的完整 Token 统计)。这些数据存入 TokenUsageManager,供前端显示用量统计。
它还做了 vLLM 兼容性处理——当 tool_choice="auto" 时改为 None,因为某些 vLLM 实现不接受 tool_choice="auto"。
第二层包装:RetryChatModel
1 | # model_factory.py |
RetryChatModel 是最外层包装,负责两件事:重试和限流。
重试机制:遇到瞬态错误(429 限速、500 服务器错误、超时、连接失败)自动重试,指数退避:1
2
3
4
5
6
7
8
9
10
11
12
13# retry_chat_model.py 核心逻辑
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504, 529}
async def __call__(self, *args, **kwargs):
for attempt in range(1, attempts + 1):
try:
result = await self._inner(*args, **kwargs)
return result
except Exception as exc:
if not _is_retryable(exc) or attempt >= attempts:
raise # 不可重试或重试用完,抛出异常
delay = _compute_backoff(attempt, self._retry_config)
await asyncio.sleep(delay) # 指数退避
退避公式:base * 2^(attempt-1),有上限封顶。第一次等 1 秒,第二次 2 秒,第三次 4 秒……
限流机制:调用前先通过 LLMRateLimiter 获取许可。限流器有三道防线:1
2
3
4acquire() 执行顺序:
1. 429 冷却期:如果刚收到过 429,等冷却结束 + 随机抖动
2. QPM 滑动窗口:60 秒内的请求数不超过上限
3. 并发信号量:同时进行的 LLM 调用不超过上限
收到 429 时,限流器设置全局暂停时间,所有等待中的请求一起暂停——这避免了”惊群效应”(所有请求同时醒来又同时打过去)。每个请求醒来时加随机抖动,错开唤醒时间。
流式响应的信号量管理更精巧——第一个 chunk 到达后立即释放信号量,因为 API 开始流式输出说明请求已被接受,不会再被限速。这避免了长时间占用并发槽位。
Formatter——消息格式转换
工厂函数还创建了一个 Formatter(格式化器):1
formatter = _create_formatter_instance(model.__class__)
Formatter 负责把 Agent 内部的消息格式转换成 API 需要的格式。不同 Provider 的消息格式不同:1
2
3OpenAI: {"role": "user", "content": [{"type": "text", "text": "..."}]}
Anthropic: {"role": "user", "content": [{"type": "text", "text": "..."}]} (略有差异)
Gemini: 结构完全不同
QwenPaw 用 _create_file_block_support_formatter() 增强了原生 Formatter,增加了文件块处理、视频块处理、推理内容(reasoning_content)保留等能力。
完整的调用链
把所有东西串起来,一次 LLM 调用的完整流程:1
2
3
4
5
6
7
8
9
101. Agent._reasoning() 调用 model(messages, tools)
2. RetryChatModel.__call__()
-> limiter.acquire() # 获取限流许可
-> TokenRecordingModelWrapper.__call__()
-> OpenAIChatModelCompat.__call__()
-> AsyncOpenAI.chat.completions.create() # HTTP 请求飞出
<- 流式响应回来(AsyncGenerator)
<- 包装流,记录 Token
<- 如果失败,等待 backoff 时间后重试
3. 流式 chunks 逐个返回给 ReAct 循环
多 Provider 注册表
ProviderManager 在 provider_manager.py 中硬编码了所有内置 Provider。每个 Provider 预设了模型列表:1
2
3
4
5
6
7
8
9
10
11
12
13DASHSCOPE_MODELS = [
ModelInfo(id="qwen3-max", name="Qwen3 Max", ...),
ModelInfo(id="qwen3-235b-a22b-thinking-2507", name="Qwen3 235B ...", ...),
ModelInfo(id="deepseek-v3.2", name="DeepSeek-V3.2", ...),
]
PROVIDER_DASHSCOPE = OpenAIProvider(
id="dashscope",
name="DashScope",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
models=DASHSCOPE_MODELS,
freeze_url=True,
)
配置持久化到 ~/.qwenpaw/secrets/providers/ 目录,每个 Provider 一个 JSON 文件,API key 加密存储。
实验
在源码中追踪 Provider 注册和模型创建:
- 打开
src/qwenpaw/providers/provider_manager.py,搜索_init_builtins - 数一数有多少个
_add_builtin()调用 - 打开
src/qwenpaw/agents/model_factory.py,搜索create_model_and_formatter - 追踪
ProviderManager.get_instance().get_provider()到provider.get_chat_model_instance()
预期结果:_init_builtins() 中能看到 20+ 个内置 Provider;create_model_and_formatter() 能看到两层包装。
工程权衡
为什么用多层包装而非一个大类?
如果把重试、限流、Token 记录全写在一个 SmartChatModel 类里,改重试策略就要碰 Token 记录的代码。多层包装让每一层独立——改限流参数不影响重试逻辑,改 Token 统计方式不影响 API 调用。这也是经典的装饰器模式。
为什么 Provider 配置要持久化到磁盘?
QwenPaw 重启后需要记住你的 API key 和选择的模型。如果不持久化,每次重启都要重新配置。配置文件存在 ~/.qwenpaw/secrets/ 下,API key 加密存储(encrypt_dict_fields),文件权限设为 0o600(仅 owner 可读写)。
为什么限流器用全局单例而非每个模型一个?
因为 API 的限速是按账户的——你用 qwen3-max 和 deepseek-v3.2 各打了 5 个请求,总共就是 10 个并发。如果每个模型各自管理限流,总并发可能超出 API 限制。全局单例确保所有模型的并发总和受控。
常见误区
误区:每次 Agent 调用 LLM 都要查 ProviderManager?不是很慢?
ProviderManager是单例,启动时加载一次,之后一直在内存里。get_provider()只是一个字典查找——O(1) 时间。真正慢的是 HTTP 请求,不是查找 Provider。
误区:流式响应是不是每个 chunk 都经过 RetryChatModel?
不是。RetryChatModel 只在流开始前做限流检查。如果流开始后中途断了,它会从头重试整个请求(重新发 HTTP 请求),不是重放 chunk。流一旦开始,chunk 直接传递给上层。
动手环节
任务:追踪从 Agent 到 HTTP 请求的完整调用链。
步骤:
- 在 IDE 中打开
src/qwenpaw/agents/model_factory.py - 搜索
create_model_and_formatter函数 - 搜索
TokenRecordingModelWrapper和RetryChatModel的包装顺序 - 打开
src/qwenpaw/providers/retry_chat_model.py,搜索_is_retryable函数,看哪些错误会被重试
预期输出:
create_model_and_formatter能看到model -> TokenRecording -> Retry的包装链_is_retryable能看到 429、500、502、503、504、529 和 OpenAI/Anthropic 的特定异常
自检:
- 理解了 Provider 是模型的”通讯录”,ProviderManager 是单例
- 知道两层包装各自的作用(Token 记录 + 重试限流)
- 知道限流器的三道防线(429 冷却、QPM 窗口、并发信号量)
LLM 调用成功了,Agent 的”思考”有了结果。但如果思考结果里包含工具调用——“我需要执行 get_current_time”——Agent 就要”行动”了。工具是怎么被发现、执行、返回结果的?下一章我们走进工具执行的现场。