第 23 章:造一个新 Model Provider——接入 FastLLM API

难度:进阶
你想接入一个新的大模型服务 “FastLLM”。它的 API 格式和 OpenAI 不一样——你需要同时写 Model 和 Formatter。本章分三步走:非流式 → 流式 → 结构化输出。
上一章:第 22 章 造一个新 Tool
任务目标
接入一个假想的 “FastLLM” API(教学用,读者可以用 mock 替代真实 API),完整实现 Model Provider 的三个层次。1
2
3
4
5
6
7
8
9
10
11
12flowchart TD
A["Step 1: 非流式"] --> B["继承 ChatModelBase"]
B --> C["实现 __call__ → ChatResponse"]
C --> D["Step 2: 流式"]
D --> E["改造为 AsyncGenerator"]
E --> F["解析 SSE chunk"]
F --> G["Step 3: 结构化输出"]
G --> H["伪装为工具调用"]
H --> I["完整的 Model Provider"]
J["配套 Formatter"] --> K["继承 TruncatedFormatterBase"]
K --> L["实现 _format / _count / _truncate"]
回顾:Model 和 Formatter 的接口
ChatModelBase
ChatModelBase(_model_base.py:13)是所有模型适配器的基类:1
2
3
4
5
6
7
8
9class ChatModelBase:
model_name: str
stream: bool
async def __call__(
self, *args, **kwargs,
) -> ChatResponse | AsyncGenerator[ChatResponse, None]:
pass
只有一个抽象方法 __call__。还有辅助方法 _validate_tool_choice(_model_base.py:46),验证 tool_choice 参数合法性("auto" / "none" / "required" / 具体函数名)。
ChatResponse 和 ChatUsage
ChatResponse(_model_response.py:20):1
2
3
4
5
6
7
8
class ChatResponse(DictMixin):
content: Sequence[TextBlock | ToolUseBlock | ThinkingBlock | AudioBlock]
id: str
created_at: str
type: Literal["chat"]
usage: ChatUsage | None
metadata: dict | None
ChatUsage(_model_usage.py:11):1
2
3
4
5
6
7
class ChatUsage(DictMixin):
input_tokens: int
output_tokens: int
time: float
type: Literal["chat"]
metadata: dict | None
FormatterBase 接口
FormatterBase(_formatter_base.py:11):1
2
3
4class FormatterBase:
async def format(self, *args, **kwargs) -> list[dict[str, Any]]:
pass
TruncatedFormatterBase(_truncated_formatter_base.py:19)需要实现 _format、_count、_truncate 三个方法。
Step 1:非流式版本
1.1 设计 FastLLM 的 API 格式
假设 FastLLM 的 API 格式如下(教学用虚构协议):
请求:1
2
3
4
5
6
7POST /v1/chat
{
"model": "fastllm-v1",
"messages": [{"role": "user", "content": "你好"}],
"tools": [...],
"stream": false
}
响应:1
2
3
4
5
6
7
8{
"id": "resp_123",
"output": {
"text": "你好!有什么可以帮你?",
"tool_calls": null
},
"usage": {"input_tokens": 5, "output_tokens": 8}
}
1.2 实现 FastLLMFormatter
FastLLM 的消息格式和 OpenAI 类似,但有细微差别:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40from typing import Any
from agentscope.formatter import TruncatedFormatterBase
from agentscope.message import Msg
class FastLLMFormatter(TruncatedFormatterBase):
"""FastLLM API 的消息格式转换器。
FastLLM 的格式和 OpenAI 基本相同,
但系统消息放在单独的 system 字段,不在 messages 数组中。
"""
async def _format(self, msgs: list[Msg], **kwargs) -> list[dict[str, Any]]:
result = []
for msg in msgs:
# 提取文本内容
if isinstance(msg.content, str):
text = msg.content
elif isinstance(msg.content, list):
texts = []
for block in msg.content:
if isinstance(block, dict) and block.get("type") == "text":
texts.append(block["text"])
text = "\n".join(texts)
else:
text = str(msg.content)
result.append({"role": msg.role, "content": text})
return result
async def _count(self, formatted: list[dict]) -> int | None:
# 简单估算,生产环境应该用 tokenizer
total = sum(len(m.get("content", "")) for m in formatted)
return total
async def _truncate(self, msgs: list[Msg]) -> list[Msg]:
# 移除最早的非系统消息
if len(msgs) <= 1:
return msgs
return [msgs[0]] + msgs[2:]
1.3 实现 FastLLMModel(非流式)
1 | import time |
Step 2:流式版本
2.1 FastLLM 的 SSE 流格式
假设 FastLLM 的流式响应格式:1
2
3data: {"id": "resp_123", "delta": {"text": "你"}, "done": false}
data: {"id": "resp_123", "delta": {"text": "好"}, "done": false}
data: {"id": "resp_123", "delta": {"text": "!"}, "done": true, "usage": {...}}
2.2 流式实现
在 __call__ 中根据 self.stream 分流:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54from typing import AsyncGenerator
from collections import OrderedDict
from agentscope.message import ToolUseBlock
class FastLLMModel(ChatModelBase):
# ... __init__ 和非流式部分同上 ...
async def __call__(
self,
messages: list[dict],
tools: list[dict] | None = None,
tool_choice: str | None = None,
**kwargs,
) -> ChatResponse | AsyncGenerator[ChatResponse, None]:
if self.stream:
return self._stream_call(messages, tools)
else:
return await self._non_stream_call(messages, tools)
async def _non_stream_call(self, messages, tools) -> ChatResponse:
"""非流式调用。"""
# 同 Step 1 的逻辑
...
async def _stream_call(
self, messages, tools
) -> AsyncGenerator[ChatResponse, None]:
"""流式调用。"""
start_time = time.time()
# 模拟 SSE 流(实际代码用 aiohttp SSE client)
chunks = ["你", "好", "!"]
total_text = ""
for i, chunk_text in enumerate(chunks):
total_text += chunk_text
is_last = i == len(chunks) - 1
usage = None
if is_last:
usage = ChatUsage(
input_tokens=10,
output_tokens=len(total_text),
time=time.time() - start_time,
)
yield ChatResponse(
content=[TextBlock(type="text", text=total_text)],
id="resp_stream",
created_at=time.strftime("%Y-%m-%dT%H:%M:%S"),
type="chat",
usage=usage,
)
2.3 参考真实实现
OpenAI 的流式解析在 _parse_openai_stream_response(_openai_model.py:346)中。核心思路:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16# _openai_model.py:376(简化)
text = "" # 累积文本
tool_calls = OrderedDict() # 累积工具调用
async for chunk in response:
delta = chunk.choices[0].delta
if delta.content:
text += delta.content
# 在句子边界 yield
yield ChatResponse(content=[TextBlock(type="text", text=text)])
if delta.tool_calls:
# 累积工具调用的 name 和 arguments
for tc in delta.tool_calls:
tool_calls[tc.index]["arguments"] += tc.arguments
关键点:流式解析是一个累积过程——每个 chunk 只包含增量(delta),解析器需要把增量累积起来。
Step 3:结构化输出
结构化输出的原理在第 9 章已经介绍过:把 Pydantic 模型伪装成”工具调用”,让模型以为要调用工具,从而获得特定格式的 JSON。
3.1 实现思路
1 | from pydantic import BaseModel |
3.2 参考 OpenAI 的实现
OpenAIChatModel 在 _structured_via_tool_call(_openai_model.py:730)中实现了完整的结构化输出路径。核心步骤:
- 将 Pydantic 模型转为工具 JSON Schema
- 设置
tool_choice={"type": "function", "function": {"name": "generate_response"}} - 调用模型,从响应中提取
ToolUseBlock - 用
structured_model.model_validate()验证输出格式
设计一瞥
设计一瞥:为什么 Model 和 Formatter 要分开?
你可能觉得”格式转换应该是 Model 自己的事”。但分开有好处:同一个FastLLMFormatter可能适用于所有兼容 FastLLM 协议的服务(不同部署、不同版本)。如果 Formatter 和 Model 绑定,换一个兼容服务就要写新的 Model 类。
代价:用户需要自己选择 Formatter 和 Model 的组合。详见卷四第 35 章。
完整流程图
1 | sequenceDiagram |
试一试:对比 OpenAI 和 FastLLM 的 Formatter
这个练习不需要 API key。
目标:理解不同 API 格式的差异。
步骤:
打开
src/agentscope/formatter/_openai_formatter.py,找到_format方法打开
src/agentscope/formatter/_anthropic_formatter.py,对比系统消息的处理方式回答以下问题:
- OpenAI 如何处理系统消息?(放在 messages 数组中,role=”system”)
- Anthropic 如何处理系统消息?(放在单独的
system字段中) - 你的 FastLLM Formatter 应该如何处理?为什么?
进阶:查看 Formatter 的截断逻辑:
1 | grep -n "_truncate" src/agentscope/formatter/_truncated_formatter_base.py |
理解 _truncate 在什么条件下被调用,以及它如何决定删除哪些消息。
PR 检查清单
提交新 Model Provider 的 PR 时:
- ChatModelBase 子类:
__call__正确处理流式/非流式 - 配套 Formatter:继承
TruncatedFormatterBase,实现_format/_count/_truncate - ChatResponse 构造:
content使用正确的 ContentBlock 类型 - ChatUsage 记录:准确记录 token 消耗和耗时
-
__init__.py导出:在model/__init__.py和formatter/__init__.py中导出 - 测试:覆盖非流式、流式(用 mock)、结构化输出
- Docstring:所有公共方法按项目规范写 docstring
- pre-commit 通过
检查点
你现在理解了:
- Model Provider 的三步开发流程:非流式 → 流式 → 结构化输出
- ChatModelBase 只有一个抽象方法
__call__,返回ChatResponse或AsyncGenerator - 流式解析是增量累积过程——每个 chunk 包含 delta,解析器累积后构造
ChatResponse - 结构化输出通过把 Pydantic 模型伪装成工具调用实现
- Formatter 和 Model 分离,使得同一个格式转换可以复用于多个兼容服务
自检练习:
- 如果 FastLLM 的 API 不支持
tools参数,你的 Model 还能支持工具调用吗?(提示:思考tool_choice的处理) - 流式模式下,
yield ChatResponse的content应该是累积文本还是增量文本?(提示:看_openai_model.py:376的text变量)
下一章预告
我们造了 Tool 和 Model。下一章,我们造一个 Memory Backend——用 SQLite 实现持久化记忆,让 Agent 重启后还能记住之前的对话。