mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
fix(telegram): stop typing indicator when LLM fails or hangs
This commit is contained in:
@@ -255,6 +255,11 @@ func (al *AgentLoop) Run(ctx context.Context) error {
|
||||
|
||||
// Process message
|
||||
func() {
|
||||
defer func() {
|
||||
if al.channelManager != nil {
|
||||
al.channelManager.InvokeTypingStop(msg.Channel, msg.ChatID)
|
||||
}
|
||||
}()
|
||||
// TODO: Re-enable media cleanup after inbound media is properly consumed by the agent.
|
||||
// Currently disabled because files are deleted before the LLM can access their content.
|
||||
// defer func() {
|
||||
|
||||
@@ -130,6 +130,19 @@ func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) {
|
||||
m.typingStops.Store(key, typingEntry{stop: stop, createdAt: time.Now()})
|
||||
}
|
||||
|
||||
// InvokeTypingStop invokes the registered typing stop function for the given channel and chatID.
|
||||
// It is safe to call even when no typing indicator is active (no-op).
|
||||
// Used by the agent loop to stop typing when processing completes (success, error, or panic),
|
||||
// regardless of whether an outbound message is published.
|
||||
func (m *Manager) InvokeTypingStop(channel, chatID string) {
|
||||
key := channel + ":" + chatID
|
||||
if v, loaded := m.typingStops.LoadAndDelete(key); loaded {
|
||||
if entry, ok := v.(typingEntry); ok {
|
||||
entry.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RecordReactionUndo registers a reaction undo function for later invocation.
|
||||
// Implements PlaceholderRecorder.
|
||||
func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) {
|
||||
|
||||
@@ -511,6 +511,43 @@ func TestPreSend_PlaceholderEditFails_FallsThrough(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvokeTypingStop_CallsRegisteredStop(t *testing.T) {
|
||||
m := newTestManager()
|
||||
var stopCalled bool
|
||||
|
||||
m.RecordTypingStop("telegram", "chat123", func() {
|
||||
stopCalled = true
|
||||
})
|
||||
|
||||
m.InvokeTypingStop("telegram", "chat123")
|
||||
|
||||
if !stopCalled {
|
||||
t.Fatal("expected typing stop func to be called")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvokeTypingStop_NoOpWhenNoEntry(t *testing.T) {
|
||||
m := newTestManager()
|
||||
// Should not panic
|
||||
m.InvokeTypingStop("telegram", "nonexistent")
|
||||
}
|
||||
|
||||
func TestInvokeTypingStop_Idempotent(t *testing.T) {
|
||||
m := newTestManager()
|
||||
var callCount int
|
||||
|
||||
m.RecordTypingStop("telegram", "chat123", func() {
|
||||
callCount++
|
||||
})
|
||||
|
||||
m.InvokeTypingStop("telegram", "chat123")
|
||||
m.InvokeTypingStop("telegram", "chat123") // Second call: entry already removed, no-op
|
||||
|
||||
if callCount != 1 {
|
||||
t.Fatalf("expected stop to be called once, got %d", callCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPreSend_TypingStopCalled(t *testing.T) {
|
||||
m := newTestManager()
|
||||
var stopCalled bool
|
||||
|
||||
@@ -242,10 +242,17 @@ func (c *TelegramChannel) sendHTMLChunk(
|
||||
return nil
|
||||
}
|
||||
|
||||
// maxTypingDuration limits how long the typing indicator can run.
|
||||
// Prevents endless typing when the LLM fails/hangs and preSend never invokes cancel.
|
||||
// Matches channels.Manager's typingStopTTL (5 min) so behavior is consistent.
|
||||
const maxTypingDuration = 5 * time.Minute
|
||||
|
||||
// StartTyping implements channels.TypingCapable.
|
||||
// It sends ChatAction(typing) immediately and then repeats every 4 seconds
|
||||
// (Telegram's typing indicator expires after ~5s) in a background goroutine.
|
||||
// The returned stop function is idempotent and cancels the goroutine.
|
||||
// The goroutine also exits automatically after maxTypingDuration if cancel is
|
||||
// never called (e.g. when the LLM fails or times out without publishing).
|
||||
func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(), error) {
|
||||
cid, threadID, err := parseTelegramChatID(chatID)
|
||||
if err != nil {
|
||||
@@ -259,12 +266,15 @@ func (c *TelegramChannel) StartTyping(ctx context.Context, chatID string) (func(
|
||||
_ = c.bot.SendChatAction(ctx, action)
|
||||
|
||||
typingCtx, cancel := context.WithCancel(ctx)
|
||||
// Cap lifetime so the goroutine cannot run indefinitely if cancel is never called
|
||||
maxCtx, maxCancel := context.WithTimeout(typingCtx, maxTypingDuration)
|
||||
go func() {
|
||||
defer maxCancel()
|
||||
ticker := time.NewTicker(4 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-typingCtx.Done():
|
||||
case <-maxCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
a := tu.ChatAction(tu.ID(cid), telego.ChatActionTyping)
|
||||
|
||||
Reference in New Issue
Block a user