refactor(inbound): add inbound context compatibility bridge

This commit is contained in:
Hoshina
2026-04-01 13:35:18 +08:00
parent 3b3f95c44c
commit 9cfa3c3ba6
6 changed files with 417 additions and 0 deletions
+3
View File
@@ -1241,6 +1241,7 @@ func (al *AgentLoop) ProcessDirectWithChannel(
Content: content,
SessionKey: sessionKey,
}
msg.Context = bus.ContextFromLegacyInbound(msg)
return al.processMessage(ctx, msg)
}
@@ -1276,6 +1277,8 @@ func (al *AgentLoop) ProcessHeartbeat(
}
func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
msg = bus.NormalizeInboundMessage(msg)
// Add message preview to log (show full content for error messages)
var logContent string
if strings.Contains(msg.Content, "Error:") || strings.Contains(msg.Content, "error") {
+1
View File
@@ -80,6 +80,7 @@ func publish[T any](ctx context.Context, mb *MessageBus, ch chan T, msg T) error
}
func (mb *MessageBus) PublishInbound(ctx context.Context, msg InboundMessage) error {
msg = NormalizeInboundMessage(msg)
return publish(ctx, mb, mb.inbound, msg)
}
+120
View File
@@ -34,6 +34,126 @@ func TestPublishConsume(t *testing.T) {
if got.Channel != "test" {
t.Fatalf("expected channel 'test', got %q", got.Channel)
}
if got.Context.Channel != "test" {
t.Fatalf("expected context channel 'test', got %q", got.Context.Channel)
}
if got.Context.ChatID != "chat1" {
t.Fatalf("expected context chat ID 'chat1', got %q", got.Context.ChatID)
}
if got.Context.SenderID != "user1" {
t.Fatalf("expected context sender ID 'user1', got %q", got.Context.SenderID)
}
}
func TestPublishInbound_NormalizesLegacyFieldsIntoContext(t *testing.T) {
mb := NewMessageBus()
defer mb.Close()
msg := InboundMessage{
Channel: "slack",
SenderID: "U123",
ChatID: "C456/1712",
Content: "hello",
MessageID: "1712.01",
Peer: Peer{Kind: "group", ID: "C456"},
Metadata: map[string]string{
"account_id": "workspace-a",
"team_id": "T001",
"reply_to_message_id": "1700.01",
"is_mentioned": "true",
"parent_peer_kind": "topic",
"parent_peer_id": "1712",
},
}
if err := mb.PublishInbound(context.Background(), msg); err != nil {
t.Fatalf("PublishInbound failed: %v", err)
}
got := <-mb.InboundChan()
if got.Context.Channel != "slack" {
t.Fatalf("expected context channel slack, got %q", got.Context.Channel)
}
if got.Context.Account != "workspace-a" {
t.Fatalf("expected context account workspace-a, got %q", got.Context.Account)
}
if got.Context.ChatType != "group" {
t.Fatalf("expected context chat type group, got %q", got.Context.ChatType)
}
if got.Context.TopicID != "1712" {
t.Fatalf("expected topic 1712, got %q", got.Context.TopicID)
}
if got.Context.SpaceType != "team" || got.Context.SpaceID != "T001" {
t.Fatalf("expected team space T001, got %q/%q", got.Context.SpaceType, got.Context.SpaceID)
}
if !got.Context.Mentioned {
t.Fatal("expected mentioned=true in context")
}
if got.Context.ReplyToMessageID != "1700.01" {
t.Fatalf("expected reply_to_message_id 1700.01, got %q", got.Context.ReplyToMessageID)
}
}
func TestPublishInbound_MirrorsContextIntoLegacyFields(t *testing.T) {
mb := NewMessageBus()
defer mb.Close()
msg := InboundMessage{
Context: InboundContext{
Channel: "telegram",
Account: "bot-a",
ChatID: "-1001",
ChatType: "group",
TopicID: "42",
SpaceID: "guild-9",
SpaceType: "guild",
SenderID: "user-1",
MessageID: "777",
Mentioned: true,
ReplyToMessageID: "666",
},
Content: "hi",
}
if err := mb.PublishInbound(context.Background(), msg); err != nil {
t.Fatalf("PublishInbound failed: %v", err)
}
got := <-mb.InboundChan()
if got.Channel != "telegram" {
t.Fatalf("expected legacy channel telegram, got %q", got.Channel)
}
if got.ChatID != "-1001" {
t.Fatalf("expected legacy chat ID -1001, got %q", got.ChatID)
}
if got.SenderID != "user-1" {
t.Fatalf("expected legacy sender ID user-1, got %q", got.SenderID)
}
if got.MessageID != "777" {
t.Fatalf("expected legacy message ID 777, got %q", got.MessageID)
}
if got.Peer.Kind != "group" || got.Peer.ID != "-1001" {
t.Fatalf("expected legacy peer group/-1001, got %q/%q", got.Peer.Kind, got.Peer.ID)
}
if got.Metadata["account_id"] != "bot-a" {
t.Fatalf("expected mirrored account_id bot-a, got %q", got.Metadata["account_id"])
}
if got.Metadata["guild_id"] != "guild-9" {
t.Fatalf("expected mirrored guild_id guild-9, got %q", got.Metadata["guild_id"])
}
if got.Metadata["parent_peer_kind"] != "topic" || got.Metadata["parent_peer_id"] != "42" {
t.Fatalf(
"expected mirrored topic parent peer, got %q/%q",
got.Metadata["parent_peer_kind"],
got.Metadata["parent_peer_id"],
)
}
if got.Metadata["reply_to_message_id"] != "666" {
t.Fatalf("expected mirrored reply_to_message_id 666, got %q", got.Metadata["reply_to_message_id"])
}
if got.Metadata["is_mentioned"] != "true" {
t.Fatalf("expected mirrored is_mentioned true, got %q", got.Metadata["is_mentioned"])
}
}
func TestPublishOutboundSubscribe(t *testing.T) {
+264
View File
@@ -0,0 +1,264 @@
package bus
import "strings"
const (
metadataKeyAccountID = "account_id"
metadataKeyGuildID = "guild_id"
metadataKeyTeamID = "team_id"
metadataKeyReplyToMessage = "reply_to_message_id"
metadataKeyReplyToSender = "reply_to_sender_id"
metadataKeyParentPeerKind = "parent_peer_kind"
metadataKeyParentPeerID = "parent_peer_id"
metadataKeyIsMentioned = "is_mentioned"
)
// ContextFromLegacyInbound builds a normalized inbound context from the legacy
// top-level fields on InboundMessage. This keeps older producers working while
// new producers migrate to writing Context directly.
func ContextFromLegacyInbound(msg InboundMessage) InboundContext {
ctx := InboundContext{
Channel: strings.TrimSpace(msg.Channel),
ChatID: strings.TrimSpace(msg.ChatID),
ChatType: normalizeKind(msg.Peer.Kind),
SenderID: firstNonEmpty(
strings.TrimSpace(msg.SenderID),
strings.TrimSpace(msg.Sender.CanonicalID),
strings.TrimSpace(msg.Sender.PlatformID),
),
MessageID: strings.TrimSpace(msg.MessageID),
Raw: cloneStringMap(msg.Metadata),
}
if account := metadataValue(msg.Metadata, metadataKeyAccountID); account != "" {
ctx.Account = account
}
if replyToMsgID := metadataValue(msg.Metadata, metadataKeyReplyToMessage); replyToMsgID != "" {
ctx.ReplyToMessageID = replyToMsgID
}
if replyToSenderID := metadataValue(msg.Metadata, metadataKeyReplyToSender); replyToSenderID != "" {
ctx.ReplyToSenderID = replyToSenderID
}
if isTruthy(metadataValue(msg.Metadata, metadataKeyIsMentioned)) {
ctx.Mentioned = true
}
parentKind := normalizeKind(metadataValue(msg.Metadata, metadataKeyParentPeerKind))
parentID := metadataValue(msg.Metadata, metadataKeyParentPeerID)
if parentKind == "topic" && parentID != "" {
ctx.TopicID = parentID
}
switch {
case metadataValue(msg.Metadata, metadataKeyGuildID) != "":
ctx.SpaceType = "guild"
ctx.SpaceID = metadataValue(msg.Metadata, metadataKeyGuildID)
case metadataValue(msg.Metadata, metadataKeyTeamID) != "":
ctx.SpaceType = "team"
ctx.SpaceID = metadataValue(msg.Metadata, metadataKeyTeamID)
}
return normalizeInboundContext(ctx)
}
// NormalizeInboundMessage ensures the normalized Context is present and mirrors
// missing legacy fields from it so older consumers continue to work during the
// migration period.
func NormalizeInboundMessage(msg InboundMessage) InboundMessage {
if msg.Context.isZero() {
msg.Context = ContextFromLegacyInbound(msg)
} else {
msg.Context = normalizeInboundContext(msg.Context)
}
if msg.Channel == "" {
msg.Channel = msg.Context.Channel
}
if msg.SenderID == "" {
msg.SenderID = msg.Context.SenderID
}
if msg.ChatID == "" {
msg.ChatID = msg.Context.ChatID
}
if msg.MessageID == "" {
msg.MessageID = msg.Context.MessageID
}
if msg.Peer.Kind == "" {
msg.Peer = peerFromContext(msg.Context)
}
msg.Metadata = mergeLegacyMetadata(msg.Metadata, msg.Context)
return msg
}
func (ctx InboundContext) isZero() bool {
return ctx.Channel == "" &&
ctx.Account == "" &&
ctx.ChatID == "" &&
ctx.ChatType == "" &&
ctx.TopicID == "" &&
ctx.SpaceID == "" &&
ctx.SpaceType == "" &&
ctx.SenderID == "" &&
ctx.MessageID == "" &&
!ctx.Mentioned &&
ctx.ReplyToMessageID == "" &&
ctx.ReplyToSenderID == "" &&
len(ctx.ReplyHandles) == 0 &&
len(ctx.Raw) == 0
}
func normalizeInboundContext(ctx InboundContext) InboundContext {
ctx.Channel = strings.TrimSpace(ctx.Channel)
ctx.Account = strings.TrimSpace(ctx.Account)
ctx.ChatID = strings.TrimSpace(ctx.ChatID)
ctx.ChatType = normalizeKind(ctx.ChatType)
ctx.TopicID = strings.TrimSpace(ctx.TopicID)
ctx.SpaceID = strings.TrimSpace(ctx.SpaceID)
ctx.SpaceType = normalizeKind(ctx.SpaceType)
ctx.SenderID = strings.TrimSpace(ctx.SenderID)
ctx.MessageID = strings.TrimSpace(ctx.MessageID)
ctx.ReplyToMessageID = strings.TrimSpace(ctx.ReplyToMessageID)
ctx.ReplyToSenderID = strings.TrimSpace(ctx.ReplyToSenderID)
ctx.ReplyHandles = cloneStringMap(ctx.ReplyHandles)
ctx.Raw = cloneStringMap(ctx.Raw)
return ctx
}
func peerFromContext(ctx InboundContext) Peer {
kind := normalizeKind(ctx.ChatType)
if kind == "" {
return Peer{}
}
switch kind {
case "direct":
return Peer{
Kind: "direct",
ID: firstNonEmpty(strings.TrimSpace(ctx.SenderID), strings.TrimSpace(ctx.ChatID)),
}
case "group", "channel":
return Peer{
Kind: kind,
ID: strings.TrimSpace(ctx.ChatID),
}
default:
return Peer{
Kind: kind,
ID: strings.TrimSpace(ctx.ChatID),
}
}
}
func mergeLegacyMetadata(existing map[string]string, ctx InboundContext) map[string]string {
merged := cloneStringMap(existing)
if len(merged) == 0 {
merged = cloneStringMap(ctx.Raw)
} else {
for k, v := range ctx.Raw {
if _, ok := merged[k]; !ok {
merged[k] = v
}
}
}
if ctx.Account != "" {
if merged == nil {
merged = make(map[string]string)
}
setMissing(merged, metadataKeyAccountID, ctx.Account)
}
if ctx.ReplyToMessageID != "" {
if merged == nil {
merged = make(map[string]string)
}
setMissing(merged, metadataKeyReplyToMessage, ctx.ReplyToMessageID)
}
if ctx.ReplyToSenderID != "" {
if merged == nil {
merged = make(map[string]string)
}
setMissing(merged, metadataKeyReplyToSender, ctx.ReplyToSenderID)
}
if ctx.Mentioned {
if merged == nil {
merged = make(map[string]string)
}
setMissing(merged, metadataKeyIsMentioned, "true")
}
if ctx.TopicID != "" {
if merged == nil {
merged = make(map[string]string)
}
setMissing(merged, metadataKeyParentPeerKind, "topic")
setMissing(merged, metadataKeyParentPeerID, ctx.TopicID)
}
switch normalizeKind(ctx.SpaceType) {
case "guild":
if merged == nil {
merged = make(map[string]string)
}
setMissing(merged, metadataKeyGuildID, ctx.SpaceID)
case "team", "workspace":
if merged == nil {
merged = make(map[string]string)
}
setMissing(merged, metadataKeyTeamID, ctx.SpaceID)
}
if len(merged) == 0 {
return nil
}
return merged
}
func setMissing(dst map[string]string, key, value string) {
if value == "" {
return
}
if _, ok := dst[key]; !ok {
dst[key] = value
}
}
func metadataValue(metadata map[string]string, key string) string {
if metadata == nil {
return ""
}
return strings.TrimSpace(metadata[key])
}
func cloneStringMap(src map[string]string) map[string]string {
if len(src) == 0 {
return nil
}
dst := make(map[string]string, len(src))
for k, v := range src {
dst[k] = v
}
return dst
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if value != "" {
return value
}
}
return ""
}
func normalizeKind(value string) string {
return strings.ToLower(strings.TrimSpace(value))
}
func isTruthy(value string) bool {
switch strings.ToLower(strings.TrimSpace(value)) {
case "1", "t", "true", "y", "yes", "on":
return true
default:
return false
}
}
+28
View File
@@ -15,11 +15,39 @@ type SenderInfo struct {
DisplayName string `json:"display_name,omitempty"` // display name
}
// InboundContext captures the normalized, platform-agnostic facts about an
// inbound message. This is the long-term source of truth for routing and
// session allocation. Legacy top-level fields on InboundMessage remain during
// the transition and are derived from this context when missing.
type InboundContext struct {
Channel string `json:"channel"`
Account string `json:"account,omitempty"`
ChatID string `json:"chat_id"`
ChatType string `json:"chat_type,omitempty"` // direct / group / channel
TopicID string `json:"topic_id,omitempty"`
SpaceID string `json:"space_id,omitempty"`
SpaceType string `json:"space_type,omitempty"` // guild / team / workspace / tenant
SenderID string `json:"sender_id"`
MessageID string `json:"message_id,omitempty"`
Mentioned bool `json:"mentioned,omitempty"`
ReplyToMessageID string `json:"reply_to_message_id,omitempty"`
ReplyToSenderID string `json:"reply_to_sender_id,omitempty"`
ReplyHandles map[string]string `json:"reply_handles,omitempty"`
Raw map[string]string `json:"raw,omitempty"`
}
type InboundMessage struct {
Channel string `json:"channel"`
SenderID string `json:"sender_id"`
Sender SenderInfo `json:"sender"`
ChatID string `json:"chat_id"`
Context InboundContext `json:"context"`
Content string `json:"content"`
Media []string `json:"media,omitempty"`
Peer Peer `json:"peer"` // routing peer
+1
View File
@@ -287,6 +287,7 @@ func (c *BaseChannel) HandleMessage(
MediaScope: scope,
Metadata: metadata,
}
msg.Context = bus.ContextFromLegacyInbound(msg)
// Auto-trigger typing indicator, message reaction, and placeholder before publishing.
// Each capability is independent — all three may fire for the same message.