Merge upstream/main into fix/1323-telegram-endless-typing

Made-with: Cursor
This commit is contained in:
kiannidev
2026-03-19 10:53:50 +02:00
270 changed files with 31493 additions and 9825 deletions
+278 -60
View File
@@ -48,19 +48,25 @@ type AgentLoop struct {
transcriber voice.Transcriber
cmdRegistry *commands.Registry
mcp mcpRuntime
mu sync.RWMutex
reloadFunc func() error
// Track active requests for safe provider cleanup
activeRequests sync.WaitGroup
}
// processOptions configures how a message is processed
type processOptions struct {
SessionKey string // Session identifier for history/context
Channel string // Target channel for tool execution
ChatID string // Target chat ID for tool execution
UserMessage string // User message content (may include prefix)
Media []string // media:// refs from inbound message
DefaultResponse string // Response when LLM returns empty
EnableSummary bool // Whether to trigger summarization
SendResponse bool // Whether to send response via bus
NoHistory bool // If true, don't load session history (for heartbeat)
SessionKey string // Session identifier for history/context
Channel string // Target channel for tool execution
ChatID string // Target chat ID for tool execution
SenderID string // Current sender ID for dynamic context
SenderDisplayName string // Current sender display name for dynamic context
UserMessage string // User message content (may include prefix)
Media []string // media:// refs from inbound message
DefaultResponse string // Response when LLM returns empty
EnableSummary bool // Whether to trigger summarization
SendResponse bool // Whether to send response via bus
NoHistory bool // If true, don't load session history (for heartbeat)
}
const (
@@ -114,6 +120,8 @@ func registerSharedTools(
registry *AgentRegistry,
provider providers.LLMProvider,
) {
allowReadPaths := buildAllowReadPatterns(cfg)
for _, agentID := range registry.ListAgentIDs() {
agent, ok := registry.GetAgent(agentID)
if !ok {
@@ -154,7 +162,12 @@ func registerSharedTools(
}
}
if cfg.Tools.IsToolEnabled("web_fetch") {
fetchTool, err := tools.NewWebFetchToolWithProxy(50000, cfg.Tools.Web.Proxy, cfg.Tools.Web.FetchLimitBytes)
fetchTool, err := tools.NewWebFetchToolWithProxy(
50000,
cfg.Tools.Web.Proxy,
cfg.Tools.Web.Format,
cfg.Tools.Web.FetchLimitBytes,
cfg.Tools.Web.PrivateHostWhitelist)
if err != nil {
logger.ErrorCF("agent", "Failed to create web fetch tool", map[string]any{"error": err.Error()})
} else {
@@ -192,6 +205,7 @@ func registerSharedTools(
cfg.Agents.Defaults.RestrictToWorkspace,
cfg.Agents.Defaults.GetMaxMediaSize(),
nil,
allowReadPaths,
)
agent.Tools.Register(sendFileTool)
}
@@ -219,26 +233,38 @@ func registerSharedTools(
}
}
// Spawn tool with allowlist checker
if cfg.Tools.IsToolEnabled("spawn") {
if cfg.Tools.IsToolEnabled("subagent") {
subagentManager := tools.NewSubagentManager(provider, agent.Model, agent.Workspace)
subagentManager.SetLLMOptions(agent.MaxTokens, agent.Temperature)
// Spawn and spawn_status tools share a SubagentManager.
// Construct it when either tool is enabled (both require subagent).
spawnEnabled := cfg.Tools.IsToolEnabled("spawn")
spawnStatusEnabled := cfg.Tools.IsToolEnabled("spawn_status")
if (spawnEnabled || spawnStatusEnabled) && cfg.Tools.IsToolEnabled("subagent") {
subagentManager := tools.NewSubagentManager(provider, agent.Model, agent.Workspace)
subagentManager.SetLLMOptions(agent.MaxTokens, agent.Temperature)
// Clone the parent's tool registry so subagents can use all
// tools registered so far (file, web, etc.) but NOT spawn/
// spawn_status which are added below — preventing recursive
// subagent spawning.
subagentManager.SetTools(agent.Tools.Clone())
if spawnEnabled {
spawnTool := tools.NewSpawnTool(subagentManager)
currentAgentID := agentID
spawnTool.SetAllowlistChecker(func(targetAgentID string) bool {
return registry.CanSpawnSubagent(currentAgentID, targetAgentID)
})
agent.Tools.Register(spawnTool)
} else {
logger.WarnCF("agent", "spawn tool requires subagent to be enabled", nil)
}
if spawnStatusEnabled {
agent.Tools.Register(tools.NewSpawnStatusTool(subagentManager))
}
} else if (spawnEnabled || spawnStatusEnabled) && !cfg.Tools.IsToolEnabled("subagent") {
logger.WarnCF("agent", "spawn/spawn_status tools require subagent to be enabled", nil)
}
}
}
func (al *AgentLoop) Run(ctx context.Context) error {
al.running.Store(true)
if err := al.ensureMCPInitialized(ctx); err != nil {
return err
}
@@ -247,12 +273,10 @@ func (al *AgentLoop) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
default:
msg, ok := al.bus.ConsumeInbound(ctx)
case msg, ok := <-al.bus.InboundChan():
if !ok {
continue
return nil
}
// Process message
func() {
defer func() {
@@ -283,7 +307,7 @@ func (al *AgentLoop) Run(ctx context.Context) error {
// If so, skip publishing to avoid duplicate messages to the user.
// Use default agent's tools to check (message tool is shared).
alreadySent := false
defaultAgent := al.registry.GetDefaultAgent()
defaultAgent := al.GetRegistry().GetDefaultAgent()
if defaultAgent != nil {
if tool, ok := defaultAgent.Tools.Get("message"); ok {
if mt, ok := tool.(*tools.MessageTool); ok {
@@ -291,7 +315,6 @@ func (al *AgentLoop) Run(ctx context.Context) error {
}
}
}
if !alreadySent {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: msg.Channel,
@@ -313,6 +336,8 @@ func (al *AgentLoop) Run(ctx context.Context) error {
}
}
}()
default:
time.Sleep(time.Microsecond * 200)
}
}
@@ -336,12 +361,13 @@ func (al *AgentLoop) Close() {
}
}
al.registry.Close()
al.GetRegistry().Close()
}
func (al *AgentLoop) RegisterTool(tool tools.Tool) {
for _, agentID := range al.registry.ListAgentIDs() {
if agent, ok := al.registry.GetAgent(agentID); ok {
registry := al.GetRegistry()
for _, agentID := range registry.ListAgentIDs() {
if agent, ok := registry.GetAgent(agentID); ok {
agent.Tools.Register(tool)
}
}
@@ -351,12 +377,123 @@ func (al *AgentLoop) SetChannelManager(cm *channels.Manager) {
al.channelManager = cm
}
// ReloadProviderAndConfig atomically swaps the provider and config with proper synchronization.
// It uses a context to allow timeout control from the caller.
// Returns an error if the reload fails or context is canceled.
func (al *AgentLoop) ReloadProviderAndConfig(
ctx context.Context,
provider providers.LLMProvider,
cfg *config.Config,
) error {
// Validate inputs
if provider == nil {
return fmt.Errorf("provider cannot be nil")
}
if cfg == nil {
return fmt.Errorf("config cannot be nil")
}
// Create new registry with updated config and provider
// Wrap in defer/recover to handle any panics gracefully
var registry *AgentRegistry
var panicErr error
done := make(chan struct{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicErr = fmt.Errorf("panic during registry creation: %v", r)
logger.ErrorCF("agent", "Panic during registry creation",
map[string]any{"panic": r})
}
close(done)
}()
registry = NewAgentRegistry(cfg, provider)
}()
// Wait for completion or context cancellation
select {
case <-done:
if registry == nil {
if panicErr != nil {
return fmt.Errorf("registry creation failed: %w", panicErr)
}
return fmt.Errorf("registry creation failed (nil result)")
}
case <-ctx.Done():
return fmt.Errorf("context canceled during registry creation: %w", ctx.Err())
}
// Check context again before proceeding
if err := ctx.Err(); err != nil {
return fmt.Errorf("context canceled after registry creation: %w", err)
}
// Ensure shared tools are re-registered on the new registry
registerSharedTools(cfg, al.bus, registry, provider)
// Atomically swap the config and registry under write lock
// This ensures readers see a consistent pair
al.mu.Lock()
oldRegistry := al.registry
// Store new values
al.cfg = cfg
al.registry = registry
// Also update fallback chain with new config
al.fallback = providers.NewFallbackChain(providers.NewCooldownTracker())
al.mu.Unlock()
// Close old provider after releasing the lock
// This prevents blocking readers while closing
if oldProvider, ok := extractProvider(oldRegistry); ok {
if stateful, ok := oldProvider.(providers.StatefulProvider); ok {
// Give in-flight requests a moment to complete
// Use a reasonable timeout that balances cleanup vs resource usage
select {
case <-time.After(100 * time.Millisecond):
stateful.Close()
case <-ctx.Done():
// Context canceled, close immediately but log warning
logger.WarnCF("agent", "Context canceled during provider cleanup, forcing close",
map[string]any{"error": ctx.Err()})
stateful.Close()
}
}
}
logger.InfoCF("agent", "Provider and config reloaded successfully",
map[string]any{
"model": cfg.Agents.Defaults.GetModelName(),
})
return nil
}
// GetRegistry returns the current registry (thread-safe)
func (al *AgentLoop) GetRegistry() *AgentRegistry {
al.mu.RLock()
defer al.mu.RUnlock()
return al.registry
}
// GetConfig returns the current config (thread-safe)
func (al *AgentLoop) GetConfig() *config.Config {
al.mu.RLock()
defer al.mu.RUnlock()
return al.cfg
}
// SetMediaStore injects a MediaStore for media lifecycle management.
func (al *AgentLoop) SetMediaStore(s media.MediaStore) {
al.mediaStore = s
// Propagate store to send_file tools in all agents.
al.registry.ForEachTool("send_file", func(t tools.Tool) {
registry := al.GetRegistry()
registry.ForEachTool("send_file", func(t tools.Tool) {
if sf, ok := t.(*tools.SendFileTool); ok {
sf.SetMediaStore(s)
}
@@ -368,6 +505,11 @@ func (al *AgentLoop) SetTranscriber(t voice.Transcriber) {
al.transcriber = t
}
// SetReloadFunc sets the callback function for triggering config reload.
func (al *AgentLoop) SetReloadFunc(fn func() error) {
al.reloadFunc = fn
}
var audioAnnotationRe = regexp.MustCompile(`\[(voice|audio)(?::[^\]]*)?\]`)
// transcribeAudioInMessage resolves audio media refs, transcribes them, and
@@ -545,7 +687,7 @@ func (al *AgentLoop) ProcessHeartbeat(
ctx context.Context,
content, channel, chatID string,
) (string, error) {
agent := al.registry.GetDefaultAgent()
agent := al.GetRegistry().GetDefaultAgent()
if agent == nil {
return "", fmt.Errorf("no default agent for heartbeat")
}
@@ -621,14 +763,16 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
})
opts := processOptions{
SessionKey: sessionKey,
Channel: msg.Channel,
ChatID: msg.ChatID,
UserMessage: msg.Content,
Media: msg.Media,
DefaultResponse: defaultResponse,
EnableSummary: true,
SendResponse: false,
SessionKey: sessionKey,
Channel: msg.Channel,
ChatID: msg.ChatID,
SenderID: msg.SenderID,
SenderDisplayName: msg.Sender.DisplayName,
UserMessage: msg.Content,
Media: msg.Media,
DefaultResponse: defaultResponse,
EnableSummary: true,
SendResponse: false,
}
// context-dependent commands check their own Runtime fields and report
@@ -641,7 +785,8 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
}
func (al *AgentLoop) resolveMessageRoute(msg bus.InboundMessage) (routing.ResolvedRoute, *AgentInstance, error) {
route := al.registry.ResolveRoute(routing.RouteInput{
registry := al.GetRegistry()
route := registry.ResolveRoute(routing.RouteInput{
Channel: msg.Channel,
AccountID: inboundMetadata(msg, metadataKeyAccountID),
Peer: extractPeer(msg),
@@ -650,9 +795,9 @@ func (al *AgentLoop) resolveMessageRoute(msg bus.InboundMessage) (routing.Resolv
TeamID: inboundMetadata(msg, metadataKeyTeamID),
})
agent, ok := al.registry.GetAgent(route.AgentID)
agent, ok := registry.GetAgent(route.AgentID)
if !ok {
agent = al.registry.GetDefaultAgent()
agent = registry.GetDefaultAgent()
}
if agent == nil {
return routing.ResolvedRoute{}, nil, fmt.Errorf("no agent available for route (agent_id=%s)", route.AgentID)
@@ -714,7 +859,7 @@ func (al *AgentLoop) processSystemMessage(
}
// Use default agent for system messages
agent := al.registry.GetDefaultAgent()
agent := al.GetRegistry().GetDefaultAgent()
if agent == nil {
return "", fmt.Errorf("no default agent for system message")
}
@@ -767,10 +912,13 @@ func (al *AgentLoop) runAgentLoop(
opts.Media,
opts.Channel,
opts.ChatID,
opts.SenderID,
opts.SenderDisplayName,
)
// Resolve media:// refs to base64 data URLs (streaming)
maxMediaSize := al.cfg.Agents.Defaults.GetMaxMediaSize()
// Resolve media:// refs: images→base64 data URLs, non-images→local paths in content
cfg := al.GetConfig()
maxMediaSize := cfg.Agents.Defaults.GetMaxMediaSize()
messages = resolveMediaRefs(messages, al.mediaStore, maxMediaSize)
// 2. Save user message to session
@@ -906,6 +1054,19 @@ func (al *AgentLoop) runLLMIteration(
// Build tool definitions
providerToolDefs := agent.Tools.ToProviderDefs()
// Determine whether the provider's native web search should replace
// the client-side web_search tool for this request. Only enable when web
// search is actually enabled and registered (so users who disabled web
// access do not get provider-side search or billing).
_, hasWebSearch := agent.Tools.Get("web_search")
useNativeSearch := al.cfg.Tools.Web.PreferNative &&
isNativeSearchProvider(agent.Provider) &&
hasWebSearch
if useNativeSearch {
providerToolDefs = filterClientWebSearch(providerToolDefs)
}
// Log LLM request details
logger.DebugCF("agent", "LLM request",
map[string]any{
@@ -914,6 +1075,7 @@ func (al *AgentLoop) runLLMIteration(
"model": activeModel,
"messages_count": len(messages),
"tools_count": len(providerToolDefs),
"native_search": useNativeSearch,
"max_tokens": agent.MaxTokens,
"temperature": agent.Temperature,
"system_prompt_len": len(messages[0].Content),
@@ -936,6 +1098,9 @@ func (al *AgentLoop) runLLMIteration(
"temperature": agent.Temperature,
"prompt_cache_key": agent.ID,
}
if useNativeSearch {
llmOpts["native_search"] = true
}
// parseThinkingLevel guarantees ThinkingOff for empty/unknown values,
// so checking != ThinkingOff is sufficient.
if agent.ThinkingLevel != ThinkingOff {
@@ -948,6 +1113,9 @@ func (al *AgentLoop) runLLMIteration(
}
callLLM := func() (*providers.LLMResponse, error) {
al.activeRequests.Add(1)
defer al.activeRequests.Done()
if len(activeCandidates) > 1 && al.fallback != nil {
fbResult, fbErr := al.fallback.Execute(
ctx,
@@ -1034,7 +1202,7 @@ func (al *AgentLoop) runLLMIteration(
newSummary := agent.Sessions.GetSummary(opts.SessionKey)
messages = agent.ContextBuilder.BuildMessages(
newHistory, newSummary, "",
nil, opts.Channel, opts.ChatID,
nil, opts.Channel, opts.ChatID, opts.SenderID, opts.SenderDisplayName,
)
continue
}
@@ -1046,6 +1214,7 @@ func (al *AgentLoop) runLLMIteration(
map[string]any{
"agent_id": agent.ID,
"iteration": iteration,
"model": activeModel,
"error": err.Error(),
})
return "", iteration, fmt.Errorf("LLM call failed after retries: %w", err)
@@ -1397,7 +1566,8 @@ func (al *AgentLoop) forceCompression(agent *AgentInstance, sessionKey string) {
func (al *AgentLoop) GetStartupInfo() map[string]any {
info := make(map[string]any)
agent := al.registry.GetDefaultAgent()
registry := al.GetRegistry()
agent := registry.GetDefaultAgent()
if agent == nil {
return info
}
@@ -1414,8 +1584,8 @@ func (al *AgentLoop) GetStartupInfo() map[string]any {
// Agents info
info["agents"] = map[string]any{
"count": len(al.registry.ListAgentIDs()),
"ids": al.registry.ListAgentIDs(),
"count": len(registry.ListAgentIDs()),
"ids": registry.ListAgentIDs(),
}
return info
@@ -1603,17 +1773,22 @@ func (al *AgentLoop) retryLLMCall(
var err error
for attempt := 0; attempt < maxRetries; attempt++ {
resp, err = agent.Provider.Chat(
ctx,
[]providers.Message{{Role: "user", Content: prompt}},
nil,
agent.Model,
map[string]any{
"max_tokens": agent.MaxTokens,
"temperature": llmTemperature,
"prompt_cache_key": agent.ID,
},
)
al.activeRequests.Add(1)
resp, err = func() (*providers.LLMResponse, error) {
defer al.activeRequests.Done()
return agent.Provider.Chat(
ctx,
[]providers.Message{{Role: "user", Content: prompt}},
nil,
agent.Model,
map[string]any{
"max_tokens": agent.MaxTokens,
"temperature": llmTemperature,
"prompt_cache_key": agent.ID,
},
)
}()
if err == nil && resp != nil && resp.Content != "" {
return resp, nil
}
@@ -1746,9 +1921,11 @@ func (al *AgentLoop) handleCommand(
}
func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOptions) *commands.Runtime {
registry := al.GetRegistry()
cfg := al.GetConfig()
rt := &commands.Runtime{
Config: al.cfg,
ListAgentIDs: al.registry.ListAgentIDs,
Config: cfg,
ListAgentIDs: registry.ListAgentIDs,
ListDefinitions: al.cmdRegistry.Definitions,
GetEnabledChannels: func() []string {
if al.channelManager == nil {
@@ -1766,9 +1943,15 @@ func (al *AgentLoop) buildCommandsRuntime(agent *AgentInstance, opts *processOpt
return nil
},
}
rt.ReloadConfig = func() error {
if al.reloadFunc == nil {
return fmt.Errorf("reload not configured")
}
return al.reloadFunc()
}
if agent != nil {
rt.GetModelInfo = func() (string, string) {
return agent.Model, al.cfg.Agents.Defaults.Provider
return agent.Model, cfg.Agents.Defaults.Provider
}
rt.SwitchModel = func(value string) (string, error) {
oldModel := agent.Model
@@ -1832,3 +2015,38 @@ func extractParentPeer(msg bus.InboundMessage) *routing.RoutePeer {
}
return &routing.RoutePeer{Kind: parentKind, ID: parentID}
}
// isNativeSearchProvider reports whether the given LLM provider implements
// NativeSearchCapable and returns true for SupportsNativeSearch.
func isNativeSearchProvider(p providers.LLMProvider) bool {
if ns, ok := p.(providers.NativeSearchCapable); ok {
return ns.SupportsNativeSearch()
}
return false
}
// filterClientWebSearch returns a copy of tools with the client-side
// web_search tool removed. Used when native provider search is preferred.
func filterClientWebSearch(tools []providers.ToolDefinition) []providers.ToolDefinition {
result := make([]providers.ToolDefinition, 0, len(tools))
for _, t := range tools {
if strings.EqualFold(t.Function.Name, "web_search") {
continue
}
result = append(result, t)
}
return result
}
// Helper to extract provider from registry for cleanup
func extractProvider(registry *AgentRegistry) (providers.LLMProvider, bool) {
if registry == nil {
return nil, false
}
// Get any agent to access the provider
defaultAgent := registry.GetDefaultAgent()
if defaultAgent == nil {
return nil, false
}
return defaultAgent.Provider, true
}