feat(routing): add ordered dispatch rules

This commit is contained in:
Hoshina
2026-04-01 22:13:04 +08:00
parent 82bfe0d9a0
commit bef17d6453
6 changed files with 524 additions and 31 deletions
+74 -6
View File
@@ -755,12 +755,12 @@ func TestAppendEventContextFields_IncludesInboundRouteAndScope(t *testing.T) {
SenderID: "U123",
Mentioned: true,
},
Route: &routing.ResolvedRoute{
AgentID: "support",
Channel: "slack",
AccountID: "workspace-a",
MatchedBy: "default",
SessionPolicy: routing.SessionPolicy{
Route: &routing.ResolvedRoute{
AgentID: "support",
Channel: "slack",
AccountID: "workspace-a",
MatchedBy: "default",
SessionPolicy: routing.SessionPolicy{
Dimensions: []string{"chat", "sender"},
IdentityLinks: map[string][]string{
"canonical-user": {"slack:U123"},
@@ -853,6 +853,74 @@ func TestResolveMessageRoute_UsesInboundContextAccount(t *testing.T) {
}
}
func TestResolveMessageRoute_UsesDispatchRulesInOrder(t *testing.T) {
tmpDir := t.TempDir()
cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
ModelName: "test-model",
},
List: []config.AgentConfig{
{ID: "main", Default: true},
{ID: "support"},
{ID: "sales"},
},
Dispatch: &config.DispatchConfig{
Rules: []config.DispatchRule{
{
Name: "support-group",
Agent: "support",
When: config.DispatchSelector{
Channel: "telegram",
Chat: "group:-100123",
},
SessionDimensions: []string{"chat"},
},
{
Name: "vip-in-group",
Agent: "sales",
When: config.DispatchSelector{
Channel: "telegram",
Chat: "group:-100123",
Sender: "12345",
},
SessionDimensions: []string{"chat", "sender"},
},
},
},
},
Session: config.SessionConfig{
Dimensions: []string{"sender"},
},
}
msgBus := bus.NewMessageBus()
al := NewAgentLoop(cfg, msgBus, &simpleMockProvider{response: "ok"})
route, _, err := al.resolveMessageRoute(testInboundMessage(bus.InboundMessage{
Context: bus.InboundContext{
Channel: "telegram",
ChatID: "-100123",
ChatType: "group",
SenderID: "12345",
},
Content: "hello",
}))
if err != nil {
t.Fatalf("resolveMessageRoute() error = %v", err)
}
if route.AgentID != "support" {
t.Fatalf("AgentID = %q, want support", route.AgentID)
}
if route.MatchedBy != "dispatch.rule:support-group" {
t.Fatalf("MatchedBy = %q, want dispatch.rule:support-group", route.MatchedBy)
}
if got := route.SessionPolicy.Dimensions; len(got) != 1 || got[0] != "chat" {
t.Fatalf("SessionPolicy.Dimensions = %v, want [chat]", got)
}
}
func TestProcessMessage_MediaArtifactCanBeForwardedBySendFile(t *testing.T) {
tmpDir := t.TempDir()
cfg := config.DefaultConfig()
+24 -2
View File
@@ -117,8 +117,9 @@ func (c *Config) MarshalJSON() ([]byte, error) {
}
type AgentsConfig struct {
Defaults AgentDefaults `json:"defaults"`
List []AgentConfig `json:"list,omitempty"`
Defaults AgentDefaults `json:"defaults"`
List []AgentConfig `json:"list,omitempty"`
Dispatch *DispatchConfig `json:"dispatch,omitempty"`
}
// AgentModelConfig supports both string and structured model config.
@@ -175,6 +176,27 @@ type SubagentsConfig struct {
Model *AgentModelConfig `json:"model,omitempty"`
}
type DispatchConfig struct {
Rules []DispatchRule `json:"rules,omitempty"`
}
type DispatchRule struct {
Name string `json:"name,omitempty"`
Agent string `json:"agent"`
When DispatchSelector `json:"when"`
SessionDimensions []string `json:"session_dimensions,omitempty"`
}
type DispatchSelector struct {
Channel string `json:"channel,omitempty"`
Account string `json:"account,omitempty"`
Space string `json:"space,omitempty"`
Chat string `json:"chat,omitempty"`
Topic string `json:"topic,omitempty"`
Sender string `json:"sender,omitempty"`
Mentioned *bool `json:"mentioned,omitempty"`
}
type SessionConfig struct {
Dimensions []string `json:"dimensions,omitempty"`
IdentityLinks map[string][]string `json:"identity_links,omitempty"`
+57 -14
View File
@@ -126,16 +126,6 @@ func TestAgentConfig_FullParse(t *testing.T) {
}
]
},
"bindings": [
{
"agent_id": "support",
"match": {
"channel": "telegram",
"account_id": "*",
"peer": {"kind": "direct", "id": "user123"}
}
}
],
"session": {
"dimensions": ["sender"],
"identity_links": {
@@ -175,9 +165,9 @@ func TestAgentConfig_FullParse(t *testing.T) {
t.Errorf("support.Subagents = %+v", support.Subagents)
}
if len(cfg.Session.Dimensions) != 1 || cfg.Session.Dimensions[0] != "sender" {
t.Errorf("Session.Dimensions = %v", cfg.Session.Dimensions)
}
if len(cfg.Session.Dimensions) != 1 || cfg.Session.Dimensions[0] != "sender" {
t.Errorf("Session.Dimensions = %v", cfg.Session.Dimensions)
}
if len(cfg.Session.IdentityLinks) != 1 {
t.Errorf("Session.IdentityLinks = %v", cfg.Session.IdentityLinks)
}
@@ -209,6 +199,60 @@ func TestConfig_BackwardCompat_NoAgentsList(t *testing.T) {
}
}
func TestAgentConfig_ParsesDispatchRules(t *testing.T) {
jsonData := `{
"agents": {
"defaults": {
"workspace": "~/.picoclaw/workspace",
"model": "glm-4.7"
},
"list": [
{ "id": "main", "default": true },
{ "id": "support" }
],
"dispatch": {
"rules": [
{
"name": "support-vip",
"agent": "support",
"when": {
"channel": "telegram",
"chat": "group:-100123",
"sender": "12345",
"mentioned": true
},
"session_dimensions": ["chat", "sender"]
}
]
}
}
}`
cfg := DefaultConfig()
if err := json.Unmarshal([]byte(jsonData), cfg); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if cfg.Agents.Dispatch == nil {
t.Fatal("Agents.Dispatch should not be nil")
}
if len(cfg.Agents.Dispatch.Rules) != 1 {
t.Fatalf("Dispatch.Rules len = %d, want 1", len(cfg.Agents.Dispatch.Rules))
}
rule := cfg.Agents.Dispatch.Rules[0]
if rule.Name != "support-vip" || rule.Agent != "support" {
t.Fatalf("rule = %+v", rule)
}
if rule.When.Channel != "telegram" || rule.When.Chat != "group:-100123" || rule.When.Sender != "12345" {
t.Fatalf("rule.When = %+v", rule.When)
}
if rule.When.Mentioned == nil || !*rule.When.Mentioned {
t.Fatalf("rule.When.Mentioned = %+v, want true", rule.When.Mentioned)
}
if got := rule.SessionDimensions; len(got) != 2 || got[0] != "chat" || got[1] != "sender" {
t.Fatalf("rule.SessionDimensions = %v, want [chat sender]", got)
}
}
// TestDefaultConfig_HeartbeatEnabled verifies heartbeat is enabled by default
func TestDefaultConfig_HeartbeatEnabled(t *testing.T) {
cfg := DefaultConfig()
@@ -964,7 +1008,6 @@ func TestLoadConfig_TelegramPlaceholderTextAcceptsSingleString(t *testing.T) {
data := `{
"version": 1,
"agents": { "defaults": { "workspace": "", "model": "", "max_tokens": 0, "max_tool_iterations": 0 } },
"bindings": [],
"session": {},
"channels": {
"telegram": {
+185 -4
View File
@@ -1,6 +1,7 @@
package routing
import (
"fmt"
"strings"
"github.com/sipeed/picoclaw/pkg/bus"
@@ -19,7 +20,7 @@ type ResolvedRoute struct {
Channel string
AccountID string
SessionPolicy SessionPolicy
MatchedBy string // currently always "default" until the new binding system lands
MatchedBy string
}
// RouteResolver determines which agent handles a message.
@@ -38,12 +39,24 @@ func NewRouteResolver(cfg *config.Config) *RouteResolver {
func (r *RouteResolver) ResolveRoute(inbound bus.InboundContext) ResolvedRoute {
channel := strings.ToLower(strings.TrimSpace(inbound.Channel))
accountID := NormalizeAccountID(inbound.Account)
identityLinks := cloneIdentityLinks(r.cfg.Session.IdentityLinks)
view := buildDispatchView(inbound, identityLinks)
if rule := r.matchDispatchRule(view); rule != nil {
return ResolvedRoute{
AgentID: r.pickAgentID(rule.Agent),
Channel: channel,
AccountID: accountID,
SessionPolicy: r.sessionPolicy(rule),
MatchedBy: matchedByForRule(rule),
}
}
return ResolvedRoute{
AgentID: r.pickAgentID(r.resolveDefaultAgentID()),
Channel: channel,
AccountID: accountID,
SessionPolicy: r.sessionPolicy(),
SessionPolicy: r.sessionPolicy(nil),
MatchedBy: "default",
}
}
@@ -85,9 +98,13 @@ func (r *RouteResolver) resolveDefaultAgentID() string {
return DefaultAgentID
}
func (r *RouteResolver) sessionPolicy() SessionPolicy {
func (r *RouteResolver) sessionPolicy(rule *config.DispatchRule) SessionPolicy {
dimensions := r.cfg.Session.Dimensions
if rule != nil && len(rule.SessionDimensions) > 0 {
dimensions = rule.SessionDimensions
}
return SessionPolicy{
Dimensions: normalizeSessionDimensions(r.cfg.Session.Dimensions),
Dimensions: normalizeSessionDimensions(dimensions),
IdentityLinks: cloneIdentityLinks(r.cfg.Session.IdentityLinks),
}
}
@@ -130,3 +147,167 @@ func cloneIdentityLinks(src map[string][]string) map[string][]string {
}
return cloned
}
type dispatchView struct {
Channel string
Account string
Space string
Chat string
Topic string
Sender string
Mentioned bool
}
func (r *RouteResolver) matchDispatchRule(view dispatchView) *config.DispatchRule {
if r.cfg == nil || r.cfg.Agents.Dispatch == nil || len(r.cfg.Agents.Dispatch.Rules) == 0 {
return nil
}
for i := range r.cfg.Agents.Dispatch.Rules {
rule := &r.cfg.Agents.Dispatch.Rules[i]
if !selectorHasAnyConstraint(rule.When) {
continue
}
if ruleMatchesView(*rule, view) {
return rule
}
}
return nil
}
func ruleMatchesView(rule config.DispatchRule, view dispatchView) bool {
when := normalizeDispatchSelector(rule.When)
if when.Channel != "" && when.Channel != view.Channel {
return false
}
if when.Account != "" && when.Account != view.Account {
return false
}
if when.Space != "" && when.Space != view.Space {
return false
}
if when.Chat != "" && when.Chat != view.Chat {
return false
}
if when.Topic != "" && when.Topic != view.Topic {
return false
}
if when.Sender != "" && when.Sender != view.Sender {
return false
}
if when.Mentioned != nil && *when.Mentioned != view.Mentioned {
return false
}
return true
}
func matchedByForRule(rule *config.DispatchRule) string {
if rule == nil {
return "default"
}
name := strings.TrimSpace(rule.Name)
if name == "" {
return "dispatch.rule"
}
return "dispatch.rule:" + strings.ToLower(name)
}
func buildDispatchView(inbound bus.InboundContext, identityLinks map[string][]string) dispatchView {
view := dispatchView{
Channel: strings.ToLower(strings.TrimSpace(inbound.Channel)),
Account: NormalizeAccountID(inbound.Account),
Mentioned: inbound.Mentioned,
}
if spaceID := strings.TrimSpace(inbound.SpaceID); spaceID != "" {
spaceType := strings.ToLower(strings.TrimSpace(inbound.SpaceType))
if spaceType == "" {
spaceType = "space"
}
view.Space = fmt.Sprintf("%s:%s", spaceType, strings.ToLower(spaceID))
}
if chatID := strings.TrimSpace(inbound.ChatID); chatID != "" {
chatType := strings.ToLower(strings.TrimSpace(inbound.ChatType))
if chatType == "" {
chatType = "direct"
}
view.Chat = fmt.Sprintf("%s:%s", chatType, strings.ToLower(chatID))
}
if topicID := strings.TrimSpace(inbound.TopicID); topicID != "" {
view.Topic = "topic:" + strings.ToLower(topicID)
}
view.Sender = canonicalDispatchSenderID(inbound.Channel, inbound.SenderID, identityLinks)
return view
}
func normalizeDispatchSelector(selector config.DispatchSelector) config.DispatchSelector {
selector.Channel = strings.ToLower(strings.TrimSpace(selector.Channel))
selector.Account = NormalizeAccountID(selector.Account)
selector.Space = strings.ToLower(strings.TrimSpace(selector.Space))
selector.Chat = strings.ToLower(strings.TrimSpace(selector.Chat))
selector.Topic = strings.ToLower(strings.TrimSpace(selector.Topic))
selector.Sender = strings.ToLower(strings.TrimSpace(selector.Sender))
return selector
}
func selectorHasAnyConstraint(selector config.DispatchSelector) bool {
return strings.TrimSpace(selector.Channel) != "" ||
strings.TrimSpace(selector.Account) != "" ||
strings.TrimSpace(selector.Space) != "" ||
strings.TrimSpace(selector.Chat) != "" ||
strings.TrimSpace(selector.Topic) != "" ||
strings.TrimSpace(selector.Sender) != "" ||
selector.Mentioned != nil
}
func canonicalDispatchSenderID(channel, rawID string, identityLinks map[string][]string) string {
normalizedID := strings.TrimSpace(rawID)
if normalizedID == "" {
return ""
}
if linked := resolveLinkedDispatchID(identityLinks, channel, normalizedID); linked != "" {
normalizedID = linked
}
return strings.ToLower(normalizedID)
}
func resolveLinkedDispatchID(identityLinks map[string][]string, channel, peerID string) string {
if len(identityLinks) == 0 {
return ""
}
peerID = strings.TrimSpace(peerID)
if peerID == "" {
return ""
}
candidates := make(map[string]bool)
rawCandidate := strings.ToLower(peerID)
if rawCandidate != "" {
candidates[rawCandidate] = true
}
channel = strings.ToLower(strings.TrimSpace(channel))
if channel != "" {
candidates[fmt.Sprintf("%s:%s", channel, rawCandidate)] = true
}
if idx := strings.Index(rawCandidate, ":"); idx > 0 && idx < len(rawCandidate)-1 {
candidates[rawCandidate[idx+1:]] = true
}
for canonical, ids := range identityLinks {
canonicalName := strings.TrimSpace(canonical)
if canonicalName == "" {
continue
}
for _, id := range ids {
normalized := strings.ToLower(strings.TrimSpace(id))
if normalized != "" && candidates[normalized] {
return canonicalName
}
}
}
return ""
}
+116
View File
@@ -71,6 +71,122 @@ func TestResolveRoute_UsesNormalizedInboundContextFields(t *testing.T) {
}
}
func TestResolveRoute_DispatchFirstMatchWins(t *testing.T) {
cfg := testConfig([]config.AgentConfig{
{ID: "main", Default: true},
{ID: "support"},
{ID: "sales"},
})
cfg.Agents.Dispatch = &config.DispatchConfig{
Rules: []config.DispatchRule{
{
Name: "support-group",
Agent: "support",
When: config.DispatchSelector{
Channel: "telegram",
Chat: "group:-100123",
},
},
{
Name: "vip-in-group",
Agent: "sales",
When: config.DispatchSelector{
Channel: "telegram",
Chat: "group:-100123",
Sender: "12345",
},
},
},
}
r := NewRouteResolver(cfg)
route := r.ResolveRoute(bus.InboundContext{
Channel: "telegram",
ChatID: "-100123",
ChatType: "group",
SenderID: "12345",
})
if route.AgentID != "support" {
t.Fatalf("AgentID = %q, want support", route.AgentID)
}
if route.MatchedBy != "dispatch.rule:support-group" {
t.Fatalf("MatchedBy = %q, want dispatch.rule:support-group", route.MatchedBy)
}
}
func TestResolveRoute_DispatchOverridesSessionDimensions(t *testing.T) {
cfg := testConfig([]config.AgentConfig{
{ID: "main", Default: true},
{ID: "support"},
})
cfg.Session.Dimensions = []string{"chat"}
cfg.Agents.Dispatch = &config.DispatchConfig{
Rules: []config.DispatchRule{
{
Name: "support-dm",
Agent: "support",
When: config.DispatchSelector{
Channel: "telegram",
Chat: "direct:user-1",
},
SessionDimensions: []string{"chat", "sender"},
},
},
}
r := NewRouteResolver(cfg)
route := r.ResolveRoute(bus.InboundContext{
Channel: "telegram",
ChatID: "user-1",
ChatType: "direct",
SenderID: "user-1",
})
if route.AgentID != "support" {
t.Fatalf("AgentID = %q, want support", route.AgentID)
}
if got := route.SessionPolicy.Dimensions; len(got) != 2 || got[0] != "chat" || got[1] != "sender" {
t.Fatalf("SessionPolicy.Dimensions = %v, want [chat sender]", got)
}
}
func TestResolveRoute_DispatchMentionedRule(t *testing.T) {
cfg := testConfig([]config.AgentConfig{
{ID: "main", Default: true},
{ID: "support"},
})
mentioned := true
cfg.Agents.Dispatch = &config.DispatchConfig{
Rules: []config.DispatchRule{
{
Name: "slack-mentions",
Agent: "support",
When: config.DispatchSelector{
Channel: "slack",
Space: "workspace:t001",
Mentioned: &mentioned,
},
},
},
}
r := NewRouteResolver(cfg)
route := r.ResolveRoute(bus.InboundContext{
Channel: "slack",
ChatID: "C123",
ChatType: "channel",
SpaceID: "T001",
SpaceType: "workspace",
SenderID: "U123",
Mentioned: true,
})
if route.AgentID != "support" {
t.Fatalf("AgentID = %q, want support", route.AgentID)
}
}
func TestResolveRoute_InvalidAgentFallsToDefault(t *testing.T) {
agents := []config.AgentConfig{
{ID: "main", Default: true},