From 4d6337fd262a8915fad272b5369cffaf39f3819c Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sun, 26 Apr 2026 19:28:26 +0800 Subject: [PATCH] fix runtime event logger reload and shutdown --- pkg/agent/agent.go | 7 +- pkg/agent/agent_init.go | 2 +- pkg/agent/agent_test.go | 5 +- pkg/agent/runtime_event_logger.go | 114 +++++++++++++++++++-- pkg/agent/runtime_event_logger_test.go | 134 +++++++++++++++++++++++++ pkg/events/subscription.go | 24 ++++- pkg/events/subscription_test.go | 59 +++++++++++ 7 files changed, 327 insertions(+), 18 deletions(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index a394a6e97..7b3b8bc11 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -41,6 +41,8 @@ type AgentLoop struct { // Runtime event system runtimeEvents runtimeevents.Bus ownsRuntimeEvents bool + runtimeEventLogMu sync.RWMutex + runtimeEventLogger *runtimeEventLogger runtimeEventLogSub runtimeevents.Subscription hooks *HookManager @@ -286,9 +288,7 @@ func (al *AgentLoop) Close() { if al.hooks != nil { al.hooks.Close() } - if al.runtimeEventLogSub != nil { - _ = al.runtimeEventLogSub.Close() - } + al.closeRuntimeEventLogger() if al.runtimeEvents != nil && al.ownsRuntimeEvents { if err := al.runtimeEvents.Close(); err != nil { logger.ErrorCF("agent", "Failed to close runtime event bus", @@ -387,6 +387,7 @@ func (al *AgentLoop) ReloadProviderAndConfig( al.fallback = providers.NewFallbackChain(providers.NewCooldownTracker(), newRL) al.mu.Unlock() + al.refreshRuntimeEventLogger(cfg) oldMCPManager := al.mcp.reset() al.hookRuntime.reset(al) diff --git a/pkg/agent/agent_init.go b/pkg/agent/agent_init.go index b8e535c36..035432e2e 100644 --- a/pkg/agent/agent_init.go +++ b/pkg/agent/agent_init.go @@ -75,7 +75,7 @@ func NewAgentLoop( al.runtimeEvents = runtimeevents.NewBus() al.ownsRuntimeEvents = true } - al.runtimeEventLogSub = subscribeRuntimeEventLogger(cfg, al.runtimeEvents) + al.refreshRuntimeEventLogger(cfg) al.providerFactory = providers.CreateProviderFromConfig al.hooks = NewHookManager(al.runtimeEvents.Channel()) configureHookManagerFromConfig(al.hooks, cfg) diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 13c5b21a0..1005be512 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -5190,6 +5190,7 @@ func TestParallelMessageProcessing_SameSessionProcessedSequentially(t *testing.T var mu sync.Mutex turnIDs := make(map[string]bool) var wg sync.WaitGroup + var firstResponse sync.Once wg.Add(1) // Only 1 turn should be created for same session cfg := &config.Config{ @@ -5212,7 +5213,9 @@ func TestParallelMessageProcessing_SameSessionProcessedSequentially(t *testing.T al := NewAgentLoop(cfg, msgBus, &concurrentMockProvider{ responseFunc: func(callID int) string { - wg.Done() + firstResponse.Do(func() { + wg.Done() + }) return "ok" }, }) diff --git a/pkg/agent/runtime_event_logger.go b/pkg/agent/runtime_event_logger.go index 4922c9021..1035ffe35 100644 --- a/pkg/agent/runtime_event_logger.go +++ b/pkg/agent/runtime_event_logger.go @@ -5,6 +5,7 @@ import ( "fmt" "path" "strings" + "sync" "time" "github.com/sipeed/picoclaw/pkg/config" @@ -12,20 +13,85 @@ import ( "github.com/sipeed/picoclaw/pkg/logger" ) -const runtimeEventLoggerBuffer = 256 +const ( + runtimeEventLoggerBuffer = 256 + runtimeEventLoggerDrainTimeout = 2 * time.Second +) type runtimeEventLogger struct { + mu sync.RWMutex cfg config.EventLoggingConfig } -func subscribeRuntimeEventLogger(cfg *config.Config, eventBus runtimeevents.Bus) runtimeevents.Subscription { - eventLogger := newRuntimeEventLogger(cfg) - sub, err := eventLogger.subscribe(context.Background(), eventBus) +func (al *AgentLoop) refreshRuntimeEventLogger(cfg *config.Config) { + if al == nil { + return + } + logCfg := config.EffectiveEventLoggingConfig(cfg) + + al.runtimeEventLogMu.Lock() + if !logCfg.Enabled { + oldSub := al.runtimeEventLogSub + al.runtimeEventLogger = nil + al.runtimeEventLogSub = nil + al.runtimeEventLogMu.Unlock() + closeRuntimeEventLoggerSubscription(oldSub) + return + } + + if al.runtimeEventLogger != nil && al.runtimeEventLogSub != nil { + al.runtimeEventLogger.updateConfig(logCfg) + al.runtimeEventLogMu.Unlock() + return + } + al.runtimeEventLogMu.Unlock() + + eventLogger := newRuntimeEventLoggerFromConfig(logCfg) + sub, err := eventLogger.subscribe(context.Background(), al.runtimeEvents) if err != nil { logger.WarnCF("events", "Failed to subscribe runtime event logger", map[string]any{"error": err.Error()}) - return nil + return + } + + al.runtimeEventLogMu.Lock() + oldSub := al.runtimeEventLogSub + al.runtimeEventLogger = eventLogger + al.runtimeEventLogSub = sub + al.runtimeEventLogMu.Unlock() + closeRuntimeEventLoggerSubscription(oldSub) +} + +func (al *AgentLoop) closeRuntimeEventLogger() { + if al == nil { + return + } + al.runtimeEventLogMu.Lock() + oldSub := al.runtimeEventLogSub + al.runtimeEventLogger = nil + al.runtimeEventLogSub = nil + al.runtimeEventLogMu.Unlock() + closeRuntimeEventLoggerSubscription(oldSub) +} + +func closeRuntimeEventLoggerSubscription(sub runtimeevents.Subscription) { + if sub == nil { + return + } + if err := sub.Close(); err != nil { + logger.WarnCF("events", "Failed to close runtime event logger subscription", map[string]any{ + "error": err.Error(), + }) + } + + timer := time.NewTimer(runtimeEventLoggerDrainTimeout) + defer timer.Stop() + select { + case <-sub.Done(): + case <-timer.C: + logger.WarnCF("events", "Timed out waiting for runtime event logger to drain", map[string]any{ + "timeout": runtimeEventLoggerDrainTimeout.String(), + }) } - return sub } func newRuntimeEventLogger(cfg *config.Config) *runtimeEventLogger { @@ -33,9 +99,31 @@ func newRuntimeEventLogger(cfg *config.Config) *runtimeEventLogger { if !logCfg.Enabled { return nil } + return newRuntimeEventLoggerFromConfig(logCfg) +} + +func newRuntimeEventLoggerFromConfig(logCfg config.EventLoggingConfig) *runtimeEventLogger { return &runtimeEventLogger{cfg: logCfg} } +func (l *runtimeEventLogger) updateConfig(cfg config.EventLoggingConfig) { + if l == nil { + return + } + l.mu.Lock() + l.cfg = cfg + l.mu.Unlock() +} + +func (l *runtimeEventLogger) configSnapshot() config.EventLoggingConfig { + if l == nil { + return config.EventLoggingConfig{} + } + l.mu.RLock() + defer l.mu.RUnlock() + return l.cfg +} + func (l *runtimeEventLogger) subscribe( ctx context.Context, eventBus runtimeevents.Bus, @@ -58,7 +146,7 @@ func (l *runtimeEventLogger) handle(_ context.Context, evt runtimeevents.Event) } fields := runtimeEventLogFields(evt) - if l.cfg.IncludePayload && evt.Payload != nil { + if l.configSnapshot().IncludePayload && evt.Payload != nil { fields["payload"] = evt.Payload } @@ -67,18 +155,22 @@ func (l *runtimeEventLogger) handle(_ context.Context, evt runtimeevents.Event) } func (l *runtimeEventLogger) shouldLog(evt runtimeevents.Event) bool { - if l == nil || !l.cfg.Enabled { + if l == nil { return false } - if runtimeEventSeverityRank(evt.Severity) < runtimeEventSeverityRank(parseRuntimeEventSeverity(l.cfg.MinSeverity)) { + cfg := l.configSnapshot() + if !cfg.Enabled { + return false + } + if runtimeEventSeverityRank(evt.Severity) < runtimeEventSeverityRank(parseRuntimeEventSeverity(cfg.MinSeverity)) { return false } kind := evt.Kind.String() - if !matchAnyRuntimeEventPattern(l.cfg.Include, kind, true) { + if !matchAnyRuntimeEventPattern(cfg.Include, kind, true) { return false } - return !matchAnyRuntimeEventPattern(l.cfg.Exclude, kind, false) + return !matchAnyRuntimeEventPattern(cfg.Exclude, kind, false) } func logRuntimeEvent(evt runtimeevents.Event, fields map[string]any) { diff --git a/pkg/agent/runtime_event_logger_test.go b/pkg/agent/runtime_event_logger_test.go index d056eb15c..a64529314 100644 --- a/pkg/agent/runtime_event_logger_test.go +++ b/pkg/agent/runtime_event_logger_test.go @@ -1,8 +1,12 @@ package agent import ( + "context" + "sync/atomic" "testing" + "time" + "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" runtimeevents "github.com/sipeed/picoclaw/pkg/events" ) @@ -97,3 +101,133 @@ func TestRuntimeEventLogFieldsSummarizeAgentPayload(t *testing.T) { t.Fatalf("raw payload should not be included by runtimeEventLogFields: %#v", fields) } } + +func runtimeEventLoggerStateForTest( + al *AgentLoop, +) (*runtimeEventLogger, runtimeevents.Subscription) { + al.runtimeEventLogMu.RLock() + defer al.runtimeEventLogMu.RUnlock() + return al.runtimeEventLogger, al.runtimeEventLogSub +} + +func TestReloadProviderAndConfigRefreshesRuntimeEventLogger(t *testing.T) { + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = t.TempDir() + cfg.Events.Logging.Include = []string{"agent.*"} + + al := NewAgentLoop(cfg, bus.NewMessageBus(), &mockProvider{}) + defer al.Close() + + eventLogger, logSub := runtimeEventLoggerStateForTest(al) + if eventLogger == nil || logSub == nil { + t.Fatal("expected initial runtime event logger subscription") + } + if eventLogger.shouldLog(runtimeevents.Event{ + Kind: runtimeevents.KindGatewayReloadCompleted, + Severity: runtimeevents.SeverityInfo, + }) { + t.Fatal("initial agent-only logging should not log gateway reload events") + } + + reloaded := config.DefaultConfig() + reloaded.Agents.Defaults.Workspace = cfg.Agents.Defaults.Workspace + reloaded.Events.Logging.Include = []string{"gateway.*"} + if err := al.ReloadProviderAndConfig(context.Background(), &mockProvider{}, reloaded); err != nil { + t.Fatalf("ReloadProviderAndConfig() error = %v", err) + } + + eventLogger, logSub = runtimeEventLoggerStateForTest(al) + if eventLogger == nil || logSub == nil { + t.Fatal("expected runtime event logger subscription after reload") + } + if !eventLogger.shouldLog(runtimeevents.Event{ + Kind: runtimeevents.KindGatewayReloadCompleted, + Severity: runtimeevents.SeverityInfo, + }) { + t.Fatal("reloaded gateway logging should log gateway reload events") + } + if eventLogger.shouldLog(runtimeevents.Event{ + Kind: runtimeevents.KindAgentTurnStart, + Severity: runtimeevents.SeverityInfo, + }) { + t.Fatal("reloaded gateway-only logging should not log agent events") + } + + disabled := config.DefaultConfig() + disabled.Agents.Defaults.Workspace = cfg.Agents.Defaults.Workspace + disabled.Events.Logging.Enabled = false + if err := al.ReloadProviderAndConfig(context.Background(), &mockProvider{}, disabled); err != nil { + t.Fatalf("ReloadProviderAndConfig() with disabled logging error = %v", err) + } + eventLogger, logSub = runtimeEventLoggerStateForTest(al) + if eventLogger != nil || logSub != nil { + t.Fatal("expected runtime event logger to be disabled after reload") + } +} + +func TestCloseRuntimeEventLoggerSubscriptionWaitsForDrain(t *testing.T) { + eventBus := runtimeevents.NewBus() + defer func() { + if err := eventBus.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + }() + + var handled atomic.Uint64 + firstStarted := make(chan struct{}) + releaseFirst := make(chan struct{}) + sub, err := eventBus.Channel().Subscribe( + context.Background(), + runtimeevents.SubscribeOptions{ + Name: "runtime-event-logger", + Buffer: 2, + Concurrency: runtimeevents.Locked, + }, + func(context.Context, runtimeevents.Event) error { + if handled.Add(1) == 1 { + close(firstStarted) + <-releaseFirst + } + return nil + }, + ) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + + first := eventBus.Publish(context.Background(), runtimeevents.Event{Kind: runtimeevents.Kind("test.first")}) + if first.Delivered != 1 { + t.Fatalf("first Publish = %+v, want one delivered event", first) + } + select { + case <-firstStarted: + case <-time.After(time.Second): + t.Fatal("timed out waiting for first handler to start") + } + second := eventBus.Publish(context.Background(), runtimeevents.Event{Kind: runtimeevents.Kind("test.second")}) + if second.Delivered != 1 { + t.Fatalf("second Publish = %+v, want one delivered event", second) + } + + closeReturned := make(chan struct{}) + go func() { + closeRuntimeEventLoggerSubscription(sub) + close(closeReturned) + }() + + select { + case <-closeReturned: + t.Fatal("runtime event logger close returned before buffered events drained") + case <-time.After(50 * time.Millisecond): + } + + close(releaseFirst) + select { + case <-closeReturned: + case <-time.After(time.Second): + t.Fatal("timed out waiting for runtime event logger close to return") + } + if got := handled.Load(); got != 2 { + t.Fatalf("handled = %d, want 2", got) + } +} diff --git a/pkg/events/subscription.go b/pkg/events/subscription.go index 5ff47eddd..1b3977300 100644 --- a/pkg/events/subscription.go +++ b/pkg/events/subscription.go @@ -102,6 +102,7 @@ type eventSubscription struct { mu sync.RWMutex closed bool wg sync.WaitGroup + blockWG sync.WaitGroup counters subscriberCounters } @@ -278,6 +279,9 @@ func (s *eventSubscription) closeInput() { close(s.closing) s.mu.Lock() s.closed = true + s.mu.Unlock() + s.blockWG.Wait() + s.mu.Lock() close(s.ch) s.mu.Unlock() if s.handler == nil { @@ -304,6 +308,10 @@ func (s *eventSubscription) enqueue(ctx context.Context, evt Event) deliveryResu ctx = context.Background() } + if s.opts.Backpressure == Block { + return s.enqueueBlocking(ctx, evt) + } + s.mu.RLock() defer s.mu.RUnlock() @@ -316,13 +324,25 @@ func (s *eventSubscription) enqueue(ctx context.Context, evt Event) deliveryResu switch s.opts.Backpressure { case DropOldest: return s.enqueueDropOldest(evt) - case Block: - return s.enqueueBlock(ctx, evt) default: return s.enqueueDropNewest(evt) } } +func (s *eventSubscription) enqueueBlocking(ctx context.Context, evt Event) deliveryResult { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return deliveryResult{closed: true} + } + s.blockWG.Add(1) + s.counters.received.Add(1) + s.mu.Unlock() + + defer s.blockWG.Done() + return s.enqueueBlock(ctx, evt) +} + func (s *eventSubscription) enqueueDropNewest(evt Event) deliveryResult { select { case <-s.closing: diff --git a/pkg/events/subscription_test.go b/pkg/events/subscription_test.go index 8cb5f58dc..44d4be64b 100644 --- a/pkg/events/subscription_test.go +++ b/pkg/events/subscription_test.go @@ -63,6 +63,65 @@ func TestUnsubscribeClosesChannel(t *testing.T) { waitForSubscriptionDone(t, sub) } +func TestBlockBackpressureCloseUnblocksPublisher(t *testing.T) { + t.Parallel() + + bus := NewBus() + defer closeBus(t, bus) + + sub, _, err := bus.Channel().SubscribeChan(context.Background(), SubscribeOptions{ + Name: "block-close", + Buffer: 1, + Backpressure: Block, + }) + if err != nil { + t.Fatalf("SubscribeChan failed: %v", err) + } + + first := bus.Publish(context.Background(), Event{Kind: Kind("test.first")}) + if first.Delivered != 1 { + t.Fatalf("first Publish = %+v, want one delivered event", first) + } + + publishStarted := make(chan struct{}) + publishReturned := make(chan PublishResult, 1) + go func() { + close(publishStarted) + publishReturned <- bus.Publish(context.Background(), Event{Kind: Kind("test.second")}) + }() + + <-publishStarted + waitForStat(t, func() uint64 { + return sub.Stats().Received + }, 2) + select { + case result := <-publishReturned: + t.Fatalf("blocking Publish returned before close: %+v", result) + default: + } + + closeReturned := make(chan error, 1) + go func() { + closeReturned <- sub.Close() + }() + + select { + case err := <-closeReturned: + if err != nil { + t.Fatalf("Close failed: %v", err) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for Close to unblock") + } + + select { + case <-publishReturned: + case <-time.After(time.Second): + t.Fatal("timed out waiting for blocking Publish to return after close") + } + waitForSubscriptionDone(t, sub) +} + func TestHandlerPanicRecovered(t *testing.T) { t.Parallel()