refactor(session): replace dm scope with dimensions policy

This commit is contained in:
Hoshina
2026-04-01 17:19:50 +08:00
parent 3957e2cc72
commit ca9652e120
20 changed files with 568 additions and 124 deletions
+1 -1
View File
@@ -149,7 +149,7 @@ func TestAgentLoop_EmitsMinimalTurnEvents(t *testing.T) {
Channel: "cli",
AccountID: routing.DefaultAccountID,
SessionPolicy: routing.SessionPolicy{
DMScope: routing.DMScopePerPeer,
Dimensions: []string{"sender"},
},
MatchedBy: "default",
},
+1 -1
View File
@@ -172,7 +172,7 @@ func TestAgentLoop_Hooks_ObserverAndLLMInterceptor(t *testing.T) {
Channel: "cli",
AccountID: routing.DefaultAccountID,
SessionPolicy: routing.SessionPolicy{
DMScope: routing.DMScopePerPeer,
Dimensions: []string{"sender"},
},
MatchedBy: "default",
},
+11 -7
View File
@@ -108,6 +108,7 @@ const (
toolLimitResponse = "I've reached `max_tool_iterations` without a final response. Increase `max_tool_iterations` in config.json if this task needs more tool steps."
handledToolResponseSummary = "Requested output delivered via tool attachment."
sessionKeyAgentPrefix = "agent:"
sessionKeyOpaquePrefix = "sk_"
metadataKeyAccountID = "account_id"
metadataKeyGuildID = "guild_id"
metadataKeyTeamID = "team_id"
@@ -1022,8 +1023,8 @@ func appendEventContextFields(fields map[string]any, turnCtx *TurnContext) {
if route.MatchedBy != "" {
fields["route_matched_by"] = route.MatchedBy
}
if route.SessionPolicy.DMScope != "" {
fields["route_dm_scope"] = string(route.SessionPolicy.DMScope)
if len(route.SessionPolicy.Dimensions) > 0 {
fields["route_dimensions"] = strings.Join(route.SessionPolicy.Dimensions, ",")
}
if count := len(route.SessionPolicy.IdentityLinks); count > 0 {
fields["route_identity_link_count"] = count
@@ -1476,7 +1477,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
opts := processOptions{
SessionKey: sessionKey,
SessionAliases: buildSessionAliases(sessionKey, allocation.SessionKey, msg.SessionKey),
SessionAliases: buildSessionAliases(sessionKey, append(allocation.SessionAliases, msg.SessionKey)...),
Channel: msg.Channel,
ChatID: msg.ChatID,
MessageID: msg.MessageID,
@@ -1543,12 +1544,17 @@ func normalizedInboundContext(msg bus.InboundMessage) bus.InboundContext {
}
func resolveScopeKey(routeSessionKey, msgSessionKey string) string {
if msgSessionKey != "" && strings.HasPrefix(msgSessionKey, sessionKeyAgentPrefix) {
if isExplicitSessionKey(msgSessionKey) {
return msgSessionKey
}
return routeSessionKey
}
func isExplicitSessionKey(sessionKey string) bool {
sessionKey = strings.TrimSpace(strings.ToLower(sessionKey))
return strings.HasPrefix(sessionKey, sessionKeyAgentPrefix) || strings.HasPrefix(sessionKey, sessionKeyOpaquePrefix)
}
func buildSessionAliases(canonicalKey string, keys ...string) []string {
if len(keys) == 0 {
return nil
@@ -1589,9 +1595,7 @@ func ensureSessionMetadata(store session.SessionStore, key string, scope *sessio
func (al *AgentLoop) allocateRouteSession(route routing.ResolvedRoute, msg bus.InboundMessage) session.Allocation {
return session.AllocateRouteSession(session.AllocationInput{
AgentID: route.AgentID,
Channel: route.Channel,
AccountID: route.AccountID,
Peer: extractPeer(msg),
Context: normalizedInboundContext(msg),
SessionPolicy: route.SessionPolicy,
})
}
+5 -5
View File
@@ -796,7 +796,7 @@ func TestAppendEventContextFields_IncludesInboundRouteAndScope(t *testing.T) {
AccountID: "workspace-a",
MatchedBy: "binding.team",
SessionPolicy: routing.SessionPolicy{
DMScope: routing.DMScopePerChannelPeer,
Dimensions: []string{"chat", "sender"},
IdentityLinks: map[string][]string{
"canonical-user": {"slack:U123"},
},
@@ -824,8 +824,8 @@ func TestAppendEventContextFields_IncludesInboundRouteAndScope(t *testing.T) {
if fields["route_matched_by"] != "binding.team" {
t.Fatalf("route_matched_by = %v, want binding.team", fields["route_matched_by"])
}
if fields["route_dm_scope"] != string(routing.DMScopePerChannelPeer) {
t.Fatalf("route_dm_scope = %v, want %q", fields["route_dm_scope"], routing.DMScopePerChannelPeer)
if fields["route_dimensions"] != "chat,sender" {
t.Fatalf("route_dimensions = %v, want chat,sender", fields["route_dimensions"])
}
if fields["route_identity_link_count"] != 1 {
t.Fatalf("route_identity_link_count = %v, want 1", fields["route_identity_link_count"])
@@ -865,7 +865,7 @@ func TestResolveMessageRoute_UsesInboundContextAccountAndSpace(t *testing.T) {
},
},
Session: config.SessionConfig{
DMScope: "per-peer",
Dimensions: []string{"sender"},
},
}
@@ -1600,7 +1600,7 @@ func TestProcessMessage_CommandOutcomes(t *testing.T) {
},
},
Session: config.SessionConfig{
DMScope: "per-channel-peer",
Dimensions: []string{"chat"},
},
}
+22
View File
@@ -9,6 +9,7 @@ import (
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/routing"
"github.com/sipeed/picoclaw/pkg/session"
"github.com/sipeed/picoclaw/pkg/tools"
)
@@ -310,6 +311,27 @@ func (al *AgentLoop) agentForSession(sessionKey string) *AgentInstance {
return nil
}
for _, agentID := range registry.ListAgentIDs() {
agent, ok := registry.GetAgent(agentID)
if !ok || agent == nil {
continue
}
scopeReader, ok := agent.Sessions.(interface {
GetSessionScope(sessionKey string) *session.SessionScope
})
if !ok {
continue
}
scope := scopeReader.GetSessionScope(sessionKey)
if scope == nil || strings.TrimSpace(scope.AgentID) == "" {
continue
}
if scopedAgent, ok := registry.GetAgent(scope.AgentID); ok {
return scopedAgent
}
return agent
}
if parsed := routing.ParseAgentSessionKey(sessionKey); parsed != nil {
if agent, ok := registry.GetAgent(parsed.AgentID); ok {
return agent
+58 -1
View File
@@ -17,6 +17,7 @@ import (
"github.com/sipeed/picoclaw/pkg/media"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/routing"
"github.com/sipeed/picoclaw/pkg/session"
"github.com/sipeed/picoclaw/pkg/tools"
)
@@ -357,7 +358,7 @@ func TestDrainBusToSteering_RequeuesDifferentScopeMessage(t *testing.T) {
},
},
Session: config.SessionConfig{
DMScope: "per-peer",
Dimensions: []string{"sender"},
},
}
@@ -1013,6 +1014,62 @@ func TestAgentLoop_Steering_DirectResponseContinuesWithQueuedMessage(t *testing.
}
}
func TestAgentLoop_AgentForSession_UsesStoredScopeMetadata(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agent-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
ModelName: "test-model",
MaxTokens: 4096,
MaxToolIterations: 10,
},
List: []config.AgentConfig{
{ID: "sales", Default: true},
{ID: "support"},
},
},
}
al := NewAgentLoop(cfg, bus.NewMessageBus(), &mockProvider{})
support, ok := al.registry.GetAgent("support")
if !ok || support == nil {
t.Fatal("expected support agent")
}
metaStore, ok := support.Sessions.(session.MetadataAwareSessionStore)
if !ok {
t.Fatal("support session store does not support metadata")
}
alias := "agent:support:slack:channel:c001"
key := session.BuildOpaqueSessionKey(alias)
scope := &session.SessionScope{
Version: session.ScopeVersionV1,
AgentID: "support",
Channel: "slack",
Account: "default",
Dimensions: []string{"chat"},
Values: map[string]string{
"chat": "channel:c001",
},
}
metaStore.EnsureSessionMetadata(key, scope, []string{alias})
got := al.agentForSession(key)
if got == nil {
t.Fatal("agentForSession() returned nil")
}
if got.ID != "support" {
t.Fatalf("agentForSession() = %q, want %q", got.ID, "support")
}
}
func TestAgentLoop_Continue_PreservesSteeringMedia(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agent-test-*")
if err != nil {
+1 -1
View File
@@ -72,7 +72,7 @@ func cloneResolvedRoute(route *routing.ResolvedRoute) *routing.ResolvedRoute {
}
cloned := *route
cloned.SessionPolicy = routing.SessionPolicy{
DMScope: route.SessionPolicy.DMScope,
Dimensions: append([]string(nil), route.SessionPolicy.Dimensions...),
IdentityLinks: cloneIdentityLinks(route.SessionPolicy.IdentityLinks),
}
return &cloned
+8 -4
View File
@@ -109,9 +109,12 @@ func (c *Config) MarshalJSON() ([]byte, error) {
Alias: (*Alias)(c),
}
// Only include session if not empty
if c.Session.DMScope != "" || len(c.Session.IdentityLinks) > 0 {
aux.Session = &c.Session
// Only include session if not empty. Deprecated dm_scope is intentionally
// omitted so persisted configs converge on dimensions-based session policy.
if len(c.Session.Dimensions) > 0 || len(c.Session.IdentityLinks) > 0 {
sessionCfg := c.Session
sessionCfg.DMScope = ""
aux.Session = &sessionCfg
}
return json.Marshal(aux)
@@ -195,7 +198,8 @@ type AgentBinding struct {
}
type SessionConfig struct {
DMScope string `json:"dm_scope,omitempty"`
Dimensions []string `json:"dimensions,omitempty"`
DMScope string `json:"dm_scope,omitempty"` // Deprecated: ignored by the new session policy path.
IdentityLinks map[string][]string `json:"identity_links,omitempty"`
}
+7 -7
View File
@@ -137,7 +137,7 @@ func TestAgentConfig_FullParse(t *testing.T) {
}
],
"session": {
"dm_scope": "per-peer",
"dimensions": ["sender"],
"identity_links": {
"john": ["telegram:123", "discord:john#1234"]
}
@@ -186,8 +186,8 @@ func TestAgentConfig_FullParse(t *testing.T) {
t.Errorf("binding.Match.Peer = %+v", binding.Match.Peer)
}
if cfg.Session.DMScope != "per-peer" {
t.Errorf("Session.DMScope = %q", cfg.Session.DMScope)
if len(cfg.Session.Dimensions) != 1 || cfg.Session.Dimensions[0] != "sender" {
t.Errorf("Session.Dimensions = %v", cfg.Session.Dimensions)
}
if len(cfg.Session.IdentityLinks) != 1 {
t.Errorf("Session.IdentityLinks = %v", cfg.Session.IdentityLinks)
@@ -758,7 +758,7 @@ func TestLoadConfig_HooksProcessConfig(t *testing.T) {
}
}
// TestDefaultConfig_DMScope verifies the default dm_scope value
// TestDefaultConfig_SessionDimensions verifies the default session dimensions
// TestDefaultConfig_SummarizationThresholds verifies summarization defaults
func TestDefaultConfig_SummarizationThresholds(t *testing.T) {
cfg := DefaultConfig()
@@ -771,11 +771,11 @@ func TestDefaultConfig_SummarizationThresholds(t *testing.T) {
}
}
func TestDefaultConfig_DMScope(t *testing.T) {
func TestDefaultConfig_SessionDimensions(t *testing.T) {
cfg := DefaultConfig()
if cfg.Session.DMScope != "per-channel-peer" {
t.Errorf("Session.DMScope = %q, want 'per-channel-peer'", cfg.Session.DMScope)
if len(cfg.Session.Dimensions) != 1 || cfg.Session.Dimensions[0] != "chat" {
t.Errorf("Session.Dimensions = %v, want [chat]", cfg.Session.Dimensions)
}
}
+1 -1
View File
@@ -37,7 +37,7 @@ func DefaultConfig() *Config {
},
Bindings: []AgentBinding{},
Session: SessionConfig{
DMScope: "per-channel-peer",
Dimensions: []string{"chat"},
},
Channels: ChannelsConfig{
WhatsApp: WhatsAppConfig{
+22 -7
View File
@@ -230,9 +230,6 @@ func (s *JSONLStore) ResolveSessionKey(_ context.Context, sessionKey string) (st
if sessionKey == "" {
return "", false, nil
}
if s.sessionExists(sessionKey) {
return sessionKey, true, nil
}
entries, err := os.ReadDir(s.dir)
if err != nil {
@@ -254,16 +251,34 @@ func (s *JSONLStore) ResolveSessionKey(_ context.Context, sessionKey string) (st
if meta.Key == "" {
continue
}
if meta.Key == sessionKey {
return meta.Key, true, nil
}
for _, alias := range meta.Aliases {
if alias == sessionKey {
if alias == sessionKey && meta.Key != sessionKey {
return meta.Key, true, nil
}
}
}
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".meta.json") {
continue
}
data, readErr := os.ReadFile(filepath.Join(s.dir, entry.Name()))
if readErr != nil {
return "", false, fmt.Errorf("memory: read meta: %w", readErr)
}
var meta SessionMeta
if err := json.Unmarshal(data, &meta); err != nil {
return "", false, fmt.Errorf("memory: decode meta: %w", err)
}
if meta.Key == sessionKey {
return meta.Key, true, nil
}
}
if s.sessionExists(sessionKey) {
return sessionKey, true, nil
}
return "", false, nil
}
+26
View File
@@ -296,6 +296,32 @@ func TestResolveSessionKeyByAlias(t *testing.T) {
}
}
func TestResolveSessionKeyByAlias_PrefersMetadataOverLegacyFile(t *testing.T) {
store := newTestStore(t)
ctx := context.Background()
if err := store.AddMessage(ctx, "legacy:key", "user", "legacy"); err != nil {
t.Fatalf("AddMessage(legacy) error = %v", err)
}
if err := store.AddMessage(ctx, "canonical", "user", "canonical"); err != nil {
t.Fatalf("AddMessage(canonical) error = %v", err)
}
if err := store.UpsertSessionMeta(ctx, "canonical", nil, []string{"legacy:key"}); err != nil {
t.Fatalf("UpsertSessionMeta() error = %v", err)
}
resolved, found, err := store.ResolveSessionKey(ctx, "legacy:key")
if err != nil {
t.Fatalf("ResolveSessionKey() error = %v", err)
}
if !found {
t.Fatal("ResolveSessionKey() did not find alias")
}
if resolved != "canonical" {
t.Fatalf("resolved = %q, want %q", resolved, "canonical")
}
}
func TestTruncateHistory_KeepLast(t *testing.T) {
store := newTestStore(t)
ctx := context.Background()
+28 -8
View File
@@ -17,10 +17,8 @@ type RouteInput struct {
}
// SessionPolicy describes how a routed message should be mapped to a session.
// The current implementation preserves the legacy dm_scope and identity_link
// semantics while moving session-key construction out of the router.
type SessionPolicy struct {
DMScope DMScope
Dimensions []string
IdentityLinks map[string][]string
}
@@ -246,16 +244,38 @@ func (r *RouteResolver) resolveDefaultAgentID() string {
}
func (r *RouteResolver) sessionPolicy() SessionPolicy {
dmScope := DMScope(r.cfg.Session.DMScope)
if dmScope == "" {
dmScope = DMScopeMain
}
return SessionPolicy{
DMScope: dmScope,
Dimensions: normalizeSessionDimensions(r.cfg.Session.Dimensions),
IdentityLinks: cloneIdentityLinks(r.cfg.Session.IdentityLinks),
}
}
func normalizeSessionDimensions(dimensions []string) []string {
if len(dimensions) == 0 {
return nil
}
normalized := make([]string, 0, len(dimensions))
seen := make(map[string]struct{}, len(dimensions))
for _, dimension := range dimensions {
dimension = strings.ToLower(strings.TrimSpace(dimension))
switch dimension {
case "space", "chat", "topic", "sender":
default:
continue
}
if _, ok := seen[dimension]; ok {
continue
}
seen[dimension] = struct{}{}
normalized = append(normalized, dimension)
}
if len(normalized) == 0 {
return nil
}
return normalized
}
func cloneIdentityLinks(src map[string][]string) map[string][]string {
if len(src) == 0 {
return nil
+3 -3
View File
@@ -17,7 +17,7 @@ func testConfig(agents []config.AgentConfig, bindings []config.AgentBinding) *co
},
Bindings: bindings,
Session: config.SessionConfig{
DMScope: "per-peer",
Dimensions: []string{"sender"},
},
}
}
@@ -37,8 +37,8 @@ func TestResolveRoute_DefaultAgent_NoBindings(t *testing.T) {
if route.MatchedBy != "default" {
t.Errorf("MatchedBy = %q, want 'default'", route.MatchedBy)
}
if route.SessionPolicy.DMScope != DMScopePerPeer {
t.Errorf("SessionPolicy.DMScope = %q, want %q", route.SessionPolicy.DMScope, DMScopePerPeer)
if len(route.SessionPolicy.Dimensions) != 1 || route.SessionPolicy.Dimensions[0] != "sender" {
t.Errorf("SessionPolicy.Dimensions = %v, want [sender]", route.SessionPolicy.Dimensions)
}
if route.SessionPolicy.IdentityLinks != nil {
t.Errorf("SessionPolicy.IdentityLinks = %v, want nil", route.SessionPolicy.IdentityLinks)
+13
View File
@@ -112,6 +112,19 @@ func CanonicalSessionPeerID(
return strings.ToLower(normalizedPeerID)
}
// CanonicalSessionIdentityID collapses an identity using identity_links when
// possible, then returns a normalized lowercase identifier.
func CanonicalSessionIdentityID(channel, rawID string, identityLinks map[string][]string) string {
normalizedID := strings.TrimSpace(rawID)
if normalizedID == "" {
return ""
}
if linked := resolveLinkedPeerID(identityLinks, channel, normalizedID); linked != "" {
normalizedID = linked
}
return strings.ToLower(normalizedID)
}
// ParseAgentSessionKey extracts agentId and rest from "agent:<agentId>:<rest>".
func ParseAgentSessionKey(sessionKey string) *ParsedSessionKey {
raw := strings.TrimSpace(sessionKey)
+136 -53
View File
@@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/routing"
)
@@ -13,85 +14,167 @@ import (
type Allocation struct {
Scope SessionScope
SessionKey string
SessionAliases []string
MainSessionKey string
MainAliases []string
}
// AllocationInput contains the routing result and peer context needed to
// derive the session keys for a turn.
type AllocationInput struct {
AgentID string
Channel string
AccountID string
Peer *routing.RoutePeer
Context bus.InboundContext
SessionPolicy routing.SessionPolicy
}
// AllocateRouteSession maps a route decision onto the current legacy
// agent-scoped session-key format.
// AllocateRouteSession maps a route decision onto a structured scope and the
// current opaque session-key format.
func AllocateRouteSession(input AllocationInput) Allocation {
scope := buildSessionScope(input)
sessionKey := strings.ToLower(routing.BuildAgentPeerSessionKey(routing.SessionKeyParams{
AgentID: input.AgentID,
Channel: input.Channel,
AccountID: input.AccountID,
Peer: input.Peer,
DMScope: input.SessionPolicy.DMScope,
IdentityLinks: input.SessionPolicy.IdentityLinks,
}))
mainSessionKey := strings.ToLower(routing.BuildAgentMainSessionKey(input.AgentID))
legacySessionAliases := buildLegacySessionAliases(input)
legacyMainSessionKey := strings.ToLower(routing.BuildAgentMainSessionKey(input.AgentID))
return Allocation{
Scope: scope,
SessionKey: sessionKey,
MainSessionKey: mainSessionKey,
SessionKey: BuildSessionKey(scope),
SessionAliases: legacySessionAliases,
MainSessionKey: BuildOpaqueSessionKey(legacyMainSessionKey),
MainAliases: []string{legacyMainSessionKey},
}
}
func buildSessionScope(input AllocationInput) SessionScope {
inbound := input.Context
scope := SessionScope{
Version: ScopeVersionV1,
AgentID: routing.NormalizeAgentID(input.AgentID),
Channel: strings.ToLower(strings.TrimSpace(input.Channel)),
Account: routing.NormalizeAccountID(input.AccountID),
Channel: strings.ToLower(strings.TrimSpace(inbound.Channel)),
Account: routing.NormalizeAccountID(inbound.Account),
}
if scope.Channel == "" {
scope.Channel = "unknown"
}
peer := input.Peer
if peer == nil {
peer = &routing.RoutePeer{Kind: "direct"}
dimensions := make([]string, 0, len(input.SessionPolicy.Dimensions))
values := make(map[string]string, len(input.SessionPolicy.Dimensions))
for _, dimension := range input.SessionPolicy.Dimensions {
switch dimension {
case "space":
if spaceID := strings.TrimSpace(inbound.SpaceID); spaceID != "" {
spaceType := strings.ToLower(strings.TrimSpace(inbound.SpaceType))
if spaceType == "" {
spaceType = "space"
}
dimensions = append(dimensions, "space")
values["space"] = fmt.Sprintf("%s:%s", spaceType, strings.ToLower(spaceID))
}
case "chat":
chatID := strings.TrimSpace(inbound.ChatID)
if chatID == "" {
continue
}
chatType := strings.ToLower(strings.TrimSpace(inbound.ChatType))
if chatType == "" {
chatType = "direct"
}
dimensions = append(dimensions, "chat")
values["chat"] = fmt.Sprintf("%s:%s", chatType, strings.ToLower(chatID))
case "topic":
if topicID := strings.TrimSpace(inbound.TopicID); topicID != "" {
dimensions = append(dimensions, "topic")
values["topic"] = "topic:" + strings.ToLower(topicID)
}
case "sender":
senderID := routing.CanonicalSessionIdentityID(
inbound.Channel,
inbound.SenderID,
input.SessionPolicy.IdentityLinks,
)
if senderID == "" {
continue
}
dimensions = append(dimensions, "sender")
values["sender"] = senderID
}
}
peerKind := strings.ToLower(strings.TrimSpace(peer.Kind))
if peerKind == "" {
peerKind = "direct"
}
switch peerKind {
case "direct":
if input.SessionPolicy.DMScope == routing.DMScopeMain {
return scope
}
peerID := routing.CanonicalSessionPeerID(
input.Channel,
peer.ID,
input.SessionPolicy.DMScope,
input.SessionPolicy.IdentityLinks,
)
if peerID == "" {
return scope
}
scope.Dimensions = []string{"sender"}
scope.Values = map[string]string{
"sender": peerID,
}
default:
peerID := strings.ToLower(strings.TrimSpace(peer.ID))
if peerID == "" {
peerID = "unknown"
}
scope.Dimensions = []string{"chat"}
scope.Values = map[string]string{
"chat": fmt.Sprintf("%s:%s", peerKind, peerID),
}
if len(dimensions) > 0 {
scope.Dimensions = dimensions
scope.Values = values
}
return scope
}
func buildLegacySessionAliases(input AllocationInput) []string {
aliases := []string{strings.ToLower(routing.BuildAgentMainSessionKey(input.AgentID))}
inbound := input.Context
if strings.EqualFold(strings.TrimSpace(inbound.ChatType), "direct") {
senderID := routing.CanonicalSessionIdentityID(
inbound.Channel,
inbound.SenderID,
input.SessionPolicy.IdentityLinks,
)
if senderID == "" {
return uniqueAliases(aliases)
}
for _, dmScope := range []routing.DMScope{
routing.DMScopePerPeer,
routing.DMScopePerChannelPeer,
routing.DMScopePerAccountChannelPeer,
} {
aliases = append(aliases, strings.ToLower(routing.BuildAgentPeerSessionKey(routing.SessionKeyParams{
AgentID: input.AgentID,
Channel: inbound.Channel,
AccountID: inbound.Account,
Peer: &routing.RoutePeer{Kind: "direct", ID: senderID},
DMScope: dmScope,
IdentityLinks: input.SessionPolicy.IdentityLinks,
})))
}
return uniqueAliases(aliases)
}
peerID := strings.TrimSpace(inbound.ChatID)
if peerID == "" {
return uniqueAliases(aliases)
}
if topicID := strings.TrimSpace(inbound.TopicID); topicID != "" {
peerID = peerID + "/" + topicID
}
aliases = append(aliases, strings.ToLower(routing.BuildAgentPeerSessionKey(routing.SessionKeyParams{
AgentID: input.AgentID,
Channel: inbound.Channel,
AccountID: inbound.Account,
Peer: &routing.RoutePeer{
Kind: strings.ToLower(strings.TrimSpace(inbound.ChatType)),
ID: peerID,
},
})))
return uniqueAliases(aliases)
}
func uniqueAliases(aliases []string) []string {
if len(aliases) == 0 {
return nil
}
normalized := make([]string, 0, len(aliases))
seen := make(map[string]struct{}, len(aliases))
for _, alias := range aliases {
alias = strings.TrimSpace(strings.ToLower(alias))
if alias == "" {
continue
}
if _, ok := seen[alias]; ok {
continue
}
seen[alias] = struct{}{}
normalized = append(normalized, alias)
}
if len(normalized) == 0 {
return nil
}
return normalized
}
+57 -22
View File
@@ -3,28 +3,36 @@ package session
import (
"testing"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/routing"
)
func TestAllocateRouteSession_PerPeerDM(t *testing.T) {
allocation := AllocateRouteSession(AllocationInput{
AgentID: "main",
Channel: "telegram",
AccountID: "default",
Peer: &routing.RoutePeer{
Kind: "direct",
ID: "User123",
AgentID: "main",
Context: bus.InboundContext{
Channel: "telegram",
Account: "default",
ChatID: "dm-123",
ChatType: "direct",
SenderID: "User123",
},
SessionPolicy: routing.SessionPolicy{
DMScope: routing.DMScopePerPeer,
Dimensions: []string{"sender"},
},
})
if allocation.SessionKey != "agent:main:direct:user123" {
t.Fatalf("SessionKey = %q, want %q", allocation.SessionKey, "agent:main:direct:user123")
if allocation.SessionKey == "" || !IsOpaqueSessionKey(allocation.SessionKey) {
t.Fatalf("SessionKey = %q, want opaque session key", allocation.SessionKey)
}
if allocation.MainSessionKey != "agent:main:main" {
t.Fatalf("MainSessionKey = %q, want %q", allocation.MainSessionKey, "agent:main:main")
if !containsAlias(allocation.SessionAliases, "agent:main:direct:user123") {
t.Fatalf("SessionAliases = %v, want to contain agent:main:direct:user123", allocation.SessionAliases)
}
if allocation.MainSessionKey == "" || !IsOpaqueSessionKey(allocation.MainSessionKey) {
t.Fatalf("MainSessionKey = %q, want opaque session key", allocation.MainSessionKey)
}
if len(allocation.MainAliases) != 1 || allocation.MainAliases[0] != "agent:main:main" {
t.Fatalf("MainAliases = %v, want [agent:main:main]", allocation.MainAliases)
}
if allocation.Scope.Version != ScopeVersionV1 {
t.Fatalf("Scope.Version = %d, want %d", allocation.Scope.Version, ScopeVersionV1)
@@ -39,23 +47,30 @@ func TestAllocateRouteSession_PerPeerDM(t *testing.T) {
func TestAllocateRouteSession_GroupPeer(t *testing.T) {
allocation := AllocateRouteSession(AllocationInput{
AgentID: "main",
Channel: "slack",
AccountID: "workspace-a",
Peer: &routing.RoutePeer{
Kind: "channel",
ID: "C001",
AgentID: "main",
Context: bus.InboundContext{
Channel: "slack",
Account: "workspace-a",
ChatID: "C001",
ChatType: "channel",
SenderID: "U001",
},
SessionPolicy: routing.SessionPolicy{
DMScope: routing.DMScopePerAccountChannelPeer,
Dimensions: []string{"chat"},
},
})
if allocation.SessionKey != "agent:main:slack:channel:c001" {
t.Fatalf("SessionKey = %q, want %q", allocation.SessionKey, "agent:main:slack:channel:c001")
if allocation.SessionKey == "" || !IsOpaqueSessionKey(allocation.SessionKey) {
t.Fatalf("SessionKey = %q, want opaque session key", allocation.SessionKey)
}
if allocation.MainSessionKey != "agent:main:main" {
t.Fatalf("MainSessionKey = %q, want %q", allocation.MainSessionKey, "agent:main:main")
if !containsAlias(allocation.SessionAliases, "agent:main:slack:channel:c001") {
t.Fatalf("SessionAliases = %v, want to contain agent:main:slack:channel:c001", allocation.SessionAliases)
}
if allocation.MainSessionKey == "" || !IsOpaqueSessionKey(allocation.MainSessionKey) {
t.Fatalf("MainSessionKey = %q, want opaque session key", allocation.MainSessionKey)
}
if len(allocation.MainAliases) != 1 || allocation.MainAliases[0] != "agent:main:main" {
t.Fatalf("MainAliases = %v, want [agent:main:main]", allocation.MainAliases)
}
if len(allocation.Scope.Dimensions) != 1 || allocation.Scope.Dimensions[0] != "chat" {
t.Fatalf("Scope.Dimensions = %v, want [chat]", allocation.Scope.Dimensions)
@@ -64,3 +79,23 @@ func TestAllocateRouteSession_GroupPeer(t *testing.T) {
t.Fatalf("Scope.Values[chat] = %q, want channel:c001", allocation.Scope.Values["chat"])
}
}
func TestBuildOpaqueSessionKey_IsStable(t *testing.T) {
first := BuildOpaqueSessionKey("agent:main:direct:user123")
second := BuildOpaqueSessionKey("agent:main:direct:user123")
if first != second {
t.Fatalf("BuildOpaqueSessionKey() mismatch: %q != %q", first, second)
}
if !IsOpaqueSessionKey(first) {
t.Fatalf("expected opaque session key, got %q", first)
}
}
func containsAlias(aliases []string, want string) bool {
for _, alias := range aliases {
if alias == want {
return true
}
}
return false
}
+80 -1
View File
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"log"
"strings"
"github.com/sipeed/picoclaw/pkg/memory"
"github.com/sipeed/picoclaw/pkg/providers"
@@ -26,6 +27,7 @@ type metaAwareStore interface {
type MetadataAwareSessionStore interface {
EnsureSessionMetadata(sessionKey string, scope *SessionScope, aliases []string)
ResolveSessionKey(sessionKey string) string
GetSessionScope(sessionKey string) *SessionScope
}
// NewJSONLBackend wraps a memory.Store for use as a SessionStore.
@@ -62,6 +64,11 @@ func (b *JSONLBackend) EnsureSessionMetadata(sessionKey string, scope *SessionSc
if !ok {
return
}
sessionKey = strings.TrimSpace(sessionKey)
if sessionKey == "" {
return
}
var rawScope json.RawMessage
if scope != nil {
data, err := json.Marshal(scope)
@@ -71,9 +78,81 @@ func (b *JSONLBackend) EnsureSessionMetadata(sessionKey string, scope *SessionSc
}
rawScope = data
}
if err := metaStore.UpsertSessionMeta(context.Background(), sessionKey, rawScope, aliases); err != nil {
ctx := context.Background()
if err := metaStore.UpsertSessionMeta(ctx, sessionKey, rawScope, aliases); err != nil {
log.Printf("session: upsert session metadata: %v", err)
return
}
canonicalHistory, historyErr := b.store.GetHistory(ctx, sessionKey)
if historyErr != nil {
log.Printf("session: get canonical history: %v", historyErr)
return
}
canonicalSummary, summaryErr := b.store.GetSummary(ctx, sessionKey)
if summaryErr != nil {
log.Printf("session: get canonical summary: %v", summaryErr)
return
}
if len(canonicalHistory) > 0 || strings.TrimSpace(canonicalSummary) != "" {
return
}
for _, alias := range aliases {
alias = strings.TrimSpace(alias)
if alias == "" || alias == sessionKey {
continue
}
aliasHistory, err := b.store.GetHistory(ctx, alias)
if err != nil {
log.Printf("session: get alias history: %v", err)
continue
}
aliasSummary, err := b.store.GetSummary(ctx, alias)
if err != nil {
log.Printf("session: get alias summary: %v", err)
continue
}
if len(aliasHistory) == 0 && strings.TrimSpace(aliasSummary) == "" {
continue
}
if err := b.store.SetHistory(ctx, sessionKey, aliasHistory); err != nil {
log.Printf("session: promote alias history: %v", err)
return
}
if strings.TrimSpace(aliasSummary) != "" {
if err := b.store.SetSummary(ctx, sessionKey, aliasSummary); err != nil {
log.Printf("session: promote alias summary: %v", err)
}
}
if err := metaStore.UpsertSessionMeta(ctx, sessionKey, rawScope, aliases); err != nil {
log.Printf("session: refresh session metadata after promotion: %v", err)
}
return
}
}
// GetSessionScope reads structured scope metadata for a session key or alias.
func (b *JSONLBackend) GetSessionScope(sessionKey string) *SessionScope {
metaStore, ok := b.store.(metaAwareStore)
if !ok {
return nil
}
sessionKey = b.resolveSessionKey(sessionKey)
meta, err := metaStore.GetSessionMeta(context.Background(), sessionKey)
if err != nil {
log.Printf("session: get session metadata: %v", err)
return nil
}
if len(meta.Scope) == 0 {
return nil
}
var scope SessionScope
if err := json.Unmarshal(meta.Scope, &scope); err != nil {
log.Printf("session: decode session scope: %v", err)
return nil
}
return CloneScope(&scope)
}
func (b *JSONLBackend) AddMessage(sessionKey, role, content string) {
+36 -2
View File
@@ -181,7 +181,7 @@ func TestJSONLBackend_SummarizeFlow(t *testing.T) {
func TestJSONLBackend_ResolveAliasAndPersistMetadata(t *testing.T) {
b := newBackend(t)
b.EnsureSessionMetadata("canonical", &session.SessionScope{
scope := &session.SessionScope{
Version: session.ScopeVersionV1,
AgentID: "main",
Channel: "telegram",
@@ -190,7 +190,8 @@ func TestJSONLBackend_ResolveAliasAndPersistMetadata(t *testing.T) {
Values: map[string]string{
"chat": "group:c1",
},
}, []string{"legacy"})
}
b.EnsureSessionMetadata("canonical", scope, []string{"legacy"})
if got := b.ResolveSessionKey("legacy"); got != "canonical" {
t.Fatalf("ResolveSessionKey() = %q, want %q", got, "canonical")
@@ -204,4 +205,37 @@ func TestJSONLBackend_ResolveAliasAndPersistMetadata(t *testing.T) {
if history[0].Content != "hello through alias" {
t.Fatalf("history[0].Content = %q, want %q", history[0].Content, "hello through alias")
}
resolvedScope := b.GetSessionScope("legacy")
if resolvedScope == nil {
t.Fatal("GetSessionScope() returned nil")
}
if resolvedScope.AgentID != scope.AgentID || resolvedScope.Values["chat"] != scope.Values["chat"] {
t.Fatalf("GetSessionScope() = %+v, want %+v", resolvedScope, scope)
}
}
func TestJSONLBackend_EnsureSessionMetadata_PromotesLegacyAliasHistory(t *testing.T) {
b := newBackend(t)
legacyKey := "agent:main:direct:legacy-user"
b.AddMessage(legacyKey, "user", "legacy history")
b.SetSummary(legacyKey, "legacy summary")
canonicalKey := session.BuildOpaqueSessionKey(legacyKey)
b.EnsureSessionMetadata(canonicalKey, &session.SessionScope{
Version: session.ScopeVersionV1,
AgentID: "main",
}, []string{legacyKey})
if got := b.ResolveSessionKey(legacyKey); got != canonicalKey {
t.Fatalf("ResolveSessionKey() = %q, want %q", got, canonicalKey)
}
history := b.GetHistory(canonicalKey)
if len(history) != 1 || history[0].Content != "legacy history" {
t.Fatalf("promoted history = %+v", history)
}
if summary := b.GetSummary(canonicalKey); summary != "legacy summary" {
t.Fatalf("promoted summary = %q, want %q", summary, "legacy summary")
}
}
+52
View File
@@ -0,0 +1,52 @@
package session
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"strings"
)
const sessionKeyV1Prefix = "sk_v1_"
// BuildOpaqueSessionKey returns a stable opaque session key derived from a
// canonical alias string. The alias remains available through metadata for
// compatibility and migration purposes.
func BuildOpaqueSessionKey(alias string) string {
normalized := strings.TrimSpace(strings.ToLower(alias))
if normalized == "" {
return ""
}
sum := sha256.Sum256([]byte(normalized))
return sessionKeyV1Prefix + hex.EncodeToString(sum[:])
}
// IsOpaqueSessionKey returns true when the key matches the current opaque
// session-key format.
func IsOpaqueSessionKey(key string) bool {
return strings.HasPrefix(strings.ToLower(strings.TrimSpace(key)), sessionKeyV1Prefix)
}
// CanonicalScopeSignature returns a stable serialized representation of scope.
func CanonicalScopeSignature(scope SessionScope) string {
parts := []string{
fmt.Sprintf("v=%d", scope.Version),
fmt.Sprintf("agent=%s", strings.TrimSpace(strings.ToLower(scope.AgentID))),
fmt.Sprintf("channel=%s", strings.TrimSpace(strings.ToLower(scope.Channel))),
fmt.Sprintf("account=%s", strings.TrimSpace(strings.ToLower(scope.Account))),
}
for _, dimension := range scope.Dimensions {
dimension = strings.TrimSpace(strings.ToLower(dimension))
if dimension == "" {
continue
}
value := strings.TrimSpace(strings.ToLower(scope.Values[dimension]))
parts = append(parts, fmt.Sprintf("%s=%s", dimension, value))
}
return strings.Join(parts, "|")
}
// BuildSessionKey returns the current opaque key for a structured session scope.
func BuildSessionKey(scope SessionScope) string {
return BuildOpaqueSessionKey(CanonicalScopeSignature(scope))
}