From 0f2353516582b1562477b09ea6e5bfbacb5e77c1 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Mon, 13 Apr 2026 12:35:27 +0800 Subject: [PATCH] fix(runtime): address session promotion and steering regressions --- pkg/agent/loop.go | 26 ++++--- pkg/agent/steering_test.go | 4 +- pkg/bus/bus_test.go | 26 +++++++ pkg/memory/jsonl.go | 116 ++++++++++++++++++++++++++++++ pkg/session/jsonl_backend.go | 11 +++ pkg/session/jsonl_backend_test.go | 20 ++++++ 6 files changed, 190 insertions(+), 13 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 1512ff824..1d9e61970 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -607,6 +607,19 @@ func (al *AgentLoop) Run(ctx context.Context) error { // immediately available messages, blocking for the first one until ctx is done. func (al *AgentLoop) drainBusToSteering(ctx context.Context, activeScope, activeAgentID string) { blocking := true + var requeue []bus.InboundMessage + defer func() { + for _, msg := range requeue { + if err := al.requeueInboundMessage(msg); err != nil { + logger.WarnCF("agent", "Failed to flush requeued inbound message", map[string]any{ + "error": err.Error(), + "channel": msg.Channel, + "sender_id": msg.SenderID, + }) + } + } + }() + for { var msg bus.InboundMessage @@ -637,13 +650,7 @@ func (al *AgentLoop) drainBusToSteering(ctx context.Context, activeScope, active msgScope, _, scopeOK := al.resolveSteeringTarget(msg) if !scopeOK || msgScope != activeScope { - if err := al.requeueInboundMessage(msg); err != nil { - logger.WarnCF("agent", "Failed to requeue non-steering inbound message", map[string]any{ - "error": err.Error(), - "channel": msg.Channel, - "sender_id": msg.SenderID, - }) - } + requeue = append(requeue, msg) continue } @@ -1706,10 +1713,7 @@ func (al *AgentLoop) requeueInboundMessage(msg bus.InboundMessage) error { } pubCtx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - return al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{ - Context: msg.Context, - Content: msg.Content, - }) + return al.bus.PublishInbound(pubCtx, msg) } func (al *AgentLoop) processSystemMessage( diff --git a/pkg/agent/steering_test.go b/pkg/agent/steering_test.go index 9ecd8472a..8e6063f08 100644 --- a/pkg/agent/steering_test.go +++ b/pkg/agent/steering_test.go @@ -421,8 +421,8 @@ func TestDrainBusToSteering_RequeuesDifferentScopeMessage(t *testing.T) { select { case <-ctx.Done(): - t.Fatalf("timeout waiting for requeued message on outbound bus") - case requeued := <-msgBus.OutboundChan(): + t.Fatalf("timeout waiting for requeued message on inbound bus") + case requeued := <-msgBus.InboundChan(): if requeued.Context.Channel != otherMsg.Context.Channel || requeued.Context.ChatID != otherMsg.Context.ChatID || requeued.Content != otherMsg.Content { t.Fatalf("requeued message mismatch: got %+v want %+v", requeued, otherMsg) diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index b261a2df3..e55e9c7a4 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -221,6 +221,32 @@ func TestPublishOutbound_MirrorsContextToLegacyFields(t *testing.T) { } } +func TestPublishOutbound_PreservesExplicitReplyToMessageID(t *testing.T) { + mb := NewMessageBus() + defer mb.Close() + + msg := OutboundMessage{ + Context: InboundContext{ + Channel: "telegram", + ChatID: "chat-42", + }, + ReplyToMessageID: "msg-9", + Content: "reply", + } + + if err := mb.PublishOutbound(context.Background(), msg); err != nil { + t.Fatalf("PublishOutbound failed: %v", err) + } + + got := <-mb.OutboundChan() + if got.ReplyToMessageID != "msg-9" { + t.Fatalf("expected mirrored reply_to_message_id msg-9, got %q", got.ReplyToMessageID) + } + if got.Context.ReplyToMessageID != "msg-9" { + t.Fatalf("expected context reply_to_message_id msg-9, got %q", got.Context.ReplyToMessageID) + } +} + func TestPublishOutboundMedia_MirrorsContextToLegacyFields(t *testing.T) { mb := NewMessageBus() defer mb.Close() diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index f6f9c50f0..a1b794b97 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -223,6 +223,32 @@ func (s *JSONLStore) UpsertSessionMeta( return s.writeMeta(sessionKey, meta) } +// PromoteAliasHistory atomically promotes the first non-empty alias session +// into the canonical session when the canonical session is still empty. +func (s *JSONLStore) PromoteAliasHistory( + _ context.Context, + sessionKey string, + scope json.RawMessage, + aliases []string, +) (bool, error) { + sessionKey = strings.TrimSpace(sessionKey) + if sessionKey == "" { + return false, nil + } + + aliases = normalizeAliases(sessionKey, aliases) + for _, alias := range aliases { + unlock := s.lockSessionPair(sessionKey, alias) + promoted, err := s.promoteAliasHistoryLocked(sessionKey, alias, scope, aliases) + unlock() + if err != nil || promoted { + return promoted, err + } + } + + return false, nil +} + // ResolveSessionKey returns the canonical session key for a candidate key. // It short-circuits direct canonical keys when possible, then scans metadata // once to resolve aliases or canonical metadata keys. @@ -294,6 +320,96 @@ func shouldShortCircuitSessionResolve(sessionKey string) bool { return !strings.ContainsAny(sessionKey, ":/\\") } +func (s *JSONLStore) lockSessionPair(keyA, keyB string) func() { + lockA := s.sessionLock(keyA) + lockB := s.sessionLock(keyB) + if lockA == lockB { + lockA.Lock() + return func() { lockA.Unlock() } + } + if keyA <= keyB { + lockA.Lock() + lockB.Lock() + return func() { + lockB.Unlock() + lockA.Unlock() + } + } + lockB.Lock() + lockA.Lock() + return func() { + lockA.Unlock() + lockB.Unlock() + } +} + +func (s *JSONLStore) promoteAliasHistoryLocked( + sessionKey string, + alias string, + scope json.RawMessage, + aliases []string, +) (bool, error) { + canonicalMeta, err := s.readMeta(sessionKey) + if err != nil { + return false, err + } + canonicalHasContent, err := s.sessionHasVisibleContentLocked(sessionKey, canonicalMeta) + if err != nil { + return false, err + } + if canonicalHasContent { + return false, nil + } + + aliasMeta, err := s.readMeta(alias) + if err != nil { + return false, err + } + aliasHistory, err := readMessages(s.jsonlPath(alias), aliasMeta.Skip) + if err != nil { + return false, err + } + aliasSummary := strings.TrimSpace(aliasMeta.Summary) + if len(aliasHistory) == 0 && aliasSummary == "" { + return false, nil + } + + now := time.Now() + if canonicalMeta.CreatedAt.IsZero() { + canonicalMeta.CreatedAt = now + } + canonicalMeta.Scope = cloneRawJSON(scope) + canonicalMeta.Aliases = normalizeAliases(sessionKey, aliases) + canonicalMeta.Skip = 0 + canonicalMeta.Count = len(aliasHistory) + canonicalMeta.UpdatedAt = now + if aliasSummary != "" { + canonicalMeta.Summary = aliasSummary + } + + if err := s.writeMeta(sessionKey, canonicalMeta); err != nil { + return false, err + } + if err := s.rewriteJSONL(sessionKey, aliasHistory); err != nil { + return false, err + } + return true, nil +} + +func (s *JSONLStore) sessionHasVisibleContentLocked(sessionKey string, meta SessionMeta) (bool, error) { + if meta.Count-meta.Skip > 0 || strings.TrimSpace(meta.Summary) != "" { + return true, nil + } + if meta.Count != 0 || meta.Skip != 0 { + return false, nil + } + history, err := readMessages(s.jsonlPath(sessionKey), meta.Skip) + if err != nil { + return false, err + } + return len(history) > 0, nil +} + // readMessages reads valid JSON lines from a .jsonl file, skipping // the first `skip` lines without unmarshaling them. This avoids the // cost of json.Unmarshal on logically truncated messages. diff --git a/pkg/session/jsonl_backend.go b/pkg/session/jsonl_backend.go index 4e4f96029..2c4eb4e5a 100644 --- a/pkg/session/jsonl_backend.go +++ b/pkg/session/jsonl_backend.go @@ -23,6 +23,10 @@ type metaAwareStore interface { ResolveSessionKey(ctx context.Context, sessionKey string) (string, bool, error) } +type aliasPromotingStore interface { + PromoteAliasHistory(ctx context.Context, sessionKey string, scope json.RawMessage, aliases []string) (bool, error) +} + // MetadataAwareSessionStore exposes structured session metadata operations. type MetadataAwareSessionStore interface { EnsureSessionMetadata(sessionKey string, scope *SessionScope, aliases []string) @@ -84,6 +88,13 @@ func (b *JSONLBackend) EnsureSessionMetadata(sessionKey string, scope *SessionSc return } + if promotingStore, ok := b.store.(aliasPromotingStore); ok { + if _, err := promotingStore.PromoteAliasHistory(ctx, sessionKey, rawScope, aliases); err != nil { + log.Printf("session: promote alias history: %v", err) + } + return + } + canonicalMeta, metaErr := metaStore.GetSessionMeta(ctx, sessionKey) if metaErr != nil { log.Printf("session: get canonical session metadata: %v", metaErr) diff --git a/pkg/session/jsonl_backend_test.go b/pkg/session/jsonl_backend_test.go index 362619125..0b79ad84d 100644 --- a/pkg/session/jsonl_backend_test.go +++ b/pkg/session/jsonl_backend_test.go @@ -282,3 +282,23 @@ func TestJSONLBackend_EnsureSessionMetadata_PromotesLegacyPicoDirectAliasHistory t.Fatalf("promoted history = %+v", history) } } + +func TestJSONLBackend_EnsureSessionMetadata_DoesNotOverwriteNonEmptyCanonicalHistory(t *testing.T) { + b := newBackend(t) + + canonicalKey := session.BuildOpaqueSessionKey("agent:main:direct:current-user") + legacyKey := "agent:main:direct:legacy-user" + + b.AddMessage(canonicalKey, "user", "current canonical history") + b.AddMessage(legacyKey, "user", "legacy history") + + b.EnsureSessionMetadata(canonicalKey, &session.SessionScope{ + Version: session.ScopeVersionV1, + AgentID: "main", + }, []string{legacyKey}) + + history := b.GetHistory(canonicalKey) + if len(history) != 1 || history[0].Content != "current canonical history" { + t.Fatalf("canonical history overwritten: %+v", history) + } +}