fix runtime event logger reload and shutdown

This commit is contained in:
Hoshina
2026-04-26 19:28:26 +08:00
parent b3d9f86a01
commit 4d6337fd26
7 changed files with 327 additions and 18 deletions
+4 -3
View File
@@ -41,6 +41,8 @@ type AgentLoop struct {
// Runtime event system
runtimeEvents runtimeevents.Bus
ownsRuntimeEvents bool
runtimeEventLogMu sync.RWMutex
runtimeEventLogger *runtimeEventLogger
runtimeEventLogSub runtimeevents.Subscription
hooks *HookManager
@@ -286,9 +288,7 @@ func (al *AgentLoop) Close() {
if al.hooks != nil {
al.hooks.Close()
}
if al.runtimeEventLogSub != nil {
_ = al.runtimeEventLogSub.Close()
}
al.closeRuntimeEventLogger()
if al.runtimeEvents != nil && al.ownsRuntimeEvents {
if err := al.runtimeEvents.Close(); err != nil {
logger.ErrorCF("agent", "Failed to close runtime event bus",
@@ -387,6 +387,7 @@ func (al *AgentLoop) ReloadProviderAndConfig(
al.fallback = providers.NewFallbackChain(providers.NewCooldownTracker(), newRL)
al.mu.Unlock()
al.refreshRuntimeEventLogger(cfg)
oldMCPManager := al.mcp.reset()
al.hookRuntime.reset(al)
+1 -1
View File
@@ -75,7 +75,7 @@ func NewAgentLoop(
al.runtimeEvents = runtimeevents.NewBus()
al.ownsRuntimeEvents = true
}
al.runtimeEventLogSub = subscribeRuntimeEventLogger(cfg, al.runtimeEvents)
al.refreshRuntimeEventLogger(cfg)
al.providerFactory = providers.CreateProviderFromConfig
al.hooks = NewHookManager(al.runtimeEvents.Channel())
configureHookManagerFromConfig(al.hooks, cfg)
+4 -1
View File
@@ -5190,6 +5190,7 @@ func TestParallelMessageProcessing_SameSessionProcessedSequentially(t *testing.T
var mu sync.Mutex
turnIDs := make(map[string]bool)
var wg sync.WaitGroup
var firstResponse sync.Once
wg.Add(1) // Only 1 turn should be created for same session
cfg := &config.Config{
@@ -5212,7 +5213,9 @@ func TestParallelMessageProcessing_SameSessionProcessedSequentially(t *testing.T
al := NewAgentLoop(cfg, msgBus, &concurrentMockProvider{
responseFunc: func(callID int) string {
wg.Done()
firstResponse.Do(func() {
wg.Done()
})
return "ok"
},
})
+103 -11
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"path"
"strings"
"sync"
"time"
"github.com/sipeed/picoclaw/pkg/config"
@@ -12,20 +13,85 @@ import (
"github.com/sipeed/picoclaw/pkg/logger"
)
const runtimeEventLoggerBuffer = 256
const (
runtimeEventLoggerBuffer = 256
runtimeEventLoggerDrainTimeout = 2 * time.Second
)
type runtimeEventLogger struct {
mu sync.RWMutex
cfg config.EventLoggingConfig
}
func subscribeRuntimeEventLogger(cfg *config.Config, eventBus runtimeevents.Bus) runtimeevents.Subscription {
eventLogger := newRuntimeEventLogger(cfg)
sub, err := eventLogger.subscribe(context.Background(), eventBus)
func (al *AgentLoop) refreshRuntimeEventLogger(cfg *config.Config) {
if al == nil {
return
}
logCfg := config.EffectiveEventLoggingConfig(cfg)
al.runtimeEventLogMu.Lock()
if !logCfg.Enabled {
oldSub := al.runtimeEventLogSub
al.runtimeEventLogger = nil
al.runtimeEventLogSub = nil
al.runtimeEventLogMu.Unlock()
closeRuntimeEventLoggerSubscription(oldSub)
return
}
if al.runtimeEventLogger != nil && al.runtimeEventLogSub != nil {
al.runtimeEventLogger.updateConfig(logCfg)
al.runtimeEventLogMu.Unlock()
return
}
al.runtimeEventLogMu.Unlock()
eventLogger := newRuntimeEventLoggerFromConfig(logCfg)
sub, err := eventLogger.subscribe(context.Background(), al.runtimeEvents)
if err != nil {
logger.WarnCF("events", "Failed to subscribe runtime event logger", map[string]any{"error": err.Error()})
return nil
return
}
al.runtimeEventLogMu.Lock()
oldSub := al.runtimeEventLogSub
al.runtimeEventLogger = eventLogger
al.runtimeEventLogSub = sub
al.runtimeEventLogMu.Unlock()
closeRuntimeEventLoggerSubscription(oldSub)
}
func (al *AgentLoop) closeRuntimeEventLogger() {
if al == nil {
return
}
al.runtimeEventLogMu.Lock()
oldSub := al.runtimeEventLogSub
al.runtimeEventLogger = nil
al.runtimeEventLogSub = nil
al.runtimeEventLogMu.Unlock()
closeRuntimeEventLoggerSubscription(oldSub)
}
func closeRuntimeEventLoggerSubscription(sub runtimeevents.Subscription) {
if sub == nil {
return
}
if err := sub.Close(); err != nil {
logger.WarnCF("events", "Failed to close runtime event logger subscription", map[string]any{
"error": err.Error(),
})
}
timer := time.NewTimer(runtimeEventLoggerDrainTimeout)
defer timer.Stop()
select {
case <-sub.Done():
case <-timer.C:
logger.WarnCF("events", "Timed out waiting for runtime event logger to drain", map[string]any{
"timeout": runtimeEventLoggerDrainTimeout.String(),
})
}
return sub
}
func newRuntimeEventLogger(cfg *config.Config) *runtimeEventLogger {
@@ -33,9 +99,31 @@ func newRuntimeEventLogger(cfg *config.Config) *runtimeEventLogger {
if !logCfg.Enabled {
return nil
}
return newRuntimeEventLoggerFromConfig(logCfg)
}
func newRuntimeEventLoggerFromConfig(logCfg config.EventLoggingConfig) *runtimeEventLogger {
return &runtimeEventLogger{cfg: logCfg}
}
func (l *runtimeEventLogger) updateConfig(cfg config.EventLoggingConfig) {
if l == nil {
return
}
l.mu.Lock()
l.cfg = cfg
l.mu.Unlock()
}
func (l *runtimeEventLogger) configSnapshot() config.EventLoggingConfig {
if l == nil {
return config.EventLoggingConfig{}
}
l.mu.RLock()
defer l.mu.RUnlock()
return l.cfg
}
func (l *runtimeEventLogger) subscribe(
ctx context.Context,
eventBus runtimeevents.Bus,
@@ -58,7 +146,7 @@ func (l *runtimeEventLogger) handle(_ context.Context, evt runtimeevents.Event)
}
fields := runtimeEventLogFields(evt)
if l.cfg.IncludePayload && evt.Payload != nil {
if l.configSnapshot().IncludePayload && evt.Payload != nil {
fields["payload"] = evt.Payload
}
@@ -67,18 +155,22 @@ func (l *runtimeEventLogger) handle(_ context.Context, evt runtimeevents.Event)
}
func (l *runtimeEventLogger) shouldLog(evt runtimeevents.Event) bool {
if l == nil || !l.cfg.Enabled {
if l == nil {
return false
}
if runtimeEventSeverityRank(evt.Severity) < runtimeEventSeverityRank(parseRuntimeEventSeverity(l.cfg.MinSeverity)) {
cfg := l.configSnapshot()
if !cfg.Enabled {
return false
}
if runtimeEventSeverityRank(evt.Severity) < runtimeEventSeverityRank(parseRuntimeEventSeverity(cfg.MinSeverity)) {
return false
}
kind := evt.Kind.String()
if !matchAnyRuntimeEventPattern(l.cfg.Include, kind, true) {
if !matchAnyRuntimeEventPattern(cfg.Include, kind, true) {
return false
}
return !matchAnyRuntimeEventPattern(l.cfg.Exclude, kind, false)
return !matchAnyRuntimeEventPattern(cfg.Exclude, kind, false)
}
func logRuntimeEvent(evt runtimeevents.Event, fields map[string]any) {
+134
View File
@@ -1,8 +1,12 @@
package agent
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
runtimeevents "github.com/sipeed/picoclaw/pkg/events"
)
@@ -97,3 +101,133 @@ func TestRuntimeEventLogFieldsSummarizeAgentPayload(t *testing.T) {
t.Fatalf("raw payload should not be included by runtimeEventLogFields: %#v", fields)
}
}
func runtimeEventLoggerStateForTest(
al *AgentLoop,
) (*runtimeEventLogger, runtimeevents.Subscription) {
al.runtimeEventLogMu.RLock()
defer al.runtimeEventLogMu.RUnlock()
return al.runtimeEventLogger, al.runtimeEventLogSub
}
func TestReloadProviderAndConfigRefreshesRuntimeEventLogger(t *testing.T) {
cfg := config.DefaultConfig()
cfg.Agents.Defaults.Workspace = t.TempDir()
cfg.Events.Logging.Include = []string{"agent.*"}
al := NewAgentLoop(cfg, bus.NewMessageBus(), &mockProvider{})
defer al.Close()
eventLogger, logSub := runtimeEventLoggerStateForTest(al)
if eventLogger == nil || logSub == nil {
t.Fatal("expected initial runtime event logger subscription")
}
if eventLogger.shouldLog(runtimeevents.Event{
Kind: runtimeevents.KindGatewayReloadCompleted,
Severity: runtimeevents.SeverityInfo,
}) {
t.Fatal("initial agent-only logging should not log gateway reload events")
}
reloaded := config.DefaultConfig()
reloaded.Agents.Defaults.Workspace = cfg.Agents.Defaults.Workspace
reloaded.Events.Logging.Include = []string{"gateway.*"}
if err := al.ReloadProviderAndConfig(context.Background(), &mockProvider{}, reloaded); err != nil {
t.Fatalf("ReloadProviderAndConfig() error = %v", err)
}
eventLogger, logSub = runtimeEventLoggerStateForTest(al)
if eventLogger == nil || logSub == nil {
t.Fatal("expected runtime event logger subscription after reload")
}
if !eventLogger.shouldLog(runtimeevents.Event{
Kind: runtimeevents.KindGatewayReloadCompleted,
Severity: runtimeevents.SeverityInfo,
}) {
t.Fatal("reloaded gateway logging should log gateway reload events")
}
if eventLogger.shouldLog(runtimeevents.Event{
Kind: runtimeevents.KindAgentTurnStart,
Severity: runtimeevents.SeverityInfo,
}) {
t.Fatal("reloaded gateway-only logging should not log agent events")
}
disabled := config.DefaultConfig()
disabled.Agents.Defaults.Workspace = cfg.Agents.Defaults.Workspace
disabled.Events.Logging.Enabled = false
if err := al.ReloadProviderAndConfig(context.Background(), &mockProvider{}, disabled); err != nil {
t.Fatalf("ReloadProviderAndConfig() with disabled logging error = %v", err)
}
eventLogger, logSub = runtimeEventLoggerStateForTest(al)
if eventLogger != nil || logSub != nil {
t.Fatal("expected runtime event logger to be disabled after reload")
}
}
func TestCloseRuntimeEventLoggerSubscriptionWaitsForDrain(t *testing.T) {
eventBus := runtimeevents.NewBus()
defer func() {
if err := eventBus.Close(); err != nil {
t.Fatalf("Close failed: %v", err)
}
}()
var handled atomic.Uint64
firstStarted := make(chan struct{})
releaseFirst := make(chan struct{})
sub, err := eventBus.Channel().Subscribe(
context.Background(),
runtimeevents.SubscribeOptions{
Name: "runtime-event-logger",
Buffer: 2,
Concurrency: runtimeevents.Locked,
},
func(context.Context, runtimeevents.Event) error {
if handled.Add(1) == 1 {
close(firstStarted)
<-releaseFirst
}
return nil
},
)
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
first := eventBus.Publish(context.Background(), runtimeevents.Event{Kind: runtimeevents.Kind("test.first")})
if first.Delivered != 1 {
t.Fatalf("first Publish = %+v, want one delivered event", first)
}
select {
case <-firstStarted:
case <-time.After(time.Second):
t.Fatal("timed out waiting for first handler to start")
}
second := eventBus.Publish(context.Background(), runtimeevents.Event{Kind: runtimeevents.Kind("test.second")})
if second.Delivered != 1 {
t.Fatalf("second Publish = %+v, want one delivered event", second)
}
closeReturned := make(chan struct{})
go func() {
closeRuntimeEventLoggerSubscription(sub)
close(closeReturned)
}()
select {
case <-closeReturned:
t.Fatal("runtime event logger close returned before buffered events drained")
case <-time.After(50 * time.Millisecond):
}
close(releaseFirst)
select {
case <-closeReturned:
case <-time.After(time.Second):
t.Fatal("timed out waiting for runtime event logger close to return")
}
if got := handled.Load(); got != 2 {
t.Fatalf("handled = %d, want 2", got)
}
}
+22 -2
View File
@@ -102,6 +102,7 @@ type eventSubscription struct {
mu sync.RWMutex
closed bool
wg sync.WaitGroup
blockWG sync.WaitGroup
counters subscriberCounters
}
@@ -278,6 +279,9 @@ func (s *eventSubscription) closeInput() {
close(s.closing)
s.mu.Lock()
s.closed = true
s.mu.Unlock()
s.blockWG.Wait()
s.mu.Lock()
close(s.ch)
s.mu.Unlock()
if s.handler == nil {
@@ -304,6 +308,10 @@ func (s *eventSubscription) enqueue(ctx context.Context, evt Event) deliveryResu
ctx = context.Background()
}
if s.opts.Backpressure == Block {
return s.enqueueBlocking(ctx, evt)
}
s.mu.RLock()
defer s.mu.RUnlock()
@@ -316,13 +324,25 @@ func (s *eventSubscription) enqueue(ctx context.Context, evt Event) deliveryResu
switch s.opts.Backpressure {
case DropOldest:
return s.enqueueDropOldest(evt)
case Block:
return s.enqueueBlock(ctx, evt)
default:
return s.enqueueDropNewest(evt)
}
}
func (s *eventSubscription) enqueueBlocking(ctx context.Context, evt Event) deliveryResult {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return deliveryResult{closed: true}
}
s.blockWG.Add(1)
s.counters.received.Add(1)
s.mu.Unlock()
defer s.blockWG.Done()
return s.enqueueBlock(ctx, evt)
}
func (s *eventSubscription) enqueueDropNewest(evt Event) deliveryResult {
select {
case <-s.closing:
+59
View File
@@ -63,6 +63,65 @@ func TestUnsubscribeClosesChannel(t *testing.T) {
waitForSubscriptionDone(t, sub)
}
func TestBlockBackpressureCloseUnblocksPublisher(t *testing.T) {
t.Parallel()
bus := NewBus()
defer closeBus(t, bus)
sub, _, err := bus.Channel().SubscribeChan(context.Background(), SubscribeOptions{
Name: "block-close",
Buffer: 1,
Backpressure: Block,
})
if err != nil {
t.Fatalf("SubscribeChan failed: %v", err)
}
first := bus.Publish(context.Background(), Event{Kind: Kind("test.first")})
if first.Delivered != 1 {
t.Fatalf("first Publish = %+v, want one delivered event", first)
}
publishStarted := make(chan struct{})
publishReturned := make(chan PublishResult, 1)
go func() {
close(publishStarted)
publishReturned <- bus.Publish(context.Background(), Event{Kind: Kind("test.second")})
}()
<-publishStarted
waitForStat(t, func() uint64 {
return sub.Stats().Received
}, 2)
select {
case result := <-publishReturned:
t.Fatalf("blocking Publish returned before close: %+v", result)
default:
}
closeReturned := make(chan error, 1)
go func() {
closeReturned <- sub.Close()
}()
select {
case err := <-closeReturned:
if err != nil {
t.Fatalf("Close failed: %v", err)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for Close to unblock")
}
select {
case <-publishReturned:
case <-time.After(time.Second):
t.Fatal("timed out waiting for blocking Publish to return after close")
}
waitForSubscriptionDone(t, sub)
}
func TestHandlerPanicRecovered(t *testing.T) {
t.Parallel()