docs: sync all documentation to V3 config format
53 KiB
PicoClaw Channel System:完整开发指南
影响范围:
pkg/channels/,pkg/bus/,pkg/media/,pkg/identity/,cmd/picoclaw/internal/gateway/
目录
- 第一部分:架构总览
- 第二部分:迁移指南——从 main 分支迁移到重构分支
- 第三部分:新 Channel 开发指南——从零实现一个新 Channel
- 第四部分:核心子系统详解
- 第五部分:关键设计决策与约定
- 附录:完整文件清单与接口速查表
第一部分:架构总览
1.1 重构前后对比
重构前(main 分支):
pkg/channels/
├── telegram.go # 每个 channel 直接放在 channels 包内
├── discord.go
├── slack.go
├── manager.go # Manager 直接引用各 channel 类型
├── ...
- Channel 实现全部在
pkg/channels/包的顶层 - Manager 通过
switch或if-else链条直接构造各 channel - Peer、MessageID 等路由信息埋在
Metadata map[string]string中 - 消息发送没有速率限制和重试
- 没有统一的媒体文件生命周期管理
- 各 channel 各自启动 HTTP 服务器
- 群聊触发过滤逻辑分散在各 channel 中
重构后(refactor/channel-system 分支):
pkg/channels/
├── base.go # BaseChannel 共享抽象层
├── interfaces.go # 可选能力接口(TypingCapable, MessageEditor, ReactionCapable, PlaceholderCapable, PlaceholderRecorder)
├── README.md # 英文文档
├── README.zh.md # 中文文档
├── media.go # MediaSender 可选接口
├── webhook.go # WebhookHandler, HealthChecker 可选接口
├── errors.go # 错误哨兵值(ErrNotRunning, ErrRateLimit, ErrTemporary, ErrSendFailed)
├── errutil.go # 错误分类帮助函数
├── registry.go # 工厂注册表(RegisterFactory / getFactory)
├── manager.go # 统一编排:Worker 队列、速率限制、重试、Typing/Placeholder、共享 HTTP
├── split.go # 长消息智能分割(保留代码块完整性)
├── telegram/ # 每个 channel 独立子包
│ ├── init.go # 工厂注册
│ ├── telegram.go # 实现
│ └── telegram_commands.go
├── discord/
│ ├── init.go
│ └── discord.go
├── slack/ line/ onebot/ dingtalk/ feishu/ wecom/ qq/ whatsapp/ whatsapp_native/ maixcam/ pico/
│ └── ...
pkg/bus/
├── bus.go # MessageBus(缓冲区 64,安全关闭+排水)
├── types.go # 结构化消息类型(Peer, SenderInfo, MediaPart, InboundMessage, OutboundMessage, OutboundMediaMessage)
pkg/media/
├── store.go # MediaStore 接口 + FileMediaStore 实现(两阶段释放,TTL 清理)
pkg/identity/
├── identity.go # 统一用户身份:规范 "platform:id" 格式 + 向后兼容匹配
1.2 消息流转全景图
┌────────────┐ InboundMessage ┌───────────┐ LLM + Tools ┌────────────┐
│ Telegram │──┐ │ │ │ │
│ Discord │──┤ PublishInbound() │ │ PublishOutbound() │ │
│ Slack │──┼──────────────────────▶ │ MessageBus │ ◀─────────────────── │ AgentLoop │
│ LINE │──┤ (buffered chan, 64) │ │ (buffered chan, 64) │ │
│ ... │──┘ │ │ │ │
└────────────┘ └─────┬─────┘ └────────────┘
│
SubscribeOutbound() │ SubscribeOutboundMedia()
▼
┌───────────────────┐
│ Manager │
│ ├── dispatchOutbound() 路由到 Worker 队列
│ ├── dispatchOutboundMedia()
│ ├── runWorker() 消息分割 + sendWithRetry()
│ ├── runMediaWorker() sendMediaWithRetry()
│ ├── preSend() 停止 Typing + 撤销 Reaction + 编辑 Placeholder
│ └── runTTLJanitor() 清理过期 Typing/Placeholder
└────────┬──────────┘
│
channel.Send() / SendMedia()
│
▼
┌────────────────┐
│ 各平台 API/SDK │
└────────────────┘
1.3 关键设计原则
| 原则 | 说明 |
|---|---|
| 子包隔离 | 每个 channel 一个独立 Go 子包,依赖 channels 父包提供的 BaseChannel 和接口 |
| 工厂注册 | 各子包通过 init() 自注册,Manager 通过名字查找工厂,消除 import 耦合 |
| 能力发现 | 可选能力通过接口(MediaSender, TypingCapable, ReactionCapable, PlaceholderCapable, MessageEditor, WebhookHandler, HealthChecker)声明,Manager 运行时类型断言发现 |
| 结构化消息 | Peer、MessageID、SenderInfo 从 Metadata 提升为 InboundMessage 的一等字段 |
| 错误分类 | Channel 返回哨兵错误(ErrRateLimit, ErrTemporary 等),Manager 据此决定重试策略 |
| 集中编排 | 速率限制、消息分割、重试、Typing/Reaction/Placeholder 全部由 Manager 和 BaseChannel 统一处理,Channel 只负责 Send |
第二部分:迁移指南——从 main 分支迁移到重构分支
2.1 如果你有未合并的 Channel 修改
步骤 1:确认你修改了哪些文件
在 main 分支上,Channel 文件直接位于 pkg/channels/ 顶层,例如:
pkg/channels/telegram.gopkg/channels/discord.go
重构后,这些文件已被删除,代码移动到了对应子包:
pkg/channels/telegram/telegram.gopkg/channels/discord/discord.go
步骤 2:理解结构变化映射
| main 分支文件 | 重构分支位置 | 变化 |
|---|---|---|
pkg/channels/telegram.go |
pkg/channels/telegram/telegram.go + init.go |
包名从 channels 变为 telegram |
pkg/channels/discord.go |
pkg/channels/discord/discord.go + init.go |
同上 |
pkg/channels/manager.go |
pkg/channels/manager.go |
大幅重写 |
| (不存在) | pkg/channels/base.go |
新增共享抽象层 |
| (不存在) | pkg/channels/registry.go |
新增工厂注册表 |
| (不存在) | pkg/channels/errors.go + errutil.go |
新增错误分类体系 |
| (不存在) | pkg/channels/interfaces.go |
新增可选能力接口 |
| (不存在) | pkg/channels/media.go |
新增 MediaSender 接口 |
| (不存在) | pkg/channels/webhook.go |
新增 WebhookHandler/HealthChecker |
| (不存在) | pkg/channels/whatsapp_native/ |
新增 WhatsApp 原生模式(whatsmeow) |
| (不存在) | pkg/channels/split.go |
新增消息分割(从 utils 迁入) |
| (不存在) | pkg/bus/types.go |
新增结构化消息类型 |
| (不存在) | pkg/media/store.go |
新增媒体文件生命周期管理 |
| (不存在) | pkg/identity/identity.go |
新增统一用户身份 |
步骤 3:迁移你的 Channel 代码
以 Telegram 为例,主要改动项:
3a. 包声明和导入
// 旧代码(main 分支)
package channels
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
)
// 新代码(重构分支)
package telegram
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels" // 引用父包
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/identity" // 新增
"github.com/sipeed/picoclaw/pkg/media" // 新增(如需媒体)
)
3b. 结构体嵌入 BaseChannel
// 旧代码:直接持有 bus、config 等字段
type TelegramChannel struct {
bus *bus.MessageBus
config *config.Config
running bool
allowList []string
// ...
}
// 新代码:嵌入 BaseChannel,它提供 bus、running、allowList 等
type TelegramChannel struct {
*channels.BaseChannel // 嵌入共享抽象
bot *telego.Bot
config *config.Config
// ... 只保留 channel 特有字段
}
3c. 构造函数
// 旧代码:直接赋值
func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) {
return &TelegramChannel{
bus: bus,
config: cfg,
allowList: cfg.Channels.Telegram.AllowFrom,
// ...
}, nil
}
// 新代码:使用 NewBaseChannel + 功能选项
func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) {
base := channels.NewBaseChannel(
"telegram", // 名称
cfg.Channels.Telegram, // 原始配置(any 类型)
bus, // 消息总线
cfg.Channels.Telegram.AllowFrom, // 允许列表
channels.WithMaxMessageLength(4096), // 平台消息长度上限
channels.WithGroupTrigger(cfg.Channels.Telegram.GroupTrigger), // 群聊触发配置
channels.WithReasoningChannelID(cfg.Channels.Telegram.ReasoningChannelID), // 思维链路由
)
return &TelegramChannel{
BaseChannel: base,
bot: bot,
config: cfg,
}, nil
}
3d. Start/Stop 生命周期
// 新代码:使用 SetRunning 原子操作
func (c *TelegramChannel) Start(ctx context.Context) error {
// ... 初始化 bot、webhook 等
c.SetRunning(true) // 必须在就绪后调用
go bh.Start()
return nil
}
func (c *TelegramChannel) Stop(ctx context.Context) error {
c.SetRunning(false) // 必须在清理前调用
// ... 停止 bot handler、取消 context
return nil
}
3e. Send 方法的错误返回
// 旧代码:只返回 error
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.running { return fmt.Errorf("not running") }
// ...
if err != nil { return err }
}
// 新代码:返回投递后的消息 ID,以及供 Manager 判断重试策略的哨兵错误
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return nil, channels.ErrNotRunning // ← Manager 不会重试
}
// ...
if err != nil {
// 使用 ClassifySendError 根据 HTTP 状态码包装错误
return nil, channels.ClassifySendError(statusCode, err)
// 或手动包装:
// return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrRateLimit, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrSendFailed, err)
}
return []string{deliveredID}, nil // 如果拿不到 ID,也可以返回 nil, nil
}
3f. 消息接收(Inbound)
// 旧代码:直接构造 InboundMessage 并发布
msg := bus.InboundMessage{
Channel: "telegram",
SenderID: senderID,
ChatID: chatID,
Content: content,
Metadata: map[string]string{
"peer_kind": "group", // 路由信息埋在 metadata
"peer_id": chatID,
"message_id": msgID,
},
}
c.bus.PublishInbound(ctx, msg)
// 新代码:使用 BaseChannel.HandleMessage,传入结构化字段
sender := bus.SenderInfo{
Platform: "telegram",
PlatformID: strconv.FormatInt(from.ID, 10),
CanonicalID: identity.BuildCanonicalID("telegram", strconv.FormatInt(from.ID, 10)),
Username: from.Username,
DisplayName: from.FirstName,
}
peer := bus.Peer{
Kind: "group", // 或 "direct"
ID: chatID,
}
// HandleMessage 内部调用 IsAllowedSender 检查权限,构建 MediaScope,发布到 bus
c.HandleMessage(ctx, peer, messageID, senderID, chatID, content, mediaRefs, metadata, sender)
3g. 添加工厂注册(必需)
为你的 channel 创建 init.go:
// pkg/channels/telegram/init.go
package telegram
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
)
func init() {
channels.RegisterFactory(config.ChannelTelegram, func(channelName, channelType string, cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) {
bc := cfg.Channels[channelName]
decoded, err := bc.GetDecoded()
if err != nil { return nil, err }
c, ok := decoded.(*config.TelegramSettings)
if !ok { return nil, channels.ErrSendFailed }
return NewTelegramChannel(bc, c, b)
})
}
3h. 在 Gateway 中导入子包
// cmd/picoclaw/internal/gateway/helpers.go
import (
_ "github.com/sipeed/picoclaw/pkg/channels/telegram" // 触发 init() 注册
_ "github.com/sipeed/picoclaw/pkg/channels/discord"
_ "github.com/sipeed/picoclaw/pkg/channels/your_new_channel" // 新增
)
步骤 4:迁移 Bus 消息使用方式
如果你的代码直接读取 InboundMessage.Metadata 中的路由字段:
// 旧代码
peerKind := msg.Metadata["peer_kind"]
peerID := msg.Metadata["peer_id"]
msgID := msg.Metadata["message_id"]
// 新代码
peerKind := msg.Peer.Kind // 一等字段
peerID := msg.Peer.ID // 一等字段
msgID := msg.MessageID // 一等字段
sender := msg.Sender // bus.SenderInfo 结构体
scope := msg.MediaScope // 媒体生命周期作用域
步骤 5:迁移允许列表检查
// 旧代码
if !c.isAllowed(senderID) { return }
// 新代码:优先使用结构化检查
if !c.IsAllowedSender(sender) { return }
// 或回退到字符串检查:
if !c.IsAllowed(senderID) { return }
BaseChannel.HandleMessage 方法内部已经处理了这个逻辑,无需在 channel 中重复检查。
2.2 如果你有 Manager 的修改
Manager 已被完全重写。你的修改需要理解新架构:
| 旧 Manager 职责 | 新 Manager 职责 |
|---|---|
| 直接构造 channel(switch/if-else) | 通过工厂注册表查找并构造 |
| 直接调用 channel.Send | 通过 per-channel Worker 队列 + 速率限制 + 重试 |
| 无消息分割 | 自动根据 MaxMessageLength 分割长消息 |
| 各 channel 自建 HTTP 服务器 | 统一共享 HTTP 服务器 |
| 无 Typing/Placeholder 管理 | 统一 preSend 处理 Typing 停止 + Reaction 撤销 + Placeholder 编辑;入站侧 BaseChannel.HandleMessage 自动编排 Typing/Reaction/Placeholder |
| 无 TTL 清理 | runTTLJanitor 定期清理过期 Typing/Reaction/Placeholder 条目 |
2.3 如果你有 Agent Loop 的修改
Agent Loop 的主要变化:
- MediaStore 注入:
agentLoop.SetMediaStore(mediaStore)— Agent 通过 MediaStore 解析工具产生的媒体引用 - ChannelManager 注入:
agentLoop.SetChannelManager(channelManager)— Agent 可查询 channel 状态 - OutboundMediaMessage:Agent 现在通过
bus.PublishOutboundMedia()发送媒体消息,而非嵌入文本回复 - extractPeer:路由使用
msg.Peer结构化字段而非 Metadata 查找
第三部分:新 Channel 开发指南——从零实现一个新 Channel
3.1 最小实现清单
要添加一个新的聊天平台(例如 matrix),你需要:
- ✅ 创建子包目录
pkg/channels/matrix/ - ✅ 创建
init.go— 工厂注册 - ✅ 创建
matrix.go— Channel 实现 - ✅ 在 Gateway helpers 中添加 blank import
- ✅ 在 Manager.initChannels() 中添加配置检查
- ✅ 在
pkg/config/中添加配置结构体
3.2 完整模板
pkg/channels/matrix/init.go
package matrix
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
)
func init() {
channels.RegisterFactory(config.ChannelMatrix, func(channelName, channelType string, cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) {
bc := cfg.Channels[channelName]
decoded, err := bc.GetDecoded()
if err != nil { return nil, err }
c, ok := decoded.(*config.MatrixSettings)
if !ok { return nil, channels.ErrSendFailed }
return NewMatrixChannel(bc, c, b)
})
}
pkg/channels/matrix/matrix.go
package matrix
import (
"context"
"fmt"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/identity"
"github.com/sipeed/picoclaw/pkg/logger"
)
// MatrixChannel implements channels.Channel for the Matrix protocol.
type MatrixChannel struct {
*channels.BaseChannel // 必须嵌入
config *config.Config
ctx context.Context
cancel context.CancelFunc
// ... Matrix SDK 客户端等
}
func NewMatrixChannel(cfg *config.Config, msgBus *bus.MessageBus) (*MatrixChannel, error) {
matrixCfg := cfg.Channels.Matrix // 假设配置中有此字段
base := channels.NewBaseChannel(
"matrix", // channel 名称(全局唯一)
matrixCfg, // 原始配置
msgBus, // 消息总线
matrixCfg.AllowFrom, // 允许列表
channels.WithMaxMessageLength(65536), // Matrix 消息长度限制
channels.WithGroupTrigger(matrixCfg.GroupTrigger),
channels.WithReasoningChannelID(matrixCfg.ReasoningChannelID), // 思维链路由(可选)
)
return &MatrixChannel{
BaseChannel: base,
config: cfg,
}, nil
}
// ========== 必须实现的 Channel 接口方法 ==========
func (c *MatrixChannel) Start(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
// 1. 初始化 Matrix 客户端
// 2. 开始监听消息
// 3. 标记为运行中
c.SetRunning(true)
logger.InfoC("matrix", "Matrix channel started")
return nil
}
func (c *MatrixChannel) Stop(ctx context.Context) error {
c.SetRunning(false)
if c.cancel != nil {
c.cancel()
}
logger.InfoC("matrix", "Matrix channel stopped")
return nil
}
func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
// 1. 检查运行状态
if !c.IsRunning() {
return nil, channels.ErrNotRunning
}
// 2. 发送消息到 Matrix
eventID, err := c.sendToMatrix(ctx, msg.ChatID, msg.Content)
if err != nil {
// 3. 必须使用错误分类包装
// 如果你有 HTTP 状态码:
// return nil, channels.ClassifySendError(statusCode, err)
// 如果是网络错误:
// return nil, channels.ClassifyNetError(err)
// 如果需要手动分类:
return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err)
}
return []string{eventID}, nil
}
// ========== 消息接收处理 ==========
func (c *MatrixChannel) handleIncoming(roomID, senderID, displayName, content string, msgID string) {
// 1. 构造结构化发送者身份
sender := bus.SenderInfo{
Platform: "matrix",
PlatformID: senderID,
CanonicalID: identity.BuildCanonicalID("matrix", senderID),
Username: senderID,
DisplayName: displayName,
}
// 2. 确定 Peer 类型(直聊 vs 群聊)
peer := bus.Peer{
Kind: "group", // 或 "direct"
ID: roomID,
}
// 3. 群聊过滤(如适用)
isGroup := peer.Kind == "group"
if isGroup {
isMentioned := false // 根据平台特性检测 @提及
shouldRespond, cleanContent := c.ShouldRespondInGroup(isMentioned, content)
if !shouldRespond {
return
}
content = cleanContent
}
// 4. 处理媒体附件(如有)
var mediaRefs []string
store := c.GetMediaStore()
if store != nil {
// 下载附件到本地 → store.Store() → 获取 ref
// mediaRefs = append(mediaRefs, ref)
}
// 5. 调用 HandleMessage 发布到 bus
// HandleMessage 内部会:
// - 检查 IsAllowedSender/IsAllowed
// - 构建 MediaScope
// - 发布 InboundMessage
c.HandleMessage(
c.ctx,
peer,
msgID, // 平台消息 ID
senderID, // 原始发送者 ID
roomID, // 聊天/房间 ID
content, // 消息内容
mediaRefs, // 媒体引用列表
nil, // 额外 metadata(通常 nil)
sender, // SenderInfo(variadic 参数)
)
}
// ========== 内部方法 ==========
func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) (string, error) {
// 实际的 Matrix SDK 调用
return "event-id", nil
}
3.3 可选能力接口
根据平台能力,你的 Channel 可以选择性实现以下接口:
MediaSender — 发送媒体附件
// 如果平台支持发送图片/文件/音频/视频
func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return nil, channels.ErrNotRunning
}
store := c.GetMediaStore()
if store == nil {
return nil, fmt.Errorf("no media store: %w", channels.ErrSendFailed)
}
var messageIDs []string
for _, part := range msg.Parts {
localPath, err := store.Resolve(part.Ref)
if err != nil {
logger.ErrorCF("matrix", "Failed to resolve media", map[string]any{
"ref": part.Ref, "error": err.Error(),
})
continue
}
// 根据 part.Type ("image"|"audio"|"video"|"file") 调用对应 API
switch part.Type {
case "image":
// 上传图片到 Matrix
default:
// 上传文件到 Matrix
}
// 如果 API 能返回平台消息 ID,就在这里追加。
// messageIDs = append(messageIDs, uploadedMessageID)
}
return messageIDs, nil
}
TypingCapable — Typing 指示器
// 如果平台支持 "正在输入..." 提示
func (c *MatrixChannel) StartTyping(ctx context.Context, chatID string) (stop func(), err error) {
// 调用 Matrix API 发送 typing 指示器
// 返回的 stop 函数必须是幂等的
stopped := false
return func() {
if !stopped {
stopped = true
// 调用 Matrix API 停止 typing
}
}, nil
}
ReactionCapable — 消息反应指示器
// 如果平台支持对入站消息添加 emoji 反应(如 Slack 的 👀、OneBot 的表情 289)
func (c *MatrixChannel) ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error) {
// 调用 Matrix API 添加反应到消息
// 返回的 undo 函数移除反应,必须是幂等的
err = c.addReaction(chatID, messageID, "eyes")
if err != nil {
return func() {}, err
}
return func() {
c.removeReaction(chatID, messageID, "eyes")
}, nil
}
MessageEditor — 消息编辑
// 如果平台支持编辑已发送的消息(用于 Placeholder 替换)
func (c *MatrixChannel) EditMessage(ctx context.Context, chatID, messageID, content string) error {
// 调用 Matrix API 编辑消息
return nil
}
PlaceholderCapable — 占位消息
// 如果平台支持发送占位消息(如 "Thinking... 💭"),并且实现了 MessageEditor,
// 则 Manager 的 preSend 会在出站时自动将占位消息编辑为最终回复。
// SendPlaceholder 内部根据 PlaceholderConfig.Enabled 决定是否发送;
// 返回 ("", nil) 表示跳过。
func (c *MatrixChannel) SendPlaceholder(ctx context.Context, chatID string) (string, error) {
cfg := c.config.Channels.Matrix.Placeholder
if !cfg.Enabled {
return "", nil
}
text := cfg.Text
if text == "" {
text = "Thinking... 💭"
}
// 调用 Matrix API 发送占位消息
msg, err := c.sendText(ctx, chatID, text)
if err != nil {
return "", err
}
return msg.ID, nil
}
WebhookHandler — HTTP Webhook 接收
// 如果 channel 通过 webhook 接收消息(而非长轮询/WebSocket)
func (c *MatrixChannel) WebhookPath() string {
return "/webhook/matrix" // 路径会被注册到共享 HTTP 服务器
}
func (c *MatrixChannel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 处理 webhook 请求
}
HealthChecker — 健康检查端点
func (c *MatrixChannel) HealthPath() string {
return "/health/matrix"
}
func (c *MatrixChannel) HealthHandler(w http.ResponseWriter, r *http.Request) {
if c.IsRunning() {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
}
3.4 入站侧 Typing/Reaction/Placeholder 自动编排
BaseChannel.HandleMessage 在发布入站消息之前,自动检测 channel 是否实现了 TypingCapable、ReactionCapable 和/或 PlaceholderCapable,并触发相应的指示器。三条管道完全独立,互不干扰:
// BaseChannel.HandleMessage 内部自动执行(无需 channel 手动调用):
if c.owner != nil && c.placeholderRecorder != nil {
// Typing — 独立管道
if tc, ok := c.owner.(TypingCapable); ok {
if stop, err := tc.StartTyping(ctx, chatID); err == nil {
c.placeholderRecorder.RecordTypingStop(c.name, chatID, stop)
}
}
// Reaction — 独立管道
if rc, ok := c.owner.(ReactionCapable); ok && messageID != "" {
if undo, err := rc.ReactToMessage(ctx, chatID, messageID); err == nil {
c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo)
}
}
// Placeholder — 独立管道
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
}
}
}
这意味着:
- 实现
TypingCapable的 channel(Telegram、Discord、LINE、Pico)无需在handleMessage中手动调用StartTyping+RecordTypingStop - 实现
ReactionCapable的 channel(Slack、OneBot)无需在handleMessage中手动调用AddReaction+RecordTypingStop - 实现
PlaceholderCapable的 channel(Telegram、Discord、Pico)无需在handleMessage中手动发送占位消息并调用RecordPlaceholder - Channel 只需实现对应接口,
HandleMessage会自动完成编排 - 不实现这些接口的 channel 不受影响(类型断言会失败,跳过)
PlaceholderCapable的SendPlaceholder方法内部根据配置的PlaceholderConfig.Enabled决定是否发送;返回("", nil)时跳过注册
Owner 注入:Manager 在 initChannel 中自动调用 SetOwner(ch) 将具体 channel 注入 BaseChannel,无需开发者手动设置。
当 Agent 处理完消息后,Manager 的 preSend 会自动:
- 调用已记录的
stop()停止 Typing - 调用已记录的
undo()撤销 Reaction - 如果有 Placeholder,且 channel 实现了
MessageEditor,尝试编辑 Placeholder 为最终回复(跳过 Send)
3.5 注册配置和 Gateway 接入
添加配置入口
Channels 现在使用统一的 map 类型配置(map[string]*config.Channel)。
每个 channel 条目将通用字段(enabled、type、allow_from 等)放在顶层,
channel 特定的设置放在 settings 子键中:
{
"channel_list": {
"matrix": {
"enabled": true,
"type": "matrix",
"allow_from": ["@user:example.com"],
"settings": {
"home_server": "https://matrix.org",
"user_id": "@bot:example.com",
"access_token": "enc://..."
}
}
}
}
安全字段(token、密码、API 密钥)放入 .security.yml:
channels:
matrix:
access_token: "your-matrix-access-token"
Channel 类型必须在 pkg/config/config_channel.go 的 channelSettingsFactory 中注册:
var channelSettingsFactory = map[string]any{
// ... 现有 channels
ChannelMatrix: (MatrixSettings{}),
}
无需修改 Manager
Manager 使用 InitChannelList() 来验证类型和解码设置,
然后通过 bc.Type 查找工厂。不需要在 Manager 中添加每个 channel 的条目——
只需注册工厂和配置条目即可。
注意:如果你的 channel 有多种模式(如 WhatsApp Bridge vs Native), 在
channelSettingsFactory中注册两种类型,并根据配置分支:// 在 config_channel.go 中: ChannelWhatsApp: (WhatsAppSettings{}), ChannelWhatsAppNative: (WhatsAppSettings{}),
在 Gateway 中添加 blank import
// cmd/picoclaw/internal/gateway/helpers.go
import (
_ "github.com/sipeed/picoclaw/pkg/channels/matrix"
)
第四部分:核心子系统详解
4.1 MessageBus
文件:pkg/bus/bus.go、pkg/bus/types.go
type MessageBus struct {
inbound chan InboundMessage // 缓冲区 = 64
outbound chan OutboundMessage // 缓冲区 = 64
outboundMedia chan OutboundMediaMessage // 缓冲区 = 64
done chan struct{} // 关闭信号
closed atomic.Bool // 防止重复关闭
}
关键行为:
| 方法 | 行为 |
|---|---|
PublishInbound(ctx, msg) |
检查 closed → 发送到 inbound channel → 阻塞/超时/关闭 |
ConsumeInbound(ctx) |
从 inbound 读取 → 阻塞/关闭/取消 |
PublishOutbound(ctx, msg) |
发送到 outbound channel |
SubscribeOutbound(ctx) |
从 outbound 读取(Manager dispatcher 调用) |
PublishOutboundMedia(ctx, msg) |
发送到 outboundMedia channel |
SubscribeOutboundMedia(ctx) |
从 outboundMedia 读取(Manager media dispatcher 调用) |
Close() |
CAS 关闭 → close(done) → 排水所有 channel(不关闭 channel 本身,避免并发 send-on-closed panic) |
设计要点:
- 缓冲区从 16 增至 64,减少突发负载下的阻塞
Close()不关闭底层 channel(只关闭done信号通道),因为可能有正在并发Publish的 goroutine- 排水循环确保 buffered 消息不被静默丢弃
4.2 结构化消息类型
文件:pkg/bus/types.go
// 路由对等体
type Peer struct {
Kind string `json:"kind"` // "direct" | "group" | "channel" | ""
ID string `json:"id"`
}
// 发送者身份信息
type SenderInfo struct {
Platform string `json:"platform,omitempty"` // "telegram", "discord", ...
PlatformID string `json:"platform_id,omitempty"` // 平台原始 ID
CanonicalID string `json:"canonical_id,omitempty"` // "platform:id" 规范格式
Username string `json:"username,omitempty"`
DisplayName string `json:"display_name,omitempty"`
}
// 入站消息
type InboundMessage struct {
Channel string // 来源 channel 名称
SenderID string // 发送者 ID(优先使用 CanonicalID)
Sender SenderInfo // 结构化发送者信息
ChatID string // 聊天/房间 ID
Content string // 消息文本
Media []string // 媒体引用列表(media://...)
Peer Peer // 路由对等体(一等字段)
MessageID string // 平台消息 ID(一等字段)
MediaScope string // 媒体生命周期作用域
SessionKey string // 会话键
Metadata map[string]string // 仅用于 channel 特有扩展
}
// 出站文本消息
type OutboundMessage struct {
Channel string
ChatID string
Content string
}
// 出站媒体消息
type OutboundMediaMessage struct {
Channel string
ChatID string
Parts []MediaPart
}
// 媒体片段
type MediaPart struct {
Type string // "image" | "audio" | "video" | "file"
Ref string // "media://uuid"
Caption string
Filename string
ContentType string
}
4.3 BaseChannel
文件:pkg/channels/base.go
BaseChannel 是所有 channel 的共享抽象层,提供以下能力:
| 方法/特性 | 说明 |
|---|---|
Name() string |
Channel 名称 |
IsRunning() bool |
原子读取运行状态 |
SetRunning(bool) |
原子设置运行状态 |
MaxMessageLength() int |
消息长度限制(rune 计数),0 = 无限制 |
ReasoningChannelID() string |
思维链路由目标 channel ID(空 = 不路由) |
IsAllowed(senderID string) bool |
旧格式允许列表检查(支持 "id|username" 和 "@username" 格式) |
IsAllowedSender(sender SenderInfo) bool |
新格式允许列表检查(委托给 identity.MatchAllowed) |
ShouldRespondInGroup(isMentioned, content) (bool, string) |
统一群聊触发过滤逻辑 |
HandleMessage(...) |
统一入站消息处理:权限检查 → 构建 MediaScope → 自动触发 Typing/Reaction/Placeholder → 发布到 Bus |
SetMediaStore(s) / GetMediaStore() |
Manager 注入的媒体存储 |
SetPlaceholderRecorder(r) / GetPlaceholderRecorder() |
Manager 注入的占位符记录器 |
SetOwner(ch) |
Manager 注入的具体 channel 引用(用于 HandleMessage 内部的 Typing/Reaction/Placeholder 类型断言) |
功能选项:
channels.WithMaxMessageLength(4096) // 设置平台消息长度限制
channels.WithGroupTrigger(groupTriggerCfg) // 设置群聊触发配置
channels.WithReasoningChannelID(id) // 设置思维链路由目标 channel
4.4 工厂注册表
文件:pkg/channels/registry.go
type ChannelFactory func(channelName, channelType string, cfg *config.Config, bus *bus.MessageBus) (Channel, error)
func RegisterFactory(name string, f ChannelFactory) // 子包 init() 中调用
func getFactory(name string) (ChannelFactory, bool) // Manager 内部调用
func GetRegisteredFactoryNames() []string // 返回所有已注册的工厂名称
为方便使用,RegisterSafeFactory[S any] 提供自动类型安全的设置解码:
// 不使用 RegisterSafeFactory(手动 GetDecoded() + 类型断言):
channels.RegisterFactory(config.ChannelTelegram,
func(channelName, channelType string, cfg *config.Config, b *bus.MessageBus) (Channel, error) {
bc := cfg.Channels[channelName]
decoded, err := bc.GetDecoded()
if err != nil { return nil, err }
c, ok := decoded.(*config.TelegramSettings)
if !ok { return nil, ErrSendFailed }
return NewTelegramChannel(bc, c, b)
})
// 使用 RegisterSafeFactory(同等安全,减少样板代码):
channels.RegisterSafeFactory(config.ChannelTelegram, NewTelegramChannel)
工厂注册表使用 sync.RWMutex 保护,在 init() 阶段注册(进程启动时完成)。Manager 在 initChannel() 中通过名字查找工厂并调用它。
4.5 错误分类与重试
文件:pkg/channels/errors.go、pkg/channels/errutil.go
哨兵错误
var (
ErrNotRunning = errors.New("channel not running") // 永久:不重试
ErrRateLimit = errors.New("rate limited") // 固定延迟:1s 后重试
ErrTemporary = errors.New("temporary failure") // 指数退避:500ms * 2^attempt,最大 8s
ErrSendFailed = errors.New("send failed") // 永久:不重试
)
错误分类帮助函数
// 根据 HTTP 状态码自动分类
func ClassifySendError(statusCode int, rawErr error) error {
// 429 → ErrRateLimit
// 5xx → ErrTemporary
// 4xx → ErrSendFailed
}
// 网络错误统一包装为临时错误
func ClassifyNetError(err error) error {
// → ErrTemporary
}
Manager 重试策略(sendWithRetry)
最大重试次数: 3
速率限制延迟: 1 秒
基础退避: 500 毫秒
最大退避: 8 秒
重试逻辑:
ErrNotRunning → 立即失败,不重试
ErrSendFailed → 立即失败,不重试
ErrRateLimit → 等待 1s → 重试
ErrTemporary → 等待 500ms * 2^attempt(最大 8s) → 重试
其他未知错误 → 等待 500ms * 2^attempt(最大 8s) → 重试
4.6 Manager 编排
文件:pkg/channels/manager.go
Per-channel Worker 架构
type channelWorker struct {
ch Channel // channel 实例
queue chan bus.OutboundMessage // 出站文本队列(缓冲 16)
mediaQueue chan bus.OutboundMediaMessage // 出站媒体队列(缓冲 16)
done chan struct{} // 文本 worker 完成信号
mediaDone chan struct{} // 媒体 worker 完成信号
limiter *rate.Limiter // per-channel 速率限制器
}
Per-channel 速率限制配置
var channelRateConfig = map[string]float64{
"telegram": 20, // 20 msg/s
"discord": 1, // 1 msg/s
"slack": 1, // 1 msg/s
"line": 10, // 10 msg/s
}
// 默认: 10 msg/s
// burst = max(1, ceil(rate/2))
生命周期管理
StartAll:
1. 遍历已注册 channels → channel.Start(ctx)
2. 为每个启动成功的 channel 创建 channelWorker
3. 启动 goroutines:
- runWorker (per-channel 出站文本)
- runMediaWorker (per-channel 出站媒体)
- dispatchOutbound (从 bus 路由到 worker 队列)
- dispatchOutboundMedia (从 bus 路由到 media worker 队列)
- runTTLJanitor (每 10s 清理过期 typing/reaction/placeholder)
4. 启动共享 HTTP 服务器(如已配置)
StopAll:
1. 关闭共享 HTTP 服务器(5s 超时)
2. 取消 dispatcher context
3. 关闭 text worker 队列 → 等待排水完成
4. 关闭 media worker 队列 → 等待排水完成
5. 停止每个 channel(channel.Stop)
Typing/Reaction/Placeholder 管理
// Manager 实现 PlaceholderRecorder 接口
func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string)
func (m *Manager) RecordTypingStop(channel, chatID string, stop func())
func (m *Manager) RecordReactionUndo(channel, chatID string, undo func())
// 入站侧:BaseChannel.HandleMessage 自动编排
// BaseChannel.HandleMessage 在 PublishInbound 之前,通过 owner 类型断言自动触发:
// - TypingCapable.StartTyping → RecordTypingStop
// - ReactionCapable.ReactToMessage → RecordReactionUndo
// - PlaceholderCapable.SendPlaceholder → RecordPlaceholder
// 三者独立,互不干扰。Channel 无需手动调用。
// 出站侧:发送前处理
func (m *Manager) preSend(ctx, name, msg, ch) bool {
key := name + ":" + msg.ChatID
// 1. 停止 Typing(调用存储的 stop 函数)
// 2. 撤销 Reaction(调用存储的 undo 函数)
// 3. 尝试编辑 Placeholder(如果 channel 实现了 MessageEditor)
// 成功 → return true(跳过 Send)
// 失败 → return false(继续 Send)
}
Manager 存储完全分离,三条管道互不干扰:
Manager {
typingStops sync.Map // "channel:chatID" → typingEntry ← 管 TypingCapable
reactionUndos sync.Map // "channel:chatID" → reactionEntry ← 管 ReactionCapable
placeholders sync.Map // "channel:chatID" → placeholderEntry
}
TTL 清理:
- Typing 停止函数:5 分钟 TTL(到期后自动调用 stop 并删除)
- Reaction 撤销函数:5 分钟 TTL(到期后自动调用 undo 并删除)
- Placeholder ID:10 分钟 TTL(到期后删除)
- 清理间隔:10 秒
4.7 消息分割
文件:pkg/channels/split.go
SplitMessage(content string, maxLen int) []string
智能分割策略:
- 计算有效分割点 = maxLen - 10% 缓冲区(为代码块闭合留空间)
- 优先在换行符处分割
- 其次在空格/制表符处分割
- 检测未闭合的代码块(
```) - 如果代码块未闭合:
- 尝试扩展到 maxLen 以包含闭合围栏
- 如果代码块太长,注入闭合/重开围栏(
\n```\n+ header) - 最后手段:在代码块开始前分割
4.8 MediaStore
文件:pkg/media/store.go
type MediaStore interface {
Store(localPath string, meta MediaMeta, scope string) (ref string, err error)
Resolve(ref string) (localPath string, err error)
ResolveWithMeta(ref string) (localPath string, meta MediaMeta, err error)
ReleaseAll(scope string) error
}
FileMediaStore 实现:
- 纯内存映射,不复制/移动文件
- 引用格式:
media://<uuid> - Scope 格式:
channel:chatID:messageID(由BuildMediaScope生成) - 两阶段操作:
- Phase 1(持锁):从 map 中收集并删除条目
- Phase 2(无锁):从磁盘删除文件
- 目的:最小化锁争用
- TTL 清理:
NewFileMediaStoreWithCleanup→Start()启动后台清理协程 - 清理间隔和最大存活时间由配置控制
4.9 Identity
文件:pkg/identity/identity.go
// 构建规范 ID
func BuildCanonicalID(platform, platformID string) string
// → "telegram:123456"
// 解析规范 ID
func ParseCanonicalID(canonical string) (platform, id string, ok bool)
// 匹配允许列表(向后兼容)
func MatchAllowed(sender bus.SenderInfo, allowed string) bool
MatchAllowed 支持的允许列表格式:
| 格式 | 匹配方式 |
|---|---|
"123456" |
匹配 sender.PlatformID |
"@alice" |
匹配 sender.Username |
"123456|alice" |
匹配 PlatformID 或 Username(旧格式兼容) |
"telegram:123456" |
精确匹配 sender.CanonicalID(新格式) |
4.10 共享 HTTP 服务器
文件:pkg/channels/manager.go 的 SetupHTTPServer
Manager 创建单一 http.Server,自动发现和注册:
- 实现
WebhookHandler的 channel → 挂载到wh.WebhookPath() - 实现
HealthChecker的 channel → 挂载到hc.HealthPath() - Health 全局端点由
health.Server.RegisterOnMux注册
超时配置:ReadTimeout = 30s, WriteTimeout = 30s
第五部分:关键设计决策与约定
5.1 必须遵守的约定
-
错误分类是合约:Channel 的
Send方法必须返回哨兵错误(或包装它们)。Manager 的重试策略完全依赖errors.Is检查。如果返回未分类的错误,Manager 会按"未知错误"处理(指数退避重试)。 -
SetRunning 是生命周期信号:
Start成功后必须调用c.SetRunning(true),Stop开始时必须调用c.SetRunning(false)。Send中必须检查c.IsRunning()并返回ErrNotRunning。 -
HandleMessage 包含权限检查:不要在调用
HandleMessage之前自行进行权限检查(除非你需要在检查前做平台特定的预处理)。HandleMessage内部已经调用IsAllowedSender/IsAllowed。 -
消息分割由 Manager 处理:Channel 的
Send方法不需要处理长消息分割。Manager 会在调用Send之前根据MaxMessageLength()自动分割。Channel 只需通过WithMaxMessageLength声明限制。 -
Typing/Reaction/Placeholder 由 BaseChannel + Manager 自动处理:Channel 的
Send方法不需要管理 Typing 停止、Reaction 撤销或 Placeholder 编辑。BaseChannel.HandleMessage在入站侧自动触发TypingCapable、ReactionCapable和PlaceholderCapable(通过owner类型断言);Manager 的preSend在出站侧自动停止 Typing、撤销 Reaction、编辑 Placeholder。Channel 只需实现对应接口即可。 -
工厂注册在 init() 中:每个子包必须有
init.go文件调用channels.RegisterFactory。Gateway 必须通过 blank import(_ "pkg/channels/xxx")触发注册。
5.2 Metadata 字段使用约定
不要再把以下信息放入 Metadata:
peer_kind/peer_id→ 使用InboundMessage.Peermessage_id→ 使用InboundMessage.MessageIDsender_platform/sender_username→ 使用InboundMessage.Sender
Metadata 仅用于:
- Channel 特有的扩展信息(如 Telegram 的
reply_to_message_id) - 不适合放入结构化字段的临时信息
5.3 并发安全约定
BaseChannel.running:使用atomic.Bool,线程安全Manager.channels/Manager.workers:使用sync.RWMutex保护Manager.placeholders/Manager.typingStops/Manager.reactionUndos:使用sync.MapMessageBus.closed:使用atomic.BoolFileMediaStore:使用sync.RWMutex,两阶段操作减少持锁时间- Channel Worker queue:Go channel,天然并发安全
5.4 测试约定
已有测试文件:
pkg/channels/base_test.go— BaseChannel 单元测试pkg/channels/manager_test.go— Manager 单元测试pkg/channels/split_test.go— 消息分割测试pkg/channels/errors_test.go— 错误类型测试pkg/channels/errutil_test.go— 错误分类测试
为新 channel 添加测试时:
go test ./pkg/channels/matrix/ -v # 子包测试
go test ./pkg/channels/ -run TestSpecific -v # 框架测试
make test # 全量测试
附录:完整文件清单与接口速查表
A.1 框架层文件
| 文件 | 职责 |
|---|---|
pkg/channels/base.go |
BaseChannel 结构体、Channel 接口、MessageLengthProvider、BaseChannelOption、HandleMessage |
pkg/channels/interfaces.go |
TypingCapable、MessageEditor、ReactionCapable、PlaceholderCapable、PlaceholderRecorder 接口 |
pkg/channels/media.go |
MediaSender 接口 |
pkg/channels/webhook.go |
WebhookHandler、HealthChecker 接口 |
pkg/channels/errors.go |
ErrNotRunning、ErrRateLimit、ErrTemporary、ErrSendFailed 哨兵 |
pkg/channels/errutil.go |
ClassifySendError、ClassifyNetError 帮助函数 |
pkg/channels/registry.go |
RegisterFactory、getFactory 工厂注册表 |
pkg/channels/manager.go |
Manager:Worker 队列、速率限制、重试、preSend、共享 HTTP、TTL janitor |
pkg/channels/split.go |
SplitMessage 长消息分割 |
pkg/bus/bus.go |
MessageBus 实现 |
pkg/bus/types.go |
Peer、SenderInfo、InboundMessage、OutboundMessage、OutboundMediaMessage、MediaPart |
pkg/media/store.go |
MediaStore 接口、FileMediaStore 实现 |
pkg/identity/identity.go |
BuildCanonicalID、ParseCanonicalID、MatchAllowed |
A.2 Channel 子包
| 子包 | 注册名 | 可选接口 |
|---|---|---|
pkg/channels/telegram/ |
"telegram" |
TypingCapable, PlaceholderCapable, MessageEditor, MediaSender |
pkg/channels/discord/ |
"discord" |
TypingCapable, PlaceholderCapable, MessageEditor, MediaSender |
pkg/channels/slack/ |
"slack" |
ReactionCapable, MediaSender |
pkg/channels/line/ |
"line" |
TypingCapable, MediaSender, WebhookHandler |
pkg/channels/onebot/ |
"onebot" |
ReactionCapable, MediaSender |
pkg/channels/dingtalk/ |
"dingtalk" |
— |
pkg/channels/feishu/ |
"feishu" |
— (架构特定 build tags: feishu_32.go / feishu_64.go) |
pkg/channels/wecom/ |
"wecom" |
MediaSender |
pkg/channels/qq/ |
"qq" |
— |
pkg/channels/whatsapp/ |
"whatsapp" |
— (Bridge 模式) |
pkg/channels/whatsapp_native/ |
"whatsapp_native" |
— (原生 whatsmeow 模式) |
pkg/channels/maixcam/ |
"maixcam" |
— |
pkg/channels/mqtt/ |
"mqtt" |
— |
pkg/channels/pico/ |
"pico" |
TypingCapable, PlaceholderCapable, MessageEditor, WebhookHandler |
A.3 接口速查表
// ===== 必须实现 =====
type Channel interface {
Name() string
Start(ctx context.Context) error
Stop(ctx context.Context) error
Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error)
IsRunning() bool
IsAllowed(senderID string) bool
IsAllowedSender(sender bus.SenderInfo) bool
ReasoningChannelID() string
}
// ===== 可选实现 =====
type MediaSender interface {
SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error)
}
type TypingCapable interface {
StartTyping(ctx context.Context, chatID string) (stop func(), err error)
}
type ReactionCapable interface {
ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error)
}
type PlaceholderCapable interface {
SendPlaceholder(ctx context.Context, chatID string) (messageID string, err error)
}
type MessageEditor interface {
EditMessage(ctx context.Context, chatID, messageID, content string) error
}
type WebhookHandler interface {
WebhookPath() string
http.Handler
}
type HealthChecker interface {
HealthPath() string
HealthHandler(w http.ResponseWriter, r *http.Request)
}
type MessageLengthProvider interface {
MaxMessageLength() int
}
// ===== 由 Manager 注入 =====
type PlaceholderRecorder interface {
RecordPlaceholder(channel, chatID, placeholderID string)
RecordTypingStop(channel, chatID string, stop func())
RecordReactionUndo(channel, chatID string, undo func())
}
A.4 Gateway 启动序列(完整引导流程)
// 1. 创建核心组件
msgBus := bus.NewMessageBus()
provider := providers.CreateProvider(cfg)
agentLoop := agent.NewAgentLoop(cfg, msgBus, provider)
// 2. 创建媒体存储(带 TTL 清理)
mediaStore := media.NewFileMediaStoreWithCleanup(cleanerConfig)
mediaStore.Start()
// 3. 创建 Channel Manager(触发 initChannels → 工厂查找 → 构造 → 注入 MediaStore/PlaceholderRecorder/Owner)
channelManager := channels.NewManager(cfg, msgBus, mediaStore)
// 4. 注入引用
agentLoop.SetChannelManager(channelManager)
agentLoop.SetMediaStore(mediaStore)
// 5. 配置共享 HTTP 服务器
channelManager.SetupHTTPServer(addr, healthServer)
// 6. 启动
channelManager.StartAll(ctx) // 启动 channels + workers + dispatchers + HTTP server
go agentLoop.Run(ctx) // 启动 Agent 消息循环
// 7. 关闭(信号触发)
cancel() // 取消 context
msgBus.Close() // 信号关闭 + 排水
channelManager.StopAll(shutdownCtx) // 停止 HTTP + workers + channels
mediaStore.Stop() // 停止 TTL 清理
agentLoop.Stop() // 停止 Agent
A.5 Per-channel 速率限制参考
| Channel | 速率 (msg/s) | Burst |
|---|---|---|
| telegram | 20 | 10 |
| discord | 1 | 1 |
| slack | 1 | 1 |
| line | 10 | 5 |
| 其他 | 10 (默认) | 5 |
A.6 已知限制和注意事项
-
媒体清理暂时禁用:Agent loop 中的
ReleaseAll调用被注释掉了(refactor(loop): disable media cleanup to prevent premature file deletion),因为会话边界尚未明确定义。TTL 清理仍然有效。 -
Feishu 架构特定编译:Feishu channel 使用 build tags 区分 32 位和 64 位架构(
feishu_32.go/feishu_64.go)。Feishu 使用 SDK 的 WebSocket 模式(非 HTTP webhook),因此不实现WebhookHandler。 -
WeCom 现在只有一个 channel:
"wecom"采用 WebSocket AI Bot 实现,带路由持久化;访问控制走统一的 channel 白名单机制,不再保留旧的 webhook/app 双分支。 -
Pico Protocol:
pkg/channels/pico/实现了一个自定义的 PicoClaw 原生协议 channel,通过 WebSocket webhook (/pico/ws) 接收消息。 -
WhatsApp 有两种模式:
"whatsapp"(Bridge 模式,通过外部 bridge URL 通信)和"whatsapp_native"(原生 whatsmeow 模式,直接连接 WhatsApp)。Manager 根据WhatsAppConfig.UseNative决定初始化哪个。 -
DingTalk 使用 Stream 模式:DingTalk 使用 SDK 的 Stream/WebSocket 模式(非 HTTP webhook),因此不实现
WebhookHandler。 -
PlaceholderConfig 的配置与实现:
PlaceholderConfig出现在 6 个 channel config 中(Telegram、Discord、Slack、LINE、OneBot、Pico),但只有实现了PlaceholderCapable+MessageEditor的 channel(Telegram、Discord、Pico)能真正使用占位消息编辑功能。其余 channel 的PlaceholderConfig为预留字段。 -
ReasoningChannelID:大多数 channel config 都包含
reasoning_channel_id字段,用于将 LLM 的思维链(reasoning/thinking)路由到指定 channel(WhatsApp、Telegram、Feishu、Discord、MaixCam、QQ、DingTalk、Slack、LINE、OneBot、WeCom)。注意:PicoConfig目前不包含该字段。BaseChannel通过WithReasoningChannelID选项和ReasoningChannelID()方法暴露此配置。