diff --git a/pkg/agent/eventbus_test.go b/pkg/agent/eventbus_test.go index 6a75ab8d9..574d7bbcc 100644 --- a/pkg/agent/eventbus_test.go +++ b/pkg/agent/eventbus_test.go @@ -149,7 +149,7 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) { Channel: "cli", AccountID: routing.DefaultAccountID, SessionPolicy: routing.SessionPolicy{ - DMScope: routing.DMScopePerPeer, + Dimensions: []string{"sender"}, }, MatchedBy: "default", }, diff --git a/pkg/agent/hooks_test.go b/pkg/agent/hooks_test.go index 3287a2a1d..6f61da65a 100644 --- a/pkg/agent/hooks_test.go +++ b/pkg/agent/hooks_test.go @@ -172,7 +172,7 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) { Channel: "cli", AccountID: routing.DefaultAccountID, SessionPolicy: routing.SessionPolicy{ - DMScope: routing.DMScopePerPeer, + Dimensions: []string{"sender"}, }, MatchedBy: "default", }, diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index ef4680e45..70827598a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -108,6 +108,7 @@ const ( toolLimitResponse = "I've reached `max_tool_iterations` without a final response. Increase `max_tool_iterations` in config.json if this task needs more tool steps." handledToolResponseSummary = "Requested output delivered via tool attachment." sessionKeyAgentPrefix = "agent:" + sessionKeyOpaquePrefix = "sk_" metadataKeyAccountID = "account_id" metadataKeyGuildID = "guild_id" metadataKeyTeamID = "team_id" @@ -1022,8 +1023,8 @@ func appendEventContextFields(fields map[string]any, turnCtx *TurnContext) { if route.MatchedBy != "" { fields["route_matched_by"] = route.MatchedBy } - if route.SessionPolicy.DMScope != "" { - fields["route_dm_scope"] = string(route.SessionPolicy.DMScope) + if len(route.SessionPolicy.Dimensions) > 0 { + fields["route_dimensions"] = strings.Join(route.SessionPolicy.Dimensions, ",") } if count := len(route.SessionPolicy.IdentityLinks); count > 0 { fields["route_identity_link_count"] = count @@ -1476,7 +1477,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) opts := processOptions{ SessionKey: sessionKey, - SessionAliases: buildSessionAliases(sessionKey, allocation.SessionKey, msg.SessionKey), + SessionAliases: buildSessionAliases(sessionKey, append(allocation.SessionAliases, msg.SessionKey)...), Channel: msg.Channel, ChatID: msg.ChatID, MessageID: msg.MessageID, @@ -1543,12 +1544,17 @@ func normalizedInboundContext(msg bus.InboundMessage) bus.InboundContext { } func resolveScopeKey(routeSessionKey, msgSessionKey string) string { - if msgSessionKey != "" && strings.HasPrefix(msgSessionKey, sessionKeyAgentPrefix) { + if isExplicitSessionKey(msgSessionKey) { return msgSessionKey } return routeSessionKey } +func isExplicitSessionKey(sessionKey string) bool { + sessionKey = strings.TrimSpace(strings.ToLower(sessionKey)) + return strings.HasPrefix(sessionKey, sessionKeyAgentPrefix) || strings.HasPrefix(sessionKey, sessionKeyOpaquePrefix) +} + func buildSessionAliases(canonicalKey string, keys ...string) []string { if len(keys) == 0 { return nil @@ -1589,9 +1595,7 @@ func ensureSessionMetadata(store session.SessionStore, key string, scope *sessio func (al *AgentLoop) allocateRouteSession(route routing.ResolvedRoute, msg bus.InboundMessage) session.Allocation { return session.AllocateRouteSession(session.AllocationInput{ AgentID: route.AgentID, - Channel: route.Channel, - AccountID: route.AccountID, - Peer: extractPeer(msg), + Context: normalizedInboundContext(msg), SessionPolicy: route.SessionPolicy, }) } diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index dbc1b674b..3efb7ddfd 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -796,7 +796,7 @@ func TestAppendEventContextFields_IncludesInboundRouteAndScope(t *testing.T) { AccountID: "workspace-a", MatchedBy: "binding.team", SessionPolicy: routing.SessionPolicy{ - DMScope: routing.DMScopePerChannelPeer, + Dimensions: []string{"chat", "sender"}, IdentityLinks: map[string][]string{ "canonical-user": {"slack:U123"}, }, @@ -824,8 +824,8 @@ func TestAppendEventContextFields_IncludesInboundRouteAndScope(t *testing.T) { if fields["route_matched_by"] != "binding.team" { t.Fatalf("route_matched_by = %v, want binding.team", fields["route_matched_by"]) } - if fields["route_dm_scope"] != string(routing.DMScopePerChannelPeer) { - t.Fatalf("route_dm_scope = %v, want %q", fields["route_dm_scope"], routing.DMScopePerChannelPeer) + if fields["route_dimensions"] != "chat,sender" { + t.Fatalf("route_dimensions = %v, want chat,sender", fields["route_dimensions"]) } if fields["route_identity_link_count"] != 1 { t.Fatalf("route_identity_link_count = %v, want 1", fields["route_identity_link_count"]) @@ -865,7 +865,7 @@ func TestResolveMessageRoute_UsesInboundContextAccountAndSpace(t *testing.T) { }, }, Session: config.SessionConfig{ - DMScope: "per-peer", + Dimensions: []string{"sender"}, }, } @@ -1600,7 +1600,7 @@ func TestProcessMessage_CommandOutcomes(t *testing.T) { }, }, Session: config.SessionConfig{ - DMScope: "per-channel-peer", + Dimensions: []string{"chat"}, }, } diff --git a/pkg/agent/steering.go b/pkg/agent/steering.go index ad6613e8c..b5cf049b3 100644 --- a/pkg/agent/steering.go +++ b/pkg/agent/steering.go @@ -9,6 +9,7 @@ import ( "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/routing" + "github.com/sipeed/picoclaw/pkg/session" "github.com/sipeed/picoclaw/pkg/tools" ) @@ -310,6 +311,27 @@ func (al *AgentLoop) agentForSession(sessionKey string) *AgentInstance { return nil } + for _, agentID := range registry.ListAgentIDs() { + agent, ok := registry.GetAgent(agentID) + if !ok || agent == nil { + continue + } + scopeReader, ok := agent.Sessions.(interface { + GetSessionScope(sessionKey string) *session.SessionScope + }) + if !ok { + continue + } + scope := scopeReader.GetSessionScope(sessionKey) + if scope == nil || strings.TrimSpace(scope.AgentID) == "" { + continue + } + if scopedAgent, ok := registry.GetAgent(scope.AgentID); ok { + return scopedAgent + } + return agent + } + if parsed := routing.ParseAgentSessionKey(sessionKey); parsed != nil { if agent, ok := registry.GetAgent(parsed.AgentID); ok { return agent diff --git a/pkg/agent/steering_test.go b/pkg/agent/steering_test.go index 75ba9861d..b67ec006c 100644 --- a/pkg/agent/steering_test.go +++ b/pkg/agent/steering_test.go @@ -17,6 +17,7 @@ import ( "github.com/sipeed/picoclaw/pkg/media" "github.com/sipeed/picoclaw/pkg/providers" "github.com/sipeed/picoclaw/pkg/routing" + "github.com/sipeed/picoclaw/pkg/session" "github.com/sipeed/picoclaw/pkg/tools" ) @@ -357,7 +358,7 @@ func TestDrainBusToSteering_RequeuesDifferentScopeMessage(t *testing.T) { }, }, Session: config.SessionConfig{ - DMScope: "per-peer", + Dimensions: []string{"sender"}, }, } @@ -1013,6 +1014,62 @@ func TestAgentLoop_Steering_DirectResponseContinuesWithQueuedMessage(t *testing. } } +func TestAgentLoop_AgentForSession_UsesStoredScopeMetadata(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agent-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + ModelName: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + List: []config.AgentConfig{ + {ID: "sales", Default: true}, + {ID: "support"}, + }, + }, + } + + al := NewAgentLoop(cfg, bus.NewMessageBus(), &mockProvider{}) + support, ok := al.registry.GetAgent("support") + if !ok || support == nil { + t.Fatal("expected support agent") + } + + metaStore, ok := support.Sessions.(session.MetadataAwareSessionStore) + if !ok { + t.Fatal("support session store does not support metadata") + } + + alias := "agent:support:slack:channel:c001" + key := session.BuildOpaqueSessionKey(alias) + scope := &session.SessionScope{ + Version: session.ScopeVersionV1, + AgentID: "support", + Channel: "slack", + Account: "default", + Dimensions: []string{"chat"}, + Values: map[string]string{ + "chat": "channel:c001", + }, + } + metaStore.EnsureSessionMetadata(key, scope, []string{alias}) + + got := al.agentForSession(key) + if got == nil { + t.Fatal("agentForSession() returned nil") + } + if got.ID != "support" { + t.Fatalf("agentForSession() = %q, want %q", got.ID, "support") + } +} + func TestAgentLoop_Continue_PreservesSteeringMedia(t *testing.T) { tmpDir, err := os.MkdirTemp("", "agent-test-*") if err != nil { diff --git a/pkg/agent/turn_context.go b/pkg/agent/turn_context.go index 95ed5a0f3..8913993aa 100644 --- a/pkg/agent/turn_context.go +++ b/pkg/agent/turn_context.go @@ -72,7 +72,7 @@ func cloneResolvedRoute(route *routing.ResolvedRoute) *routing.ResolvedRoute { } cloned := *route cloned.SessionPolicy = routing.SessionPolicy{ - DMScope: route.SessionPolicy.DMScope, + Dimensions: append([]string(nil), route.SessionPolicy.Dimensions...), IdentityLinks: cloneIdentityLinks(route.SessionPolicy.IdentityLinks), } return &cloned diff --git a/pkg/config/config.go b/pkg/config/config.go index 397cd4ab8..10eb07339 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -109,9 +109,12 @@ func (c *Config) MarshalJSON() ([]byte, error) { Alias: (*Alias)(c), } - // Only include session if not empty - if c.Session.DMScope != "" || len(c.Session.IdentityLinks) > 0 { - aux.Session = &c.Session + // Only include session if not empty. Deprecated dm_scope is intentionally + // omitted so persisted configs converge on dimensions-based session policy. + if len(c.Session.Dimensions) > 0 || len(c.Session.IdentityLinks) > 0 { + sessionCfg := c.Session + sessionCfg.DMScope = "" + aux.Session = &sessionCfg } return json.Marshal(aux) @@ -195,7 +198,8 @@ type AgentBinding struct { } type SessionConfig struct { - DMScope string `json:"dm_scope,omitempty"` + Dimensions []string `json:"dimensions,omitempty"` + DMScope string `json:"dm_scope,omitempty"` // Deprecated: ignored by the new session policy path. IdentityLinks map[string][]string `json:"identity_links,omitempty"` } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 278dfa43a..e8ebf1cfe 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -137,7 +137,7 @@ func TestAgentConfig_FullParse(t *testing.T) { } ], "session": { - "dm_scope": "per-peer", + "dimensions": ["sender"], "identity_links": { "john": ["telegram:123", "discord:john#1234"] } @@ -186,8 +186,8 @@ func TestAgentConfig_FullParse(t *testing.T) { t.Errorf("binding.Match.Peer = %+v", binding.Match.Peer) } - if cfg.Session.DMScope != "per-peer" { - t.Errorf("Session.DMScope = %q", cfg.Session.DMScope) + if len(cfg.Session.Dimensions) != 1 || cfg.Session.Dimensions[0] != "sender" { + t.Errorf("Session.Dimensions = %v", cfg.Session.Dimensions) } if len(cfg.Session.IdentityLinks) != 1 { t.Errorf("Session.IdentityLinks = %v", cfg.Session.IdentityLinks) @@ -758,7 +758,7 @@ func TestLoadConfig_HooksProcessConfig(t *testing.T) { } } -// TestDefaultConfig_DMScope verifies the default dm_scope value +// TestDefaultConfig_SessionDimensions verifies the default session dimensions // TestDefaultConfig_SummarizationThresholds verifies summarization defaults func TestDefaultConfig_SummarizationThresholds(t *testing.T) { cfg := DefaultConfig() @@ -771,11 +771,11 @@ func TestDefaultConfig_SummarizationThresholds(t *testing.T) { } } -func TestDefaultConfig_DMScope(t *testing.T) { +func TestDefaultConfig_SessionDimensions(t *testing.T) { cfg := DefaultConfig() - if cfg.Session.DMScope != "per-channel-peer" { - t.Errorf("Session.DMScope = %q, want 'per-channel-peer'", cfg.Session.DMScope) + if len(cfg.Session.Dimensions) != 1 || cfg.Session.Dimensions[0] != "chat" { + t.Errorf("Session.Dimensions = %v, want [chat]", cfg.Session.Dimensions) } } diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index c3845e3e2..58cd05088 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -37,7 +37,7 @@ func DefaultConfig() *Config { }, Bindings: []AgentBinding{}, Session: SessionConfig{ - DMScope: "per-channel-peer", + Dimensions: []string{"chat"}, }, Channels: ChannelsConfig{ WhatsApp: WhatsAppConfig{ diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index 70c55329f..7e2c6b892 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -230,9 +230,6 @@ func (s *JSONLStore) ResolveSessionKey(_ context.Context, sessionKey string) (st if sessionKey == "" { return "", false, nil } - if s.sessionExists(sessionKey) { - return sessionKey, true, nil - } entries, err := os.ReadDir(s.dir) if err != nil { @@ -254,16 +251,34 @@ func (s *JSONLStore) ResolveSessionKey(_ context.Context, sessionKey string) (st if meta.Key == "" { continue } - if meta.Key == sessionKey { - return meta.Key, true, nil - } for _, alias := range meta.Aliases { - if alias == sessionKey { + if alias == sessionKey && meta.Key != sessionKey { return meta.Key, true, nil } } } + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".meta.json") { + continue + } + data, readErr := os.ReadFile(filepath.Join(s.dir, entry.Name())) + if readErr != nil { + return "", false, fmt.Errorf("memory: read meta: %w", readErr) + } + var meta SessionMeta + if err := json.Unmarshal(data, &meta); err != nil { + return "", false, fmt.Errorf("memory: decode meta: %w", err) + } + if meta.Key == sessionKey { + return meta.Key, true, nil + } + } + + if s.sessionExists(sessionKey) { + return sessionKey, true, nil + } + return "", false, nil } diff --git a/pkg/memory/jsonl_test.go b/pkg/memory/jsonl_test.go index ef739e49b..71ce8d866 100644 --- a/pkg/memory/jsonl_test.go +++ b/pkg/memory/jsonl_test.go @@ -296,6 +296,32 @@ func TestResolveSessionKeyByAlias(t *testing.T) { } } +func TestResolveSessionKeyByAlias_PrefersMetadataOverLegacyFile(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + if err := store.AddMessage(ctx, "legacy:key", "user", "legacy"); err != nil { + t.Fatalf("AddMessage(legacy) error = %v", err) + } + if err := store.AddMessage(ctx, "canonical", "user", "canonical"); err != nil { + t.Fatalf("AddMessage(canonical) error = %v", err) + } + if err := store.UpsertSessionMeta(ctx, "canonical", nil, []string{"legacy:key"}); err != nil { + t.Fatalf("UpsertSessionMeta() error = %v", err) + } + + resolved, found, err := store.ResolveSessionKey(ctx, "legacy:key") + if err != nil { + t.Fatalf("ResolveSessionKey() error = %v", err) + } + if !found { + t.Fatal("ResolveSessionKey() did not find alias") + } + if resolved != "canonical" { + t.Fatalf("resolved = %q, want %q", resolved, "canonical") + } +} + func TestTruncateHistory_KeepLast(t *testing.T) { store := newTestStore(t) ctx := context.Background() diff --git a/pkg/routing/route.go b/pkg/routing/route.go index 494aefabb..e5a000067 100644 --- a/pkg/routing/route.go +++ b/pkg/routing/route.go @@ -17,10 +17,8 @@ type RouteInput struct { } // SessionPolicy describes how a routed message should be mapped to a session. -// The current implementation preserves the legacy dm_scope and identity_link -// semantics while moving session-key construction out of the router. type SessionPolicy struct { - DMScope DMScope + Dimensions []string IdentityLinks map[string][]string } @@ -246,16 +244,38 @@ func (r *RouteResolver) resolveDefaultAgentID() string { } func (r *RouteResolver) sessionPolicy() SessionPolicy { - dmScope := DMScope(r.cfg.Session.DMScope) - if dmScope == "" { - dmScope = DMScopeMain - } return SessionPolicy{ - DMScope: dmScope, + Dimensions: normalizeSessionDimensions(r.cfg.Session.Dimensions), IdentityLinks: cloneIdentityLinks(r.cfg.Session.IdentityLinks), } } +func normalizeSessionDimensions(dimensions []string) []string { + if len(dimensions) == 0 { + return nil + } + + normalized := make([]string, 0, len(dimensions)) + seen := make(map[string]struct{}, len(dimensions)) + for _, dimension := range dimensions { + dimension = strings.ToLower(strings.TrimSpace(dimension)) + switch dimension { + case "space", "chat", "topic", "sender": + default: + continue + } + if _, ok := seen[dimension]; ok { + continue + } + seen[dimension] = struct{}{} + normalized = append(normalized, dimension) + } + if len(normalized) == 0 { + return nil + } + return normalized +} + func cloneIdentityLinks(src map[string][]string) map[string][]string { if len(src) == 0 { return nil diff --git a/pkg/routing/route_test.go b/pkg/routing/route_test.go index ab1a7a4e2..3397bd8e8 100644 --- a/pkg/routing/route_test.go +++ b/pkg/routing/route_test.go @@ -17,7 +17,7 @@ func testConfig(agents []config.AgentConfig, bindings []config.AgentBinding) *co }, Bindings: bindings, Session: config.SessionConfig{ - DMScope: "per-peer", + Dimensions: []string{"sender"}, }, } } @@ -37,8 +37,8 @@ func TestResolveRoute_DefaultAgent_NoBindings(t *testing.T) { if route.MatchedBy != "default" { t.Errorf("MatchedBy = %q, want 'default'", route.MatchedBy) } - if route.SessionPolicy.DMScope != DMScopePerPeer { - t.Errorf("SessionPolicy.DMScope = %q, want %q", route.SessionPolicy.DMScope, DMScopePerPeer) + if len(route.SessionPolicy.Dimensions) != 1 || route.SessionPolicy.Dimensions[0] != "sender" { + t.Errorf("SessionPolicy.Dimensions = %v, want [sender]", route.SessionPolicy.Dimensions) } if route.SessionPolicy.IdentityLinks != nil { t.Errorf("SessionPolicy.IdentityLinks = %v, want nil", route.SessionPolicy.IdentityLinks) diff --git a/pkg/routing/session_key.go b/pkg/routing/session_key.go index 17b62f4b7..cc3ce43f3 100644 --- a/pkg/routing/session_key.go +++ b/pkg/routing/session_key.go @@ -112,6 +112,19 @@ func CanonicalSessionPeerID( return strings.ToLower(normalizedPeerID) } +// CanonicalSessionIdentityID collapses an identity using identity_links when +// possible, then returns a normalized lowercase identifier. +func CanonicalSessionIdentityID(channel, rawID string, identityLinks map[string][]string) string { + normalizedID := strings.TrimSpace(rawID) + if normalizedID == "" { + return "" + } + if linked := resolveLinkedPeerID(identityLinks, channel, normalizedID); linked != "" { + normalizedID = linked + } + return strings.ToLower(normalizedID) +} + // ParseAgentSessionKey extracts agentId and rest from "agent::". func ParseAgentSessionKey(sessionKey string) *ParsedSessionKey { raw := strings.TrimSpace(sessionKey) diff --git a/pkg/session/allocator.go b/pkg/session/allocator.go index a3b8e075d..6bf678deb 100644 --- a/pkg/session/allocator.go +++ b/pkg/session/allocator.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/routing" ) @@ -13,85 +14,167 @@ import ( type Allocation struct { Scope SessionScope SessionKey string + SessionAliases []string MainSessionKey string + MainAliases []string } // AllocationInput contains the routing result and peer context needed to // derive the session keys for a turn. type AllocationInput struct { AgentID string - Channel string - AccountID string - Peer *routing.RoutePeer + Context bus.InboundContext SessionPolicy routing.SessionPolicy } -// AllocateRouteSession maps a route decision onto the current legacy -// agent-scoped session-key format. +// AllocateRouteSession maps a route decision onto a structured scope and the +// current opaque session-key format. func AllocateRouteSession(input AllocationInput) Allocation { scope := buildSessionScope(input) - sessionKey := strings.ToLower(routing.BuildAgentPeerSessionKey(routing.SessionKeyParams{ - AgentID: input.AgentID, - Channel: input.Channel, - AccountID: input.AccountID, - Peer: input.Peer, - DMScope: input.SessionPolicy.DMScope, - IdentityLinks: input.SessionPolicy.IdentityLinks, - })) - mainSessionKey := strings.ToLower(routing.BuildAgentMainSessionKey(input.AgentID)) + legacySessionAliases := buildLegacySessionAliases(input) + legacyMainSessionKey := strings.ToLower(routing.BuildAgentMainSessionKey(input.AgentID)) return Allocation{ Scope: scope, - SessionKey: sessionKey, - MainSessionKey: mainSessionKey, + SessionKey: BuildSessionKey(scope), + SessionAliases: legacySessionAliases, + MainSessionKey: BuildOpaqueSessionKey(legacyMainSessionKey), + MainAliases: []string{legacyMainSessionKey}, } } func buildSessionScope(input AllocationInput) SessionScope { + inbound := input.Context scope := SessionScope{ Version: ScopeVersionV1, AgentID: routing.NormalizeAgentID(input.AgentID), - Channel: strings.ToLower(strings.TrimSpace(input.Channel)), - Account: routing.NormalizeAccountID(input.AccountID), + Channel: strings.ToLower(strings.TrimSpace(inbound.Channel)), + Account: routing.NormalizeAccountID(inbound.Account), + } + if scope.Channel == "" { + scope.Channel = "unknown" } - peer := input.Peer - if peer == nil { - peer = &routing.RoutePeer{Kind: "direct"} + dimensions := make([]string, 0, len(input.SessionPolicy.Dimensions)) + values := make(map[string]string, len(input.SessionPolicy.Dimensions)) + + for _, dimension := range input.SessionPolicy.Dimensions { + switch dimension { + case "space": + if spaceID := strings.TrimSpace(inbound.SpaceID); spaceID != "" { + spaceType := strings.ToLower(strings.TrimSpace(inbound.SpaceType)) + if spaceType == "" { + spaceType = "space" + } + dimensions = append(dimensions, "space") + values["space"] = fmt.Sprintf("%s:%s", spaceType, strings.ToLower(spaceID)) + } + case "chat": + chatID := strings.TrimSpace(inbound.ChatID) + if chatID == "" { + continue + } + chatType := strings.ToLower(strings.TrimSpace(inbound.ChatType)) + if chatType == "" { + chatType = "direct" + } + dimensions = append(dimensions, "chat") + values["chat"] = fmt.Sprintf("%s:%s", chatType, strings.ToLower(chatID)) + case "topic": + if topicID := strings.TrimSpace(inbound.TopicID); topicID != "" { + dimensions = append(dimensions, "topic") + values["topic"] = "topic:" + strings.ToLower(topicID) + } + case "sender": + senderID := routing.CanonicalSessionIdentityID( + inbound.Channel, + inbound.SenderID, + input.SessionPolicy.IdentityLinks, + ) + if senderID == "" { + continue + } + dimensions = append(dimensions, "sender") + values["sender"] = senderID + } } - peerKind := strings.ToLower(strings.TrimSpace(peer.Kind)) - if peerKind == "" { - peerKind = "direct" - } - - switch peerKind { - case "direct": - if input.SessionPolicy.DMScope == routing.DMScopeMain { - return scope - } - peerID := routing.CanonicalSessionPeerID( - input.Channel, - peer.ID, - input.SessionPolicy.DMScope, - input.SessionPolicy.IdentityLinks, - ) - if peerID == "" { - return scope - } - scope.Dimensions = []string{"sender"} - scope.Values = map[string]string{ - "sender": peerID, - } - default: - peerID := strings.ToLower(strings.TrimSpace(peer.ID)) - if peerID == "" { - peerID = "unknown" - } - scope.Dimensions = []string{"chat"} - scope.Values = map[string]string{ - "chat": fmt.Sprintf("%s:%s", peerKind, peerID), - } + if len(dimensions) > 0 { + scope.Dimensions = dimensions + scope.Values = values } return scope } + +func buildLegacySessionAliases(input AllocationInput) []string { + aliases := []string{strings.ToLower(routing.BuildAgentMainSessionKey(input.AgentID))} + inbound := input.Context + + if strings.EqualFold(strings.TrimSpace(inbound.ChatType), "direct") { + senderID := routing.CanonicalSessionIdentityID( + inbound.Channel, + inbound.SenderID, + input.SessionPolicy.IdentityLinks, + ) + if senderID == "" { + return uniqueAliases(aliases) + } + for _, dmScope := range []routing.DMScope{ + routing.DMScopePerPeer, + routing.DMScopePerChannelPeer, + routing.DMScopePerAccountChannelPeer, + } { + aliases = append(aliases, strings.ToLower(routing.BuildAgentPeerSessionKey(routing.SessionKeyParams{ + AgentID: input.AgentID, + Channel: inbound.Channel, + AccountID: inbound.Account, + Peer: &routing.RoutePeer{Kind: "direct", ID: senderID}, + DMScope: dmScope, + IdentityLinks: input.SessionPolicy.IdentityLinks, + }))) + } + return uniqueAliases(aliases) + } + + peerID := strings.TrimSpace(inbound.ChatID) + if peerID == "" { + return uniqueAliases(aliases) + } + if topicID := strings.TrimSpace(inbound.TopicID); topicID != "" { + peerID = peerID + "/" + topicID + } + aliases = append(aliases, strings.ToLower(routing.BuildAgentPeerSessionKey(routing.SessionKeyParams{ + AgentID: input.AgentID, + Channel: inbound.Channel, + AccountID: inbound.Account, + Peer: &routing.RoutePeer{ + Kind: strings.ToLower(strings.TrimSpace(inbound.ChatType)), + ID: peerID, + }, + }))) + + return uniqueAliases(aliases) +} + +func uniqueAliases(aliases []string) []string { + if len(aliases) == 0 { + return nil + } + normalized := make([]string, 0, len(aliases)) + seen := make(map[string]struct{}, len(aliases)) + for _, alias := range aliases { + alias = strings.TrimSpace(strings.ToLower(alias)) + if alias == "" { + continue + } + if _, ok := seen[alias]; ok { + continue + } + seen[alias] = struct{}{} + normalized = append(normalized, alias) + } + if len(normalized) == 0 { + return nil + } + return normalized +} diff --git a/pkg/session/allocator_test.go b/pkg/session/allocator_test.go index 5eb442e98..c688fe0bf 100644 --- a/pkg/session/allocator_test.go +++ b/pkg/session/allocator_test.go @@ -3,28 +3,36 @@ package session import ( "testing" + "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/routing" ) func TestAllocateRouteSession_PerPeerDM(t *testing.T) { allocation := AllocateRouteSession(AllocationInput{ - AgentID: "main", - Channel: "telegram", - AccountID: "default", - Peer: &routing.RoutePeer{ - Kind: "direct", - ID: "User123", + AgentID: "main", + Context: bus.InboundContext{ + Channel: "telegram", + Account: "default", + ChatID: "dm-123", + ChatType: "direct", + SenderID: "User123", }, SessionPolicy: routing.SessionPolicy{ - DMScope: routing.DMScopePerPeer, + Dimensions: []string{"sender"}, }, }) - if allocation.SessionKey != "agent:main:direct:user123" { - t.Fatalf("SessionKey = %q, want %q", allocation.SessionKey, "agent:main:direct:user123") + if allocation.SessionKey == "" || !IsOpaqueSessionKey(allocation.SessionKey) { + t.Fatalf("SessionKey = %q, want opaque session key", allocation.SessionKey) } - if allocation.MainSessionKey != "agent:main:main" { - t.Fatalf("MainSessionKey = %q, want %q", allocation.MainSessionKey, "agent:main:main") + if !containsAlias(allocation.SessionAliases, "agent:main:direct:user123") { + t.Fatalf("SessionAliases = %v, want to contain agent:main:direct:user123", allocation.SessionAliases) + } + if allocation.MainSessionKey == "" || !IsOpaqueSessionKey(allocation.MainSessionKey) { + t.Fatalf("MainSessionKey = %q, want opaque session key", allocation.MainSessionKey) + } + if len(allocation.MainAliases) != 1 || allocation.MainAliases[0] != "agent:main:main" { + t.Fatalf("MainAliases = %v, want [agent:main:main]", allocation.MainAliases) } if allocation.Scope.Version != ScopeVersionV1 { t.Fatalf("Scope.Version = %d, want %d", allocation.Scope.Version, ScopeVersionV1) @@ -39,23 +47,30 @@ func TestAllocateRouteSession_PerPeerDM(t *testing.T) { func TestAllocateRouteSession_GroupPeer(t *testing.T) { allocation := AllocateRouteSession(AllocationInput{ - AgentID: "main", - Channel: "slack", - AccountID: "workspace-a", - Peer: &routing.RoutePeer{ - Kind: "channel", - ID: "C001", + AgentID: "main", + Context: bus.InboundContext{ + Channel: "slack", + Account: "workspace-a", + ChatID: "C001", + ChatType: "channel", + SenderID: "U001", }, SessionPolicy: routing.SessionPolicy{ - DMScope: routing.DMScopePerAccountChannelPeer, + Dimensions: []string{"chat"}, }, }) - if allocation.SessionKey != "agent:main:slack:channel:c001" { - t.Fatalf("SessionKey = %q, want %q", allocation.SessionKey, "agent:main:slack:channel:c001") + if allocation.SessionKey == "" || !IsOpaqueSessionKey(allocation.SessionKey) { + t.Fatalf("SessionKey = %q, want opaque session key", allocation.SessionKey) } - if allocation.MainSessionKey != "agent:main:main" { - t.Fatalf("MainSessionKey = %q, want %q", allocation.MainSessionKey, "agent:main:main") + if !containsAlias(allocation.SessionAliases, "agent:main:slack:channel:c001") { + t.Fatalf("SessionAliases = %v, want to contain agent:main:slack:channel:c001", allocation.SessionAliases) + } + if allocation.MainSessionKey == "" || !IsOpaqueSessionKey(allocation.MainSessionKey) { + t.Fatalf("MainSessionKey = %q, want opaque session key", allocation.MainSessionKey) + } + if len(allocation.MainAliases) != 1 || allocation.MainAliases[0] != "agent:main:main" { + t.Fatalf("MainAliases = %v, want [agent:main:main]", allocation.MainAliases) } if len(allocation.Scope.Dimensions) != 1 || allocation.Scope.Dimensions[0] != "chat" { t.Fatalf("Scope.Dimensions = %v, want [chat]", allocation.Scope.Dimensions) @@ -64,3 +79,23 @@ func TestAllocateRouteSession_GroupPeer(t *testing.T) { t.Fatalf("Scope.Values[chat] = %q, want channel:c001", allocation.Scope.Values["chat"]) } } + +func TestBuildOpaqueSessionKey_IsStable(t *testing.T) { + first := BuildOpaqueSessionKey("agent:main:direct:user123") + second := BuildOpaqueSessionKey("agent:main:direct:user123") + if first != second { + t.Fatalf("BuildOpaqueSessionKey() mismatch: %q != %q", first, second) + } + if !IsOpaqueSessionKey(first) { + t.Fatalf("expected opaque session key, got %q", first) + } +} + +func containsAlias(aliases []string, want string) bool { + for _, alias := range aliases { + if alias == want { + return true + } + } + return false +} diff --git a/pkg/session/jsonl_backend.go b/pkg/session/jsonl_backend.go index 38a0c160e..caa18a624 100644 --- a/pkg/session/jsonl_backend.go +++ b/pkg/session/jsonl_backend.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "log" + "strings" "github.com/sipeed/picoclaw/pkg/memory" "github.com/sipeed/picoclaw/pkg/providers" @@ -26,6 +27,7 @@ type metaAwareStore interface { type MetadataAwareSessionStore interface { EnsureSessionMetadata(sessionKey string, scope *SessionScope, aliases []string) ResolveSessionKey(sessionKey string) string + GetSessionScope(sessionKey string) *SessionScope } // NewJSONLBackend wraps a memory.Store for use as a SessionStore. @@ -62,6 +64,11 @@ func (b *JSONLBackend) EnsureSessionMetadata(sessionKey string, scope *SessionSc if !ok { return } + sessionKey = strings.TrimSpace(sessionKey) + if sessionKey == "" { + return + } + var rawScope json.RawMessage if scope != nil { data, err := json.Marshal(scope) @@ -71,9 +78,81 @@ func (b *JSONLBackend) EnsureSessionMetadata(sessionKey string, scope *SessionSc } rawScope = data } - if err := metaStore.UpsertSessionMeta(context.Background(), sessionKey, rawScope, aliases); err != nil { + ctx := context.Background() + if err := metaStore.UpsertSessionMeta(ctx, sessionKey, rawScope, aliases); err != nil { log.Printf("session: upsert session metadata: %v", err) + return } + + canonicalHistory, historyErr := b.store.GetHistory(ctx, sessionKey) + if historyErr != nil { + log.Printf("session: get canonical history: %v", historyErr) + return + } + canonicalSummary, summaryErr := b.store.GetSummary(ctx, sessionKey) + if summaryErr != nil { + log.Printf("session: get canonical summary: %v", summaryErr) + return + } + if len(canonicalHistory) > 0 || strings.TrimSpace(canonicalSummary) != "" { + return + } + + for _, alias := range aliases { + alias = strings.TrimSpace(alias) + if alias == "" || alias == sessionKey { + continue + } + aliasHistory, err := b.store.GetHistory(ctx, alias) + if err != nil { + log.Printf("session: get alias history: %v", err) + continue + } + aliasSummary, err := b.store.GetSummary(ctx, alias) + if err != nil { + log.Printf("session: get alias summary: %v", err) + continue + } + if len(aliasHistory) == 0 && strings.TrimSpace(aliasSummary) == "" { + continue + } + if err := b.store.SetHistory(ctx, sessionKey, aliasHistory); err != nil { + log.Printf("session: promote alias history: %v", err) + return + } + if strings.TrimSpace(aliasSummary) != "" { + if err := b.store.SetSummary(ctx, sessionKey, aliasSummary); err != nil { + log.Printf("session: promote alias summary: %v", err) + } + } + if err := metaStore.UpsertSessionMeta(ctx, sessionKey, rawScope, aliases); err != nil { + log.Printf("session: refresh session metadata after promotion: %v", err) + } + return + } +} + +// GetSessionScope reads structured scope metadata for a session key or alias. +func (b *JSONLBackend) GetSessionScope(sessionKey string) *SessionScope { + metaStore, ok := b.store.(metaAwareStore) + if !ok { + return nil + } + sessionKey = b.resolveSessionKey(sessionKey) + meta, err := metaStore.GetSessionMeta(context.Background(), sessionKey) + if err != nil { + log.Printf("session: get session metadata: %v", err) + return nil + } + if len(meta.Scope) == 0 { + return nil + } + var scope SessionScope + if err := json.Unmarshal(meta.Scope, &scope); err != nil { + log.Printf("session: decode session scope: %v", err) + return nil + } + return CloneScope(&scope) } func (b *JSONLBackend) AddMessage(sessionKey, role, content string) { diff --git a/pkg/session/jsonl_backend_test.go b/pkg/session/jsonl_backend_test.go index 32a69377b..411e3e8c5 100644 --- a/pkg/session/jsonl_backend_test.go +++ b/pkg/session/jsonl_backend_test.go @@ -181,7 +181,7 @@ func TestJSONLBackend_SummarizeFlow(t *testing.T) { func TestJSONLBackend_ResolveAliasAndPersistMetadata(t *testing.T) { b := newBackend(t) - b.EnsureSessionMetadata("canonical", &session.SessionScope{ + scope := &session.SessionScope{ Version: session.ScopeVersionV1, AgentID: "main", Channel: "telegram", @@ -190,7 +190,8 @@ func TestJSONLBackend_ResolveAliasAndPersistMetadata(t *testing.T) { Values: map[string]string{ "chat": "group:c1", }, - }, []string{"legacy"}) + } + b.EnsureSessionMetadata("canonical", scope, []string{"legacy"}) if got := b.ResolveSessionKey("legacy"); got != "canonical" { t.Fatalf("ResolveSessionKey() = %q, want %q", got, "canonical") @@ -204,4 +205,37 @@ func TestJSONLBackend_ResolveAliasAndPersistMetadata(t *testing.T) { if history[0].Content != "hello through alias" { t.Fatalf("history[0].Content = %q, want %q", history[0].Content, "hello through alias") } + + resolvedScope := b.GetSessionScope("legacy") + if resolvedScope == nil { + t.Fatal("GetSessionScope() returned nil") + } + if resolvedScope.AgentID != scope.AgentID || resolvedScope.Values["chat"] != scope.Values["chat"] { + t.Fatalf("GetSessionScope() = %+v, want %+v", resolvedScope, scope) + } +} + +func TestJSONLBackend_EnsureSessionMetadata_PromotesLegacyAliasHistory(t *testing.T) { + b := newBackend(t) + + legacyKey := "agent:main:direct:legacy-user" + b.AddMessage(legacyKey, "user", "legacy history") + b.SetSummary(legacyKey, "legacy summary") + + canonicalKey := session.BuildOpaqueSessionKey(legacyKey) + b.EnsureSessionMetadata(canonicalKey, &session.SessionScope{ + Version: session.ScopeVersionV1, + AgentID: "main", + }, []string{legacyKey}) + + if got := b.ResolveSessionKey(legacyKey); got != canonicalKey { + t.Fatalf("ResolveSessionKey() = %q, want %q", got, canonicalKey) + } + history := b.GetHistory(canonicalKey) + if len(history) != 1 || history[0].Content != "legacy history" { + t.Fatalf("promoted history = %+v", history) + } + if summary := b.GetSummary(canonicalKey); summary != "legacy summary" { + t.Fatalf("promoted summary = %q, want %q", summary, "legacy summary") + } } diff --git a/pkg/session/key.go b/pkg/session/key.go new file mode 100644 index 000000000..77dd115f5 --- /dev/null +++ b/pkg/session/key.go @@ -0,0 +1,52 @@ +package session + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "strings" +) + +const sessionKeyV1Prefix = "sk_v1_" + +// BuildOpaqueSessionKey returns a stable opaque session key derived from a +// canonical alias string. The alias remains available through metadata for +// compatibility and migration purposes. +func BuildOpaqueSessionKey(alias string) string { + normalized := strings.TrimSpace(strings.ToLower(alias)) + if normalized == "" { + return "" + } + sum := sha256.Sum256([]byte(normalized)) + return sessionKeyV1Prefix + hex.EncodeToString(sum[:]) +} + +// IsOpaqueSessionKey returns true when the key matches the current opaque +// session-key format. +func IsOpaqueSessionKey(key string) bool { + return strings.HasPrefix(strings.ToLower(strings.TrimSpace(key)), sessionKeyV1Prefix) +} + +// CanonicalScopeSignature returns a stable serialized representation of scope. +func CanonicalScopeSignature(scope SessionScope) string { + parts := []string{ + fmt.Sprintf("v=%d", scope.Version), + fmt.Sprintf("agent=%s", strings.TrimSpace(strings.ToLower(scope.AgentID))), + fmt.Sprintf("channel=%s", strings.TrimSpace(strings.ToLower(scope.Channel))), + fmt.Sprintf("account=%s", strings.TrimSpace(strings.ToLower(scope.Account))), + } + for _, dimension := range scope.Dimensions { + dimension = strings.TrimSpace(strings.ToLower(dimension)) + if dimension == "" { + continue + } + value := strings.TrimSpace(strings.ToLower(scope.Values[dimension])) + parts = append(parts, fmt.Sprintf("%s=%s", dimension, value)) + } + return strings.Join(parts, "|") +} + +// BuildSessionKey returns the current opaque key for a structured session scope. +func BuildSessionKey(scope SessionScope) string { + return BuildOpaqueSessionKey(CanonicalScopeSignature(scope)) +}