Files
picoclaw/pkg/channels
Ming b716b8a053 feat(commands): centralized command registry with sub-command routing (#959)
* feat(commands): Session management [Phase 1/2] command centralization and registration

* docs: add design for command registry post-review fixes

Documents the architecture decisions for fixing 5 Important issues
from code review: SubCommand pattern, Deps struct, command-group files,
Executor caching, and Telegram registration dedup.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat(commands): add SubCommand type and EffectiveUsage method

Introduce SubCommand struct for declaring sub-commands structurally
within a parent command Definition. The EffectiveUsage() method
auto-generates usage strings from sub-command names and args,
preventing drift between help text and actual handler behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat(commands): add Deps struct and secondToken helper, remove dead contains()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat(commands): add sub-command routing to Executor

Uses Registry.Lookup for O(1) command dispatch instead of iterating
all definitions. Definitions with SubCommands are routed to matching
sub-command handlers. Missing or unknown sub-commands reply with
auto-generated usage.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(commands): split into command-group files with Deps injection

Extract show/list/start/help into individual cmd_*.go files.
Replace config.Config parameter with Deps struct for runtime data.
Restore /show agents and /list agents sub-commands.
Use EffectiveUsage for auto-generated help text.
Bridge external callers (agent/loop.go, telegram.go) with Deps wrapper
until Task 5 fully wires the Deps fields.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* perf(commands): cache Executor in AgentLoop, wire Deps with runtime callbacks

Create Executor once in NewAgentLoop instead of per-message. Deps
closures capture AgentLoop pointer for late-bound access to
channelManager and runtime agent model.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(telegram): remove duplicate initBotCommands, keep async startCommandRegistration only

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore(commands): restore Outcome comments and annotate Deps.Config

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(commands): consolidate /switch into commands package, fix ! prefix

Move /switch model and /switch channel handling from inline loop.go
logic into cmd_switch.go using the SubCommand + Deps pattern. This
removes the OutcomePassthrough branch in handleCommand entirely.

Also replace the hardcoded "/" prefix check with commands.HasCommandPrefix
so that "!" prefixed commands are correctly routed to the Executor.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: add docs/plans to .gitignore and untrack existing files

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(commands): address code review findings

- Remove dead ExecuteResult.Reply field and unused branch in loop.go
- Extract shared agentsHandler for /show agents and /list agents
- Remove redundant firstToken/secondToken (use nthToken instead)
- Simplify Telegram startup: pass BuiltinDefinitions directly
- Centralize req.Reply nil guard in executeDefinition
- Extract unavailableMsg constant (was duplicated 5 times)
- Remove unused MessageID from Request
- Remove stale "reserved for Phase 2" comment on Deps.Config

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(commands): replace Deps with per-request Runtime

Separate stateless Registry (cached on AgentLoop) from per-request
Runtime (passed to handlers at execution time). This enables future
session management features to inject per-request context without
modifying the command registry.

- Rename Deps → Runtime, move to runtime.go
- Change Handler signature: func(ctx, req) error → func(ctx, req, rt *Runtime) error
- NewExecutor now takes (registry, runtime) — executor is created per-request
- BuiltinDefinitions() no longer takes parameters (stateless)
- AgentLoop caches cmdRegistry, builds Runtime via buildRuntime()
- Update all cmd_*.go handlers and tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* style: fix gci import grouping and godoc formatting

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(onboard): skip legacy AGENT.md when copying embedded workspace templates

The workspace/ directory contains both AGENT.md (legacy) and AGENTS.md
(current). copyEmbeddedToTarget was copying both, causing the test
TestCopyEmbeddedToTargetUsesAgentsMarkdown to fail. Skip AGENT.md
during the walk to match the expected behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(agent): address self-review comments on loop.go

- Move cmdRegistry init into struct literal (review comment #11)
- Rename buildRuntime → buildCommandsRuntime for clarity (review comment #12)
- Add comment to default switch case explaining passthrough (review comment #13)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(commands): address code review findings on naming and correctness

- Rename dispatcher.go → request.go (no Dispatcher type remains)
- Rename cmd_agents.go → handler_agents.go (shared handler, not a top-level command)
- Add modelMu to protect AgentInstance.Model writes in SwitchModel
- Add ListDefinitions to Runtime so /help uses registry instead of BuiltinDefinitions()
- Fix SwitchChannel message: validation-only callback should not say "Switched"
- Propagate Reply errors in executor instead of discarding with _ =
- Add HasCommandPrefix unit test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(onboard): extract legacy filename to constant

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(agent): handle commands before route error check

Move handleCommand() before the routeErr gate so global commands
(/help, /show, /switch) remain available even when routing fails.
Context-dependent commands that need a routed agent will report
"unavailable" through their nil-Runtime guards.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* revert: remove unnecessary AGENT.md skip in onboard

Reverts 02d0c04 and 74deae1. The test failure was caused by a local
leftover workspace/AGENT.md file (gitignored but embedded by go:embed).
Deleting the local file fixes the root cause; the code-level skip was
never needed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: executeDefinition Unknown option

* fix(agent): use routed agent for model commands, restore Telegram command diff

- Remove modelMu: message processing is serial, no concurrent writes
- Pass routed agent to handleCommand/buildCommandsRuntime instead of
  always using default agent
- GetModelInfo/SwitchModel are nil when agent is nil (route failed),
  handlers reply "unavailable"
- Restore GetMyCommands + slices.Equal check before SetMyCommands to
  avoid unnecessary Telegram API calls on restart

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(commands): remove unintended config mutation in SwitchModel

SwitchModel should only update the routed agent's runtime Model field.
Writing to cfg.Agents.Defaults.ModelName was a behavioral change that
corrupts the default agent config when switching a non-default agent.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(commands): move /switch channel to /check channel

/switch channel only validates availability, not actually switching.
Rename to /check channel to match actual behavior. /switch channel
now shows a redirect message pointing users to the new command.

Addresses review feedback from yinwm on PR #959.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 17:31:40 +08:00
..
2026-03-05 21:20:20 +09:00

PicoClaw Channel System: Complete Development Guide

Scope: pkg/channels/, pkg/bus/, pkg/media/, pkg/identity/, cmd/picoclaw/internal/gateway/


Table of Contents


Part 1: Architecture Overview

1.1 Before and After Comparison

Before Refactor (main branch):

pkg/channels/
├── telegram.go          # Each channel directly in the channels package
├── discord.go
├── slack.go
├── manager.go           # Manager directly references each channel type
├── ...
  • All channel implementations lived at the top level of pkg/channels/
  • Manager constructed each channel via switch or if-else chains
  • Routing info like Peer and MessageID was buried in Metadata map[string]string
  • No rate limiting or retry on message sending
  • No unified media file lifecycle management
  • Each channel ran its own HTTP server
  • Group chat trigger filtering logic was scattered across channels

After Refactor (refactor/channel-system branch):

pkg/channels/
├── base.go              # BaseChannel shared abstraction layer
├── interfaces.go        # Optional capability interfaces (TypingCapable, MessageEditor, ReactionCapable, PlaceholderCapable, PlaceholderRecorder)
├── README.md            # English documentation
├── README.zh.md         # Chinese documentation
├── media.go             # MediaSender optional interface
├── webhook.go           # WebhookHandler, HealthChecker optional interfaces
├── errors.go            # Sentinel errors (ErrNotRunning, ErrRateLimit, ErrTemporary, ErrSendFailed)
├── errutil.go           # Error classification helpers
├── registry.go          # Factory registry (RegisterFactory / getFactory)
├── manager.go           # Unified orchestration: Worker queues, rate limiting, retries, Typing/Placeholder, shared HTTP
├── split.go             # Smart long-message splitting (preserves code block integrity)
├── telegram/            # Each channel in its own sub-package
│   ├── init.go          # Factory registration
│   ├── telegram.go      # Implementation
│   └── telegram_commands.go
├── discord/
│   ├── init.go
│   └── discord.go
├── slack/ line/ onebot/ dingtalk/ feishu/ wecom/ qq/ whatsapp/ whatsapp_native/ maixcam/ pico/
│   └── ...

pkg/bus/
├── bus.go               # MessageBus (buffer 64, safe close + drain)
├── types.go             # Structured message types (Peer, SenderInfo, MediaPart, InboundMessage, OutboundMessage, OutboundMediaMessage)

pkg/media/
├── store.go             # MediaStore interface + FileMediaStore implementation (two-phase release, TTL cleanup)

pkg/identity/
├── identity.go          # Unified user identity: canonical "platform:id" format + backward-compatible matching

1.2 Message Flow Overview

┌────────────┐      InboundMessage       ┌───────────┐      LLM + Tools      ┌────────────┐
│  Telegram   │──┐                        │           │                        │            │
│  Discord    │──┤   PublishInbound()     │           │   PublishOutbound()   │            │
│  Slack      │──┼──────────────────────▶ │ MessageBus │ ◀─────────────────── │ AgentLoop  │
│  LINE       │──┤   (buffered chan, 64)  │           │   (buffered chan, 64) │            │
│  ...        │──┘                        │           │                        │            │
└────────────┘                            └─────┬─────┘                        └────────────┘
                                                │
                            SubscribeOutbound() │  SubscribeOutboundMedia()
                                                ▼
                                    ┌───────────────────┐
                                    │   Manager          │
                                    │   ├── dispatchOutbound()    Route to Worker queues
                                    │   ├── dispatchOutboundMedia()
                                    │   ├── runWorker()           Message split + sendWithRetry()
                                    │   ├── runMediaWorker()      sendMediaWithRetry()
                                    │   ├── preSend()             Stop Typing + Undo Reaction + Edit Placeholder
                                    │   └── runTTLJanitor()       Clean up expired Typing/Placeholder
                                    └────────┬──────────┘
                                             │
                                   channel.Send() / SendMedia()
                                             │
                                             ▼
                                    ┌────────────────┐
                                    │ Platform APIs   │
                                    └────────────────┘

1.3 Key Design Principles

Principle Description
Sub-package Isolation Each channel is a standalone Go sub-package, depending on BaseChannel and interfaces from the channels parent package
Factory Registration Sub-packages self-register via init(), Manager looks up factories by name, eliminating import coupling
Capability Discovery Optional capabilities are declared via interfaces (MediaSender, TypingCapable, ReactionCapable, PlaceholderCapable, MessageEditor, WebhookHandler, HealthChecker), discovered by Manager via runtime type assertions
Structured Messages Peer, MessageID, and SenderInfo promoted from Metadata to first-class fields on InboundMessage
Error Classification Channels return sentinel errors (ErrRateLimit, ErrTemporary, etc.), Manager uses these to determine retry strategy
Centralized Orchestration Rate limiting, message splitting, retries, and Typing/Reaction/Placeholder management are all handled by Manager and BaseChannel; channels only need to implement Send

Part 2: Migration Guide — From main Branch to Refactored Branch

2.1 If You Have Unmerged Channel Changes

Step 1: Identify which files you modified

On the main branch, channel files were directly in pkg/channels/ top level, e.g.:

  • pkg/channels/telegram.go
  • pkg/channels/discord.go

After refactoring, these files have been removed and code moved to corresponding sub-packages:

  • pkg/channels/telegram/telegram.go
  • pkg/channels/discord/discord.go

Step 2: Understand the structural change mapping

main branch file Refactored branch location Changes
pkg/channels/telegram.go pkg/channels/telegram/telegram.go + init.go Package name changed from channels to telegram
pkg/channels/discord.go pkg/channels/discord/discord.go + init.go Same as above
pkg/channels/manager.go pkg/channels/manager.go Extensively rewritten
(did not exist) pkg/channels/base.go New shared abstraction layer
(did not exist) pkg/channels/registry.go New factory registry
(did not exist) pkg/channels/errors.go + errutil.go New error classification system
(did not exist) pkg/channels/interfaces.go New optional capability interfaces
(did not exist) pkg/channels/media.go New MediaSender interface
(did not exist) pkg/channels/webhook.go New WebhookHandler/HealthChecker
(did not exist) pkg/channels/whatsapp_native/ New WhatsApp native mode (whatsmeow)
(did not exist) pkg/channels/split.go New message splitting (migrated from utils)
(did not exist) pkg/bus/types.go New structured message types
(did not exist) pkg/media/store.go New media file lifecycle management
(did not exist) pkg/identity/identity.go New unified user identity

Step 3: Migrate your channel code

Using Telegram as an example, the main changes are:

3a. Package declaration and imports

// Old code (main branch)
package channels

import (
    "github.com/sipeed/picoclaw/pkg/bus"
    "github.com/sipeed/picoclaw/pkg/config"
)

// New code (refactored branch)
package telegram

import (
    "github.com/sipeed/picoclaw/pkg/bus"
    "github.com/sipeed/picoclaw/pkg/channels"     // Reference parent package
    "github.com/sipeed/picoclaw/pkg/config"
    "github.com/sipeed/picoclaw/pkg/identity"      // New
    "github.com/sipeed/picoclaw/pkg/media"          // New (if media support needed)
)

3b. Struct embeds BaseChannel

// Old code: directly held bus, config, etc. fields
type TelegramChannel struct {
    bus       *bus.MessageBus
    config    *config.Config
    running   bool
    allowList []string
    // ...
}

// New code: embed BaseChannel, which provides bus, running, allowList, etc.
type TelegramChannel struct {
    *channels.BaseChannel          // Embed shared abstraction
    bot    *telego.Bot
    config *config.Config
    // ... only channel-specific fields
}

3c. Constructor

// Old code: direct assignment
func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) {
    return &TelegramChannel{
        bus:       bus,
        config:    cfg,
        allowList: cfg.Channels.Telegram.AllowFrom,
        // ...
    }, nil
}

// New code: use NewBaseChannel + functional options
func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) {
    base := channels.NewBaseChannel(
        "telegram",                    // Name
        cfg.Channels.Telegram,         // Raw config (any type)
        bus,                           // Message bus
        cfg.Channels.Telegram.AllowFrom, // Allow list
        channels.WithMaxMessageLength(4096),                     // Platform message length limit
        channels.WithGroupTrigger(cfg.Channels.Telegram.GroupTrigger), // Group trigger config
        channels.WithReasoningChannelID(cfg.Channels.Telegram.ReasoningChannelID), // Reasoning chain routing
    )
    return &TelegramChannel{
        BaseChannel: base,
        bot:         bot,
        config:      cfg,
    }, nil
}

3d. Start/Stop lifecycle

// New code: use SetRunning atomic operation
func (c *TelegramChannel) Start(ctx context.Context) error {
    // ... initialize bot, webhook, etc.
    c.SetRunning(true)    // Must be called after ready
    go bh.Start()
    return nil
}

func (c *TelegramChannel) Stop(ctx context.Context) error {
    c.SetRunning(false)   // Must be called before cleanup
    // ... stop bot handler, cancel context
    return nil
}

3e. Send method error returns

// Old code: returns plain error
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
    if !c.running { return fmt.Errorf("not running") }
    // ...
    if err != nil { return err }
}

// New code: must return sentinel errors for Manager to determine retry strategy
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
    if !c.IsRunning() {
        return channels.ErrNotRunning    // ← Manager will not retry
    }
    // ...
    if err != nil {
        // Use ClassifySendError to wrap error based on HTTP status code
        return channels.ClassifySendError(statusCode, err)
        // Or manually wrap:
        // return fmt.Errorf("%w: %v", channels.ErrTemporary, err)
        // return fmt.Errorf("%w: %v", channels.ErrRateLimit, err)
        // return fmt.Errorf("%w: %v", channels.ErrSendFailed, err)
    }
    return nil
}

3f. Message reception (Inbound)

// Old code: directly construct InboundMessage and publish
msg := bus.InboundMessage{
    Channel:  "telegram",
    SenderID: senderID,
    ChatID:   chatID,
    Content:  content,
    Metadata: map[string]string{
        "peer_kind": "group",     // Routing info buried in metadata
        "peer_id":   chatID,
        "message_id": msgID,
    },
}
c.bus.PublishInbound(ctx, msg)

// New code: use BaseChannel.HandleMessage with structured fields
sender := bus.SenderInfo{
    Platform:    "telegram",
    PlatformID:  strconv.FormatInt(from.ID, 10),
    CanonicalID: identity.BuildCanonicalID("telegram", strconv.FormatInt(from.ID, 10)),
    Username:    from.Username,
    DisplayName: from.FirstName,
}

peer := bus.Peer{
    Kind: "group",    // or "direct"
    ID:   chatID,
}

// HandleMessage internally calls IsAllowedSender for permission checks, builds MediaScope, and publishes to bus
c.HandleMessage(ctx, peer, messageID, senderID, chatID, content, mediaRefs, metadata, sender)

3g. Add factory registration (required)

Create init.go for your channel:

// pkg/channels/telegram/init.go
package telegram

import (
    "github.com/sipeed/picoclaw/pkg/bus"
    "github.com/sipeed/picoclaw/pkg/channels"
    "github.com/sipeed/picoclaw/pkg/config"
)

func init() {
    channels.RegisterFactory("telegram", func(cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) {
        return NewTelegramChannel(cfg, b)
    })
}

3h. Import sub-package in Gateway

// cmd/picoclaw/internal/gateway/helpers.go
import (
    _ "github.com/sipeed/picoclaw/pkg/channels/telegram"   // Triggers init() registration
    _ "github.com/sipeed/picoclaw/pkg/channels/discord"
    _ "github.com/sipeed/picoclaw/pkg/channels/your_new_channel"  // New addition
)

Step 4: Migrate bus message usage

If your code directly reads routing fields from InboundMessage.Metadata:

// Old code
peerKind := msg.Metadata["peer_kind"]
peerID   := msg.Metadata["peer_id"]
msgID    := msg.Metadata["message_id"]

// New code
peerKind := msg.Peer.Kind      // First-class field
peerID   := msg.Peer.ID        // First-class field
msgID    := msg.MessageID       // First-class field
sender   := msg.Sender          // bus.SenderInfo struct
scope    := msg.MediaScope       // Media lifecycle scope

Step 5: Migrate allow-list checks

// Old code
if !c.isAllowed(senderID) { return }

// New code: prefer structured check
if !c.IsAllowedSender(sender) { return }
// Or fall back to string check:
if !c.IsAllowed(senderID) { return }

BaseChannel.HandleMessage already handles this logic internally — no need to duplicate the check in your channel.

2.2 If You Have Manager Modifications

The Manager has been completely rewritten. Your modifications will need to account for the new architecture:

Old Manager Responsibility New Manager Responsibility
Directly construct channels (switch/if-else) Look up and construct via factory registry
Directly call channel.Send Per-channel Worker queues + rate limiting + retries
No message splitting Automatic splitting based on MaxMessageLength
Each channel runs its own HTTP server Unified shared HTTP server
No Typing/Placeholder management Unified preSend handles Typing stop + Reaction undo + Placeholder edit; inbound-side BaseChannel.HandleMessage auto-orchestrates Typing/Reaction/Placeholder
No TTL cleanup runTTLJanitor periodically cleans up expired Typing/Reaction/Placeholder entries

2.3 If You Have Agent Loop Modifications

Main changes to the Agent Loop:

  1. MediaStore injection: agentLoop.SetMediaStore(mediaStore) — Agent resolves media references produced by tools via MediaStore
  2. ChannelManager injection: agentLoop.SetChannelManager(channelManager) — Agent can query channel state
  3. OutboundMediaMessage: Agent now sends media messages via bus.PublishOutboundMedia() instead of embedding them in text replies
  4. extractPeer: Routing uses msg.Peer structured fields instead of Metadata lookups

Part 3: New Channel Development Guide — Implementing a Channel from Scratch

3.1 Minimum Implementation Checklist

To add a new chat platform (e.g., matrix), you need to:

  1. Create sub-package directory pkg/channels/matrix/
  2. Create init.go — factory registration
  3. Create matrix.go — channel implementation
  4. Add blank import in Gateway helpers
  5. Add config check in Manager.initChannels()
  6. Add config struct in pkg/config/

3.2 Complete Template

pkg/channels/matrix/init.go

package matrix

import (
    "github.com/sipeed/picoclaw/pkg/bus"
    "github.com/sipeed/picoclaw/pkg/channels"
    "github.com/sipeed/picoclaw/pkg/config"
)

func init() {
    channels.RegisterFactory("matrix", func(cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) {
        return NewMatrixChannel(cfg, b)
    })
}

pkg/channels/matrix/matrix.go

package matrix

import (
    "context"
    "fmt"

    "github.com/sipeed/picoclaw/pkg/bus"
    "github.com/sipeed/picoclaw/pkg/channels"
    "github.com/sipeed/picoclaw/pkg/config"
    "github.com/sipeed/picoclaw/pkg/identity"
    "github.com/sipeed/picoclaw/pkg/logger"
)

// MatrixChannel implements channels.Channel for the Matrix protocol.
type MatrixChannel struct {
    *channels.BaseChannel            // Must embed
    config *config.Config
    ctx    context.Context
    cancel context.CancelFunc
    // ... Matrix SDK client, etc.
}

func NewMatrixChannel(cfg *config.Config, msgBus *bus.MessageBus) (*MatrixChannel, error) {
    matrixCfg := cfg.Channels.Matrix // Assumes this field exists in config

    base := channels.NewBaseChannel(
        "matrix",                           // Channel name (globally unique)
        matrixCfg,                          // Raw config
        msgBus,                             // Message bus
        matrixCfg.AllowFrom,                // Allow list
        channels.WithMaxMessageLength(65536), // Matrix message length limit
        channels.WithGroupTrigger(matrixCfg.GroupTrigger),
        channels.WithReasoningChannelID(matrixCfg.ReasoningChannelID), // Reasoning chain routing (optional)
    )

    return &MatrixChannel{
        BaseChannel: base,
        config:      cfg,
    }, nil
}

// ========== Required Channel Interface Methods ==========

func (c *MatrixChannel) Start(ctx context.Context) error {
    c.ctx, c.cancel = context.WithCancel(ctx)

    // 1. Initialize Matrix client
    // 2. Start listening for messages
    // 3. Mark as running
    c.SetRunning(true)

    logger.InfoC("matrix", "Matrix channel started")
    return nil
}

func (c *MatrixChannel) Stop(ctx context.Context) error {
    c.SetRunning(false)

    if c.cancel != nil {
        c.cancel()
    }

    logger.InfoC("matrix", "Matrix channel stopped")
    return nil
}

func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
    // 1. Check running state
    if !c.IsRunning() {
        return channels.ErrNotRunning
    }

    // 2. Send message to Matrix
    err := c.sendToMatrix(ctx, msg.ChatID, msg.Content)
    if err != nil {
        // 3. Must use error classification wrapping
        //    If you have an HTTP status code:
        //    return channels.ClassifySendError(statusCode, err)
        //    If it's a network error:
        //    return channels.ClassifyNetError(err)
        //    If manual classification is needed:
        return fmt.Errorf("%w: %v", channels.ErrTemporary, err)
    }

    return nil
}

// ========== Incoming Message Handling ==========

func (c *MatrixChannel) handleIncoming(roomID, senderID, displayName, content string, msgID string) {
    // 1. Construct structured sender identity
    sender := bus.SenderInfo{
        Platform:    "matrix",
        PlatformID:  senderID,
        CanonicalID: identity.BuildCanonicalID("matrix", senderID),
        Username:    senderID,
        DisplayName: displayName,
    }

    // 2. Determine Peer type (direct vs group)
    peer := bus.Peer{
        Kind: "group",    // or "direct"
        ID:   roomID,
    }

    // 3. Group chat filtering (if applicable)
    isGroup := peer.Kind == "group"
    if isGroup {
        isMentioned := false // Detect @mentions based on platform specifics
        shouldRespond, cleanContent := c.ShouldRespondInGroup(isMentioned, content)
        if !shouldRespond {
            return
        }
        content = cleanContent
    }

    // 4. Handle media attachments (if any)
    var mediaRefs []string
    store := c.GetMediaStore()
    if store != nil {
        // Download attachment locally → store.Store() → get ref
        // mediaRefs = append(mediaRefs, ref)
    }

    // 5. Call HandleMessage to publish to bus
    //    HandleMessage internally will:
    //    - Check IsAllowedSender/IsAllowed
    //    - Build MediaScope
    //    - Publish InboundMessage
    c.HandleMessage(
        c.ctx,
        peer,
        msgID,                   // Platform message ID
        senderID,                // Raw sender ID
        roomID,                  // Chat/room ID
        content,                 // Message content
        mediaRefs,               // Media reference list
        nil,                     // Extra metadata (usually nil)
        sender,                  // SenderInfo (variadic parameter)
    )
}

// ========== Internal Methods ==========

func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) error {
    // Actual Matrix SDK call
    return nil
}

3.3 Optional Capability Interfaces

Depending on platform capabilities, your channel can optionally implement the following interfaces:

MediaSender — Send Media Attachments

// If the platform supports sending images/files/audio/video
func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
    if !c.IsRunning() {
        return channels.ErrNotRunning
    }

    store := c.GetMediaStore()
    if store == nil {
        return fmt.Errorf("no media store: %w", channels.ErrSendFailed)
    }

    for _, part := range msg.Parts {
        localPath, err := store.Resolve(part.Ref)
        if err != nil {
            logger.ErrorCF("matrix", "Failed to resolve media", map[string]any{
                "ref": part.Ref, "error": err.Error(),
            })
            continue
        }

        // Call the appropriate API based on part.Type ("image"|"audio"|"video"|"file")
        switch part.Type {
        case "image":
            // Upload image to Matrix
        default:
            // Upload file to Matrix
        }
    }
    return nil
}

TypingCapable — Typing Indicator

// If the platform supports "typing..." indicators
func (c *MatrixChannel) StartTyping(ctx context.Context, chatID string) (stop func(), err error) {
    // Call Matrix API to send typing indicator
    // The returned stop function must be idempotent
    stopped := false
    return func() {
        if !stopped {
            stopped = true
            // Call Matrix API to stop typing
        }
    }, nil
}

ReactionCapable — Message Reaction Indicator

// If the platform supports adding emoji reactions to inbound messages (e.g., Slack's 👀, OneBot's emoji 289)
func (c *MatrixChannel) ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error) {
    // Call Matrix API to add reaction to message
    // The returned undo function removes the reaction, must be idempotent
    err = c.addReaction(chatID, messageID, "eyes")
    if err != nil {
        return func() {}, err
    }
    return func() {
        c.removeReaction(chatID, messageID, "eyes")
    }, nil
}

MessageEditor — Message Editing

// If the platform supports editing sent messages (used for Placeholder replacement)
func (c *MatrixChannel) EditMessage(ctx context.Context, chatID, messageID, content string) error {
    // Call Matrix API to edit message
    return nil
}

PlaceholderCapable — Placeholder Messages

// If the platform supports sending placeholder messages (e.g. "Thinking... 💭"),
// and the channel also implements MessageEditor, then Manager's preSend will
// automatically edit the placeholder into the final response on outbound.
// SendPlaceholder checks PlaceholderConfig.Enabled internally;
// returning ("", nil) means skip.
func (c *MatrixChannel) SendPlaceholder(ctx context.Context, chatID string) (string, error) {
    cfg := c.config.Channels.Matrix.Placeholder
    if !cfg.Enabled {
        return "", nil
    }
    text := cfg.Text
    if text == "" {
        text = "Thinking... 💭"
    }
    // Call Matrix API to send placeholder message
    msg, err := c.sendText(ctx, chatID, text)
    if err != nil {
        return "", err
    }
    return msg.ID, nil
}

WebhookHandler — HTTP Webhook Reception

// If the channel receives messages via webhook (rather than long-polling/WebSocket)
func (c *MatrixChannel) WebhookPath() string {
    return "/webhook/matrix"   // Path will be registered on the shared HTTP server
}

func (c *MatrixChannel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // Handle webhook request
}

HealthChecker — Health Check Endpoint

func (c *MatrixChannel) HealthPath() string {
    return "/health/matrix"
}

func (c *MatrixChannel) HealthHandler(w http.ResponseWriter, r *http.Request) {
    if c.IsRunning() {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    } else {
        w.WriteHeader(http.StatusServiceUnavailable)
    }
}

3.4 Inbound-side Typing/Reaction/Placeholder Auto-orchestration

BaseChannel.HandleMessage automatically detects whether the channel implements TypingCapable, ReactionCapable, and/or PlaceholderCapable before publishing the inbound message, and triggers the corresponding indicators. The three pipelines are completely independent and do not interfere with each other:

// Automatically executed inside BaseChannel.HandleMessage (no manual calls needed):
if c.owner != nil && c.placeholderRecorder != nil {
    // Typing — independent pipeline
    if tc, ok := c.owner.(TypingCapable); ok {
        if stop, err := tc.StartTyping(ctx, chatID); err == nil {
            c.placeholderRecorder.RecordTypingStop(c.name, chatID, stop)
        }
    }
    // Reaction — independent pipeline
    if rc, ok := c.owner.(ReactionCapable); ok && messageID != "" {
        if undo, err := rc.ReactToMessage(ctx, chatID, messageID); err == nil {
            c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo)
        }
    }
    // Placeholder — independent pipeline
    if pc, ok := c.owner.(PlaceholderCapable); ok {
        if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
            c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
        }
    }
}

This means:

  • Channels implementing TypingCapable (Telegram, Discord, LINE, Pico) do not need to manually call StartTyping + RecordTypingStop in handleMessage
  • Channels implementing ReactionCapable (Slack, OneBot) do not need to manually call AddReaction + RecordTypingStop in handleMessage
  • Channels implementing PlaceholderCapable (Telegram, Discord, Pico) do not need to manually send placeholder messages and call RecordPlaceholder in handleMessage
  • Channels only need to implement the corresponding interface; HandleMessage handles orchestration automatically
  • Channels that don't implement these interfaces are unaffected (type assertions will fail and be skipped)
  • PlaceholderCapable's SendPlaceholder method internally decides whether to send based on the configured PlaceholderConfig.Enabled; returning ("", nil) skips registration

Owner Injection: Manager automatically calls SetOwner(ch) in initChannel to inject the concrete channel into BaseChannel — no manual setup required from developers.

When the Agent finishes processing a message, Manager's preSend automatically:

  1. Calls the recorded stop() to stop Typing
  2. Calls the recorded undo() to undo Reaction
  3. If there is a Placeholder and the channel implements MessageEditor, attempts to edit the Placeholder with the final reply (skipping Send)

3.5 Register Configuration and Gateway Integration

Add configuration in pkg/config/config.go

type ChannelsConfig struct {
    // ... existing channels
    Matrix  MatrixChannelConfig  `json:"matrix"`
}

type MatrixChannelConfig struct {
    Enabled    bool     `json:"enabled"`
    HomeServer string   `json:"home_server"`
    Token      string   `json:"token"`
    AllowFrom  []string `json:"allow_from"`
    GroupTrigger GroupTriggerConfig `json:"group_trigger"`
    Placeholder  PlaceholderConfig  `json:"placeholder"`
    ReasoningChannelID string `json:"reasoning_channel_id"`
}

Add entry in Manager.initChannels()

// In the initChannels() method of pkg/channels/manager.go
if m.config.Channels.Matrix.Enabled && m.config.Channels.Matrix.Token != "" {
    m.initChannel("matrix", "Matrix")
}

Note

: If your channel has multiple modes (like WhatsApp Bridge vs Native), branch in initChannels based on config:

if cfg.UseNative {
    m.initChannel("whatsapp_native", "WhatsApp Native")
} else {
    m.initChannel("whatsapp", "WhatsApp")
}

Add blank import in Gateway

// cmd/picoclaw/internal/gateway/helpers.go
import (
    _ "github.com/sipeed/picoclaw/pkg/channels/matrix"
)

Part 4: Core Subsystem Details

4.1 MessageBus

Files: pkg/bus/bus.go, pkg/bus/types.go

type MessageBus struct {
    inbound       chan InboundMessage       // buffer = 64
    outbound      chan OutboundMessage      // buffer = 64
    outboundMedia chan OutboundMediaMessage  // buffer = 64
    done          chan struct{}             // Close signal
    closed        atomic.Bool              // Prevents double-close
}

Key Behaviors:

Method Behavior
PublishInbound(ctx, msg) Check closed → send to inbound channel → block/timeout/close
ConsumeInbound(ctx) Read from inbound → block/close/cancel
PublishOutbound(ctx, msg) Send to outbound channel
SubscribeOutbound(ctx) Read from outbound (called by Manager dispatcher)
PublishOutboundMedia(ctx, msg) Send to outboundMedia channel
SubscribeOutboundMedia(ctx) Read from outboundMedia (called by Manager media dispatcher)
Close() CAS close → close(done) → drain all channels (does not close the channels themselves to avoid concurrent send-on-closed panic)

Design Notes:

  • Buffer size increased from 16 to 64 to reduce blocking under burst load
  • Close() does not close the underlying channels (only closes the done signal channel), because there may be concurrent Publish goroutines
  • Drain loop ensures buffered messages are not silently dropped

4.2 Structured Message Types

File: pkg/bus/types.go

// Routing peer
type Peer struct {
    Kind string `json:"kind"`  // "direct" | "group" | "channel" | ""
    ID   string `json:"id"`
}

// Sender identity information
type SenderInfo struct {
    Platform    string `json:"platform,omitempty"`     // "telegram", "discord", ...
    PlatformID  string `json:"platform_id,omitempty"`  // Platform-native ID
    CanonicalID string `json:"canonical_id,omitempty"` // "platform:id" canonical format
    Username    string `json:"username,omitempty"`
    DisplayName string `json:"display_name,omitempty"`
}

// Inbound message
type InboundMessage struct {
    Channel    string            // Source channel name
    SenderID   string            // Sender ID (prefer CanonicalID)
    Sender     SenderInfo        // Structured sender info
    ChatID     string            // Chat/room ID
    Content    string            // Message text
    Media      []string          // Media reference list (media://...)
    Peer       Peer              // Routing peer (first-class field)
    MessageID  string            // Platform message ID (first-class field)
    MediaScope string            // Media lifecycle scope
    SessionKey string            // Session key
    Metadata   map[string]string // Only for channel-specific extensions
}

// Outbound text message
type OutboundMessage struct {
    Channel string
    ChatID  string
    Content string
}

// Outbound media message
type OutboundMediaMessage struct {
    Channel string
    ChatID  string
    Parts   []MediaPart
}

// Media part
type MediaPart struct {
    Type        string // "image" | "audio" | "video" | "file"
    Ref         string // "media://uuid"
    Caption     string
    Filename    string
    ContentType string
}

4.3 BaseChannel

File: pkg/channels/base.go

BaseChannel is the shared abstraction layer for all channels, providing the following capabilities:

Method/Feature Description
Name() string Channel name
IsRunning() bool Atomically read running state
SetRunning(bool) Atomically set running state
MaxMessageLength() int Message length limit (rune count), 0 = unlimited
ReasoningChannelID() string Reasoning chain routing target channel ID (empty = no routing)
IsAllowed(senderID string) bool Legacy allow-list check (supports "id|username" and "@username" formats)
IsAllowedSender(sender SenderInfo) bool New allow-list check (delegates to identity.MatchAllowed)
ShouldRespondInGroup(isMentioned, content) (bool, string) Unified group chat trigger filtering logic
HandleMessage(...) Unified inbound message handling: permission check → build MediaScope → auto-trigger Typing/Reaction/Placeholder → publish to Bus
SetMediaStore(s) / GetMediaStore() MediaStore injected by Manager
SetPlaceholderRecorder(r) / GetPlaceholderRecorder() PlaceholderRecorder injected by Manager
SetOwner(ch) Concrete channel reference injected by Manager (used for Typing/Reaction/Placeholder type assertions in HandleMessage)

Functional Options:

channels.WithMaxMessageLength(4096)        // Set platform message length limit
channels.WithGroupTrigger(groupTriggerCfg) // Set group trigger configuration
channels.WithReasoningChannelID(id)        // Set reasoning chain routing target channel

4.4 Factory Registry

File: pkg/channels/registry.go

type ChannelFactory func(cfg *config.Config, bus *bus.MessageBus) (Channel, error)

func RegisterFactory(name string, f ChannelFactory)   // Called in sub-package init()
func getFactory(name string) (ChannelFactory, bool)    // Called internally by Manager

The factory registry is protected by sync.RWMutex and registrations occur during init() phase (completed at process startup). Manager looks up factories by name in initChannel() and calls them.

4.5 Error Classification and Retries

Files: pkg/channels/errors.go, pkg/channels/errutil.go

Sentinel Errors

var (
    ErrNotRunning = errors.New("channel not running")   // Permanent: do not retry
    ErrRateLimit  = errors.New("rate limited")           // Fixed delay: retry after 1s
    ErrTemporary  = errors.New("temporary failure")      // Exponential backoff: 500ms * 2^attempt, max 8s
    ErrSendFailed = errors.New("send failed")            // Permanent: do not retry
)

Error Classification Helpers

// Automatically classify based on HTTP status code
func ClassifySendError(statusCode int, rawErr error) error {
    // 429 → ErrRateLimit
    // 5xx → ErrTemporary
    // 4xx → ErrSendFailed
}

// Wrap network errors as temporary
func ClassifyNetError(err error) error {
    // → ErrTemporary
}

Manager Retry Strategy (sendWithRetry)

Max retries:      3
Rate limit delay:  1 second
Base backoff:      500 milliseconds
Max backoff:       8 seconds

Retry logic:
  ErrNotRunning → Fail immediately, no retry
  ErrSendFailed → Fail immediately, no retry
  ErrRateLimit  → Wait 1s → retry
  ErrTemporary  → Wait 500ms * 2^attempt (max 8s) → retry
  Other unknown → Wait 500ms * 2^attempt (max 8s) → retry

4.6 Manager Orchestration

File: pkg/channels/manager.go

Per-channel Worker Architecture

type channelWorker struct {
    ch         Channel                      // Channel instance
    queue      chan bus.OutboundMessage      // Outbound text queue (buffered 16)
    mediaQueue chan bus.OutboundMediaMessage // Outbound media queue (buffered 16)
    done       chan struct{}                // Text worker completion signal
    mediaDone  chan struct{}                // Media worker completion signal
    limiter    *rate.Limiter                // Per-channel rate limiter
}

Per-channel Rate Limit Configuration

var channelRateConfig = map[string]float64{
    "telegram": 20,   // 20 msg/s
    "discord":  1,    // 1 msg/s
    "slack":    1,    // 1 msg/s
    "line":     10,   // 10 msg/s
}
// Default: 10 msg/s
// burst = max(1, ceil(rate/2))

Lifecycle Management

StartAll:
  1. Iterate registered channels → channel.Start(ctx)
  2. Create channelWorker for each successfully started channel
  3. Start goroutines:
     - runWorker (per-channel outbound text)
     - runMediaWorker (per-channel outbound media)
     - dispatchOutbound (route from bus to worker queues)
     - dispatchOutboundMedia (route from bus to media worker queues)
     - runTTLJanitor (every 10s clean up expired typing/reaction/placeholder)
  4. Start shared HTTP server (if configured)

StopAll:
  1. Shut down shared HTTP server (5s timeout)
  2. Cancel dispatcher context
  3. Close text worker queues → wait for drain to complete
  4. Close media worker queues → wait for drain to complete
  5. Stop each channel (channel.Stop)

Typing/Reaction/Placeholder Management

// Manager implements PlaceholderRecorder interface
func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string)
func (m *Manager) RecordTypingStop(channel, chatID string, stop func())
func (m *Manager) RecordReactionUndo(channel, chatID string, undo func())

// Inbound side: BaseChannel.HandleMessage auto-orchestrates
// BaseChannel.HandleMessage, before PublishInbound, auto-triggers via owner type assertions:
//   - TypingCapable.StartTyping       → RecordTypingStop
//   - ReactionCapable.ReactToMessage  → RecordReactionUndo
//   - PlaceholderCapable.SendPlaceholder → RecordPlaceholder
// All three are independent and do not interfere with each other. Channels don't need to call these manually.

// Outbound side: pre-send processing
func (m *Manager) preSend(ctx, name, msg, ch) bool {
    key := name + ":" + msg.ChatID
    // 1. Stop Typing (call stored stop function)
    // 2. Undo Reaction (call stored undo function)
    // 3. Attempt to edit Placeholder (if channel implements MessageEditor)
    //    Success → return true (skip Send)
    //    Failure → return false (proceed with Send)
}

Manager storage is fully separated; three pipelines do not interfere:

Manager {
    typingStops   sync.Map  // "channel:chatID" → typingEntry    ← manages TypingCapable
    reactionUndos sync.Map  // "channel:chatID" → reactionEntry  ← manages ReactionCapable
    placeholders  sync.Map  // "channel:chatID" → placeholderEntry
}

TTL Cleanup:

  • Typing stop functions: 5-minute TTL (auto-calls stop and deletes on expiry)
  • Reaction undo functions: 5-minute TTL (auto-calls undo and deletes on expiry)
  • Placeholder IDs: 10-minute TTL (deletes on expiry)
  • Cleanup interval: 10 seconds

4.7 Message Splitting

File: pkg/channels/split.go

SplitMessage(content string, maxLen int) []string

Smart splitting strategy:

  1. Calculate effective split point = maxLen - 10% buffer (to reserve space for code block closure)
  2. Prefer splitting at newlines
  3. Otherwise split at spaces/tabs
  4. Detect unclosed code blocks (```)
  5. If a code block is unclosed:
    • Attempt to extend to maxLen to include the closing fence
    • If the code block is too long, inject close/reopen fences (\n```\n + header)
    • Last resort: split before the code block starts

4.8 MediaStore

File: pkg/media/store.go

type MediaStore interface {
    Store(localPath string, meta MediaMeta, scope string) (ref string, err error)
    Resolve(ref string) (localPath string, err error)
    ResolveWithMeta(ref string) (localPath string, meta MediaMeta, err error)
    ReleaseAll(scope string) error
}

FileMediaStore Implementation:

  • Pure in-memory mapping, no file copy/move
  • Reference format: media://<uuid>
  • Scope format: channel:chatID:messageID (generated by BuildMediaScope)
  • Two-phase operation:
    • Phase 1 (holding lock): collect and delete entries from map
    • Phase 2 (no lock): delete files from disk
    • Purpose: minimize lock contention
  • TTL Cleanup: NewFileMediaStoreWithCleanupStart() launches background cleanup goroutine
  • Cleanup interval and max TTL are controlled by configuration

4.9 Identity

File: pkg/identity/identity.go

// Build canonical ID
func BuildCanonicalID(platform, platformID string) string
// → "telegram:123456"

// Parse canonical ID
func ParseCanonicalID(canonical string) (platform, id string, ok bool)

// Match against allow list (backward-compatible)
func MatchAllowed(sender bus.SenderInfo, allowed string) bool

MatchAllowed supported allow-list formats:

Format Matching
"123456" Matches sender.PlatformID
"@alice" Matches sender.Username
"123456|alice" Matches PlatformID or Username (legacy format compatibility)
"telegram:123456" Exact match on sender.CanonicalID (new format)

4.10 Shared HTTP Server

File: pkg/channels/manager.go's SetupHTTPServer

Manager creates a single http.Server and auto-discovers and registers:

  • Channels implementing WebhookHandler → mounted at wh.WebhookPath()
  • Channels implementing HealthChecker → mounted at hc.HealthPath()
  • Global health endpoint registered by health.Server.RegisterOnMux

Timeout configuration: ReadTimeout = 30s, WriteTimeout = 30s


Part 5: Key Design Decisions and Conventions

5.1 Mandatory Conventions

  1. Error classification is a contract: A channel's Send method must return sentinel errors (or wrap them). Manager's retry strategy relies entirely on errors.Is checks. Returning unclassified errors will cause Manager to treat them as "unknown errors" (exponential backoff retry).

  2. SetRunning is a lifecycle signal: Must call c.SetRunning(true) after successful Start, and must call c.SetRunning(false) at the beginning of Stop. Must check c.IsRunning() in Send and return ErrNotRunning.

  3. HandleMessage includes permission checks: Do not perform your own permission checks before calling HandleMessage (unless you need platform-specific preprocessing before the check). HandleMessage already calls IsAllowedSender/IsAllowed internally.

  4. Message splitting is handled by Manager: A channel's Send method does not need to handle long message splitting. Manager automatically splits based on MaxMessageLength() before calling Send. Channels only need to declare the limit via WithMaxMessageLength.

  5. Typing/Reaction/Placeholder is handled by BaseChannel + Manager automatically: A channel's Send method does not need to manage Typing stop, Reaction undo, or Placeholder editing. BaseChannel.HandleMessage auto-triggers TypingCapable, ReactionCapable, and PlaceholderCapable on the inbound side (via owner type assertions); Manager's preSend auto-stops Typing, undoes Reaction, and edits Placeholder on the outbound side. Channels only need to implement the corresponding interfaces.

  6. Factory registration belongs in init(): Each sub-package must have an init.go file calling channels.RegisterFactory. Gateway must trigger registration via blank imports (_ "pkg/channels/xxx").

5.2 Metadata Field Usage Conventions

Do NOT put the following information in Metadata anymore:

  • peer_kind / peer_id → Use InboundMessage.Peer
  • message_id → Use InboundMessage.MessageID
  • sender_platform / sender_username → Use InboundMessage.Sender

Metadata should only be used for:

  • Channel-specific extension information (e.g., Telegram's reply_to_message_id)
  • Temporary information that doesn't fit into structured fields

5.3 Concurrency Safety Conventions

  • BaseChannel.running: Uses atomic.Bool, thread-safe
  • Manager.channels / Manager.workers: Protected by sync.RWMutex
  • Manager.placeholders / Manager.typingStops / Manager.reactionUndos: Uses sync.Map
  • MessageBus.closed: Uses atomic.Bool
  • FileMediaStore: Uses sync.RWMutex, two-phase operation to minimize lock-hold time
  • Channel Worker queue: Go channel, inherently concurrent-safe

5.4 Testing Conventions

Existing test files:

  • pkg/channels/base_test.go — BaseChannel unit tests
  • pkg/channels/manager_test.go — Manager unit tests
  • pkg/channels/split_test.go — Message splitting tests
  • pkg/channels/errors_test.go — Error type tests
  • pkg/channels/errutil_test.go — Error classification tests

To add tests for a new channel:

go test ./pkg/channels/matrix/ -v              # Sub-package tests
go test ./pkg/channels/ -run TestSpecific -v    # Framework tests
make test                                       # Full test suite

Appendix: Complete File Listing and Interface Quick Reference

A.1 Framework Layer Files

File Responsibility
pkg/channels/base.go BaseChannel struct, Channel interface, MessageLengthProvider, BaseChannelOption, HandleMessage
pkg/channels/interfaces.go TypingCapable, MessageEditor, ReactionCapable, PlaceholderCapable, PlaceholderRecorder interfaces
pkg/channels/media.go MediaSender interface
pkg/channels/webhook.go WebhookHandler, HealthChecker interfaces
pkg/channels/errors.go ErrNotRunning, ErrRateLimit, ErrTemporary, ErrSendFailed sentinels
pkg/channels/errutil.go ClassifySendError, ClassifyNetError helpers
pkg/channels/registry.go RegisterFactory, getFactory factory registry
pkg/channels/manager.go Manager: Worker queues, rate limiting, retries, preSend, shared HTTP, TTL janitor
pkg/channels/split.go SplitMessage long-message splitting
pkg/bus/bus.go MessageBus implementation
pkg/bus/types.go Peer, SenderInfo, InboundMessage, OutboundMessage, OutboundMediaMessage, MediaPart
pkg/media/store.go MediaStore interface, FileMediaStore implementation
pkg/identity/identity.go BuildCanonicalID, ParseCanonicalID, MatchAllowed

A.2 Channel Sub-packages

Sub-package Registered Name Optional Interfaces
pkg/channels/telegram/ "telegram" TypingCapable, PlaceholderCapable, MessageEditor, MediaSender
pkg/channels/discord/ "discord" TypingCapable, PlaceholderCapable, MessageEditor, MediaSender
pkg/channels/slack/ "slack" ReactionCapable, MediaSender
pkg/channels/line/ "line" TypingCapable, MediaSender, WebhookHandler
pkg/channels/onebot/ "onebot" ReactionCapable, MediaSender
pkg/channels/dingtalk/ "dingtalk"
pkg/channels/feishu/ "feishu" — (architecture-specific build tags: feishu_32.go / feishu_64.go)
pkg/channels/wecom/ "wecom" WebhookHandler, HealthChecker
pkg/channels/wecom/ "wecom_app" MediaSender, WebhookHandler, HealthChecker
pkg/channels/qq/ "qq"
pkg/channels/whatsapp/ "whatsapp" — (Bridge mode)
pkg/channels/whatsapp_native/ "whatsapp_native" — (Native whatsmeow mode)
pkg/channels/maixcam/ "maixcam"
pkg/channels/pico/ "pico" TypingCapable, PlaceholderCapable, MessageEditor, WebhookHandler

A.3 Interface Quick Reference

// ===== Required =====
type Channel interface {
    Name() string
    Start(ctx context.Context) error
    Stop(ctx context.Context) error
    Send(ctx context.Context, msg bus.OutboundMessage) error
    IsRunning() bool
    IsAllowed(senderID string) bool
    IsAllowedSender(sender bus.SenderInfo) bool
    ReasoningChannelID() string
}

// ===== Optional =====
type MediaSender interface {
    SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error
}

type TypingCapable interface {
    StartTyping(ctx context.Context, chatID string) (stop func(), err error)
}

type ReactionCapable interface {
    ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error)
}

type PlaceholderCapable interface {
    SendPlaceholder(ctx context.Context, chatID string) (messageID string, err error)
}

type MessageEditor interface {
    EditMessage(ctx context.Context, chatID, messageID, content string) error
}

type WebhookHandler interface {
    WebhookPath() string
    http.Handler
}

type HealthChecker interface {
    HealthPath() string
    HealthHandler(w http.ResponseWriter, r *http.Request)
}

type MessageLengthProvider interface {
    MaxMessageLength() int
}

// ===== Injected by Manager =====
type PlaceholderRecorder interface {
    RecordPlaceholder(channel, chatID, placeholderID string)
    RecordTypingStop(channel, chatID string, stop func())
    RecordReactionUndo(channel, chatID string, undo func())
}

A.4 Gateway Startup Sequence (Complete Bootstrap Flow)

// 1. Create core components
msgBus     := bus.NewMessageBus()
provider   := providers.CreateProvider(cfg)
agentLoop  := agent.NewAgentLoop(cfg, msgBus, provider)

// 2. Create media store (with TTL cleanup)
mediaStore := media.NewFileMediaStoreWithCleanup(cleanerConfig)
mediaStore.Start()

// 3. Create Channel Manager (triggers initChannels → factory lookup → construct → inject MediaStore/PlaceholderRecorder/Owner)
channelManager := channels.NewManager(cfg, msgBus, mediaStore)

// 4. Inject references
agentLoop.SetChannelManager(channelManager)
agentLoop.SetMediaStore(mediaStore)

// 5. Configure shared HTTP server
channelManager.SetupHTTPServer(addr, healthServer)

// 6. Start
channelManager.StartAll(ctx)  // Start channels + workers + dispatchers + HTTP server
go agentLoop.Run(ctx)          // Start Agent message loop

// 7. Shutdown (signal-triggered)
cancel()                       // Cancel context
msgBus.Close()                 // Signal close + drain
channelManager.StopAll(shutdownCtx)  // Stop HTTP + workers + channels
mediaStore.Stop()              // Stop TTL cleanup
agentLoop.Stop()               // Stop Agent

A.5 Per-channel Rate Limit Reference

Channel Rate (msg/s) Burst
telegram 20 10
discord 1 1
slack 1 1
line 10 5
others 10 (default) 5

A.6 Known Limitations and Caveats

  1. Media cleanup temporarily disabled: The ReleaseAll call in the Agent loop is commented out (refactor(loop): disable media cleanup to prevent premature file deletion) because session boundaries are not yet clearly defined. TTL cleanup remains active.

  2. Feishu architecture-specific compilation: The Feishu channel uses build tags to distinguish 32-bit and 64-bit architectures (feishu_32.go / feishu_64.go). Feishu uses the SDK's WebSocket mode (not HTTP webhook), so it does not implement WebhookHandler.

  3. WeCom has two factories: "wecom" (Bot mode, webhook only) and "wecom_app" (App mode, supports MediaSender) are registered separately. Both implement WebhookHandler and HealthChecker.

  4. Pico Protocol: pkg/channels/pico/ implements a custom PicoClaw native protocol channel that receives messages via WebSocket webhook (/pico/ws).

  5. WhatsApp has two modes: "whatsapp" (Bridge mode, communicates via external bridge URL) and "whatsapp_native" (native whatsmeow mode, connects directly to WhatsApp). Manager selects which to initialize based on WhatsAppConfig.UseNative.

  6. DingTalk uses Stream mode: DingTalk uses the SDK's Stream/WebSocket mode (not HTTP webhook), so it does not implement WebhookHandler.

  7. PlaceholderConfig vs implementation: PlaceholderConfig appears in 6 channel configs (Telegram, Discord, Slack, LINE, OneBot, Pico), but only channels that implement both PlaceholderCapable + MessageEditor (Telegram, Discord, Pico) can actually use placeholder message editing. The rest are reserved fields.

  8. ReasoningChannelID: Most channel configs include a reasoning_channel_id field to route LLM reasoning/thinking output to a designated channel (WhatsApp, Telegram, Feishu, Discord, MaixCam, QQ, DingTalk, Slack, LINE, OneBot, WeCom, WeComApp). Note: PicoConfig does not currently expose this field. BaseChannel exposes this via the WithReasoningChannelID option and ReasoningChannelID() method.