diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index aade18014..ef2b9e28f 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -278,58 +278,64 @@ func (al *AgentLoop) Run(ctx context.Context) error { return nil } // Process message - // TODO: Re-enable media cleanup after inbound media is properly consumed by the agent. - // Currently disabled because files are deleted before the LLM can access their content. - // defer func() { - // if al.mediaStore != nil && msg.MediaScope != "" { - // if releaseErr := al.mediaStore.ReleaseAll(msg.MediaScope); releaseErr != nil { - // logger.WarnCF("agent", "Failed to release media", map[string]any{ - // "scope": msg.MediaScope, - // "error": releaseErr.Error(), - // }) - // } - // } - // }() + func() { + defer func() { + if al.channelManager != nil { + al.channelManager.InvokeTypingStop(msg.Channel, msg.ChatID) + } + }() + // TODO: Re-enable media cleanup after inbound media is properly consumed by the agent. + // Currently disabled because files are deleted before the LLM can access their content. + // defer func() { + // if al.mediaStore != nil && msg.MediaScope != "" { + // if releaseErr := al.mediaStore.ReleaseAll(msg.MediaScope); releaseErr != nil { + // logger.WarnCF("agent", "Failed to release media", map[string]any{ + // "scope": msg.MediaScope, + // "error": releaseErr.Error(), + // }) + // } + // } + // }() - response, err := al.processMessage(ctx, msg) - if err != nil { - response = fmt.Sprintf("Error processing message: %v", err) - } + response, err := al.processMessage(ctx, msg) + if err != nil { + response = fmt.Sprintf("Error processing message: %v", err) + } - if response != "" { - // Check if the message tool already sent a response during this round. - // If so, skip publishing to avoid duplicate messages to the user. - // Use default agent's tools to check (message tool is shared). - alreadySent := false - defaultAgent := al.GetRegistry().GetDefaultAgent() - if defaultAgent != nil { - if tool, ok := defaultAgent.Tools.Get("message"); ok { - if mt, ok := tool.(*tools.MessageTool); ok { - alreadySent = mt.HasSentInRound() + if response != "" { + // Check if the message tool already sent a response during this round. + // If so, skip publishing to avoid duplicate messages to the user. + // Use default agent's tools to check (message tool is shared). + alreadySent := false + defaultAgent := al.GetRegistry().GetDefaultAgent() + if defaultAgent != nil { + if tool, ok := defaultAgent.Tools.Get("message"); ok { + if mt, ok := tool.(*tools.MessageTool); ok { + alreadySent = mt.HasSentInRound() + } } } - } - - if !alreadySent { - al.bus.PublishOutbound(ctx, bus.OutboundMessage{ - Channel: msg.Channel, - ChatID: msg.ChatID, - Content: response, - }) - logger.InfoCF("agent", "Published outbound response", - map[string]any{ - "channel": msg.Channel, - "chat_id": msg.ChatID, - "content_len": len(response), + if !alreadySent { + al.bus.PublishOutbound(ctx, bus.OutboundMessage{ + Channel: msg.Channel, + ChatID: msg.ChatID, + Content: response, }) - } else { - logger.DebugCF( - "agent", - "Skipped outbound (message tool already sent)", - map[string]any{"channel": msg.Channel}, - ) + logger.InfoCF("agent", "Published outbound response", + map[string]any{ + "channel": msg.Channel, + "chat_id": msg.ChatID, + "content_len": len(response), + }) + } else { + logger.DebugCF( + "agent", + "Skipped outbound (message tool already sent)", + map[string]any{"channel": msg.Channel}, + ) + } } - } + }() default: time.Sleep(time.Microsecond * 200) } diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 2e1e12ded..c980daf66 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -136,6 +136,19 @@ func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) { } } +// InvokeTypingStop invokes the registered typing stop function for the given channel and chatID. +// It is safe to call even when no typing indicator is active (no-op). +// Used by the agent loop to stop typing when processing completes (success, error, or panic), +// regardless of whether an outbound message is published. +func (m *Manager) InvokeTypingStop(channel, chatID string) { + key := channel + ":" + chatID + if v, loaded := m.typingStops.LoadAndDelete(key); loaded { + if entry, ok := v.(typingEntry); ok { + entry.stop() + } + } +} + // RecordReactionUndo registers a reaction undo function for later invocation. // Implements PlaceholderRecorder. func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) { diff --git a/pkg/channels/manager_test.go b/pkg/channels/manager_test.go index e0f55288a..7dfec9ebf 100644 --- a/pkg/channels/manager_test.go +++ b/pkg/channels/manager_test.go @@ -511,6 +511,43 @@ func TestPreSend_PlaceholderEditFails_FallsThrough(t *testing.T) { } } +func TestInvokeTypingStop_CallsRegisteredStop(t *testing.T) { + m := newTestManager() + var stopCalled bool + + m.RecordTypingStop("telegram", "chat123", func() { + stopCalled = true + }) + + m.InvokeTypingStop("telegram", "chat123") + + if !stopCalled { + t.Fatal("expected typing stop func to be called") + } +} + +func TestInvokeTypingStop_NoOpWhenNoEntry(t *testing.T) { + m := newTestManager() + // Should not panic + m.InvokeTypingStop("telegram", "nonexistent") +} + +func TestInvokeTypingStop_Idempotent(t *testing.T) { + m := newTestManager() + var callCount int + + m.RecordTypingStop("telegram", "chat123", func() { + callCount++ + }) + + m.InvokeTypingStop("telegram", "chat123") + m.InvokeTypingStop("telegram", "chat123") // Second call: entry already removed, no-op + + if callCount != 1 { + t.Fatalf("expected stop to be called once, got %d", callCount) + } +} + func TestPreSend_TypingStopCalled(t *testing.T) { m := newTestManager() var stopCalled bool diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index 9d0325093..2797bdf4a 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -302,10 +302,17 @@ func (c *TelegramChannel) sendChunk( return nil } +// maxTypingDuration limits how long the typing indicator can run. +// Prevents endless typing when the LLM fails/hangs and preSend never invokes cancel. +// Matches channels.Manager's typingStopTTL (5 min) so behavior is consistent. +const maxTypingDuration = 5 * time.Minute + // StartTyping implements channels.TypingCapable. // It sends ChatAction(typing) immediately and then repeats every 4 seconds // (Telegram's typing indicator expires after ~5s) in a background goroutine. // The returned stop function is idempotent and cancels the goroutine. +// The goroutine also exits automatically after maxTypingDuration if cancel is +// never called (e.g. when the LLM fails or times out without publishing). func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(), error) { cid, threadID, err := parseTelegramChatID(chatID) if err != nil { @@ -319,12 +326,15 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func( _ = c.bot.SendChatAction(ctx, action) typingCtx, cancel := context.WithCancel(ctx) + // Cap lifetime so the goroutine cannot run indefinitely if cancel is never called + maxCtx, maxCancel := context.WithTimeout(typingCtx, maxTypingDuration) go func() { + defer maxCancel() ticker := time.NewTicker(4 * time.Second) defer ticker.Stop() for { select { - case <-typingCtx.Done(): + case <-maxCtx.Done(): return case <-ticker.C: a := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)