merge: integrate main into refactor-inbound-context-routing-session

This commit is contained in:
Hoshina
2026-04-13 13:25:07 +08:00
134 changed files with 13291 additions and 1321 deletions
+322 -17
View File
@@ -91,6 +91,7 @@ type processOptions struct {
DefaultResponse string // Response when LLM returns empty
EnableSummary bool // Whether to trigger summarization
SendResponse bool // Whether to send response via bus
AllowInterimPicoPublish bool // Whether pico tool-call interim text can be published when SendResponse is false
SuppressToolFeedback bool // Whether to suppress inline tool feedback messages
NoHistory bool // If true, don't load session history (for heartbeat)
SkipInitialSteeringPoll bool // If true, skip the steering poll at loop start (used by Continue)
@@ -109,6 +110,15 @@ const (
defaultResponse = "The model returned an empty response. This may indicate a provider error or token limit."
toolLimitResponse = "I've reached `max_tool_iterations` without a final response. Increase `max_tool_iterations` in config.json if this task needs more tool steps."
handledToolResponseSummary = "Requested output delivered via tool attachment."
sessionKeyAgentPrefix = "agent:"
metadataKeyMessageKind = "message_kind"
messageKindThought = "thought"
metadataKeyAccountID = "account_id"
metadataKeyGuildID = "guild_id"
metadataKeyTeamID = "team_id"
metadataKeyReplyToMessage = "reply_to_message_id"
metadataKeyParentPeerKind = "parent_peer_kind"
metadataKeyParentPeerID = "parent_peer_id"
)
func NewAgentLoop(
@@ -688,21 +698,21 @@ func (al *AgentLoop) PublishResponseIfNeeded(ctx context.Context, channel, chatI
return
}
alreadySent := false
alreadySentToSameChat := false
defaultAgent := al.GetRegistry().GetDefaultAgent()
if defaultAgent != nil {
if tool, ok := defaultAgent.Tools.Get("message"); ok {
if mt, ok := tool.(*tools.MessageTool); ok {
alreadySent = mt.HasSentInRound()
alreadySentToSameChat = mt.HasSentTo(channel, chatID)
}
}
}
if alreadySent {
if alreadySentToSameChat {
logger.DebugCF(
"agent",
"Skipped outbound (message tool already sent)",
map[string]any{"channel": channel},
"Skipped outbound (message tool already sent to same chat)",
map[string]any{"channel": channel, "chat_id": chatID},
)
return
}
@@ -1593,10 +1603,12 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
UserMessage: msg.Content,
Media: append([]string(nil), msg.Media...),
},
SenderDisplayName: msg.Sender.DisplayName,
DefaultResponse: defaultResponse,
EnableSummary: true,
SendResponse: false,
SenderID: msg.SenderID,
SenderDisplayName: msg.Sender.DisplayName,
DefaultResponse: defaultResponse,
EnableSummary: true,
SendResponse: false,
AllowInterimPicoPublish: true,
}
// context-dependent commands check their own Runtime fields and report
@@ -1888,6 +1900,43 @@ func (al *AgentLoop) targetReasoningChannelID(channelName string) (chatID string
return ""
}
func (al *AgentLoop) publishPicoReasoning(ctx context.Context, reasoningContent, chatID string) {
if reasoningContent == "" || chatID == "" {
return
}
if ctx.Err() != nil {
return
}
pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second)
defer pubCancel()
if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{
Context: bus.InboundContext{
Channel: "pico",
ChatID: chatID,
Raw: map[string]string{
metadataKeyMessageKind: messageKindThought,
},
},
Content: reasoningContent,
}); err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) ||
errors.Is(err, bus.ErrBusClosed) {
logger.DebugCF("agent", "Pico reasoning publish skipped (timeout/cancel)", map[string]any{
"channel": "pico",
"error": err.Error(),
})
} else {
logger.WarnCF("agent", "Failed to publish pico reasoning (best-effort)", map[string]any{
"channel": "pico",
"error": err.Error(),
})
}
}
}
func (al *AgentLoop) handleReasoning(
ctx context.Context,
reasoningContent, channelName, channelID string,
@@ -2483,12 +2532,16 @@ turnLoop:
if reasoningContent == "" {
reasoningContent = response.ReasoningContent
}
go al.handleReasoning(
ctx,
reasoningContent,
ts.channel,
al.targetReasoningChannelID(ts.channel),
)
if ts.channel == "pico" {
go al.publishPicoReasoning(turnCtx, reasoningContent, ts.chatID)
} else {
go al.handleReasoning(
turnCtx,
reasoningContent,
ts.channel,
al.targetReasoningChannelID(ts.channel),
)
}
al.emitEvent(
EventKindLLMResponse,
ts.eventMeta("runTurn", "turn.llm.response"),
@@ -2515,9 +2568,29 @@ turnLoop:
}
logger.DebugCF("agent", "LLM response", llmResponseFields)
if al.bus != nil && ts.channel == "pico" && len(response.ToolCalls) > 0 && ts.opts.AllowInterimPicoPublish {
if strings.TrimSpace(response.Content) != "" {
outCtx, outCancel := context.WithTimeout(turnCtx, 3*time.Second)
err := al.bus.PublishOutbound(outCtx, bus.OutboundMessage{
Channel: ts.channel,
ChatID: ts.chatID,
Content: response.Content,
})
outCancel()
if err != nil {
logger.WarnCF("agent", "Failed to publish pico interim tool-call content", map[string]any{
"error": err.Error(),
"channel": ts.channel,
"chat_id": ts.chatID,
"iteration": iteration,
})
}
}
}
if len(response.ToolCalls) == 0 || gracefulTerminal {
responseContent := response.Content
if responseContent == "" && response.ReasoningContent != "" {
if responseContent == "" && response.ReasoningContent != "" && ts.channel != "pico" {
responseContent = response.ReasoningContent
}
if steerMsgs := al.dequeueSteeringMessagesForScope(ts.sessionKey); len(steerMsgs) > 0 {
@@ -2613,6 +2686,238 @@ turnLoop:
toolName = toolReq.Tool
toolArgs = toolReq.Arguments
}
case HookActionRespond:
// Hook returns result directly, skip tool execution.
// SECURITY: This bypasses ApproveTool, allowing hooks to respond
// for any tool name without approval. This is intentional for
// plugin tools but means a before_tool hook can override even
// sensitive tools like bash. Hook configuration should be
// carefully reviewed to prevent unauthorized tool execution.
if toolReq != nil && toolReq.HookResult != nil {
hookResult := toolReq.HookResult
argsJSON, _ := json.Marshal(toolArgs)
argsPreview := utils.Truncate(string(argsJSON), 200)
logger.InfoCF("agent", fmt.Sprintf("Tool call (hook respond): %s(%s)", toolName, argsPreview),
map[string]any{
"agent_id": ts.agent.ID,
"tool": toolName,
"iteration": iteration,
})
// Emit ToolExecStart event (same as normal tool execution)
al.emitEvent(
EventKindToolExecStart,
ts.eventMeta("runTurn", "turn.tool.start"),
ToolExecStartPayload{
Tool: toolName,
Arguments: cloneEventArguments(toolArgs),
},
)
// Send tool feedback to chat channel if enabled (same as normal tool execution)
if al.cfg.Agents.Defaults.IsToolFeedbackEnabled() &&
ts.channel != "" &&
!ts.opts.SuppressToolFeedback {
argsJSON, _ := json.Marshal(toolArgs)
feedbackPreview := utils.Truncate(
string(argsJSON),
al.cfg.Agents.Defaults.GetToolFeedbackMaxArgsLength(),
)
feedbackMsg := utils.FormatToolFeedbackMessage(toolName, feedbackPreview)
fbCtx, fbCancel := context.WithTimeout(turnCtx, 3*time.Second)
_ = al.bus.PublishOutbound(fbCtx, bus.OutboundMessage{
Channel: ts.channel,
ChatID: ts.chatID,
Content: feedbackMsg,
})
fbCancel()
}
toolDuration := time.Duration(0) // Hook execution time unknown
// Send ForUser content to user
// For ResponseHandled results, send regardless of SendResponse setting,
// same as normal tool execution path.
shouldSendForUser := !hookResult.Silent && hookResult.ForUser != "" &&
(ts.opts.SendResponse || hookResult.ResponseHandled)
if shouldSendForUser {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Context: bus.InboundContext{
Channel: ts.channel,
ChatID: ts.chatID,
Raw: map[string]string{
"is_tool_call": "true",
},
},
Content: hookResult.ForUser,
})
}
// Handle media from hook result (same as normal tool execution)
if len(hookResult.Media) > 0 && hookResult.ResponseHandled {
parts := make([]bus.MediaPart, 0, len(hookResult.Media))
for _, ref := range hookResult.Media {
part := bus.MediaPart{Ref: ref}
if al.mediaStore != nil {
if _, meta, err := al.mediaStore.ResolveWithMeta(ref); err == nil {
part.Filename = meta.Filename
part.ContentType = meta.ContentType
part.Type = inferMediaType(meta.Filename, meta.ContentType)
}
}
parts = append(parts, part)
}
outboundMedia := bus.OutboundMediaMessage{
Channel: ts.channel,
ChatID: ts.chatID,
Parts: parts,
}
if al.channelManager != nil && ts.channel != "" && !constants.IsInternalChannel(ts.channel) {
if err := al.channelManager.SendMedia(ctx, outboundMedia); err != nil {
logger.WarnCF("agent", "Failed to deliver hook media",
map[string]any{
"agent_id": ts.agent.ID,
"tool": toolName,
"channel": ts.channel,
"chat_id": ts.chatID,
"error": err.Error(),
})
// Same as normal tool execution: notify LLM about delivery failure
hookResult.IsError = true
hookResult.ForLLM = fmt.Sprintf("failed to deliver attachment: %v", err)
}
} else if al.bus != nil {
al.bus.PublishOutboundMedia(ctx, outboundMedia)
// Same as normal tool execution: bus only queues, media not yet delivered
hookResult.ResponseHandled = false
}
}
// Track response handling status (same as normal tool execution)
if !hookResult.ResponseHandled {
allResponsesHandled = false
}
// Build tool message
contentForLLM := hookResult.ContentForLLM()
if al.cfg.Tools.IsFilterSensitiveDataEnabled() {
contentForLLM = al.cfg.FilterSensitiveData(contentForLLM)
}
toolResultMsg := providers.Message{
Role: "tool",
Content: contentForLLM,
ToolCallID: tc.ID,
}
// Handle media for LLM vision (same as normal tool execution)
if len(hookResult.Media) > 0 && !hookResult.ResponseHandled {
hookResult.ArtifactTags = buildArtifactTags(al.mediaStore, hookResult.Media)
// Recalculate contentForLLM after adding ArtifactTags
contentForLLM = hookResult.ContentForLLM()
if al.cfg.Tools.IsFilterSensitiveDataEnabled() {
contentForLLM = al.cfg.FilterSensitiveData(contentForLLM)
}
toolResultMsg.Content = contentForLLM
toolResultMsg.Media = append(toolResultMsg.Media, hookResult.Media...)
}
// Emit ToolExecEnd event (after filtering, same as normal tool execution)
al.emitEvent(
EventKindToolExecEnd,
ts.eventMeta("runTurn", "turn.tool.end"),
ToolExecEndPayload{
Tool: toolName,
Duration: toolDuration,
ForLLMLen: len(contentForLLM),
ForUserLen: len(hookResult.ForUser),
IsError: hookResult.IsError,
Async: hookResult.Async,
},
)
messages = append(messages, toolResultMsg)
if !ts.opts.NoHistory {
ts.agent.Sessions.AddFullMessage(ts.sessionKey, toolResultMsg)
ts.recordPersistedMessage(toolResultMsg)
ts.ingestMessage(turnCtx, al, toolResultMsg)
}
// Same as normal tool execution: check for steering/interrupt/SubTurn after each tool
if steerMsgs := al.dequeueSteeringMessagesForScope(ts.sessionKey); len(steerMsgs) > 0 {
pendingMessages = append(pendingMessages, steerMsgs...)
}
skipReason := ""
skipMessage := ""
if len(pendingMessages) > 0 {
skipReason = "queued user steering message"
skipMessage = "Skipped due to queued user message."
} else if gracefulPending, _ := ts.gracefulInterruptRequested(); gracefulPending {
skipReason = "graceful interrupt requested"
skipMessage = "Skipped due to graceful interrupt."
}
if skipReason != "" {
remaining := len(normalizedToolCalls) - i - 1
if remaining > 0 {
logger.InfoCF("agent", "Turn checkpoint: skipping remaining tools after hook respond",
map[string]any{
"agent_id": ts.agent.ID,
"completed": i + 1,
"skipped": remaining,
"reason": skipReason,
})
for j := i + 1; j < len(normalizedToolCalls); j++ {
skippedTC := normalizedToolCalls[j]
al.emitEvent(
EventKindToolExecSkipped,
ts.eventMeta("runTurn", "turn.tool.skipped"),
ToolExecSkippedPayload{
Tool: skippedTC.Name,
Reason: skipReason,
},
)
skippedMsg := providers.Message{
Role: "tool",
Content: skipMessage,
ToolCallID: skippedTC.ID,
}
messages = append(messages, skippedMsg)
if !ts.opts.NoHistory {
ts.agent.Sessions.AddFullMessage(ts.sessionKey, skippedMsg)
ts.recordPersistedMessage(skippedMsg)
}
}
}
break
}
// Also poll for any SubTurn results that arrived during tool execution.
if ts.pendingResults != nil {
select {
case result, ok := <-ts.pendingResults:
if ok && result != nil && result.ForLLM != "" {
content := al.cfg.FilterSensitiveData(result.ForLLM)
msg := providers.Message{Role: "user", Content: fmt.Sprintf("[SubTurn Result] %s", content)}
messages = append(messages, msg)
ts.agent.Sessions.AddFullMessage(ts.sessionKey, msg)
}
default:
// No results available
}
}
continue
}
// If no HookResult, fall back to continue with warning
logger.WarnCF("agent", "Hook returned respond action but no HookResult provided",
map[string]any{
"agent_id": ts.agent.ID,
"tool": toolName,
"action": "respond",
})
case HookActionDenyTool:
allResponsesHandled = false
denyContent := hookDeniedToolContent("Tool execution denied by hook", decision.Reason)
@@ -2702,7 +3007,7 @@ turnLoop:
string(argsJSON),
al.cfg.Agents.Defaults.GetToolFeedbackMaxArgsLength(),
)
feedbackMsg := fmt.Sprintf("\U0001f527 `%s`\n```\n%s\n```", tc.Name, feedbackPreview)
feedbackMsg := utils.FormatToolFeedbackMessage(tc.Name, feedbackPreview)
fbCtx, fbCancel := context.WithTimeout(turnCtx, 3*time.Second)
_ = al.bus.PublishOutbound(fbCtx, outboundMessageForTurn(ts, feedbackMsg))
fbCancel()