feat(channels): auto-orchestrate Placeholder/Typing/Reaction via capability interfaces

Define PlaceholderCapable, TypingCapable, and ReactionCapable interfaces
and have BaseChannel.HandleMessage auto-detect and trigger all three as
independent pipelines on inbound messages. This replaces the scattered
manual orchestration code in each channel's handleMessage with a single
unified dispatch in the framework layer.

Changes:
- Add PlaceholderCapable interface to interfaces.go
- Add ReactionCapable + RecordReactionUndo to interfaces.go
- BaseChannel.HandleMessage auto-triggers Typing → Reaction → Placeholder
- Manager gains reactionUndos sync.Map with TTL janitor cleanup
- Telegram: extract SendPlaceholder from manual code, add StartTyping
- Discord: add SendPlaceholder + StartTyping
- Pico: add SendPlaceholder (uses Pico Protocol message.create)
- Slack: extract ReactToMessage from manual code
- OneBot: extract ReactToMessage, remove leaked pendingEmojiMsg sync.Map
- LINE: move group-chat guard into StartTyping, remove manual orchestration
- Config: add Placeholder to PicoConfig; remove from Slack/LINE/OneBot
  (no MessageEditor, so placeholder config was dead code)
This commit is contained in:
Hoshina
2026-02-27 03:02:40 +08:00
committed by 美電球
parent ba98069a00
commit 29ed650107
10 changed files with 268 additions and 142 deletions
+30
View File
@@ -82,6 +82,7 @@ type BaseChannel struct {
groupTrigger config.GroupTriggerConfig
mediaStore media.MediaStore
placeholderRecorder PlaceholderRecorder
owner Channel // the concrete channel that embeds this BaseChannel
}
func NewBaseChannel(
@@ -257,6 +258,29 @@ func (c *BaseChannel) HandleMessage(
Metadata: metadata,
}
// Auto-trigger typing indicator, message reaction, and placeholder before publishing.
// Each capability is independent — all three may fire for the same message.
if c.owner != nil && c.placeholderRecorder != nil {
// Typing — independent pipeline
if tc, ok := c.owner.(TypingCapable); ok {
if stop, err := tc.StartTyping(ctx, chatID); err == nil {
c.placeholderRecorder.RecordTypingStop(c.name, chatID, stop)
}
}
// Reaction — independent pipeline
if rc, ok := c.owner.(ReactionCapable); ok && messageID != "" {
if undo, err := rc.ReactToMessage(ctx, chatID, messageID); err == nil {
c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo)
}
}
// Placeholder — independent pipeline
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
}
}
}
if err := c.bus.PublishInbound(ctx, msg); err != nil {
logger.ErrorCF("channels", "Failed to publish inbound message", map[string]any{
"channel": c.name,
@@ -286,6 +310,12 @@ func (c *BaseChannel) GetPlaceholderRecorder() PlaceholderRecorder {
return c.placeholderRecorder
}
// SetOwner injects the concrete channel that embeds this BaseChannel.
// This allows HandleMessage to auto-trigger TypingCapable / ReactionCapable / PlaceholderCapable.
func (c *BaseChannel) SetOwner(ch Channel) {
c.owner = ch
}
// BuildMediaScope constructs a scope key for media lifecycle tracking.
func BuildMediaScope(channel, chatID, messageID string) string {
id := messageID
+28 -7
View File
@@ -224,6 +224,27 @@ func (c *DiscordChannel) EditMessage(ctx context.Context, chatID string, message
return err
}
// SendPlaceholder implements channels.PlaceholderCapable.
// It sends a placeholder message that will later be edited to the actual
// response via EditMessage (channels.MessageEditor).
func (c *DiscordChannel) SendPlaceholder(ctx context.Context, chatID string) (string, error) {
if !c.config.Placeholder.Enabled {
return "", nil
}
text := c.config.Placeholder.Text
if text == "" {
text = "Thinking... 💭"
}
msg, err := c.session.ChannelMessageSend(chatID, text)
if err != nil {
return "", err
}
return msg.ID, nil
}
func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content string) error {
// Use the passed ctx for timeout control
sendCtx, cancel := context.WithTimeout(ctx, sendTimeout)
@@ -360,13 +381,6 @@ func (c *DiscordChannel) handleMessage(s *discordgo.Session, m *discordgo.Messag
content = "[media only]"
}
// Start typing after all early returns — guaranteed to have a matching Send()
c.startTyping(m.ChannelID)
// Register typing stop with Manager for outbound orchestration
if rec := c.GetPlaceholderRecorder(); rec != nil {
rec.RecordTypingStop("discord", m.ChannelID, func() { c.stopTyping(m.ChannelID) })
}
logger.DebugCF("discord", "Received message", map[string]any{
"sender_name": sender.DisplayName,
"sender_id": senderID,
@@ -440,6 +454,13 @@ func (c *DiscordChannel) stopTyping(chatID string) {
}
}
// StartTyping implements channels.TypingCapable.
// It starts a continuous typing indicator and returns an idempotent stop function.
func (c *DiscordChannel) StartTyping(ctx context.Context, chatID string) (func(), error) {
c.startTyping(chatID)
return func() { c.stopTyping(chatID) }, nil
}
func (c *DiscordChannel) downloadAttachment(url, filename string) string {
return utils.DownloadFile(url, filename, utils.DownloadOptions{
LoggerPrefix: "discord",
+17
View File
@@ -15,10 +15,27 @@ type MessageEditor interface {
EditMessage(ctx context.Context, chatID string, messageID string, content string) error
}
// ReactionCapable — channels that can add a reaction (e.g. 👀) to an inbound message.
// ReactToMessage adds a reaction and returns an undo function to remove it.
// The undo function MUST be idempotent and safe to call multiple times.
type ReactionCapable interface {
ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error)
}
// PlaceholderCapable — channels that can send a placeholder message
// (e.g. "Thinking... 💭") that will later be edited to the actual response.
// The channel MUST also implement MessageEditor for the placeholder to be useful.
// SendPlaceholder returns the platform message ID of the placeholder so that
// Manager.preSend can later edit it via MessageEditor.EditMessage.
type PlaceholderCapable interface {
SendPlaceholder(ctx context.Context, chatID string) (messageID string, err error)
}
// PlaceholderRecorder is injected into channels by Manager.
// Channels call these methods on inbound to register typing/placeholder state.
// Manager uses the registered state on outbound to stop typing and edit placeholders.
type PlaceholderRecorder interface {
RecordPlaceholder(channel, chatID, placeholderID string)
RecordTypingStop(channel, chatID string, stop func())
RecordReactionUndo(channel, chatID string, undo func())
}
+8 -26
View File
@@ -378,28 +378,6 @@ func (c *LINEChannel) processEvent(event lineEvent) {
return
}
// Thinking indicator (LINE loading animation is 1:1 only).
// For group/room chats, LINE provides no equivalent API.
// Only start if PlaceholderRecorder is available to avoid wasted API calls.
if !isGroup {
if rec := c.GetPlaceholderRecorder(); rec != nil {
typingCtx, typingCancel := context.WithTimeout(c.ctx, 5*time.Minute)
stop, err := c.StartTyping(typingCtx, chatID)
if err == nil {
var stopOnce sync.Once
stopFn := func() {
stopOnce.Do(func() {
stop()
typingCancel()
})
}
rec.RecordTypingStop("line", chatID, stopFn)
} else {
typingCancel()
}
}
}
c.HandleMessage(c.ctx, peer, msg.ID, senderID, chatID, content, mediaPaths, metadata, sender)
}
@@ -598,15 +576,19 @@ func (c *LINEChannel) sendPush(ctx context.Context, to, content, quoteToken stri
// StartTyping implements channels.TypingCapable using LINE's loading animation.
//
// NOTE: The LINE loading animation API only works for 1:1 chats. Callers must ensure
// the provided chatID is a user chat ID (not a group/room ID).
// There is no explicit "stop" API; we periodically re-send start requests to keep
// the indicator alive, and stop by canceling the context.
// NOTE: The LINE loading animation API only works for 1:1 chats.
// Group/room chat IDs (starting with "C" or "R") are detected automatically;
// for these, a no-op stop function is returned without calling the API.
func (c *LINEChannel) StartTyping(ctx context.Context, chatID string) (func(), error) {
if chatID == "" {
return func() {}, nil
}
// Group/room chats: LINE loading animation is 1:1 only.
if strings.HasPrefix(chatID, "C") || strings.HasPrefix(chatID, "R") {
return func() {}, nil
}
typingCtx, cancel := context.WithCancel(ctx)
var once sync.Once
stop := func() { once.Do(cancel) }
+48 -13
View File
@@ -44,6 +44,12 @@ type typingEntry struct {
createdAt time.Time
}
// reactionEntry wraps a reaction undo function with a creation timestamp for TTL eviction.
type reactionEntry struct {
undo func()
createdAt time.Time
}
// placeholderEntry wraps a placeholder ID with a creation timestamp for TTL eviction.
type placeholderEntry struct {
id string
@@ -68,17 +74,18 @@ type channelWorker struct {
}
type Manager struct {
channels map[string]Channel
workers map[string]*channelWorker
bus *bus.MessageBus
config *config.Config
mediaStore media.MediaStore
dispatchTask *asyncTask
mux *http.ServeMux
httpServer *http.Server
mu sync.RWMutex
placeholders sync.Map // "channel:chatID" → placeholderID (string)
typingStops sync.Map // "channel:chatID" → func()
channels map[string]Channel
workers map[string]*channelWorker
bus *bus.MessageBus
config *config.Config
mediaStore media.MediaStore
dispatchTask *asyncTask
mux *http.ServeMux
httpServer *http.Server
mu sync.RWMutex
placeholders sync.Map // "channel:chatID" → placeholderID (string)
typingStops sync.Map // "channel:chatID" → func()
reactionUndos sync.Map // "channel:chatID" → reactionEntry
}
type asyncTask struct {
@@ -99,7 +106,14 @@ func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) {
m.typingStops.Store(key, typingEntry{stop: stop, createdAt: time.Now()})
}
// preSend handles typing stop and placeholder editing before sending a message.
// RecordReactionUndo registers a reaction undo function for later invocation.
// Implements PlaceholderRecorder.
func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) {
key := channel + ":" + chatID
m.reactionUndos.Store(key, reactionEntry{undo: undo, createdAt: time.Now()})
}
// preSend handles typing stop, reaction undo, and placeholder editing before sending a message.
// Returns true if the message was edited into a placeholder (skip Send).
func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) bool {
key := name + ":" + msg.ChatID
@@ -111,7 +125,14 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess
}
}
// 2. Try editing placeholder
// 2. Undo reaction
if v, loaded := m.reactionUndos.LoadAndDelete(key); loaded {
if entry, ok := v.(reactionEntry); ok {
entry.undo() // idempotent, safe
}
}
// 3. Try editing placeholder
if v, loaded := m.placeholders.LoadAndDelete(key); loaded {
if entry, ok := v.(placeholderEntry); ok && entry.id != "" {
if editor, ok := ch.(MessageEditor); ok {
@@ -171,6 +192,10 @@ func (m *Manager) initChannel(name, displayName string) {
if setter, ok := ch.(interface{ SetPlaceholderRecorder(r PlaceholderRecorder) }); ok {
setter.SetPlaceholderRecorder(m)
}
// Inject owner reference so BaseChannel.HandleMessage can auto-trigger typing/reaction
if setter, ok := ch.(interface{ SetOwner(ch Channel) }); ok {
setter.SetOwner(ch)
}
m.channels[name] = ch
logger.InfoCF("channels", "Channel enabled successfully", map[string]any{
"channel": displayName,
@@ -690,6 +715,16 @@ func (m *Manager) runTTLJanitor(ctx context.Context) {
}
return true
})
m.reactionUndos.Range(func(key, value any) bool {
if entry, ok := value.(reactionEntry); ok {
if now.Sub(entry.createdAt) > typingStopTTL {
if _, loaded := m.reactionUndos.LoadAndDelete(key); loaded {
entry.undo() // idempotent, safe
}
}
}
return true
})
m.placeholders.Range(func(key, value any) bool {
if entry, ok := value.(placeholderEntry); ok {
if now.Sub(entry.createdAt) > placeholderTTL {
+30 -27
View File
@@ -23,21 +23,20 @@ import (
type OneBotChannel struct {
*channels.BaseChannel
config config.OneBotConfig
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
dedup map[string]struct{}
dedupRing []string
dedupIdx int
mu sync.Mutex
writeMu sync.Mutex
echoCounter int64
selfID int64
pending map[string]chan json.RawMessage
pendingMu sync.Mutex
lastMessageID sync.Map
pendingEmojiMsg sync.Map
config config.OneBotConfig
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
dedup map[string]struct{}
dedupRing []string
dedupIdx int
mu sync.Mutex
writeMu sync.Mutex
echoCounter int64
selfID int64
pending map[string]chan json.RawMessage
pendingMu sync.Mutex
lastMessageID sync.Map
}
type oneBotRawEvent struct {
@@ -129,6 +128,22 @@ func (c *OneBotChannel) setMsgEmojiLike(messageID string, emojiID int, set bool)
}()
}
// ReactToMessage implements channels.ReactionCapable.
// It adds an emoji reaction (ID 289) to group messages and returns an undo function.
// Private messages return a no-op since reactions are only meaningful in groups.
func (c *OneBotChannel) ReactToMessage(ctx context.Context, chatID, messageID string) (func(), error) {
// Only react in group chats
if !strings.HasPrefix(chatID, "group:") {
return func() {}, nil
}
c.setMsgEmojiLike(messageID, 289, true)
return func() {
c.setMsgEmojiLike(messageID, 289, false)
}, nil
}
func (c *OneBotChannel) Start(ctx context.Context) error {
if c.config.WSUrl == "" {
return fmt.Errorf("OneBot ws_url not configured")
@@ -1044,18 +1059,6 @@ func (c *OneBotChannel) handleMessage(raw *oneBotRawEvent) {
c.lastMessageID.Store(chatID, messageID)
if raw.MessageType == "group" && messageID != "" && messageID != "0" {
c.setMsgEmojiLike(messageID, 289, true)
c.pendingEmojiMsg.Store(chatID, messageID)
// Register emoji stop with Manager for outbound orchestration
if rec := c.GetPlaceholderRecorder(); rec != nil {
capturedMsgID := messageID
rec.RecordTypingStop("onebot", chatID, func() {
c.setMsgEmojiLike(capturedMsgID, 289, false)
})
}
}
senderInfo := bus.SenderInfo{
Platform: "onebot",
PlatformID: senderID,
+26 -8
View File
@@ -171,6 +171,32 @@ func (c *PicoChannel) StartTyping(ctx context.Context, chatID string) (func(), e
}, nil
}
// SendPlaceholder implements channels.PlaceholderCapable.
// It sends a placeholder message via the Pico Protocol that will later be
// edited to the actual response via EditMessage (channels.MessageEditor).
func (c *PicoChannel) SendPlaceholder(ctx context.Context, chatID string) (string, error) {
if !c.config.Placeholder.Enabled {
return "", nil
}
text := c.config.Placeholder.Text
if text == "" {
text = "Thinking... 💭"
}
msgID := uuid.New().String()
outMsg := newMessage(TypeMessageCreate, map[string]any{
"content": text,
"message_id": msgID,
})
if err := c.broadcastToSession(chatID, outMsg); err != nil {
return "", err
}
return msgID, nil
}
// broadcastToSession sends a message to all connections with a matching session.
func (c *PicoChannel) broadcastToSession(chatID string, msg PicoMessage) error {
// chatID format: "pico:<sessionID>"
@@ -413,14 +439,6 @@ func (c *PicoChannel) handleMessageSend(pc *picoConn, msg PicoMessage) {
"preview": truncate(content, 50),
})
// Register typing with Manager
if rec := c.GetPlaceholderRecorder(); rec != nil {
stop, err := c.StartTyping(c.ctx, chatID)
if err == nil {
rec.RecordTypingStop("pico", chatID, stop)
}
}
sender := bus.SenderInfo{
Platform: "pico",
PlatformID: senderID,
+22 -34
View File
@@ -200,6 +200,28 @@ func (c *SlackChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessa
return nil
}
// ReactToMessage implements channels.ReactionCapable.
// It adds an "eyes" (👀) reaction to the inbound message and returns an undo function
// that removes the reaction.
func (c *SlackChannel) ReactToMessage(ctx context.Context, chatID, messageID string) (func(), error) {
channelID, _ := parseSlackChatID(chatID)
if channelID == "" {
return func() {}, nil
}
c.api.AddReaction("eyes", slack.ItemRef{
Channel: channelID,
Timestamp: messageID,
})
return func() {
c.api.RemoveReaction("eyes", slack.ItemRef{
Channel: channelID,
Timestamp: messageID,
})
}, nil
}
func (c *SlackChannel) eventLoop() {
for {
select {
@@ -275,23 +297,6 @@ func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) {
chatID = channelID + "/" + threadTS
}
c.api.AddReaction("eyes", slack.ItemRef{
Channel: channelID,
Timestamp: messageTS,
})
// Register typing stop (remove "eyes" reaction) with Manager
if rec := c.GetPlaceholderRecorder(); rec != nil {
capturedChannelID := channelID
capturedMessageTS := messageTS
rec.RecordTypingStop("slack", chatID, func() {
c.api.RemoveReaction("eyes", slack.ItemRef{
Channel: capturedChannelID,
Timestamp: capturedMessageTS,
})
})
}
c.pendingAcks.Store(chatID, slackMessageRef{
ChannelID: channelID,
Timestamp: messageTS,
@@ -402,23 +407,6 @@ func (c *SlackChannel) handleAppMention(ev *slackevents.AppMentionEvent) {
chatID = channelID + "/" + messageTS
}
c.api.AddReaction("eyes", slack.ItemRef{
Channel: channelID,
Timestamp: messageTS,
})
// Register typing stop (remove "eyes" reaction) with Manager
if rec := c.GetPlaceholderRecorder(); rec != nil {
capturedChannelID := channelID
capturedMessageTS := messageTS
rec.RecordTypingStop("slack", chatID, func() {
c.api.RemoveReaction("eyes", slack.ItemRef{
Channel: capturedChannelID,
Timestamp: capturedMessageTS,
})
})
}
c.pendingAcks.Store(chatID, slackMessageRef{
ChannelID: channelID,
Timestamp: messageTS,
+58 -24
View File
@@ -191,6 +191,36 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
return nil
}
// StartTyping implements channels.TypingCapable.
// It sends ChatAction(typing) immediately and then repeats every 4 seconds
// (Telegram's typing indicator expires after ~5s) in a background goroutine.
// The returned stop function is idempotent and cancels the goroutine.
func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(), error) {
cid, err := parseChatID(chatID)
if err != nil {
return func() {}, err
}
// Send the first typing action immediately
_ = c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(cid), telego.ChatActionTyping))
typingCtx, cancel := context.WithCancel(ctx)
go func() {
ticker := time.NewTicker(4 * time.Second)
defer ticker.Stop()
for {
select {
case <-typingCtx.Done():
return
case <-ticker.C:
_ = c.bot.SendChatAction(typingCtx, tu.ChatAction(tu.ID(cid), telego.ChatActionTyping))
}
}
}()
return cancel, nil
}
// EditMessage implements channels.MessageEditor.
func (c *TelegramChannel) EditMessage(ctx context.Context, chatID string, messageID string, content string) error {
cid, err := parseChatID(chatID)
@@ -208,6 +238,33 @@ func (c *TelegramChannel) EditMessage(ctx context.Context, chatID string, messag
return err
}
// SendPlaceholder implements channels.PlaceholderCapable.
// It sends a placeholder message (e.g. "Thinking... 💭") that will later be
// edited to the actual response via EditMessage (channels.MessageEditor).
func (c *TelegramChannel) SendPlaceholder(ctx context.Context, chatID string) (string, error) {
phCfg := c.config.Channels.Telegram.Placeholder
if !phCfg.Enabled {
return "", nil
}
text := phCfg.Text
if text == "" {
text = "Thinking... 💭"
}
cid, err := parseChatID(chatID)
if err != nil {
return "", err
}
pMsg, err := c.bot.SendMessage(ctx, tu.Message(tu.ID(cid), text))
if err != nil {
return "", err
}
return fmt.Sprintf("%d", pMsg.MessageID), nil
}
// SendMedia implements the channels.MediaSender interface.
func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
if !c.IsRunning() {
@@ -419,30 +476,7 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
"preview": utils.Truncate(content, 50),
})
// Thinking indicator
err := c.bot.SendChatAction(ctx, tu.ChatAction(tu.ID(chatID), telego.ChatActionTyping))
if err != nil {
logger.ErrorCF("telegram", "Failed to send chat action", map[string]any{
"error": err.Error(),
})
}
// Create cancel function for thinking state and register with Manager
_, thinkCancel := context.WithTimeout(ctx, 5*time.Minute)
if rec := c.GetPlaceholderRecorder(); rec != nil {
rec.RecordTypingStop("telegram", chatIDStr, thinkCancel)
} else {
// No recorder — cancel immediately to avoid context leak
thinkCancel()
}
pMsg, err := c.bot.SendMessage(ctx, tu.Message(tu.ID(chatID), "Thinking... 💭"))
if err == nil {
pID := pMsg.MessageID
if rec := c.GetPlaceholderRecorder(); rec != nil {
rec.RecordPlaceholder("telegram", chatIDStr, fmt.Sprintf("%d", pID))
}
}
// Placeholder is now auto-triggered by BaseChannel.HandleMessage via PlaceholderCapable
peerKind := "direct"
peerID := fmt.Sprintf("%d", user.ID)
+1 -3
View File
@@ -288,7 +288,6 @@ type SlackConfig struct {
AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_SLACK_ALLOW_FROM"`
GroupTrigger GroupTriggerConfig `json:"group_trigger,omitempty"`
Typing TypingConfig `json:"typing,omitempty"`
Placeholder PlaceholderConfig `json:"placeholder,omitempty"`
}
type LINEConfig struct {
@@ -301,7 +300,6 @@ type LINEConfig struct {
AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_LINE_ALLOW_FROM"`
GroupTrigger GroupTriggerConfig `json:"group_trigger,omitempty"`
Typing TypingConfig `json:"typing,omitempty"`
Placeholder PlaceholderConfig `json:"placeholder,omitempty"`
}
type OneBotConfig struct {
@@ -313,7 +311,6 @@ type OneBotConfig struct {
AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_ONEBOT_ALLOW_FROM"`
GroupTrigger GroupTriggerConfig `json:"group_trigger,omitempty"`
Typing TypingConfig `json:"typing,omitempty"`
Placeholder PlaceholderConfig `json:"placeholder,omitempty"`
}
type WeComConfig struct {
@@ -354,6 +351,7 @@ type PicoConfig struct {
WriteTimeout int `json:"write_timeout,omitempty"`
MaxConnections int `json:"max_connections,omitempty"`
AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_PICO_ALLOW_FROM"`
Placeholder PlaceholderConfig `json:"placeholder,omitempty"`
}
type HeartbeatConfig struct {