第 18 章:接入一个新频道

个人公众号
1
2
卷三:工坊实战
[15] 造新工具 -> [16] 造新技能 -> [17] 接入新模型 -> [18] 接入新频道 <- you are here

第 12 章我们理解了 Channel 的适配器模式。现在动手——接入一个新的聊天平台。有两种路径:内置频道和自定义频道。


目标

为一个假设的 “ChatX” 平台写一个 Channel 适配器。ChatX 用 Webhook 接收消息,用 REST API 发送消息。

最小实现——6 个方法

每个 Channel 必须实现以下方法:

方法职责
channel(类属性)频道标识符字符串
from_config从配置创建实例
build_agent_request_from_native平台消息 → AgentRequest
start启动(连接平台)
stop停止(断开连接)
send发送文字消息给用户

第一步:选择路径

方式一:内置频道(需要改源码)

src/qwenpaw/app/channels/chatx/ 下创建模块,然后在 registry.py_BUILTIN_SPECS 中注册。

方式二:自定义频道(不改源码)

~/.qwenpaw/custom_channels/ 下放一个 .py 文件。_discover_custom_channels() 会自动加载。

两种方式的核心代码完全相同。

第二步:创建 Channel 类

src/qwenpaw/app/channels/chatx/ 下创建 channel.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import logging
from typing import Any, Optional, Dict
import httpx
from qwenpaw.app.channels.base import (
BaseChannel, ProcessHandler, OnReplySent,
)

logger = logging.getLogger(__name__)


class ChatXChannel(BaseChannel):
channel = "chatx"

def __init__(
self,
process: ProcessHandler,
bot_token: str = "",
enabled: bool = True,
on_reply_sent: OnReplySent = None,
):
super().__init__(process, on_reply_sent=on_reply_sent)
self.bot_token = bot_token
self.enabled = enabled

@classmethod
def from_config(cls, process, config, **kwargs):
return cls(
process=process,
bot_token=config.get("bot_token", ""),
enabled=config.get("enabled", True),
)

@classmethod
def from_env(cls, process, on_reply_sent=None):
import os
return cls(
process=process,
bot_token=os.getenv("CHATX_BOT_TOKEN", ""),
on_reply_sent=on_reply_sent,
)

第三步:实现消息转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def build_agent_request_from_native(
self, native_payload: Any
):
data = native_payload
sender_id = str(data.get("from", {}).get("id", ""))
text = data.get("content", {}).get("text", "")
session_id = self.resolve_session_id(sender_id, data)

content_parts = [{"type": "text", "text": text}]

return self.build_agent_request_from_user_content(
channel_id=self.channel,
sender_id=sender_id,
session_id=session_id,
content_parts=content_parts,
channel_meta=data,
)

关键:最后调用 build_agent_request_from_user_content()——这是 BaseChannel 提供的共享方法。

第四步:实现发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def send(
self, to_handle: str, text: str,
meta: Optional[Dict] = None,
):
async with httpx.AsyncClient() as client:
await client.post(
"https://api.chatx.io/v1/messages",
headers={
"Authorization": f"Bearer {self.bot_token}"
},
json={
"to": to_handle,
"content": {"type": "text", "text": text},
},
)

第五步:实现生命周期

1
2
3
4
5
async def start(self):
logger.info("ChatX channel started")

async def stop(self):
logger.info("ChatX channel stopped")

第六步:注册频道

registry.py_BUILTIN_SPECS 中添加:

1
2
3
4
_BUILTIN_SPECS = {
# ... 已有频道 ...
"chatx": (".chatx", "ChatXChannel"),
}

如果用自定义频道方式,直接把文件放到 ~/.qwenpaw/custom_channels/ 即可,不需要改注册表。

ProcessHandler 桥接机制

1
2
3
4
5
6
ChatX Webhook
-> ChatXChannel.build_agent_request_from_native()
-> AgentRequest
-> self._process(request) <- ProcessHandler
-> async for event in events:
send_content_parts() -> ChatX API

_run_process_loop()(BaseChannel 提供)已经实现了事件循环。Channel 只需实现消息的”进出”转换。

可选但有用的重写

方法什么时候需要
send_content_parts平台支持图片/文件等富文本
send_media平台支持媒体上传
resolve_session_id平台有会话 ID 概念
health_check需要验证 token 有效性
merge_native_items消息需要特殊合并逻辑

自检

  • 创建了 channels/chatx/channel.py,实现了 6 个核心方法
  • 知道 build_agent_request_from_native 把平台消息转成 AgentRequest
  • 知道 send 把 Agent 的回复发回平台
  • 知道 ProcessHandler 是 Channel 和 Runner 之间的桥梁

最后一章:从零到 PR——把改动提交为合规的 Pull Request。