mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
78fd080189
Add a non-blocking runtime publish path and switch hot-path publishers to it. Enforce subscription timeout boundaries, keep ordered subscriber snapshots up to date on subscribe changes, expose all runtime kinds to process hooks, add safe log attrs for non-agent events, and close the gateway message bus on full shutdown.
255 lines
5.5 KiB
Go
255 lines
5.5 KiB
Go
package events
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestSubscribeOnceClosesAfterFirstEvent(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
bus := NewBus()
|
|
defer closeBus(t, bus)
|
|
|
|
var handled atomic.Uint64
|
|
sub, err := bus.Channel().SubscribeOnce(
|
|
context.Background(),
|
|
SubscribeOptions{Name: "once", Buffer: 2},
|
|
func(context.Context, Event) error {
|
|
handled.Add(1)
|
|
return nil
|
|
},
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("SubscribeOnce failed: %v", err)
|
|
}
|
|
|
|
bus.Publish(context.Background(), Event{Kind: KindAgentTurnStart})
|
|
waitForSubscriptionDone(t, sub)
|
|
bus.Publish(context.Background(), Event{Kind: KindAgentTurnEnd})
|
|
|
|
if got := handled.Load(); got != 1 {
|
|
t.Fatalf("handled = %d, want 1", got)
|
|
}
|
|
if got := sub.Stats().Handled; got != 1 {
|
|
t.Fatalf("subscription handled = %d, want 1", got)
|
|
}
|
|
}
|
|
|
|
func TestUnsubscribeClosesChannel(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
bus := NewBus()
|
|
defer closeBus(t, bus)
|
|
|
|
sub, ch, err := bus.Channel().SubscribeChan(context.Background(), SubscribeOptions{Name: "chan"})
|
|
if err != nil {
|
|
t.Fatalf("SubscribeChan failed: %v", err)
|
|
}
|
|
if err := sub.Close(); err != nil {
|
|
t.Fatalf("Close failed: %v", err)
|
|
}
|
|
|
|
select {
|
|
case _, ok := <-ch:
|
|
if ok {
|
|
t.Fatal("channel is open, want closed")
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timed out waiting for channel close")
|
|
}
|
|
waitForSubscriptionDone(t, sub)
|
|
}
|
|
|
|
func TestBlockBackpressureCloseUnblocksPublisher(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
bus := NewBus()
|
|
defer closeBus(t, bus)
|
|
|
|
sub, _, err := bus.Channel().SubscribeChan(context.Background(), SubscribeOptions{
|
|
Name: "block-close",
|
|
Buffer: 1,
|
|
Backpressure: Block,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("SubscribeChan failed: %v", err)
|
|
}
|
|
|
|
first := bus.Publish(context.Background(), Event{Kind: Kind("test.first")})
|
|
if first.Delivered != 1 {
|
|
t.Fatalf("first Publish = %+v, want one delivered event", first)
|
|
}
|
|
|
|
publishStarted := make(chan struct{})
|
|
publishReturned := make(chan PublishResult, 1)
|
|
go func() {
|
|
close(publishStarted)
|
|
publishReturned <- bus.Publish(context.Background(), Event{Kind: Kind("test.second")})
|
|
}()
|
|
|
|
<-publishStarted
|
|
waitForStat(t, func() uint64 {
|
|
return sub.Stats().Received
|
|
}, 2)
|
|
select {
|
|
case result := <-publishReturned:
|
|
t.Fatalf("blocking Publish returned before close: %+v", result)
|
|
default:
|
|
}
|
|
|
|
closeReturned := make(chan error, 1)
|
|
go func() {
|
|
closeReturned <- sub.Close()
|
|
}()
|
|
|
|
select {
|
|
case err := <-closeReturned:
|
|
if err != nil {
|
|
t.Fatalf("Close failed: %v", err)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timed out waiting for Close to unblock")
|
|
}
|
|
|
|
select {
|
|
case <-publishReturned:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timed out waiting for blocking Publish to return after close")
|
|
}
|
|
waitForSubscriptionDone(t, sub)
|
|
}
|
|
|
|
func TestHandlerPanicRecovered(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
bus := NewBus()
|
|
defer closeBus(t, bus)
|
|
|
|
sub, err := bus.Channel().Subscribe(
|
|
context.Background(),
|
|
SubscribeOptions{Name: "panic", Buffer: 1},
|
|
func(context.Context, Event) error {
|
|
panic("boom")
|
|
},
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("Subscribe failed: %v", err)
|
|
}
|
|
|
|
bus.Publish(context.Background(), Event{Kind: KindAgentError})
|
|
waitForStat(t, func() uint64 {
|
|
return sub.Stats().Panicked
|
|
}, 1)
|
|
}
|
|
|
|
func TestLockedHandlerProcessesSequentially(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
bus := NewBus()
|
|
defer closeBus(t, bus)
|
|
|
|
var active atomic.Int64
|
|
var maxActive atomic.Int64
|
|
sub, err := bus.Channel().Subscribe(
|
|
context.Background(),
|
|
SubscribeOptions{Name: "locked", Buffer: 8, Concurrency: Locked},
|
|
func(context.Context, Event) error {
|
|
current := active.Add(1)
|
|
for {
|
|
currentMax := maxActive.Load()
|
|
if current <= currentMax || maxActive.CompareAndSwap(currentMax, current) {
|
|
break
|
|
}
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
active.Add(-1)
|
|
return nil
|
|
},
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("Subscribe failed: %v", err)
|
|
}
|
|
|
|
for i := 0; i < 5; i++ {
|
|
bus.Publish(context.Background(), Event{Kind: KindAgentLLMDelta})
|
|
}
|
|
waitForStat(t, func() uint64 {
|
|
return sub.Stats().Handled
|
|
}, 5)
|
|
|
|
if got := maxActive.Load(); got != 1 {
|
|
t.Fatalf("max active handlers = %d, want 1", got)
|
|
}
|
|
}
|
|
|
|
func TestHandlerTimeoutDoesNotWedgeLockedSubscription(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
bus := NewBus()
|
|
defer closeBus(t, bus)
|
|
|
|
releaseFirst := make(chan struct{})
|
|
defer close(releaseFirst)
|
|
|
|
var calls atomic.Uint64
|
|
sub, err := bus.Channel().Subscribe(
|
|
context.Background(),
|
|
SubscribeOptions{Name: "timeout", Buffer: 2, Concurrency: Locked, Timeout: 20 * time.Millisecond},
|
|
func(context.Context, Event) error {
|
|
if calls.Add(1) == 1 {
|
|
<-releaseFirst
|
|
}
|
|
return nil
|
|
},
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("Subscribe failed: %v", err)
|
|
}
|
|
|
|
bus.Publish(context.Background(), Event{Kind: Kind("test.first")})
|
|
waitForStat(t, func() uint64 {
|
|
return sub.Stats().TimedOut
|
|
}, 1)
|
|
|
|
bus.Publish(context.Background(), Event{Kind: Kind("test.second")})
|
|
waitForStat(t, func() uint64 {
|
|
return sub.Stats().Handled
|
|
}, 1)
|
|
|
|
if got := sub.Stats().Failed; got != 1 {
|
|
t.Fatalf("subscription failed = %d, want timeout failure", got)
|
|
}
|
|
}
|
|
|
|
func waitForSubscriptionDone(t *testing.T, sub Subscription) {
|
|
t.Helper()
|
|
|
|
select {
|
|
case <-sub.Done():
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timed out waiting for subscription to stop")
|
|
}
|
|
}
|
|
|
|
func waitForStat(t *testing.T, stat func() uint64, want uint64) {
|
|
t.Helper()
|
|
|
|
deadline := time.After(time.Second)
|
|
ticker := time.NewTicker(time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
if got := stat(); got >= want {
|
|
return
|
|
}
|
|
select {
|
|
case <-ticker.C:
|
|
case <-deadline:
|
|
t.Fatalf("timed out waiting for stat >= %d", want)
|
|
}
|
|
}
|
|
}
|