diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index d7461e76f..84b783985 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -1241,6 +1241,7 @@ func (al *AgentLoop) ProcessDirectWithChannel( Content: content, SessionKey: sessionKey, } + msg.Context = bus.ContextFromLegacyInbound(msg) return al.processMessage(ctx, msg) } @@ -1276,6 +1277,8 @@ func (al *AgentLoop) ProcessHeartbeat( } func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { + msg = bus.NormalizeInboundMessage(msg) + // Add message preview to log (show full content for error messages) var logContent string if strings.Contains(msg.Content, "Error:") || strings.Contains(msg.Content, "error") { diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 37fcb74c5..f6a339ff0 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -80,6 +80,7 @@ func publish[T any](ctx context.Context, mb *MessageBus, ch chan T, msg T) error } func (mb *MessageBus) PublishInbound(ctx context.Context, msg InboundMessage) error { + msg = NormalizeInboundMessage(msg) return publish(ctx, mb, mb.inbound, msg) } diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index 9b6324ca6..ab79c0d49 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -34,6 +34,126 @@ func TestPublishConsume(t *testing.T) { if got.Channel != "test" { t.Fatalf("expected channel 'test', got %q", got.Channel) } + if got.Context.Channel != "test" { + t.Fatalf("expected context channel 'test', got %q", got.Context.Channel) + } + if got.Context.ChatID != "chat1" { + t.Fatalf("expected context chat ID 'chat1', got %q", got.Context.ChatID) + } + if got.Context.SenderID != "user1" { + t.Fatalf("expected context sender ID 'user1', got %q", got.Context.SenderID) + } +} + +func TestPublishInbound_NormalizesLegacyFieldsIntoContext(t *testing.T) { + mb := NewMessageBus() + defer mb.Close() + + msg := InboundMessage{ + Channel: "slack", + SenderID: "U123", + ChatID: "C456/1712", + Content: "hello", + MessageID: "1712.01", + Peer: Peer{Kind: "group", ID: "C456"}, + Metadata: map[string]string{ + "account_id": "workspace-a", + "team_id": "T001", + "reply_to_message_id": "1700.01", + "is_mentioned": "true", + "parent_peer_kind": "topic", + "parent_peer_id": "1712", + }, + } + + if err := mb.PublishInbound(context.Background(), msg); err != nil { + t.Fatalf("PublishInbound failed: %v", err) + } + + got := <-mb.InboundChan() + if got.Context.Channel != "slack" { + t.Fatalf("expected context channel slack, got %q", got.Context.Channel) + } + if got.Context.Account != "workspace-a" { + t.Fatalf("expected context account workspace-a, got %q", got.Context.Account) + } + if got.Context.ChatType != "group" { + t.Fatalf("expected context chat type group, got %q", got.Context.ChatType) + } + if got.Context.TopicID != "1712" { + t.Fatalf("expected topic 1712, got %q", got.Context.TopicID) + } + if got.Context.SpaceType != "team" || got.Context.SpaceID != "T001" { + t.Fatalf("expected team space T001, got %q/%q", got.Context.SpaceType, got.Context.SpaceID) + } + if !got.Context.Mentioned { + t.Fatal("expected mentioned=true in context") + } + if got.Context.ReplyToMessageID != "1700.01" { + t.Fatalf("expected reply_to_message_id 1700.01, got %q", got.Context.ReplyToMessageID) + } +} + +func TestPublishInbound_MirrorsContextIntoLegacyFields(t *testing.T) { + mb := NewMessageBus() + defer mb.Close() + + msg := InboundMessage{ + Context: InboundContext{ + Channel: "telegram", + Account: "bot-a", + ChatID: "-1001", + ChatType: "group", + TopicID: "42", + SpaceID: "guild-9", + SpaceType: "guild", + SenderID: "user-1", + MessageID: "777", + Mentioned: true, + ReplyToMessageID: "666", + }, + Content: "hi", + } + + if err := mb.PublishInbound(context.Background(), msg); err != nil { + t.Fatalf("PublishInbound failed: %v", err) + } + + got := <-mb.InboundChan() + if got.Channel != "telegram" { + t.Fatalf("expected legacy channel telegram, got %q", got.Channel) + } + if got.ChatID != "-1001" { + t.Fatalf("expected legacy chat ID -1001, got %q", got.ChatID) + } + if got.SenderID != "user-1" { + t.Fatalf("expected legacy sender ID user-1, got %q", got.SenderID) + } + if got.MessageID != "777" { + t.Fatalf("expected legacy message ID 777, got %q", got.MessageID) + } + if got.Peer.Kind != "group" || got.Peer.ID != "-1001" { + t.Fatalf("expected legacy peer group/-1001, got %q/%q", got.Peer.Kind, got.Peer.ID) + } + if got.Metadata["account_id"] != "bot-a" { + t.Fatalf("expected mirrored account_id bot-a, got %q", got.Metadata["account_id"]) + } + if got.Metadata["guild_id"] != "guild-9" { + t.Fatalf("expected mirrored guild_id guild-9, got %q", got.Metadata["guild_id"]) + } + if got.Metadata["parent_peer_kind"] != "topic" || got.Metadata["parent_peer_id"] != "42" { + t.Fatalf( + "expected mirrored topic parent peer, got %q/%q", + got.Metadata["parent_peer_kind"], + got.Metadata["parent_peer_id"], + ) + } + if got.Metadata["reply_to_message_id"] != "666" { + t.Fatalf("expected mirrored reply_to_message_id 666, got %q", got.Metadata["reply_to_message_id"]) + } + if got.Metadata["is_mentioned"] != "true" { + t.Fatalf("expected mirrored is_mentioned true, got %q", got.Metadata["is_mentioned"]) + } } func TestPublishOutboundSubscribe(t *testing.T) { diff --git a/pkg/bus/inbound_context.go b/pkg/bus/inbound_context.go new file mode 100644 index 000000000..501f27be4 --- /dev/null +++ b/pkg/bus/inbound_context.go @@ -0,0 +1,264 @@ +package bus + +import "strings" + +const ( + metadataKeyAccountID = "account_id" + metadataKeyGuildID = "guild_id" + metadataKeyTeamID = "team_id" + metadataKeyReplyToMessage = "reply_to_message_id" + metadataKeyReplyToSender = "reply_to_sender_id" + metadataKeyParentPeerKind = "parent_peer_kind" + metadataKeyParentPeerID = "parent_peer_id" + metadataKeyIsMentioned = "is_mentioned" +) + +// ContextFromLegacyInbound builds a normalized inbound context from the legacy +// top-level fields on InboundMessage. This keeps older producers working while +// new producers migrate to writing Context directly. +func ContextFromLegacyInbound(msg InboundMessage) InboundContext { + ctx := InboundContext{ + Channel: strings.TrimSpace(msg.Channel), + ChatID: strings.TrimSpace(msg.ChatID), + ChatType: normalizeKind(msg.Peer.Kind), + SenderID: firstNonEmpty( + strings.TrimSpace(msg.SenderID), + strings.TrimSpace(msg.Sender.CanonicalID), + strings.TrimSpace(msg.Sender.PlatformID), + ), + MessageID: strings.TrimSpace(msg.MessageID), + Raw: cloneStringMap(msg.Metadata), + } + + if account := metadataValue(msg.Metadata, metadataKeyAccountID); account != "" { + ctx.Account = account + } + if replyToMsgID := metadataValue(msg.Metadata, metadataKeyReplyToMessage); replyToMsgID != "" { + ctx.ReplyToMessageID = replyToMsgID + } + if replyToSenderID := metadataValue(msg.Metadata, metadataKeyReplyToSender); replyToSenderID != "" { + ctx.ReplyToSenderID = replyToSenderID + } + if isTruthy(metadataValue(msg.Metadata, metadataKeyIsMentioned)) { + ctx.Mentioned = true + } + + parentKind := normalizeKind(metadataValue(msg.Metadata, metadataKeyParentPeerKind)) + parentID := metadataValue(msg.Metadata, metadataKeyParentPeerID) + if parentKind == "topic" && parentID != "" { + ctx.TopicID = parentID + } + + switch { + case metadataValue(msg.Metadata, metadataKeyGuildID) != "": + ctx.SpaceType = "guild" + ctx.SpaceID = metadataValue(msg.Metadata, metadataKeyGuildID) + case metadataValue(msg.Metadata, metadataKeyTeamID) != "": + ctx.SpaceType = "team" + ctx.SpaceID = metadataValue(msg.Metadata, metadataKeyTeamID) + } + + return normalizeInboundContext(ctx) +} + +// NormalizeInboundMessage ensures the normalized Context is present and mirrors +// missing legacy fields from it so older consumers continue to work during the +// migration period. +func NormalizeInboundMessage(msg InboundMessage) InboundMessage { + if msg.Context.isZero() { + msg.Context = ContextFromLegacyInbound(msg) + } else { + msg.Context = normalizeInboundContext(msg.Context) + } + + if msg.Channel == "" { + msg.Channel = msg.Context.Channel + } + if msg.SenderID == "" { + msg.SenderID = msg.Context.SenderID + } + if msg.ChatID == "" { + msg.ChatID = msg.Context.ChatID + } + if msg.MessageID == "" { + msg.MessageID = msg.Context.MessageID + } + if msg.Peer.Kind == "" { + msg.Peer = peerFromContext(msg.Context) + } + + msg.Metadata = mergeLegacyMetadata(msg.Metadata, msg.Context) + return msg +} + +func (ctx InboundContext) isZero() bool { + return ctx.Channel == "" && + ctx.Account == "" && + ctx.ChatID == "" && + ctx.ChatType == "" && + ctx.TopicID == "" && + ctx.SpaceID == "" && + ctx.SpaceType == "" && + ctx.SenderID == "" && + ctx.MessageID == "" && + !ctx.Mentioned && + ctx.ReplyToMessageID == "" && + ctx.ReplyToSenderID == "" && + len(ctx.ReplyHandles) == 0 && + len(ctx.Raw) == 0 +} + +func normalizeInboundContext(ctx InboundContext) InboundContext { + ctx.Channel = strings.TrimSpace(ctx.Channel) + ctx.Account = strings.TrimSpace(ctx.Account) + ctx.ChatID = strings.TrimSpace(ctx.ChatID) + ctx.ChatType = normalizeKind(ctx.ChatType) + ctx.TopicID = strings.TrimSpace(ctx.TopicID) + ctx.SpaceID = strings.TrimSpace(ctx.SpaceID) + ctx.SpaceType = normalizeKind(ctx.SpaceType) + ctx.SenderID = strings.TrimSpace(ctx.SenderID) + ctx.MessageID = strings.TrimSpace(ctx.MessageID) + ctx.ReplyToMessageID = strings.TrimSpace(ctx.ReplyToMessageID) + ctx.ReplyToSenderID = strings.TrimSpace(ctx.ReplyToSenderID) + ctx.ReplyHandles = cloneStringMap(ctx.ReplyHandles) + ctx.Raw = cloneStringMap(ctx.Raw) + return ctx +} + +func peerFromContext(ctx InboundContext) Peer { + kind := normalizeKind(ctx.ChatType) + if kind == "" { + return Peer{} + } + + switch kind { + case "direct": + return Peer{ + Kind: "direct", + ID: firstNonEmpty(strings.TrimSpace(ctx.SenderID), strings.TrimSpace(ctx.ChatID)), + } + case "group", "channel": + return Peer{ + Kind: kind, + ID: strings.TrimSpace(ctx.ChatID), + } + default: + return Peer{ + Kind: kind, + ID: strings.TrimSpace(ctx.ChatID), + } + } +} + +func mergeLegacyMetadata(existing map[string]string, ctx InboundContext) map[string]string { + merged := cloneStringMap(existing) + if len(merged) == 0 { + merged = cloneStringMap(ctx.Raw) + } else { + for k, v := range ctx.Raw { + if _, ok := merged[k]; !ok { + merged[k] = v + } + } + } + + if ctx.Account != "" { + if merged == nil { + merged = make(map[string]string) + } + setMissing(merged, metadataKeyAccountID, ctx.Account) + } + if ctx.ReplyToMessageID != "" { + if merged == nil { + merged = make(map[string]string) + } + setMissing(merged, metadataKeyReplyToMessage, ctx.ReplyToMessageID) + } + if ctx.ReplyToSenderID != "" { + if merged == nil { + merged = make(map[string]string) + } + setMissing(merged, metadataKeyReplyToSender, ctx.ReplyToSenderID) + } + if ctx.Mentioned { + if merged == nil { + merged = make(map[string]string) + } + setMissing(merged, metadataKeyIsMentioned, "true") + } + if ctx.TopicID != "" { + if merged == nil { + merged = make(map[string]string) + } + setMissing(merged, metadataKeyParentPeerKind, "topic") + setMissing(merged, metadataKeyParentPeerID, ctx.TopicID) + } + + switch normalizeKind(ctx.SpaceType) { + case "guild": + if merged == nil { + merged = make(map[string]string) + } + setMissing(merged, metadataKeyGuildID, ctx.SpaceID) + case "team", "workspace": + if merged == nil { + merged = make(map[string]string) + } + setMissing(merged, metadataKeyTeamID, ctx.SpaceID) + } + + if len(merged) == 0 { + return nil + } + return merged +} + +func setMissing(dst map[string]string, key, value string) { + if value == "" { + return + } + if _, ok := dst[key]; !ok { + dst[key] = value + } +} + +func metadataValue(metadata map[string]string, key string) string { + if metadata == nil { + return "" + } + return strings.TrimSpace(metadata[key]) +} + +func cloneStringMap(src map[string]string) map[string]string { + if len(src) == 0 { + return nil + } + + dst := make(map[string]string, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if value != "" { + return value + } + } + return "" +} + +func normalizeKind(value string) string { + return strings.ToLower(strings.TrimSpace(value)) +} + +func isTruthy(value string) bool { + switch strings.ToLower(strings.TrimSpace(value)) { + case "1", "t", "true", "y", "yes", "on": + return true + default: + return false + } +} diff --git a/pkg/bus/types.go b/pkg/bus/types.go index 12da3f1dd..0c4cd707b 100644 --- a/pkg/bus/types.go +++ b/pkg/bus/types.go @@ -15,11 +15,39 @@ type SenderInfo struct { DisplayName string `json:"display_name,omitempty"` // display name } +// InboundContext captures the normalized, platform-agnostic facts about an +// inbound message. This is the long-term source of truth for routing and +// session allocation. Legacy top-level fields on InboundMessage remain during +// the transition and are derived from this context when missing. +type InboundContext struct { + Channel string `json:"channel"` + Account string `json:"account,omitempty"` + + ChatID string `json:"chat_id"` + ChatType string `json:"chat_type,omitempty"` // direct / group / channel + TopicID string `json:"topic_id,omitempty"` + + SpaceID string `json:"space_id,omitempty"` + SpaceType string `json:"space_type,omitempty"` // guild / team / workspace / tenant + + SenderID string `json:"sender_id"` + MessageID string `json:"message_id,omitempty"` + + Mentioned bool `json:"mentioned,omitempty"` + + ReplyToMessageID string `json:"reply_to_message_id,omitempty"` + ReplyToSenderID string `json:"reply_to_sender_id,omitempty"` + + ReplyHandles map[string]string `json:"reply_handles,omitempty"` + Raw map[string]string `json:"raw,omitempty"` +} + type InboundMessage struct { Channel string `json:"channel"` SenderID string `json:"sender_id"` Sender SenderInfo `json:"sender"` ChatID string `json:"chat_id"` + Context InboundContext `json:"context"` Content string `json:"content"` Media []string `json:"media,omitempty"` Peer Peer `json:"peer"` // routing peer diff --git a/pkg/channels/base.go b/pkg/channels/base.go index bd4ced849..fd68ebcc2 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -287,6 +287,7 @@ func (c *BaseChannel) HandleMessage( MediaScope: scope, Metadata: metadata, } + msg.Context = bus.ContextFromLegacyInbound(msg) // Auto-trigger typing indicator, message reaction, and placeholder before publishing. // Each capability is independent — all three may fire for the same message.