diff --git a/pkg/config/config.go b/pkg/config/config.go index 4767fcfec..4970047cf 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1105,6 +1105,8 @@ func LoadConfig(path string) (*Config, error) { return nil, fmt.Errorf("unsupported config version: %d", versionInfo.Version) } + applyLegacyBindingsMigration(data, cfg) + if err = env.Parse(cfg); err != nil { return nil, err } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index bb90fb2c4..74e5cc9fe 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -288,6 +288,143 @@ func TestAgentConfig_ParsesDispatchRules(t *testing.T) { } } +func TestLoadConfig_MigratesLegacyBindingsToDispatchRules(t *testing.T) { + dir := t.TempDir() + configPath := filepath.Join(dir, "config.json") + raw := `{ + "version": 2, + "agents": { + "defaults": { + "workspace": "~/.picoclaw/workspace", + "model": "glm-4.7" + }, + "list": [ + { "id": "main", "default": true }, + { "id": "support" }, + { "id": "ops" }, + { "id": "slack" } + ] + }, + "bindings": [ + { + "agent_id": "support", + "match": { + "channel": "telegram", + "peer": { "kind": "group", "id": "-100123" } + } + }, + { + "agent_id": "ops", + "match": { + "channel": "discord", + "guild_id": "guild-1" + } + }, + { + "agent_id": "slack", + "match": { + "channel": "slack", + "account_id": "*" + } + } + ] + }` + if err := os.WriteFile(configPath, []byte(raw), 0o644); err != nil { + t.Fatalf("WriteFile(configPath): %v", err) + } + + cfg, err := LoadConfig(configPath) + if err != nil { + t.Fatalf("LoadConfig() error: %v", err) + } + if cfg.Agents.Dispatch == nil { + t.Fatal("Agents.Dispatch should not be nil") + } + if len(cfg.Agents.Dispatch.Rules) != 3 { + t.Fatalf("Dispatch.Rules len = %d, want 3", len(cfg.Agents.Dispatch.Rules)) + } + + first := cfg.Agents.Dispatch.Rules[0] + if first.Agent != "support" { + t.Fatalf("first.Agent = %q, want %q", first.Agent, "support") + } + if first.When.Channel != "telegram" || first.When.Chat != "group:-100123" { + t.Fatalf("first.When = %+v", first.When) + } + if first.When.Account != legacyDefaultAccountID { + t.Fatalf("first.When.Account = %q, want %q", first.When.Account, legacyDefaultAccountID) + } + + second := cfg.Agents.Dispatch.Rules[1] + if second.Agent != "ops" || second.When.Space != "guild:guild-1" { + t.Fatalf("second = %+v", second) + } + + third := cfg.Agents.Dispatch.Rules[2] + if third.Agent != "slack" { + t.Fatalf("third.Agent = %q, want %q", third.Agent, "slack") + } + if third.When.Channel != "slack" || third.When.Account != "" { + t.Fatalf("third.When = %+v", third.When) + } +} + +func TestLoadConfig_PrefersDispatchRulesOverLegacyBindings(t *testing.T) { + dir := t.TempDir() + configPath := filepath.Join(dir, "config.json") + raw := `{ + "version": 2, + "agents": { + "defaults": { + "workspace": "~/.picoclaw/workspace", + "model": "glm-4.7" + }, + "list": [ + { "id": "main", "default": true }, + { "id": "support" } + ], + "dispatch": { + "rules": [ + { + "name": "explicit", + "agent": "support", + "when": { + "channel": "telegram", + "chat": "group:-100123" + } + } + ] + } + }, + "bindings": [ + { + "agent_id": "main", + "match": { + "channel": "telegram", + "account_id": "*" + } + } + ] + }` + if err := os.WriteFile(configPath, []byte(raw), 0o644); err != nil { + t.Fatalf("WriteFile(configPath): %v", err) + } + + cfg, err := LoadConfig(configPath) + if err != nil { + t.Fatalf("LoadConfig() error: %v", err) + } + if cfg.Agents.Dispatch == nil { + t.Fatal("Agents.Dispatch should not be nil") + } + if len(cfg.Agents.Dispatch.Rules) != 1 { + t.Fatalf("Dispatch.Rules len = %d, want 1", len(cfg.Agents.Dispatch.Rules)) + } + if cfg.Agents.Dispatch.Rules[0].Name != "explicit" { + t.Fatalf("Dispatch.Rules[0].Name = %q, want %q", cfg.Agents.Dispatch.Rules[0].Name, "explicit") + } +} + // TestDefaultConfig_HeartbeatEnabled verifies heartbeat is enabled by default func TestDefaultConfig_HeartbeatEnabled(t *testing.T) { cfg := DefaultConfig() diff --git a/pkg/config/legacy_bindings.go b/pkg/config/legacy_bindings.go new file mode 100644 index 000000000..83fa08669 --- /dev/null +++ b/pkg/config/legacy_bindings.go @@ -0,0 +1,209 @@ +package config + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/sipeed/picoclaw/pkg/logger" +) + +const legacyDefaultAccountID = "default" + +type legacyBindingsEnvelope struct { + Bindings json.RawMessage `json:"bindings"` +} + +type legacyAgentBinding struct { + AgentID string `json:"agent_id"` + Match legacyBindingMatch `json:"match"` +} + +type legacyBindingMatch struct { + Channel string `json:"channel"` + AccountID string `json:"account_id,omitempty"` + Peer *legacyPeerMatch `json:"peer,omitempty"` + GuildID string `json:"guild_id,omitempty"` + TeamID string `json:"team_id,omitempty"` +} + +type legacyPeerMatch struct { + Kind string `json:"kind"` + ID string `json:"id"` +} + +func applyLegacyBindingsMigration(data []byte, cfg *Config) { + if cfg == nil { + return + } + + bindings, found, err := decodeLegacyBindings(data) + if err != nil { + logger.WarnF( + "legacy bindings config detected but could not be decoded", + map[string]any{"error": err}, + ) + return + } + if !found { + return + } + + if cfg.Agents.Dispatch != nil && len(cfg.Agents.Dispatch.Rules) > 0 { + logger.WarnF( + "legacy bindings config is deprecated and ignored because agents.dispatch.rules is configured", + map[string]any{"bindings": len(bindings), "dispatch_rules": len(cfg.Agents.Dispatch.Rules)}, + ) + return + } + + rules, dropped := migrateLegacyBindings(bindings) + if len(rules) == 0 { + logger.WarnF( + "legacy bindings config is deprecated and could not be migrated", + map[string]any{"bindings": len(bindings), "dropped_bindings": dropped}, + ) + return + } + + if cfg.Agents.Dispatch == nil { + cfg.Agents.Dispatch = &DispatchConfig{} + } + cfg.Agents.Dispatch.Rules = rules + + fields := map[string]any{ + "bindings": len(bindings), + "dispatch_rules": len(rules), + } + if dropped > 0 { + fields["dropped_bindings"] = dropped + } + logger.WarnF("legacy bindings config is deprecated; migrated to agents.dispatch.rules in memory", fields) +} + +func decodeLegacyBindings(data []byte) ([]legacyAgentBinding, bool, error) { + var envelope legacyBindingsEnvelope + if err := json.Unmarshal(data, &envelope); err != nil { + return nil, false, err + } + if len(envelope.Bindings) == 0 { + return nil, false, nil + } + + var bindings []legacyAgentBinding + if err := json.Unmarshal(envelope.Bindings, &bindings); err != nil { + return nil, true, err + } + return bindings, true, nil +} + +func migrateLegacyBindings(bindings []legacyAgentBinding) ([]DispatchRule, int) { + if len(bindings) == 0 { + return nil, 0 + } + + type prioritizedRule struct { + rule DispatchRule + index int + kind int + } + + prioritized := make([]prioritizedRule, 0, len(bindings)) + dropped := 0 + for i, binding := range bindings { + rule, kind, ok := migrateLegacyBinding(binding, i) + if !ok { + dropped++ + continue + } + prioritized = append(prioritized, prioritizedRule{rule: rule, index: i, kind: kind}) + } + if len(prioritized) == 0 { + return nil, dropped + } + + rules := make([]DispatchRule, 0, len(prioritized)) + for kind := 0; kind <= 4; kind++ { + for _, item := range prioritized { + if item.kind == kind { + rules = append(rules, item.rule) + } + } + } + return rules, dropped +} + +func migrateLegacyBinding(binding legacyAgentBinding, index int) (DispatchRule, int, bool) { + channel := strings.ToLower(strings.TrimSpace(binding.Match.Channel)) + agentID := strings.TrimSpace(binding.AgentID) + if channel == "" || agentID == "" { + return DispatchRule{}, 0, false + } + + rule := DispatchRule{ + Name: fmt.Sprintf("legacy-binding-%d", index+1), + Agent: agentID, + When: DispatchSelector{ + Channel: channel, + }, + } + + switch normalizeLegacyAccountSelector(binding.Match.AccountID) { + case "": + case "*": + default: + rule.When.Account = normalizeLegacyAccountSelector(binding.Match.AccountID) + } + + if peer := binding.Match.Peer; peer != nil { + peerKind := strings.ToLower(strings.TrimSpace(peer.Kind)) + peerID := strings.TrimSpace(peer.ID) + if peerID == "" { + return DispatchRule{}, 0, false + } + switch peerKind { + case "direct": + rule.When.Sender = peerID + return rule, 0, true + case "group", "channel": + rule.When.Chat = peerKind + ":" + peerID + return rule, 0, true + case "topic": + rule.When.Topic = "topic:" + peerID + return rule, 0, true + default: + return DispatchRule{}, 0, false + } + } + + if guildID := strings.TrimSpace(binding.Match.GuildID); guildID != "" { + rule.When.Space = "guild:" + guildID + return rule, 1, true + } + + if teamID := strings.TrimSpace(binding.Match.TeamID); teamID != "" { + rule.When.Space = "team:" + teamID + return rule, 2, true + } + + accountSelector := normalizeLegacyAccountSelector(binding.Match.AccountID) + if accountSelector == "*" { + rule.When.Account = "" + return rule, 4, true + } + + rule.When.Account = accountSelector + return rule, 3, true +} + +func normalizeLegacyAccountSelector(accountID string) string { + accountID = strings.TrimSpace(accountID) + switch accountID { + case "": + return legacyDefaultAccountID + case "*": + return "*" + default: + return strings.ToLower(accountID) + } +} diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index f6728330f..f6f9c50f0 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -224,33 +224,50 @@ func (s *JSONLStore) UpsertSessionMeta( } // ResolveSessionKey returns the canonical session key for a candidate key. -// It first checks direct key existence, then scans metadata aliases on miss. +// It short-circuits direct canonical keys when possible, then scans metadata +// once to resolve aliases or canonical metadata keys. func (s *JSONLStore) ResolveSessionKey(_ context.Context, sessionKey string) (string, bool, error) { sessionKey = strings.TrimSpace(sessionKey) if sessionKey == "" { return "", false, nil } + hasDirectSession := s.sessionExists(sessionKey) + if hasDirectSession && shouldShortCircuitSessionResolve(sessionKey) { + return sessionKey, true, nil + } + entries, err := os.ReadDir(s.dir) if err != nil { return "", false, fmt.Errorf("memory: read sessions dir: %w", err) } + var directMetaMatch string for _, entry := range entries { if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".meta.json") { continue } + data, readErr := os.ReadFile(filepath.Join(s.dir, entry.Name())) if readErr != nil { - return "", false, fmt.Errorf("memory: read meta: %w", readErr) + log.Printf("memory: skipping unreadable meta %s: %v", entry.Name(), readErr) + continue } + var meta SessionMeta if err := json.Unmarshal(data, &meta); err != nil { - return "", false, fmt.Errorf("memory: decode meta: %w", err) + log.Printf("memory: skipping corrupt meta %s: %v", entry.Name(), err) + continue } + if meta.Key == "" { continue } + + if meta.Key == sessionKey { + directMetaMatch = meta.Key + } + for _, alias := range meta.Aliases { if alias == sessionKey && meta.Key != sessionKey { return meta.Key, true, nil @@ -258,30 +275,25 @@ func (s *JSONLStore) ResolveSessionKey(_ context.Context, sessionKey string) (st } } - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".meta.json") { - continue - } - data, readErr := os.ReadFile(filepath.Join(s.dir, entry.Name())) - if readErr != nil { - return "", false, fmt.Errorf("memory: read meta: %w", readErr) - } - var meta SessionMeta - if err := json.Unmarshal(data, &meta); err != nil { - return "", false, fmt.Errorf("memory: decode meta: %w", err) - } - if meta.Key == sessionKey { - return meta.Key, true, nil - } + if directMetaMatch != "" { + return directMetaMatch, true, nil } - if s.sessionExists(sessionKey) { + if hasDirectSession { return sessionKey, true, nil } return "", false, nil } +func shouldShortCircuitSessionResolve(sessionKey string) bool { + sessionKey = strings.TrimSpace(strings.ToLower(sessionKey)) + if sessionKey == "" { + return false + } + return !strings.ContainsAny(sessionKey, ":/\\") +} + // readMessages reads valid JSON lines from a .jsonl file, skipping // the first `skip` lines without unmarshaling them. This avoids the // cost of json.Unmarshal on logically truncated messages. diff --git a/pkg/memory/jsonl_test.go b/pkg/memory/jsonl_test.go index 71ce8d866..b64c1b25f 100644 --- a/pkg/memory/jsonl_test.go +++ b/pkg/memory/jsonl_test.go @@ -322,6 +322,63 @@ func TestResolveSessionKeyByAlias_PrefersMetadataOverLegacyFile(t *testing.T) { } } +func TestResolveSessionKey_DirectHitSkipsCorruptMetadata(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + if err := store.AddMessage(ctx, "canonical", "user", "hello"); err != nil { + t.Fatalf("AddMessage() error = %v", err) + } + if err := os.WriteFile( + filepath.Join(store.dir, "broken.meta.json"), + []byte("{not-json"), + 0o644, + ); err != nil { + t.Fatalf("WriteFile(broken.meta.json) error = %v", err) + } + + resolved, found, err := store.ResolveSessionKey(ctx, "canonical") + if err != nil { + t.Fatalf("ResolveSessionKey() error = %v", err) + } + if !found { + t.Fatal("ResolveSessionKey() did not find direct session") + } + if resolved != "canonical" { + t.Fatalf("resolved = %q, want %q", resolved, "canonical") + } +} + +func TestResolveSessionKey_SkipsCorruptMetadataDuringAliasScan(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + if err := store.AddMessage(ctx, "canonical", "user", "hello"); err != nil { + t.Fatalf("AddMessage() error = %v", err) + } + if err := store.UpsertSessionMeta(ctx, "canonical", nil, []string{"legacy:key"}); err != nil { + t.Fatalf("UpsertSessionMeta() error = %v", err) + } + if err := os.WriteFile( + filepath.Join(store.dir, "broken.meta.json"), + []byte("{not-json"), + 0o644, + ); err != nil { + t.Fatalf("WriteFile(broken.meta.json) error = %v", err) + } + + resolved, found, err := store.ResolveSessionKey(ctx, "legacy:key") + if err != nil { + t.Fatalf("ResolveSessionKey() error = %v", err) + } + if !found { + t.Fatal("ResolveSessionKey() did not find alias") + } + if resolved != "canonical" { + t.Fatalf("resolved = %q, want %q", resolved, "canonical") + } +} + func TestTruncateHistory_KeepLast(t *testing.T) { store := newTestStore(t) ctx := context.Background()