diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 28e549ce0..4860b9e2a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -255,6 +255,11 @@ func (al *AgentLoop) Run(ctx context.Context) error { // Process message func() { + defer func() { + if al.channelManager != nil { + al.channelManager.InvokeTypingStop(msg.Channel, msg.ChatID) + } + }() // TODO: Re-enable media cleanup after inbound media is properly consumed by the agent. // Currently disabled because files are deleted before the LLM can access their content. // defer func() { diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 472895a7a..2c06feb38 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -130,6 +130,19 @@ func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) { m.typingStops.Store(key, typingEntry{stop: stop, createdAt: time.Now()}) } +// InvokeTypingStop invokes the registered typing stop function for the given channel and chatID. +// It is safe to call even when no typing indicator is active (no-op). +// Used by the agent loop to stop typing when processing completes (success, error, or panic), +// regardless of whether an outbound message is published. +func (m *Manager) InvokeTypingStop(channel, chatID string) { + key := channel + ":" + chatID + if v, loaded := m.typingStops.LoadAndDelete(key); loaded { + if entry, ok := v.(typingEntry); ok { + entry.stop() + } + } +} + // RecordReactionUndo registers a reaction undo function for later invocation. // Implements PlaceholderRecorder. func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) { diff --git a/pkg/channels/manager_test.go b/pkg/channels/manager_test.go index 1f3a628c2..f92e4abb3 100644 --- a/pkg/channels/manager_test.go +++ b/pkg/channels/manager_test.go @@ -511,6 +511,43 @@ func TestPreSend_PlaceholderEditFails_FallsThrough(t *testing.T) { } } +func TestInvokeTypingStop_CallsRegisteredStop(t *testing.T) { + m := newTestManager() + var stopCalled bool + + m.RecordTypingStop("telegram", "chat123", func() { + stopCalled = true + }) + + m.InvokeTypingStop("telegram", "chat123") + + if !stopCalled { + t.Fatal("expected typing stop func to be called") + } +} + +func TestInvokeTypingStop_NoOpWhenNoEntry(t *testing.T) { + m := newTestManager() + // Should not panic + m.InvokeTypingStop("telegram", "nonexistent") +} + +func TestInvokeTypingStop_Idempotent(t *testing.T) { + m := newTestManager() + var callCount int + + m.RecordTypingStop("telegram", "chat123", func() { + callCount++ + }) + + m.InvokeTypingStop("telegram", "chat123") + m.InvokeTypingStop("telegram", "chat123") // Second call: entry already removed, no-op + + if callCount != 1 { + t.Fatalf("expected stop to be called once, got %d", callCount) + } +} + func TestPreSend_TypingStopCalled(t *testing.T) { m := newTestManager() var stopCalled bool diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index 34ee46b7b..5f86d24c9 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -242,10 +242,17 @@ func (c *TelegramChannel) sendHTMLChunk( return nil } +// maxTypingDuration limits how long the typing indicator can run. +// Prevents endless typing when the LLM fails/hangs and preSend never invokes cancel. +// Matches channels.Manager's typingStopTTL (5 min) so behavior is consistent. +const maxTypingDuration = 5 * time.Minute + // StartTyping implements channels.TypingCapable. // It sends ChatAction(typing) immediately and then repeats every 4 seconds // (Telegram's typing indicator expires after ~5s) in a background goroutine. // The returned stop function is idempotent and cancels the goroutine. +// The goroutine also exits automatically after maxTypingDuration if cancel is +// never called (e.g. when the LLM fails or times out without publishing). func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(), error) { cid, threadID, err := parseTelegramChatID(chatID) if err != nil { @@ -259,12 +266,15 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func( _ = c.bot.SendChatAction(ctx, action) typingCtx, cancel := context.WithCancel(ctx) + // Cap lifetime so the goroutine cannot run indefinitely if cancel is never called + maxCtx, maxCancel := context.WithTimeout(typingCtx, maxTypingDuration) go func() { + defer maxCancel() ticker := time.NewTicker(4 * time.Second) defer ticker.Stop() for { select { - case <-typingCtx.Done(): + case <-maxCtx.Done(): return case <-ticker.C: a := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)