From 779e4dfc381f4b556cad545d2e650f81216bb1d0 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Fri, 27 Feb 2026 03:30:58 +0800 Subject: [PATCH] docs(channels): add English README for channel system architecture --- pkg/channels/README.md | 1331 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1331 insertions(+) create mode 100644 pkg/channels/README.md diff --git a/pkg/channels/README.md b/pkg/channels/README.md new file mode 100644 index 000000000..52b9f98f4 --- /dev/null +++ b/pkg/channels/README.md @@ -0,0 +1,1331 @@ +# PicoClaw Channel System Refactor: Complete Development Guide + +> **Branch**: `refactor/channel-system` +> **Status**: Active development (~40 commits) +> **Scope**: `pkg/channels/`, `pkg/bus/`, `pkg/media/`, `pkg/identity/`, `cmd/picoclaw/internal/gateway/` + +--- + +## Table of Contents + +- [Part 1: Architecture Overview](#part-1-architecture-overview) +- [Part 2: Migration Guide — From main Branch to Refactored Branch](#part-2-migration-guide--from-main-branch-to-refactored-branch) +- [Part 3: New Channel Development Guide — Implementing a Channel from Scratch](#part-3-new-channel-development-guide--implementing-a-channel-from-scratch) +- [Part 4: Core Subsystem Details](#part-4-core-subsystem-details) +- [Part 5: Key Design Decisions and Conventions](#part-5-key-design-decisions-and-conventions) +- [Appendix: Complete File Listing and Interface Quick Reference](#appendix-complete-file-listing-and-interface-quick-reference) + +--- + +## Part 1: Architecture Overview + +### 1.1 Before and After Comparison + +**Before Refactor (main branch)**: + +``` +pkg/channels/ +├── telegram.go # Each channel directly in the channels package +├── discord.go +├── slack.go +├── manager.go # Manager directly references each channel type +├── ... +``` + +- All channel implementations lived at the top level of `pkg/channels/` +- Manager constructed each channel via `switch` or `if-else` chains +- Routing info like Peer and MessageID was buried in `Metadata map[string]string` +- No rate limiting or retry on message sending +- No unified media file lifecycle management +- Each channel ran its own HTTP server +- Group chat trigger filtering logic was scattered across channels + +**After Refactor (refactor/channel-system branch)**: + +``` +pkg/channels/ +├── base.go # BaseChannel shared abstraction layer +├── interfaces.go # Optional capability interfaces (TypingCapable, MessageEditor, ReactionCapable, PlaceholderCapable, PlaceholderRecorder) +├── media.go # MediaSender optional interface +├── webhook.go # WebhookHandler, HealthChecker optional interfaces +├── errors.go # Sentinel errors (ErrNotRunning, ErrRateLimit, ErrTemporary, ErrSendFailed) +├── errutil.go # Error classification helpers +├── registry.go # Factory registry (RegisterFactory / getFactory) +├── manager.go # Unified orchestration: Worker queues, rate limiting, retries, Typing/Placeholder, shared HTTP +├── split.go # Smart long-message splitting (preserves code block integrity) +├── telegram/ # Each channel in its own sub-package +│ ├── init.go # Factory registration +│ ├── telegram.go # Implementation +│ └── telegram_commands.go +├── discord/ +│ ├── init.go +│ └── discord.go +├── slack/ line/ onebot/ dingtalk/ feishu/ wecom/ qq/ whatsapp/ maixcam/ pico/ +│ └── ... + +pkg/bus/ +├── bus.go # MessageBus (buffer 64, safe close + drain) +├── types.go # Structured message types (Peer, SenderInfo, MediaPart, InboundMessage, OutboundMessage, OutboundMediaMessage) + +pkg/media/ +├── store.go # MediaStore interface + FileMediaStore implementation (two-phase release, TTL cleanup) + +pkg/identity/ +├── identity.go # Unified user identity: canonical "platform:id" format + backward-compatible matching +``` + +### 1.2 Message Flow Overview + +``` +┌────────────┐ InboundMessage ┌───────────┐ LLM + Tools ┌────────────┐ +│ Telegram │──┐ │ │ │ │ +│ Discord │──┤ PublishInbound() │ │ PublishOutbound() │ │ +│ Slack │──┼──────────────────────▶ │ MessageBus │ ◀─────────────────── │ AgentLoop │ +│ LINE │──┤ (buffered chan, 64) │ │ (buffered chan, 64) │ │ +│ ... │──┘ │ │ │ │ +└────────────┘ └─────┬─────┘ └────────────┘ + │ + SubscribeOutbound() │ SubscribeOutboundMedia() + ▼ + ┌───────────────────┐ + │ Manager │ + │ ├── dispatchOutbound() Route to Worker queues + │ ├── dispatchOutboundMedia() + │ ├── runWorker() Message split + sendWithRetry() + │ ├── runMediaWorker() sendMediaWithRetry() + │ ├── preSend() Stop Typing + Undo Reaction + Edit Placeholder + │ └── runTTLJanitor() Clean up expired Typing/Placeholder + └────────┬──────────┘ + │ + channel.Send() / SendMedia() + │ + ▼ + ┌────────────────┐ + │ Platform APIs │ + └────────────────┘ +``` + +### 1.3 Key Design Principles + +| Principle | Description | +|-----------|-------------| +| **Sub-package Isolation** | Each channel is a standalone Go sub-package, depending on `BaseChannel` and interfaces from the `channels` parent package | +| **Factory Registration** | Sub-packages self-register via `init()`, Manager looks up factories by name, eliminating import coupling | +| **Capability Discovery** | Optional capabilities are declared via interfaces (`MediaSender`, `TypingCapable`, `ReactionCapable`, `PlaceholderCapable`, `MessageEditor`, `WebhookHandler`), discovered by Manager via runtime type assertions | +| **Structured Messages** | Peer, MessageID, and SenderInfo promoted from Metadata to first-class fields on InboundMessage | +| **Error Classification** | Channels return sentinel errors (`ErrRateLimit`, `ErrTemporary`, etc.), Manager uses these to determine retry strategy | +| **Centralized Orchestration** | Rate limiting, message splitting, retries, and Typing/Reaction/Placeholder management are all handled by Manager and BaseChannel; channels only need to implement Send | + +--- + +## Part 2: Migration Guide — From main Branch to Refactored Branch + +### 2.1 If You Have Unmerged Channel Changes + +#### Step 1: Identify which files you modified + +On the main branch, channel files were directly in `pkg/channels/` top level, e.g.: +- `pkg/channels/telegram.go` +- `pkg/channels/discord.go` + +After refactoring, these files have been removed and code moved to corresponding sub-packages: +- `pkg/channels/telegram/telegram.go` +- `pkg/channels/discord/discord.go` + +#### Step 2: Understand the structural change mapping + +| main branch file | Refactored branch location | Changes | +|---|---|---| +| `pkg/channels/telegram.go` | `pkg/channels/telegram/telegram.go` + `init.go` | Package name changed from `channels` to `telegram` | +| `pkg/channels/discord.go` | `pkg/channels/discord/discord.go` + `init.go` | Same as above | +| `pkg/channels/manager.go` | `pkg/channels/manager.go` | Extensively rewritten | +| _(did not exist)_ | `pkg/channels/base.go` | New shared abstraction layer | +| _(did not exist)_ | `pkg/channels/registry.go` | New factory registry | +| _(did not exist)_ | `pkg/channels/errors.go` + `errutil.go` | New error classification system | +| _(did not exist)_ | `pkg/channels/interfaces.go` | New optional capability interfaces | +| _(did not exist)_ | `pkg/channels/media.go` | New MediaSender interface | +| _(did not exist)_ | `pkg/channels/webhook.go` | New WebhookHandler/HealthChecker | +| _(did not exist)_ | `pkg/channels/split.go` | New message splitting (migrated from utils) | +| _(did not exist)_ | `pkg/bus/types.go` | New structured message types | +| _(did not exist)_ | `pkg/media/store.go` | New media file lifecycle management | +| _(did not exist)_ | `pkg/identity/identity.go` | New unified user identity | + +#### Step 3: Migrate your channel code + +Using Telegram as an example, the main changes are: + +**3a. Package declaration and imports** + +```go +// Old code (main branch) +package channels + +import ( + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/config" +) + +// New code (refactored branch) +package telegram + +import ( + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/channels" // Reference parent package + "github.com/sipeed/picoclaw/pkg/config" + "github.com/sipeed/picoclaw/pkg/identity" // New + "github.com/sipeed/picoclaw/pkg/media" // New (if media support needed) +) +``` + +**3b. Struct embeds BaseChannel** + +```go +// Old code: directly held bus, config, etc. fields +type TelegramChannel struct { + bus *bus.MessageBus + config *config.Config + running bool + allowList []string + // ... +} + +// New code: embed BaseChannel, which provides bus, running, allowList, etc. +type TelegramChannel struct { + *channels.BaseChannel // Embed shared abstraction + bot *telego.Bot + config *config.Config + // ... only channel-specific fields +} +``` + +**3c. Constructor** + +```go +// Old code: direct assignment +func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) { + return &TelegramChannel{ + bus: bus, + config: cfg, + allowList: cfg.Channels.Telegram.AllowFrom, + // ... + }, nil +} + +// New code: use NewBaseChannel + functional options +func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) { + base := channels.NewBaseChannel( + "telegram", // Name + cfg.Channels.Telegram, // Raw config (any type) + bus, // Message bus + cfg.Channels.Telegram.AllowFrom, // Allow list + channels.WithMaxMessageLength(4096), // Platform message length limit + channels.WithGroupTrigger(cfg.Channels.Telegram.GroupTrigger), // Group trigger config + ) + return &TelegramChannel{ + BaseChannel: base, + bot: bot, + config: cfg, + }, nil +} +``` + +**3d. Start/Stop lifecycle** + +```go +// New code: use SetRunning atomic operation +func (c *TelegramChannel) Start(ctx context.Context) error { + // ... initialize bot, webhook, etc. + c.SetRunning(true) // Must be called after ready + go bh.Start() + return nil +} + +func (c *TelegramChannel) Stop(ctx context.Context) error { + c.SetRunning(false) // Must be called before cleanup + // ... stop bot handler, cancel context + return nil +} +``` + +**3e. Send method error returns** + +```go +// Old code: returns plain error +func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { + if !c.running { return fmt.Errorf("not running") } + // ... + if err != nil { return err } +} + +// New code: must return sentinel errors for Manager to determine retry strategy +func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { + if !c.IsRunning() { + return channels.ErrNotRunning // ← Manager will not retry + } + // ... + if err != nil { + // Use ClassifySendError to wrap error based on HTTP status code + return channels.ClassifySendError(statusCode, err) + // Or manually wrap: + // return fmt.Errorf("%w: %v", channels.ErrTemporary, err) + // return fmt.Errorf("%w: %v", channels.ErrRateLimit, err) + // return fmt.Errorf("%w: %v", channels.ErrSendFailed, err) + } + return nil +} +``` + +**3f. Message reception (Inbound)** + +```go +// Old code: directly construct InboundMessage and publish +msg := bus.InboundMessage{ + Channel: "telegram", + SenderID: senderID, + ChatID: chatID, + Content: content, + Metadata: map[string]string{ + "peer_kind": "group", // Routing info buried in metadata + "peer_id": chatID, + "message_id": msgID, + }, +} +c.bus.PublishInbound(ctx, msg) + +// New code: use BaseChannel.HandleMessage with structured fields +sender := bus.SenderInfo{ + Platform: "telegram", + PlatformID: strconv.FormatInt(from.ID, 10), + CanonicalID: identity.BuildCanonicalID("telegram", strconv.FormatInt(from.ID, 10)), + Username: from.Username, + DisplayName: from.FirstName, +} + +peer := bus.Peer{ + Kind: "group", // or "direct" + ID: chatID, +} + +// HandleMessage internally calls IsAllowedSender for permission checks, builds MediaScope, and publishes to bus +c.HandleMessage(ctx, peer, messageID, senderID, chatID, content, mediaRefs, metadata, sender) +``` + +**3g. Add factory registration (required)** + +Create `init.go` for your channel: + +```go +// pkg/channels/telegram/init.go +package telegram + +import ( + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/channels" + "github.com/sipeed/picoclaw/pkg/config" +) + +func init() { + channels.RegisterFactory("telegram", func(cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) { + return NewTelegramChannel(cfg, b) + }) +} +``` + +**3h. Import sub-package in Gateway** + +```go +// cmd/picoclaw/internal/gateway/helpers.go +import ( + _ "github.com/sipeed/picoclaw/pkg/channels/telegram" // Triggers init() registration + _ "github.com/sipeed/picoclaw/pkg/channels/discord" + _ "github.com/sipeed/picoclaw/pkg/channels/your_new_channel" // New addition +) +``` + +#### Step 4: Migrate bus message usage + +If your code directly reads routing fields from `InboundMessage.Metadata`: + +```go +// Old code +peerKind := msg.Metadata["peer_kind"] +peerID := msg.Metadata["peer_id"] +msgID := msg.Metadata["message_id"] + +// New code +peerKind := msg.Peer.Kind // First-class field +peerID := msg.Peer.ID // First-class field +msgID := msg.MessageID // First-class field +sender := msg.Sender // bus.SenderInfo struct +scope := msg.MediaScope // Media lifecycle scope +``` + +#### Step 5: Migrate allow-list checks + +```go +// Old code +if !c.isAllowed(senderID) { return } + +// New code: prefer structured check +if !c.IsAllowedSender(sender) { return } +// Or fall back to string check: +if !c.IsAllowed(senderID) { return } +``` + +`BaseChannel.HandleMessage` already handles this logic internally — no need to duplicate the check in your channel. + +### 2.2 If You Have Manager Modifications + +The Manager has been completely rewritten. Your modifications will need to account for the new architecture: + +| Old Manager Responsibility | New Manager Responsibility | +|---|---| +| Directly construct channels (switch/if-else) | Look up and construct via factory registry | +| Directly call channel.Send | Per-channel Worker queues + rate limiting + retries | +| No message splitting | Automatic splitting based on MaxMessageLength | +| Each channel runs its own HTTP server | Unified shared HTTP server | +| No Typing/Placeholder management | Unified preSend handles Typing stop + Reaction undo + Placeholder edit; inbound-side BaseChannel.HandleMessage auto-orchestrates Typing/Reaction/Placeholder | +| No TTL cleanup | runTTLJanitor periodically cleans up expired Typing/Reaction/Placeholder entries | + +### 2.3 If You Have Agent Loop Modifications + +Main changes to the Agent Loop: + +1. **MediaStore injection**: `agentLoop.SetMediaStore(mediaStore)` — Agent resolves media references produced by tools via MediaStore +2. **ChannelManager injection**: `agentLoop.SetChannelManager(channelManager)` — Agent can query channel state +3. **OutboundMediaMessage**: Agent now sends media messages via `bus.PublishOutboundMedia()` instead of embedding them in text replies +4. **extractPeer**: Routing uses `msg.Peer` structured fields instead of Metadata lookups + +--- + +## Part 3: New Channel Development Guide — Implementing a Channel from Scratch + +### 3.1 Minimum Implementation Checklist + +To add a new chat platform (e.g., `matrix`), you need to: + +1. ✅ Create sub-package directory `pkg/channels/matrix/` +2. ✅ Create `init.go` — factory registration +3. ✅ Create `matrix.go` — channel implementation +4. ✅ Add blank import in Gateway helpers +5. ✅ Add config check in Manager.initChannels() +6. ✅ Add config struct in `pkg/config/` + +### 3.2 Complete Template + +#### `pkg/channels/matrix/init.go` + +```go +package matrix + +import ( + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/channels" + "github.com/sipeed/picoclaw/pkg/config" +) + +func init() { + channels.RegisterFactory("matrix", func(cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) { + return NewMatrixChannel(cfg, b) + }) +} +``` + +#### `pkg/channels/matrix/matrix.go` + +```go +package matrix + +import ( + "context" + "fmt" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/channels" + "github.com/sipeed/picoclaw/pkg/config" + "github.com/sipeed/picoclaw/pkg/identity" + "github.com/sipeed/picoclaw/pkg/logger" +) + +// MatrixChannel implements channels.Channel for the Matrix protocol. +type MatrixChannel struct { + *channels.BaseChannel // Must embed + config *config.Config + ctx context.Context + cancel context.CancelFunc + // ... Matrix SDK client, etc. +} + +func NewMatrixChannel(cfg *config.Config, msgBus *bus.MessageBus) (*MatrixChannel, error) { + matrixCfg := cfg.Channels.Matrix // Assumes this field exists in config + + base := channels.NewBaseChannel( + "matrix", // Channel name (globally unique) + matrixCfg, // Raw config + msgBus, // Message bus + matrixCfg.AllowFrom, // Allow list + channels.WithMaxMessageLength(65536), // Matrix message length limit + channels.WithGroupTrigger(matrixCfg.GroupTrigger), + ) + + return &MatrixChannel{ + BaseChannel: base, + config: cfg, + }, nil +} + +// ========== Required Channel Interface Methods ========== + +func (c *MatrixChannel) Start(ctx context.Context) error { + c.ctx, c.cancel = context.WithCancel(ctx) + + // 1. Initialize Matrix client + // 2. Start listening for messages + // 3. Mark as running + c.SetRunning(true) + + logger.InfoC("matrix", "Matrix channel started") + return nil +} + +func (c *MatrixChannel) Stop(ctx context.Context) error { + c.SetRunning(false) + + if c.cancel != nil { + c.cancel() + } + + logger.InfoC("matrix", "Matrix channel stopped") + return nil +} + +func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { + // 1. Check running state + if !c.IsRunning() { + return channels.ErrNotRunning + } + + // 2. Send message to Matrix + err := c.sendToMatrix(ctx, msg.ChatID, msg.Content) + if err != nil { + // 3. Must use error classification wrapping + // If you have an HTTP status code: + // return channels.ClassifySendError(statusCode, err) + // If it's a network error: + // return channels.ClassifyNetError(err) + // If manual classification is needed: + return fmt.Errorf("%w: %v", channels.ErrTemporary, err) + } + + return nil +} + +// ========== Incoming Message Handling ========== + +func (c *MatrixChannel) handleIncoming(roomID, senderID, displayName, content string, msgID string) { + // 1. Construct structured sender identity + sender := bus.SenderInfo{ + Platform: "matrix", + PlatformID: senderID, + CanonicalID: identity.BuildCanonicalID("matrix", senderID), + Username: senderID, + DisplayName: displayName, + } + + // 2. Determine Peer type (direct vs group) + peer := bus.Peer{ + Kind: "group", // or "direct" + ID: roomID, + } + + // 3. Group chat filtering (if applicable) + isGroup := peer.Kind == "group" + if isGroup { + isMentioned := false // Detect @mentions based on platform specifics + shouldRespond, cleanContent := c.ShouldRespondInGroup(isMentioned, content) + if !shouldRespond { + return + } + content = cleanContent + } + + // 4. Handle media attachments (if any) + var mediaRefs []string + store := c.GetMediaStore() + if store != nil { + // Download attachment locally → store.Store() → get ref + // mediaRefs = append(mediaRefs, ref) + } + + // 5. Call HandleMessage to publish to bus + // HandleMessage internally will: + // - Check IsAllowedSender/IsAllowed + // - Build MediaScope + // - Publish InboundMessage + c.HandleMessage( + c.ctx, + peer, + msgID, // Platform message ID + senderID, // Raw sender ID + roomID, // Chat/room ID + content, // Message content + mediaRefs, // Media reference list + nil, // Extra metadata (usually nil) + sender, // SenderInfo (variadic parameter) + ) +} + +// ========== Internal Methods ========== + +func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) error { + // Actual Matrix SDK call + return nil +} +``` + +### 3.3 Optional Capability Interfaces + +Depending on platform capabilities, your channel can optionally implement the following interfaces: + +#### MediaSender — Send Media Attachments + +```go +// If the platform supports sending images/files/audio/video +func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { + if !c.IsRunning() { + return channels.ErrNotRunning + } + + store := c.GetMediaStore() + if store == nil { + return fmt.Errorf("no media store: %w", channels.ErrSendFailed) + } + + for _, part := range msg.Parts { + localPath, err := store.Resolve(part.Ref) + if err != nil { + logger.ErrorCF("matrix", "Failed to resolve media", map[string]any{ + "ref": part.Ref, "error": err.Error(), + }) + continue + } + + // Call the appropriate API based on part.Type ("image"|"audio"|"video"|"file") + switch part.Type { + case "image": + // Upload image to Matrix + default: + // Upload file to Matrix + } + } + return nil +} +``` + +#### TypingCapable — Typing Indicator + +```go +// If the platform supports "typing..." indicators +func (c *MatrixChannel) StartTyping(ctx context.Context, chatID string) (stop func(), err error) { + // Call Matrix API to send typing indicator + // The returned stop function must be idempotent + stopped := false + return func() { + if !stopped { + stopped = true + // Call Matrix API to stop typing + } + }, nil +} +``` + +#### ReactionCapable — Message Reaction Indicator + +```go +// If the platform supports adding emoji reactions to inbound messages (e.g., Slack's 👀, OneBot's emoji 289) +func (c *MatrixChannel) ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error) { + // Call Matrix API to add reaction to message + // The returned undo function removes the reaction, must be idempotent + err = c.addReaction(chatID, messageID, "eyes") + if err != nil { + return func() {}, err + } + return func() { + c.removeReaction(chatID, messageID, "eyes") + }, nil +} +``` + +#### MessageEditor — Message Editing + +```go +// If the platform supports editing sent messages (used for Placeholder replacement) +func (c *MatrixChannel) EditMessage(ctx context.Context, chatID, messageID, content string) error { + // Call Matrix API to edit message + return nil +} +``` + +#### WebhookHandler — HTTP Webhook Reception + +```go +// If the channel receives messages via webhook (rather than long-polling/WebSocket) +func (c *MatrixChannel) WebhookPath() string { + return "/webhook/matrix" // Path will be registered on the shared HTTP server +} + +func (c *MatrixChannel) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Handle webhook request +} +``` + +#### HealthChecker — Health Check Endpoint + +```go +func (c *MatrixChannel) HealthPath() string { + return "/health/matrix" +} + +func (c *MatrixChannel) HealthHandler(w http.ResponseWriter, r *http.Request) { + if c.IsRunning() { + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } +} +``` + +### 3.4 Inbound-side Typing/Reaction/Placeholder Auto-orchestration + +`BaseChannel.HandleMessage` automatically detects whether the channel implements `TypingCapable`, `ReactionCapable`, and/or `PlaceholderCapable` **before** publishing the inbound message, and triggers the corresponding indicators. The three pipelines are completely independent and do not interfere with each other: + +```go +// Automatically executed inside BaseChannel.HandleMessage (no manual calls needed): +if c.owner != nil && c.placeholderRecorder != nil { + // Typing — independent pipeline + if tc, ok := c.owner.(TypingCapable); ok { + if stop, err := tc.StartTyping(ctx, chatID); err == nil { + c.placeholderRecorder.RecordTypingStop(c.name, chatID, stop) + } + } + // Reaction — independent pipeline + if rc, ok := c.owner.(ReactionCapable); ok && messageID != "" { + if undo, err := rc.ReactToMessage(ctx, chatID, messageID); err == nil { + c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo) + } + } + // Placeholder — independent pipeline + if pc, ok := c.owner.(PlaceholderCapable); ok { + if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" { + c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID) + } + } +} +``` + +**This means**: +- Channels implementing `TypingCapable` (Telegram, Discord, LINE, Pico) do not need to manually call `StartTyping` + `RecordTypingStop` in `handleMessage` +- Channels implementing `ReactionCapable` (Slack, OneBot) do not need to manually call `AddReaction` + `RecordTypingStop` in `handleMessage` +- Channels implementing `PlaceholderCapable` (Telegram, Discord, Pico) do not need to manually send placeholder messages and call `RecordPlaceholder` in `handleMessage` +- Channels only need to implement the corresponding interface; `HandleMessage` handles orchestration automatically +- Channels that don't implement these interfaces are unaffected (type assertions will fail and be skipped) +- `PlaceholderCapable`'s `SendPlaceholder` method internally decides whether to send based on the configured `PlaceholderConfig.Enabled`; returning `("", nil)` skips registration + +**Owner Injection**: Manager automatically calls `SetOwner(ch)` in `initChannel` to inject the concrete channel into BaseChannel — no manual setup required from developers. + +When the Agent finishes processing a message, Manager's `preSend` automatically: +1. Calls the recorded `stop()` to stop Typing +2. Calls the recorded `undo()` to undo Reaction +3. If there is a Placeholder and the channel implements `MessageEditor`, attempts to edit the Placeholder with the final reply (skipping Send) + +### 3.5 Register Configuration and Gateway Integration + +#### Add configuration in `pkg/config/config.go` + +```go +type ChannelsConfig struct { + // ... existing channels + Matrix MatrixChannelConfig `yaml:"matrix" json:"matrix"` +} + +type MatrixChannelConfig struct { + Enabled bool `yaml:"enabled" json:"enabled"` + HomeServer string `yaml:"home_server" json:"home_server"` + Token string `yaml:"token" json:"token"` + AllowFrom []string `yaml:"allow_from" json:"allow_from"` + GroupTrigger GroupTriggerConfig `yaml:"group_trigger" json:"group_trigger"` +} +``` + +#### Add entry in Manager.initChannels() + +```go +// In the initChannels() method of pkg/channels/manager.go +if m.config.Channels.Matrix.Enabled && m.config.Channels.Matrix.Token != "" { + m.initChannel("matrix", "Matrix") +} +``` + +#### Add blank import in Gateway + +```go +// cmd/picoclaw/internal/gateway/helpers.go +import ( + _ "github.com/sipeed/picoclaw/pkg/channels/matrix" +) +``` + +--- + +## Part 4: Core Subsystem Details + +### 4.1 MessageBus + +**Files**: `pkg/bus/bus.go`, `pkg/bus/types.go` + +```go +type MessageBus struct { + inbound chan InboundMessage // buffer = 64 + outbound chan OutboundMessage // buffer = 64 + outboundMedia chan OutboundMediaMessage // buffer = 64 + done chan struct{} // Close signal + closed atomic.Bool // Prevents double-close +} +``` + +**Key Behaviors**: + +| Method | Behavior | +|--------|----------| +| `PublishInbound(ctx, msg)` | Check closed → send to inbound channel → block/timeout/close | +| `ConsumeInbound(ctx)` | Read from inbound → block/close/cancel | +| `PublishOutbound(ctx, msg)` | Send to outbound channel | +| `SubscribeOutbound(ctx)` | Read from outbound (called by Manager dispatcher) | +| `PublishOutboundMedia(ctx, msg)` | Send to outboundMedia channel | +| `SubscribeOutboundMedia(ctx)` | Read from outboundMedia (called by Manager media dispatcher) | +| `Close()` | CAS close → close(done) → drain all channels (**does not close the channels themselves** to avoid concurrent send-on-closed panic) | + +**Design Notes**: +- Buffer size increased from 16 to 64 to reduce blocking under burst load +- `Close()` does not close the underlying channels (only closes the `done` signal channel), because there may be concurrent `Publish` goroutines +- Drain loop ensures buffered messages are not silently dropped + +### 4.2 Structured Message Types + +**File**: `pkg/bus/types.go` + +```go +// Routing peer +type Peer struct { + Kind string `json:"kind"` // "direct" | "group" | "channel" | "" + ID string `json:"id"` +} + +// Sender identity information +type SenderInfo struct { + Platform string `json:"platform,omitempty"` // "telegram", "discord", ... + PlatformID string `json:"platform_id,omitempty"` // Platform-native ID + CanonicalID string `json:"canonical_id,omitempty"` // "platform:id" canonical format + Username string `json:"username,omitempty"` + DisplayName string `json:"display_name,omitempty"` +} + +// Inbound message +type InboundMessage struct { + Channel string // Source channel name + SenderID string // Sender ID (prefer CanonicalID) + Sender SenderInfo // Structured sender info + ChatID string // Chat/room ID + Content string // Message text + Media []string // Media reference list (media://...) + Peer Peer // Routing peer (first-class field) + MessageID string // Platform message ID (first-class field) + MediaScope string // Media lifecycle scope + SessionKey string // Session key + Metadata map[string]string // Only for channel-specific extensions +} + +// Outbound text message +type OutboundMessage struct { + Channel string + ChatID string + Content string +} + +// Outbound media message +type OutboundMediaMessage struct { + Channel string + ChatID string + Parts []MediaPart +} + +// Media part +type MediaPart struct { + Type string // "image" | "audio" | "video" | "file" + Ref string // "media://uuid" + Caption string + Filename string + ContentType string +} +``` + +### 4.3 BaseChannel + +**File**: `pkg/channels/base.go` + +BaseChannel is the shared abstraction layer for all channels, providing the following capabilities: + +| Method/Feature | Description | +|---|---| +| `Name() string` | Channel name | +| `IsRunning() bool` | Atomically read running state | +| `SetRunning(bool)` | Atomically set running state | +| `MaxMessageLength() int` | Message length limit (rune count), 0 = unlimited | +| `IsAllowed(senderID string) bool` | Legacy allow-list check (supports `"id\|username"` and `"@username"` formats) | +| `IsAllowedSender(sender SenderInfo) bool` | New allow-list check (delegates to `identity.MatchAllowed`) | +| `ShouldRespondInGroup(isMentioned, content) (bool, string)` | Unified group chat trigger filtering logic | +| `HandleMessage(...)` | Unified inbound message handling: permission check → build MediaScope → auto-trigger Typing/Reaction → publish to Bus | +| `SetMediaStore(s) / GetMediaStore()` | MediaStore injected by Manager | +| `SetPlaceholderRecorder(r) / GetPlaceholderRecorder()` | PlaceholderRecorder injected by Manager | +| `SetOwner(ch)` | Concrete channel reference injected by Manager (used for Typing/Reaction type assertions in HandleMessage) | + +**Functional Options**: + +```go +channels.WithMaxMessageLength(4096) // Set platform message length limit +channels.WithGroupTrigger(groupTriggerCfg) // Set group trigger configuration +``` + +### 4.4 Factory Registry + +**File**: `pkg/channels/registry.go` + +```go +type ChannelFactory func(cfg *config.Config, bus *bus.MessageBus) (Channel, error) + +func RegisterFactory(name string, f ChannelFactory) // Called in sub-package init() +func getFactory(name string) (ChannelFactory, bool) // Called internally by Manager +``` + +The factory registry is protected by `sync.RWMutex` and registrations occur during `init()` phase (completed at process startup). Manager looks up factories by name in `initChannel()` and calls them. + +### 4.5 Error Classification and Retries + +**Files**: `pkg/channels/errors.go`, `pkg/channels/errutil.go` + +#### Sentinel Errors + +```go +var ( + ErrNotRunning = errors.New("channel not running") // Permanent: do not retry + ErrRateLimit = errors.New("rate limited") // Fixed delay: retry after 1s + ErrTemporary = errors.New("temporary failure") // Exponential backoff: 500ms * 2^attempt, max 8s + ErrSendFailed = errors.New("send failed") // Permanent: do not retry +) +``` + +#### Error Classification Helpers + +```go +// Automatically classify based on HTTP status code +func ClassifySendError(statusCode int, rawErr error) error { + // 429 → ErrRateLimit + // 5xx → ErrTemporary + // 4xx → ErrSendFailed +} + +// Wrap network errors as temporary +func ClassifyNetError(err error) error { + // → ErrTemporary +} +``` + +#### Manager Retry Strategy (`sendWithRetry`) + +``` +Max retries: 3 +Rate limit delay: 1 second +Base backoff: 500 milliseconds +Max backoff: 8 seconds + +Retry logic: + ErrNotRunning → Fail immediately, no retry + ErrSendFailed → Fail immediately, no retry + ErrRateLimit → Wait 1s → retry + ErrTemporary → Wait 500ms * 2^attempt (max 8s) → retry + Other unknown → Wait 500ms * 2^attempt (max 8s) → retry +``` + +### 4.6 Manager Orchestration + +**File**: `pkg/channels/manager.go` + +#### Per-channel Worker Architecture + +```go +type channelWorker struct { + ch Channel // Channel instance + queue chan bus.OutboundMessage // Outbound text queue (buffered 16) + mediaQueue chan bus.OutboundMediaMessage // Outbound media queue (buffered 16) + done chan struct{} // Text worker completion signal + mediaDone chan struct{} // Media worker completion signal + limiter *rate.Limiter // Per-channel rate limiter +} +``` + +#### Per-channel Rate Limit Configuration + +```go +var channelRateConfig = map[string]float64{ + "telegram": 20, // 20 msg/s + "discord": 1, // 1 msg/s + "slack": 1, // 1 msg/s + "line": 10, // 10 msg/s +} +// Default: 10 msg/s +// burst = max(1, ceil(rate/2)) +``` + +#### Lifecycle Management + +``` +StartAll: + 1. Iterate registered channels → channel.Start(ctx) + 2. Create channelWorker for each successfully started channel + 3. Start goroutines: + - runWorker (per-channel outbound text) + - runMediaWorker (per-channel outbound media) + - dispatchOutbound (route from bus to worker queues) + - dispatchOutboundMedia (route from bus to media worker queues) + - runTTLJanitor (every 10s clean up expired typing/placeholder) + 4. Start shared HTTP server (if configured) + +StopAll: + 1. Shut down shared HTTP server (5s timeout) + 2. Cancel dispatcher context + 3. Close text worker queues → wait for drain to complete + 4. Close media worker queues → wait for drain to complete + 5. Stop each channel (channel.Stop) +``` + +#### Typing/Reaction/Placeholder Management + +```go +// Manager implements PlaceholderRecorder interface +func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string) +func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) +func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) + +// Inbound side: BaseChannel.HandleMessage auto-orchestrates +// BaseChannel.HandleMessage, before PublishInbound, auto-triggers via owner type assertions: +// - TypingCapable.StartTyping → RecordTypingStop +// - ReactionCapable.ReactToMessage → RecordReactionUndo +// - PlaceholderCapable.SendPlaceholder → RecordPlaceholder +// All three are independent and do not interfere with each other. Channels don't need to call these manually. + +// Outbound side: pre-send processing +func (m *Manager) preSend(ctx, name, msg, ch) bool { + key := name + ":" + msg.ChatID + // 1. Stop Typing (call stored stop function) + // 2. Undo Reaction (call stored undo function) + // 3. Attempt to edit Placeholder (if channel implements MessageEditor) + // Success → return true (skip Send) + // Failure → return false (proceed with Send) +} +``` + +Manager storage is fully separated; three pipelines do not interfere: + +```go +Manager { + typingStops sync.Map // "channel:chatID" → typingEntry ← manages TypingCapable + reactionUndos sync.Map // "channel:chatID" → reactionEntry ← manages ReactionCapable + placeholders sync.Map // "channel:chatID" → placeholderEntry +} +``` + +TTL Cleanup: +- Typing stop functions: 5-minute TTL (auto-calls stop and deletes on expiry) +- Reaction undo functions: 5-minute TTL (auto-calls undo and deletes on expiry) +- Placeholder IDs: 10-minute TTL (deletes on expiry) +- Cleanup interval: 10 seconds + +### 4.7 Message Splitting + +**File**: `pkg/channels/split.go` + +`SplitMessage(content string, maxLen int) []string` + +Smart splitting strategy: +1. Calculate effective split point = maxLen - 10% buffer (to reserve space for code block closure) +2. Prefer splitting at newlines +3. Otherwise split at spaces/tabs +4. Detect unclosed code blocks (` ``` `) +5. If a code block is unclosed: + - Attempt to extend to maxLen to include the closing fence + - If the code block is too long, inject close/reopen fences (`\n```\n` + header) + - Last resort: split before the code block starts + +### 4.8 MediaStore + +**File**: `pkg/media/store.go` + +```go +type MediaStore interface { + Store(localPath string, meta MediaMeta, scope string) (ref string, err error) + Resolve(ref string) (localPath string, err error) + ResolveWithMeta(ref string) (localPath string, meta MediaMeta, err error) + ReleaseAll(scope string) error +} +``` + +**FileMediaStore Implementation**: +- Pure in-memory mapping, no file copy/move +- Reference format: `media://` +- Scope format: `channel:chatID:messageID` (generated by `BuildMediaScope`) +- **Two-phase operation**: + - Phase 1 (holding lock): collect and delete entries from map + - Phase 2 (no lock): delete files from disk + - Purpose: minimize lock contention +- **TTL Cleanup**: `NewFileMediaStoreWithCleanup` → `Start()` launches background cleanup goroutine +- Cleanup interval and max TTL are controlled by configuration + +### 4.9 Identity + +**File**: `pkg/identity/identity.go` + +```go +// Build canonical ID +func BuildCanonicalID(platform, platformID string) string +// → "telegram:123456" + +// Parse canonical ID +func ParseCanonicalID(canonical string) (platform, id string, ok bool) + +// Match against allow list (backward-compatible) +func MatchAllowed(sender bus.SenderInfo, allowed string) bool +``` + +`MatchAllowed` supported allow-list formats: +| Format | Matching | +|--------|----------| +| `"123456"` | Matches `sender.PlatformID` | +| `"@alice"` | Matches `sender.Username` | +| `"123456\|alice"` | Matches PlatformID or Username (legacy format compatibility) | +| `"telegram:123456"` | Exact match on `sender.CanonicalID` (new format) | + +### 4.10 Shared HTTP Server + +**File**: `pkg/channels/manager.go`'s `SetupHTTPServer` + +Manager creates a single `http.Server` and auto-discovers and registers: +- Channels implementing `WebhookHandler` → mounted at `wh.WebhookPath()` +- Channels implementing `HealthChecker` → mounted at `hc.HealthPath()` +- Global health endpoint registered by `health.Server.RegisterOnMux` + +Timeout configuration: ReadTimeout = 30s, WriteTimeout = 30s + +--- + +## Part 5: Key Design Decisions and Conventions + +### 5.1 Mandatory Conventions + +1. **Error classification is a contract**: A channel's `Send` method **must** return sentinel errors (or wrap them). Manager's retry strategy relies entirely on `errors.Is` checks. Returning unclassified errors will cause Manager to treat them as "unknown errors" (exponential backoff retry). + +2. **SetRunning is a lifecycle signal**: **Must** call `c.SetRunning(true)` after successful `Start`, and **must** call `c.SetRunning(false)` at the beginning of `Stop`. **Must** check `c.IsRunning()` in `Send` and return `ErrNotRunning`. + +3. **HandleMessage includes permission checks**: Do not perform your own permission checks before calling `HandleMessage` (unless you need platform-specific preprocessing before the check). `HandleMessage` already calls `IsAllowedSender`/`IsAllowed` internally. + +4. **Message splitting is handled by Manager**: A channel's `Send` method does not need to handle long message splitting. Manager automatically splits based on `MaxMessageLength()` before calling `Send`. Channels only need to declare the limit via `WithMaxMessageLength`. + +5. **Typing/Reaction/Placeholder is handled by BaseChannel + Manager automatically**: A channel's `Send` method does not need to manage Typing stop, Reaction undo, or Placeholder editing. `BaseChannel.HandleMessage` auto-triggers `TypingCapable`, `ReactionCapable`, and `PlaceholderCapable` on the inbound side (via `owner` type assertions); Manager's `preSend` auto-stops Typing, undoes Reaction, and edits Placeholder on the outbound side. Channels only need to implement the corresponding interfaces. + +6. **Factory registration belongs in init()**: Each sub-package must have an `init.go` file calling `channels.RegisterFactory`. Gateway must trigger registration via blank imports (`_ "pkg/channels/xxx"`). + +### 5.2 Metadata Field Usage Conventions + +**Do NOT put the following information in Metadata anymore**: +- `peer_kind` / `peer_id` → Use `InboundMessage.Peer` +- `message_id` → Use `InboundMessage.MessageID` +- `sender_platform` / `sender_username` → Use `InboundMessage.Sender` + +**Metadata should only be used for**: +- Channel-specific extension information (e.g., Telegram's `reply_to_message_id`) +- Temporary information that doesn't fit into structured fields + +### 5.3 Concurrency Safety Conventions + +- `BaseChannel.running`: Uses `atomic.Bool`, thread-safe +- `Manager.channels` / `Manager.workers`: Protected by `sync.RWMutex` +- `Manager.placeholders` / `Manager.typingStops` / `Manager.reactionUndos`: Uses `sync.Map` +- `MessageBus.closed`: Uses `atomic.Bool` +- `FileMediaStore`: Uses `sync.RWMutex`, two-phase operation to minimize lock-hold time +- Channel Worker queue: Go channel, inherently concurrent-safe + +### 5.4 Testing Conventions + +Existing test files: +- `pkg/channels/base_test.go` — BaseChannel unit tests +- `pkg/channels/manager_test.go` — Manager unit tests +- `pkg/channels/split_test.go` — Message splitting tests +- `pkg/channels/errors_test.go` — Error type tests +- `pkg/channels/errutil_test.go` — Error classification tests + +To add tests for a new channel: +```bash +go test ./pkg/channels/matrix/ -v # Sub-package tests +go test ./pkg/channels/ -run TestSpecific -v # Framework tests +make test # Full test suite +``` + +--- + +## Appendix: Complete File Listing and Interface Quick Reference + +### A.1 Framework Layer Files + +| File | Responsibility | +|------|---------------| +| `pkg/channels/base.go` | BaseChannel struct, Channel interface, MessageLengthProvider, BaseChannelOption, HandleMessage | +| `pkg/channels/interfaces.go` | TypingCapable, MessageEditor, ReactionCapable, PlaceholderCapable, PlaceholderRecorder interfaces | +| `pkg/channels/media.go` | MediaSender interface | +| `pkg/channels/webhook.go` | WebhookHandler, HealthChecker interfaces | +| `pkg/channels/errors.go` | ErrNotRunning, ErrRateLimit, ErrTemporary, ErrSendFailed sentinels | +| `pkg/channels/errutil.go` | ClassifySendError, ClassifyNetError helpers | +| `pkg/channels/registry.go` | RegisterFactory, getFactory factory registry | +| `pkg/channels/manager.go` | Manager: Worker queues, rate limiting, retries, preSend, shared HTTP, TTL janitor | +| `pkg/channels/split.go` | SplitMessage long-message splitting | +| `pkg/bus/bus.go` | MessageBus implementation | +| `pkg/bus/types.go` | Peer, SenderInfo, InboundMessage, OutboundMessage, OutboundMediaMessage, MediaPart | +| `pkg/media/store.go` | MediaStore interface, FileMediaStore implementation | +| `pkg/identity/identity.go` | BuildCanonicalID, ParseCanonicalID, MatchAllowed | + +### A.2 Channel Sub-packages + +| Sub-package | Registered Name | Optional Interfaces | +|-------------|----------------|-------------------| +| `pkg/channels/telegram/` | `"telegram"` | MessageEditor, MediaSender, TypingCapable, PlaceholderCapable | +| `pkg/channels/discord/` | `"discord"` | MessageEditor, TypingCapable, PlaceholderCapable | +| `pkg/channels/slack/` | `"slack"` | ReactionCapable | +| `pkg/channels/line/` | `"line"` | WebhookHandler, HealthChecker, TypingCapable | +| `pkg/channels/onebot/` | `"onebot"` | ReactionCapable | +| `pkg/channels/dingtalk/` | `"dingtalk"` | WebhookHandler | +| `pkg/channels/feishu/` | `"feishu"` | WebhookHandler (architecture-specific build tags) | +| `pkg/channels/wecom/` | `"wecom"` + `"wecom_app"` | WebhookHandler | +| `pkg/channels/qq/` | `"qq"` | — | +| `pkg/channels/whatsapp/` | `"whatsapp"` | — | +| `pkg/channels/maixcam/` | `"maixcam"` | — | +| `pkg/channels/pico/` | `"pico"` | WebhookHandler (Pico Protocol), TypingCapable, PlaceholderCapable | + +### A.3 Interface Quick Reference + +```go +// ===== Required ===== +type Channel interface { + Name() string + Start(ctx context.Context) error + Stop(ctx context.Context) error + Send(ctx context.Context, msg bus.OutboundMessage) error + IsRunning() bool + IsAllowed(senderID string) bool + IsAllowedSender(sender bus.SenderInfo) bool +} + +// ===== Optional ===== +type MediaSender interface { + SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error +} + +type TypingCapable interface { + StartTyping(ctx context.Context, chatID string) (stop func(), err error) +} + +type ReactionCapable interface { + ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error) +} + +type PlaceholderCapable interface { + SendPlaceholder(ctx context.Context, chatID string) (messageID string, err error) +} + +type MessageEditor interface { + EditMessage(ctx context.Context, chatID, messageID, content string) error +} + +type WebhookHandler interface { + WebhookPath() string + http.Handler +} + +type HealthChecker interface { + HealthPath() string + HealthHandler(w http.ResponseWriter, r *http.Request) +} + +type MessageLengthProvider interface { + MaxMessageLength() int +} + +// ===== Injected by Manager ===== +type PlaceholderRecorder interface { + RecordPlaceholder(channel, chatID, placeholderID string) + RecordTypingStop(channel, chatID string, stop func()) + RecordReactionUndo(channel, chatID string, undo func()) +} +``` + +### A.4 Gateway Startup Sequence (Complete Bootstrap Flow) + +```go +// 1. Create core components +msgBus := bus.NewMessageBus() +provider := providers.CreateProvider(cfg) +agentLoop := agent.NewAgentLoop(cfg, msgBus, provider) + +// 2. Create media store (with TTL cleanup) +mediaStore := media.NewFileMediaStoreWithCleanup(cleanerConfig) +mediaStore.Start() + +// 3. Create Channel Manager (triggers initChannels → factory lookup → construct → inject MediaStore/PlaceholderRecorder/Owner) +channelManager := channels.NewManager(cfg, msgBus, mediaStore) + +// 4. Inject references +agentLoop.SetChannelManager(channelManager) +agentLoop.SetMediaStore(mediaStore) + +// 5. Configure shared HTTP server +channelManager.SetupHTTPServer(addr, healthServer) + +// 6. Start +channelManager.StartAll(ctx) // Start channels + workers + dispatchers + HTTP server +go agentLoop.Run(ctx) // Start Agent message loop + +// 7. Shutdown (signal-triggered) +cancel() // Cancel context +msgBus.Close() // Signal close + drain +channelManager.StopAll(shutdownCtx) // Stop HTTP + workers + channels +mediaStore.Stop() // Stop TTL cleanup +agentLoop.Stop() // Stop Agent +``` + +### A.5 Per-channel Rate Limit Reference + +| Channel | Rate (msg/s) | Burst | +|---------|-------------|-------| +| telegram | 20 | 10 | +| discord | 1 | 1 | +| slack | 1 | 1 | +| line | 10 | 5 | +| _others_ | 10 (default) | 5 | + +### A.6 Known Limitations and Caveats + +1. **Media cleanup temporarily disabled**: The `ReleaseAll` call in the Agent loop is commented out (`refactor(loop): disable media cleanup to prevent premature file deletion`) because session boundaries are not yet clearly defined. TTL cleanup remains active. + +2. **Feishu architecture-specific compilation**: The Feishu channel uses build tags to distinguish 32-bit and 64-bit architectures (`feishu_32.go` / `feishu_64.go`). + +3. **WeCom has two factories**: `"wecom"` (Bot mode) and `"wecom_app"` (App mode) are registered separately. + +4. **Pico Protocol**: `pkg/channels/pico/` implements a custom PicoClaw native protocol channel that receives messages via webhook. \ No newline at end of file