fix(runtime): address session promotion and steering regressions

This commit is contained in:
Hoshina
2026-04-13 12:35:27 +08:00
parent 296077eabf
commit 0f23535165
6 changed files with 190 additions and 13 deletions
+15 -11
View File
@@ -607,6 +607,19 @@ func (al *AgentLoop) Run(ctx context.Context) error {
// immediately available messages, blocking for the first one until ctx is done.
func (al *AgentLoop) drainBusToSteering(ctx context.Context, activeScope, activeAgentID string) {
blocking := true
var requeue []bus.InboundMessage
defer func() {
for _, msg := range requeue {
if err := al.requeueInboundMessage(msg); err != nil {
logger.WarnCF("agent", "Failed to flush requeued inbound message", map[string]any{
"error": err.Error(),
"channel": msg.Channel,
"sender_id": msg.SenderID,
})
}
}
}()
for {
var msg bus.InboundMessage
@@ -637,13 +650,7 @@ func (al *AgentLoop) drainBusToSteering(ctx context.Context, activeScope, active
msgScope, _, scopeOK := al.resolveSteeringTarget(msg)
if !scopeOK || msgScope != activeScope {
if err := al.requeueInboundMessage(msg); err != nil {
logger.WarnCF("agent", "Failed to requeue non-steering inbound message", map[string]any{
"error": err.Error(),
"channel": msg.Channel,
"sender_id": msg.SenderID,
})
}
requeue = append(requeue, msg)
continue
}
@@ -1706,10 +1713,7 @@ func (al *AgentLoop) requeueInboundMessage(msg bus.InboundMessage) error {
}
pubCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{
Context: msg.Context,
Content: msg.Content,
})
return al.bus.PublishInbound(pubCtx, msg)
}
func (al *AgentLoop) processSystemMessage(
+2 -2
View File
@@ -421,8 +421,8 @@ func TestDrainBusToSteering_RequeuesDifferentScopeMessage(t *testing.T) {
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for requeued message on outbound bus")
case requeued := <-msgBus.OutboundChan():
t.Fatalf("timeout waiting for requeued message on inbound bus")
case requeued := <-msgBus.InboundChan():
if requeued.Context.Channel != otherMsg.Context.Channel || requeued.Context.ChatID != otherMsg.Context.ChatID ||
requeued.Content != otherMsg.Content {
t.Fatalf("requeued message mismatch: got %+v want %+v", requeued, otherMsg)
+26
View File
@@ -221,6 +221,32 @@ func TestPublishOutbound_MirrorsContextToLegacyFields(t *testing.T) {
}
}
func TestPublishOutbound_PreservesExplicitReplyToMessageID(t *testing.T) {
mb := NewMessageBus()
defer mb.Close()
msg := OutboundMessage{
Context: InboundContext{
Channel: "telegram",
ChatID: "chat-42",
},
ReplyToMessageID: "msg-9",
Content: "reply",
}
if err := mb.PublishOutbound(context.Background(), msg); err != nil {
t.Fatalf("PublishOutbound failed: %v", err)
}
got := <-mb.OutboundChan()
if got.ReplyToMessageID != "msg-9" {
t.Fatalf("expected mirrored reply_to_message_id msg-9, got %q", got.ReplyToMessageID)
}
if got.Context.ReplyToMessageID != "msg-9" {
t.Fatalf("expected context reply_to_message_id msg-9, got %q", got.Context.ReplyToMessageID)
}
}
func TestPublishOutboundMedia_MirrorsContextToLegacyFields(t *testing.T) {
mb := NewMessageBus()
defer mb.Close()
+116
View File
@@ -223,6 +223,32 @@ func (s *JSONLStore) UpsertSessionMeta(
return s.writeMeta(sessionKey, meta)
}
// PromoteAliasHistory atomically promotes the first non-empty alias session
// into the canonical session when the canonical session is still empty.
func (s *JSONLStore) PromoteAliasHistory(
_ context.Context,
sessionKey string,
scope json.RawMessage,
aliases []string,
) (bool, error) {
sessionKey = strings.TrimSpace(sessionKey)
if sessionKey == "" {
return false, nil
}
aliases = normalizeAliases(sessionKey, aliases)
for _, alias := range aliases {
unlock := s.lockSessionPair(sessionKey, alias)
promoted, err := s.promoteAliasHistoryLocked(sessionKey, alias, scope, aliases)
unlock()
if err != nil || promoted {
return promoted, err
}
}
return false, nil
}
// ResolveSessionKey returns the canonical session key for a candidate key.
// It short-circuits direct canonical keys when possible, then scans metadata
// once to resolve aliases or canonical metadata keys.
@@ -294,6 +320,96 @@ func shouldShortCircuitSessionResolve(sessionKey string) bool {
return !strings.ContainsAny(sessionKey, ":/\\")
}
func (s *JSONLStore) lockSessionPair(keyA, keyB string) func() {
lockA := s.sessionLock(keyA)
lockB := s.sessionLock(keyB)
if lockA == lockB {
lockA.Lock()
return func() { lockA.Unlock() }
}
if keyA <= keyB {
lockA.Lock()
lockB.Lock()
return func() {
lockB.Unlock()
lockA.Unlock()
}
}
lockB.Lock()
lockA.Lock()
return func() {
lockA.Unlock()
lockB.Unlock()
}
}
func (s *JSONLStore) promoteAliasHistoryLocked(
sessionKey string,
alias string,
scope json.RawMessage,
aliases []string,
) (bool, error) {
canonicalMeta, err := s.readMeta(sessionKey)
if err != nil {
return false, err
}
canonicalHasContent, err := s.sessionHasVisibleContentLocked(sessionKey, canonicalMeta)
if err != nil {
return false, err
}
if canonicalHasContent {
return false, nil
}
aliasMeta, err := s.readMeta(alias)
if err != nil {
return false, err
}
aliasHistory, err := readMessages(s.jsonlPath(alias), aliasMeta.Skip)
if err != nil {
return false, err
}
aliasSummary := strings.TrimSpace(aliasMeta.Summary)
if len(aliasHistory) == 0 && aliasSummary == "" {
return false, nil
}
now := time.Now()
if canonicalMeta.CreatedAt.IsZero() {
canonicalMeta.CreatedAt = now
}
canonicalMeta.Scope = cloneRawJSON(scope)
canonicalMeta.Aliases = normalizeAliases(sessionKey, aliases)
canonicalMeta.Skip = 0
canonicalMeta.Count = len(aliasHistory)
canonicalMeta.UpdatedAt = now
if aliasSummary != "" {
canonicalMeta.Summary = aliasSummary
}
if err := s.writeMeta(sessionKey, canonicalMeta); err != nil {
return false, err
}
if err := s.rewriteJSONL(sessionKey, aliasHistory); err != nil {
return false, err
}
return true, nil
}
func (s *JSONLStore) sessionHasVisibleContentLocked(sessionKey string, meta SessionMeta) (bool, error) {
if meta.Count-meta.Skip > 0 || strings.TrimSpace(meta.Summary) != "" {
return true, nil
}
if meta.Count != 0 || meta.Skip != 0 {
return false, nil
}
history, err := readMessages(s.jsonlPath(sessionKey), meta.Skip)
if err != nil {
return false, err
}
return len(history) > 0, nil
}
// readMessages reads valid JSON lines from a .jsonl file, skipping
// the first `skip` lines without unmarshaling them. This avoids the
// cost of json.Unmarshal on logically truncated messages.
+11
View File
@@ -23,6 +23,10 @@ type metaAwareStore interface {
ResolveSessionKey(ctx context.Context, sessionKey string) (string, bool, error)
}
type aliasPromotingStore interface {
PromoteAliasHistory(ctx context.Context, sessionKey string, scope json.RawMessage, aliases []string) (bool, error)
}
// MetadataAwareSessionStore exposes structured session metadata operations.
type MetadataAwareSessionStore interface {
EnsureSessionMetadata(sessionKey string, scope *SessionScope, aliases []string)
@@ -84,6 +88,13 @@ func (b *JSONLBackend) EnsureSessionMetadata(sessionKey string, scope *SessionSc
return
}
if promotingStore, ok := b.store.(aliasPromotingStore); ok {
if _, err := promotingStore.PromoteAliasHistory(ctx, sessionKey, rawScope, aliases); err != nil {
log.Printf("session: promote alias history: %v", err)
}
return
}
canonicalMeta, metaErr := metaStore.GetSessionMeta(ctx, sessionKey)
if metaErr != nil {
log.Printf("session: get canonical session metadata: %v", metaErr)
+20
View File
@@ -282,3 +282,23 @@ func TestJSONLBackend_EnsureSessionMetadata_PromotesLegacyPicoDirectAliasHistory
t.Fatalf("promoted history = %+v", history)
}
}
func TestJSONLBackend_EnsureSessionMetadata_DoesNotOverwriteNonEmptyCanonicalHistory(t *testing.T) {
b := newBackend(t)
canonicalKey := session.BuildOpaqueSessionKey("agent:main:direct:current-user")
legacyKey := "agent:main:direct:legacy-user"
b.AddMessage(canonicalKey, "user", "current canonical history")
b.AddMessage(legacyKey, "user", "legacy history")
b.EnsureSessionMetadata(canonicalKey, &session.SessionScope{
Version: session.ScopeVersionV1,
AgentID: "main",
}, []string{legacyKey})
history := b.GetHistory(canonicalKey)
if len(history) != 1 || history[0].Content != "current canonical history" {
t.Fatalf("canonical history overwritten: %+v", history)
}
}