diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 01e7ce4c4..a3a23fb3d 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -296,16 +296,21 @@ func (al *AgentLoop) Run(ctx context.Context) error { // } // }() - defer drainCancel() + drainCanceled := false + cancelDrain := func() { + if drainCanceled { + return + } + drainCancel() + drainCanceled = true + } + defer cancelDrain() response, err := al.processMessage(ctx, msg) if err != nil { response = fmt.Sprintf("Error processing message: %v", err) } - - if response != "" { - al.publishResponseIfNeeded(ctx, msg.Channel, msg.ChatID, response) - } + finalResponse := response target, targetErr := al.buildContinuationTarget(msg) if targetErr != nil { @@ -317,6 +322,10 @@ func (al *AgentLoop) Run(ctx context.Context) error { return } if target == nil { + cancelDrain() + if finalResponse != "" { + al.publishResponseIfNeeded(ctx, msg.Channel, msg.ChatID, finalResponse) + } return } @@ -343,7 +352,39 @@ func (al *AgentLoop) Run(ctx context.Context) error { return } - al.publishResponseIfNeeded(ctx, target.Channel, target.ChatID, continued) + finalResponse = continued + } + + cancelDrain() + + for al.pendingSteeringCountForScope(target.SessionKey) > 0 { + logger.InfoCF("agent", "Draining steering queued during turn shutdown", + map[string]any{ + "channel": target.Channel, + "chat_id": target.ChatID, + "session_key": target.SessionKey, + "queue_depth": al.pendingSteeringCountForScope(target.SessionKey), + }) + + continued, continueErr := al.Continue(ctx, target.SessionKey, target.Channel, target.ChatID) + if continueErr != nil { + logger.WarnCF("agent", "Failed to continue queued steering after shutdown drain", + map[string]any{ + "channel": target.Channel, + "chat_id": target.ChatID, + "error": continueErr.Error(), + }) + return + } + if continued == "" { + break + } + + finalResponse = continued + } + + if finalResponse != "" { + al.publishResponseIfNeeded(ctx, target.Channel, target.ChatID, finalResponse) } }() }