From 56a060ff61167c03086a271340574e2f34d28721 Mon Sep 17 00:00:00 2001 From: hsohinna Date: Thu, 19 Feb 2026 14:39:35 +0800 Subject: [PATCH] feat(onebot): enhance OneBot channel (#192) * fix: change BotStatus type to json.RawMessage and add isAPIResponse function * feat(onebot): add rich media, API callback, keepalive and voice transcription Comprehensive improvements to the OneBot channel for better NapCatQQ compatibility: - Add echo-based API callback mechanism (sendAPIRequest) for request/response correlation via pending map - Add WebSocket ping/pong keepalive (30s ping, 60s read deadline) - Fetch bot self ID via get_login_info on connect/reconnect - Refactor parseMessageContentEx into parseMessageSegments supporting image, record, video, file, reply, face, forward segments - Add voice transcription via Groq transcriber (SetTranscriber) - Switch to message segment array format for sending with auto reply quote via lastMessageID tracking - Add message_sent event handling and detailed notice event processing (recall, poke, group increase/decrease, friend add, etc.) - Use sync/atomic for echoCounter, optimize listen() lock pattern - Clean up pending callbacks on Stop(), defer temp file cleanup - Mount Groq transcriber on OneBot channel in main.go gateway * feat(onebot): add user ID allowlist check for incoming messages - Currently, the agent does not respond to messages sent by users outside the allowlist. * refactor(onebot): simplify channel implementation and add emoji reaction - onebot.go from 1179 to 980 lines (~17%) --- cmd/picoclaw/main.go | 6 + pkg/channels/onebot.go | 707 +++++++++++++++++++++++++++++------------ 2 files changed, 504 insertions(+), 209 deletions(-) diff --git a/cmd/picoclaw/main.go b/cmd/picoclaw/main.go index 128f8c421..36bf2ea83 100644 --- a/cmd/picoclaw/main.go +++ b/cmd/picoclaw/main.go @@ -623,6 +623,12 @@ func gatewayCmd() { logger.InfoC("voice", "Groq transcription attached to Slack channel") } } + if onebotChannel, ok := channelManager.GetChannel("onebot"); ok { + if oc, ok := onebotChannel.(*channels.OneBotChannel); ok { + oc.SetTranscriber(transcriber) + logger.InfoC("voice", "Groq transcription attached to OneBot channel") + } + } } enabledChannels := channelManager.GetEnabledChannels() diff --git a/pkg/channels/onebot.go b/pkg/channels/onebot.go index 5d97fab9c..53e82b44d 100644 --- a/pkg/channels/onebot.go +++ b/pkg/channels/onebot.go @@ -4,9 +4,11 @@ import ( "context" "encoding/json" "fmt" + "os" "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/gorilla/websocket" @@ -14,20 +16,28 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/logger" + "github.com/sipeed/picoclaw/pkg/utils" + "github.com/sipeed/picoclaw/pkg/voice" ) type OneBotChannel struct { *BaseChannel - config config.OneBotConfig - conn *websocket.Conn - ctx context.Context - cancel context.CancelFunc - dedup map[string]struct{} - dedupRing []string - dedupIdx int - mu sync.Mutex - writeMu sync.Mutex - echoCounter int64 + config config.OneBotConfig + conn *websocket.Conn + ctx context.Context + cancel context.CancelFunc + dedup map[string]struct{} + dedupRing []string + dedupIdx int + mu sync.Mutex + writeMu sync.Mutex + echoCounter int64 + selfID int64 + pending map[string]chan json.RawMessage + pendingMu sync.Mutex + transcriber *voice.GroqTranscriber + lastMessageID sync.Map + pendingEmojiMsg sync.Map } type oneBotRawEvent struct { @@ -43,9 +53,11 @@ type oneBotRawEvent struct { SelfID json.RawMessage `json:"self_id"` Time json.RawMessage `json:"time"` MetaEventType string `json:"meta_event_type"` + NoticeType string `json:"notice_type"` Echo string `json:"echo"` RetCode json.RawMessage `json:"retcode"` - Status BotStatus `json:"status"` + Status json.RawMessage `json:"status"` + Data json.RawMessage `json:"data"` } type BotStatus struct { @@ -53,42 +65,36 @@ type BotStatus struct { Good bool `json:"good"` } +func isAPIResponse(raw json.RawMessage) bool { + if len(raw) == 0 { + return false + } + var s string + if json.Unmarshal(raw, &s) == nil { + return s == "ok" || s == "failed" + } + var bs BotStatus + if json.Unmarshal(raw, &bs) == nil { + return bs.Online || bs.Good + } + return false +} + type oneBotSender struct { UserID json.RawMessage `json:"user_id"` Nickname string `json:"nickname"` Card string `json:"card"` } -type oneBotEvent struct { - PostType string - MessageType string - SubType string - MessageID string - UserID int64 - GroupID int64 - Content string - RawContent string - IsBotMentioned bool - Sender oneBotSender - SelfID int64 - Time int64 - MetaEventType string -} - type oneBotAPIRequest struct { Action string `json:"action"` Params interface{} `json:"params"` Echo string `json:"echo,omitempty"` } -type oneBotSendPrivateMsgParams struct { - UserID int64 `json:"user_id"` - Message string `json:"message"` -} - -type oneBotSendGroupMsgParams struct { - GroupID int64 `json:"group_id"` - Message string `json:"message"` +type oneBotMessageSegment struct { + Type string `json:"type"` + Data map[string]interface{} `json:"data"` } func NewOneBotChannel(cfg config.OneBotConfig, messageBus *bus.MessageBus) (*OneBotChannel, error) { @@ -101,9 +107,30 @@ func NewOneBotChannel(cfg config.OneBotConfig, messageBus *bus.MessageBus) (*One dedup: make(map[string]struct{}, dedupSize), dedupRing: make([]string, dedupSize), dedupIdx: 0, + pending: make(map[string]chan json.RawMessage), }, nil } +func (c *OneBotChannel) SetTranscriber(transcriber *voice.GroqTranscriber) { + c.transcriber = transcriber +} + +func (c *OneBotChannel) setMsgEmojiLike(messageID string, emojiID int, set bool) { + go func() { + _, err := c.sendAPIRequest("set_msg_emoji_like", map[string]interface{}{ + "message_id": messageID, + "emoji_id": emojiID, + "set": set, + }, 5*time.Second) + if err != nil { + logger.DebugCF("onebot", "Failed to set emoji like", map[string]interface{}{ + "message_id": messageID, + "error": err.Error(), + }) + } + }() +} + func (c *OneBotChannel) Start(ctx context.Context) error { if c.config.WSUrl == "" { return fmt.Errorf("OneBot ws_url not configured") @@ -121,12 +148,12 @@ func (c *OneBotChannel) Start(ctx context.Context) error { }) } else { go c.listen() + c.fetchSelfID() } if c.config.ReconnectInterval > 0 { go c.reconnectLoop() } else { - // If reconnect is disabled but initial connection failed, we cannot recover if c.conn == nil { return fmt.Errorf("failed to connect to OneBot and reconnect is disabled") } @@ -152,14 +179,141 @@ func (c *OneBotChannel) connect() error { return err } + conn.SetPongHandler(func(appData string) error { + _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + c.mu.Lock() c.conn = conn c.mu.Unlock() + go c.pinger(conn) + logger.InfoC("onebot", "WebSocket connected") return nil } +func (c *OneBotChannel) pinger(conn *websocket.Conn) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.writeMu.Lock() + err := conn.WriteMessage(websocket.PingMessage, nil) + c.writeMu.Unlock() + if err != nil { + logger.DebugCF("onebot", "Ping write failed, stopping pinger", map[string]interface{}{ + "error": err.Error(), + }) + return + } + } + } +} + +func (c *OneBotChannel) fetchSelfID() { + resp, err := c.sendAPIRequest("get_login_info", nil, 5*time.Second) + if err != nil { + logger.WarnCF("onebot", "Failed to get_login_info", map[string]interface{}{ + "error": err.Error(), + }) + return + } + + type loginInfo struct { + UserID json.RawMessage `json:"user_id"` + Nickname string `json:"nickname"` + } + for _, extract := range []func() (*loginInfo, error){ + func() (*loginInfo, error) { + var w struct { + Data loginInfo `json:"data"` + } + err := json.Unmarshal(resp, &w) + return &w.Data, err + }, + func() (*loginInfo, error) { + var f loginInfo + err := json.Unmarshal(resp, &f) + return &f, err + }, + } { + info, err := extract() + if err != nil || len(info.UserID) == 0 { + continue + } + if uid, err := parseJSONInt64(info.UserID); err == nil && uid > 0 { + atomic.StoreInt64(&c.selfID, uid) + logger.InfoCF("onebot", "Bot self ID retrieved", map[string]interface{}{ + "self_id": uid, + "nickname": info.Nickname, + }) + return + } + } + + logger.WarnCF("onebot", "Could not parse self ID from get_login_info response", map[string]interface{}{ + "response": string(resp), + }) +} + +func (c *OneBotChannel) sendAPIRequest(action string, params interface{}, timeout time.Duration) (json.RawMessage, error) { + c.mu.Lock() + conn := c.conn + c.mu.Unlock() + + if conn == nil { + return nil, fmt.Errorf("WebSocket not connected") + } + + echo := fmt.Sprintf("api_%d_%d", time.Now().UnixNano(), atomic.AddInt64(&c.echoCounter, 1)) + + ch := make(chan json.RawMessage, 1) + c.pendingMu.Lock() + c.pending[echo] = ch + c.pendingMu.Unlock() + + defer func() { + c.pendingMu.Lock() + delete(c.pending, echo) + c.pendingMu.Unlock() + }() + + req := oneBotAPIRequest{ + Action: action, + Params: params, + Echo: echo, + } + + data, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal API request: %w", err) + } + + c.writeMu.Lock() + err = conn.WriteMessage(websocket.TextMessage, data) + c.writeMu.Unlock() + + if err != nil { + return nil, fmt.Errorf("failed to write API request: %w", err) + } + + select { + case resp := <-ch: + return resp, nil + case <-time.After(timeout): + return nil, fmt.Errorf("API request %s timed out after %v", action, timeout) + case <-c.ctx.Done(): + return nil, fmt.Errorf("context cancelled") + } +} + func (c *OneBotChannel) reconnectLoop() { interval := time.Duration(c.config.ReconnectInterval) * time.Second if interval < 5*time.Second { @@ -183,6 +337,7 @@ func (c *OneBotChannel) reconnectLoop() { }) } else { go c.listen() + c.fetchSelfID() } } } @@ -197,6 +352,13 @@ func (c *OneBotChannel) Stop(ctx context.Context) error { c.cancel() } + c.pendingMu.Lock() + for echo, ch := range c.pending { + close(ch) + delete(c.pending, echo) + } + c.pendingMu.Unlock() + c.mu.Lock() if c.conn != nil { c.conn.Close() @@ -225,10 +387,7 @@ func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error return err } - c.writeMu.Lock() - c.echoCounter++ - echo := fmt.Sprintf("send_%d", c.echoCounter) - c.writeMu.Unlock() + echo := fmt.Sprintf("send_%d", atomic.AddInt64(&c.echoCounter, 1)) req := oneBotAPIRequest{ Action: action, @@ -252,67 +411,78 @@ func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error return err } + if msgID, ok := c.pendingEmojiMsg.LoadAndDelete(msg.ChatID); ok { + if mid, ok := msgID.(string); ok && mid != "" { + c.setMsgEmojiLike(mid, 289, false) + } + } + return nil } +func (c *OneBotChannel) buildMessageSegments(chatID, content string) []oneBotMessageSegment { + var segments []oneBotMessageSegment + + if lastMsgID, ok := c.lastMessageID.Load(chatID); ok { + if msgID, ok := lastMsgID.(string); ok && msgID != "" { + segments = append(segments, oneBotMessageSegment{ + Type: "reply", + Data: map[string]interface{}{"id": msgID}, + }) + } + } + + segments = append(segments, oneBotMessageSegment{ + Type: "text", + Data: map[string]interface{}{"text": content}, + }) + + return segments +} + func (c *OneBotChannel) buildSendRequest(msg bus.OutboundMessage) (string, interface{}, error) { chatID := msg.ChatID + segments := c.buildMessageSegments(chatID, msg.Content) - if len(chatID) > 6 && chatID[:6] == "group:" { - groupID, err := strconv.ParseInt(chatID[6:], 10, 64) - if err != nil { - return "", nil, fmt.Errorf("invalid group ID in chatID: %s", chatID) - } - return "send_group_msg", oneBotSendGroupMsgParams{ - GroupID: groupID, - Message: msg.Content, - }, nil + var action, idKey string + var rawID string + if rest, ok := strings.CutPrefix(chatID, "group:"); ok { + action, idKey, rawID = "send_group_msg", "group_id", rest + } else if rest, ok := strings.CutPrefix(chatID, "private:"); ok { + action, idKey, rawID = "send_private_msg", "user_id", rest + } else { + action, idKey, rawID = "send_private_msg", "user_id", chatID } - if len(chatID) > 8 && chatID[:8] == "private:" { - userID, err := strconv.ParseInt(chatID[8:], 10, 64) - if err != nil { - return "", nil, fmt.Errorf("invalid user ID in chatID: %s", chatID) - } - return "send_private_msg", oneBotSendPrivateMsgParams{ - UserID: userID, - Message: msg.Content, - }, nil - } - - userID, err := strconv.ParseInt(chatID, 10, 64) + id, err := strconv.ParseInt(rawID, 10, 64) if err != nil { - return "", nil, fmt.Errorf("invalid chatID for OneBot: %s", chatID) + return "", nil, fmt.Errorf("invalid %s in chatID: %s", idKey, chatID) } - - return "send_private_msg", oneBotSendPrivateMsgParams{ - UserID: userID, - Message: msg.Content, - }, nil + return action, map[string]interface{}{idKey: id, "message": segments}, nil } func (c *OneBotChannel) listen() { + c.mu.Lock() + conn := c.conn + c.mu.Unlock() + + if conn == nil { + logger.WarnC("onebot", "WebSocket connection is nil, listener exiting") + return + } + for { select { case <-c.ctx.Done(): return default: - c.mu.Lock() - conn := c.conn - c.mu.Unlock() - - if conn == nil { - logger.WarnC("onebot", "WebSocket connection is nil, listener exiting") - return - } - _, message, err := conn.ReadMessage() if err != nil { logger.ErrorCF("onebot", "WebSocket read error", map[string]interface{}{ "error": err.Error(), }) c.mu.Lock() - if c.conn != nil { + if c.conn == conn { c.conn.Close() c.conn = nil } @@ -320,10 +490,7 @@ func (c *OneBotChannel) listen() { return } - logger.DebugCF("onebot", "Raw WebSocket message received", map[string]interface{}{ - "length": len(message), - "payload": string(message), - }) + _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) var raw oneBotRawEvent if err := json.Unmarshal(message, &raw); err != nil { @@ -334,20 +501,37 @@ func (c *OneBotChannel) listen() { continue } - if raw.Echo != "" || raw.Status.Online || raw.Status.Good { - logger.DebugCF("onebot", "Received API response, skipping", map[string]interface{}{ - "echo": raw.Echo, - "status": raw.Status, - }) + logger.DebugCF("onebot", "WebSocket event", map[string]interface{}{ + "length": len(message), + "post_type": raw.PostType, + "sub_type": raw.SubType, + }) + + if raw.Echo != "" { + c.pendingMu.Lock() + ch, ok := c.pending[raw.Echo] + c.pendingMu.Unlock() + + if ok { + select { + case ch <- message: + default: + } + } else { + logger.DebugCF("onebot", "Received API response (no waiter)", map[string]interface{}{ + "echo": raw.Echo, + "status": string(raw.Status), + }) + } continue } - logger.DebugCF("onebot", "Parsed raw event", map[string]interface{}{ - "post_type": raw.PostType, - "message_type": raw.MessageType, - "sub_type": raw.SubType, - "meta_event_type": raw.MetaEventType, - }) + if isAPIResponse(raw.Status) { + logger.DebugCF("onebot", "Received API response without echo, skipping", map[string]interface{}{ + "status": string(raw.Status), + }) + continue + } c.handleRawEvent(&raw) } @@ -386,9 +570,12 @@ func parseJSONString(raw json.RawMessage) string { type parseMessageResult struct { Text string IsBotMentioned bool + Media []string + LocalFiles []string + ReplyTo string } -func parseMessageContentEx(raw json.RawMessage, selfID int64) parseMessageResult { +func (c *OneBotChannel) parseMessageSegments(raw json.RawMessage, selfID int64) parseMessageResult { if len(raw) == 0 { return parseMessageResult{} } @@ -408,60 +595,155 @@ func parseMessageContentEx(raw json.RawMessage, selfID int64) parseMessageResult } var segments []map[string]interface{} - if err := json.Unmarshal(raw, &segments); err == nil { - var text string - mentioned := false - selfIDStr := strconv.FormatInt(selfID, 10) - for _, seg := range segments { - segType, _ := seg["type"].(string) - data, _ := seg["data"].(map[string]interface{}) - switch segType { - case "text": - if data != nil { - if t, ok := data["text"].(string); ok { - text += t - } + if err := json.Unmarshal(raw, &segments); err != nil { + return parseMessageResult{} + } + + var textParts []string + mentioned := false + selfIDStr := strconv.FormatInt(selfID, 10) + var media []string + var localFiles []string + var replyTo string + + for _, seg := range segments { + segType, _ := seg["type"].(string) + data, _ := seg["data"].(map[string]interface{}) + + switch segType { + case "text": + if data != nil { + if t, ok := data["text"].(string); ok { + textParts = append(textParts, t) } - case "at": - if data != nil && selfID > 0 { - qqVal := fmt.Sprintf("%v", data["qq"]) - if qqVal == selfIDStr || qqVal == "all" { - mentioned = true + } + + case "at": + if data != nil && selfID > 0 { + qqVal := fmt.Sprintf("%v", data["qq"]) + if qqVal == selfIDStr || qqVal == "all" { + mentioned = true + } + } + + case "image", "video", "file": + if data != nil { + url, _ := data["url"].(string) + if url != "" { + defaults := map[string]string{"image": "image.jpg", "video": "video.mp4", "file": "file"} + filename := defaults[segType] + if f, ok := data["file"].(string); ok && f != "" { + filename = f + } else if n, ok := data["name"].(string); ok && n != "" { + filename = n + } + localPath := utils.DownloadFile(url, filename, utils.DownloadOptions{ + LoggerPrefix: "onebot", + }) + if localPath != "" { + media = append(media, localPath) + localFiles = append(localFiles, localPath) + textParts = append(textParts, fmt.Sprintf("[%s]", segType)) } } } + + case "record": + if data != nil { + url, _ := data["url"].(string) + if url != "" { + localPath := utils.DownloadFile(url, "voice.amr", utils.DownloadOptions{ + LoggerPrefix: "onebot", + }) + if localPath != "" { + localFiles = append(localFiles, localPath) + if c.transcriber != nil && c.transcriber.IsAvailable() { + tctx, tcancel := context.WithTimeout(c.ctx, 30*time.Second) + result, err := c.transcriber.Transcribe(tctx, localPath) + tcancel() + if err != nil { + logger.WarnCF("onebot", "Voice transcription failed", map[string]interface{}{ + "error": err.Error(), + }) + textParts = append(textParts, "[voice (transcription failed)]") + media = append(media, localPath) + } else { + textParts = append(textParts, fmt.Sprintf("[voice transcription: %s]", result.Text)) + } + } else { + textParts = append(textParts, "[voice]") + media = append(media, localPath) + } + } + } + } + + case "reply": + if data != nil { + if id, ok := data["id"]; ok { + replyTo = fmt.Sprintf("%v", id) + } + } + + case "face": + if data != nil { + faceID, _ := data["id"] + textParts = append(textParts, fmt.Sprintf("[face:%v]", faceID)) + } + + case "forward": + textParts = append(textParts, "[forward message]") + + default: + } - return parseMessageResult{Text: strings.TrimSpace(text), IsBotMentioned: mentioned} } - return parseMessageResult{} + + return parseMessageResult{ + Text: strings.TrimSpace(strings.Join(textParts, "")), + IsBotMentioned: mentioned, + Media: media, + LocalFiles: localFiles, + ReplyTo: replyTo, + } } func (c *OneBotChannel) handleRawEvent(raw *oneBotRawEvent) { switch raw.PostType { case "message": - evt, err := c.normalizeMessageEvent(raw) - if err != nil { - logger.WarnCF("onebot", "Failed to normalize message event", map[string]interface{}{ - "error": err.Error(), - }) - return + if userID, err := parseJSONInt64(raw.UserID); err == nil && userID > 0 { + if !c.IsAllowed(strconv.FormatInt(userID, 10)) { + logger.DebugCF("onebot", "Message rejected by allowlist", map[string]interface{}{ + "user_id": userID, + }) + return + } } - c.handleMessage(evt) + c.handleMessage(raw) + + case "message_sent": + logger.DebugCF("onebot", "Bot sent message event", map[string]interface{}{ + "message_type": raw.MessageType, + "message_id": parseJSONString(raw.MessageID), + }) + case "meta_event": c.handleMetaEvent(raw) + case "notice": - logger.DebugCF("onebot", "Notice event received", map[string]interface{}{ - "sub_type": raw.SubType, - }) + c.handleNoticeEvent(raw) + case "request": logger.DebugCF("onebot", "Request event received", map[string]interface{}{ "sub_type": raw.SubType, }) + case "": logger.DebugCF("onebot", "Event with empty post_type (possibly API response)", map[string]interface{}{ "echo": raw.Echo, "status": raw.Status, }) + default: logger.DebugCF("onebot", "Unknown post_type", map[string]interface{}{ "post_type": raw.PostType, @@ -469,18 +751,51 @@ func (c *OneBotChannel) handleRawEvent(raw *oneBotRawEvent) { } } -func (c *OneBotChannel) normalizeMessageEvent(raw *oneBotRawEvent) (*oneBotEvent, error) { +func (c *OneBotChannel) handleMetaEvent(raw *oneBotRawEvent) { + if raw.MetaEventType == "lifecycle" { + logger.InfoCF("onebot", "Lifecycle event", map[string]interface{}{"sub_type": raw.SubType}) + } else if raw.MetaEventType != "heartbeat" { + logger.DebugCF("onebot", "Meta event: "+raw.MetaEventType, nil) + } +} + +func (c *OneBotChannel) handleNoticeEvent(raw *oneBotRawEvent) { + fields := map[string]interface{}{ + "notice_type": raw.NoticeType, + "sub_type": raw.SubType, + "group_id": parseJSONString(raw.GroupID), + "user_id": parseJSONString(raw.UserID), + "message_id": parseJSONString(raw.MessageID), + } + switch raw.NoticeType { + case "group_recall", "group_increase", "group_decrease", + "friend_add", "group_admin", "group_ban": + logger.InfoCF("onebot", "Notice: "+raw.NoticeType, fields) + default: + logger.DebugCF("onebot", "Notice: "+raw.NoticeType, fields) + } +} + +func (c *OneBotChannel) handleMessage(raw *oneBotRawEvent) { + // Parse fields from raw event userID, err := parseJSONInt64(raw.UserID) if err != nil { - return nil, fmt.Errorf("parse user_id: %w (raw: %s)", err, string(raw.UserID)) + logger.WarnCF("onebot", "Failed to parse user_id", map[string]interface{}{ + "error": err.Error(), + "raw": string(raw.UserID), + }) + return } groupID, _ := parseJSONInt64(raw.GroupID) selfID, _ := parseJSONInt64(raw.SelfID) - ts, _ := parseJSONInt64(raw.Time) messageID := parseJSONString(raw.MessageID) - parsed := parseMessageContentEx(raw.Message, selfID) + if selfID == 0 { + selfID = atomic.LoadInt64(&c.selfID) + } + + parsed := c.parseMessageSegments(raw.Message, selfID) isBotMentioned := parsed.IsBotMentioned content := raw.RawMessage @@ -495,6 +810,10 @@ func (c *OneBotChannel) normalizeMessageEvent(raw *oneBotRawEvent) (*oneBotEvent } } + if parsed.Text != "" && content != parsed.Text && (len(parsed.Media) > 0 || parsed.ReplyTo != "") { + content = parsed.Text + } + var sender oneBotSender if len(raw.Sender) > 0 { if err := json.Unmarshal(raw.Sender, &sender); err != nil { @@ -505,137 +824,107 @@ func (c *OneBotChannel) normalizeMessageEvent(raw *oneBotRawEvent) (*oneBotEvent } } - logger.DebugCF("onebot", "Normalized message event", map[string]interface{}{ - "message_type": raw.MessageType, - "user_id": userID, - "group_id": groupID, - "message_id": messageID, - "content_len": len(content), - "nickname": sender.Nickname, - }) - - return &oneBotEvent{ - PostType: raw.PostType, - MessageType: raw.MessageType, - SubType: raw.SubType, - MessageID: messageID, - UserID: userID, - GroupID: groupID, - Content: content, - RawContent: raw.RawMessage, - IsBotMentioned: isBotMentioned, - Sender: sender, - SelfID: selfID, - Time: ts, - MetaEventType: raw.MetaEventType, - }, nil -} - -func (c *OneBotChannel) handleMetaEvent(raw *oneBotRawEvent) { - switch raw.MetaEventType { - case "lifecycle": - logger.InfoCF("onebot", "Lifecycle event", map[string]interface{}{ - "sub_type": raw.SubType, - }) - case "heartbeat": - logger.DebugC("onebot", "Heartbeat received") - default: - logger.DebugCF("onebot", "Unknown meta_event_type", map[string]interface{}{ - "meta_event_type": raw.MetaEventType, - }) + // Clean up temp files when done + if len(parsed.LocalFiles) > 0 { + defer func() { + for _, f := range parsed.LocalFiles { + if err := os.Remove(f); err != nil { + logger.DebugCF("onebot", "Failed to remove temp file", map[string]interface{}{ + "path": f, + "error": err.Error(), + }) + } + } + }() } -} -func (c *OneBotChannel) handleMessage(evt *oneBotEvent) { - if c.isDuplicate(evt.MessageID) { + if c.isDuplicate(messageID) { logger.DebugCF("onebot", "Duplicate message, skipping", map[string]interface{}{ - "message_id": evt.MessageID, + "message_id": messageID, }) return } - content := evt.Content if content == "" { logger.DebugCF("onebot", "Received empty message, ignoring", map[string]interface{}{ - "message_id": evt.MessageID, + "message_id": messageID, }) return } - senderID := strconv.FormatInt(evt.UserID, 10) + senderID := strconv.FormatInt(userID, 10) var chatID string metadata := map[string]string{ - "message_id": evt.MessageID, + "message_id": messageID, } - switch evt.MessageType { + if parsed.ReplyTo != "" { + metadata["reply_to_message_id"] = parsed.ReplyTo + } + + switch raw.MessageType { case "private": chatID = "private:" + senderID - logger.InfoCF("onebot", "Received private message", map[string]interface{}{ - "sender": senderID, - "message_id": evt.MessageID, - "length": len(content), - "content": truncate(content, 100), - }) case "group": - groupIDStr := strconv.FormatInt(evt.GroupID, 10) + groupIDStr := strconv.FormatInt(groupID, 10) chatID = "group:" + groupIDStr metadata["group_id"] = groupIDStr - senderUserID, _ := parseJSONInt64(evt.Sender.UserID) + senderUserID, _ := parseJSONInt64(sender.UserID) if senderUserID > 0 { metadata["sender_user_id"] = strconv.FormatInt(senderUserID, 10) } - if evt.Sender.Card != "" { - metadata["sender_name"] = evt.Sender.Card - } else if evt.Sender.Nickname != "" { - metadata["sender_name"] = evt.Sender.Nickname + if sender.Card != "" { + metadata["sender_name"] = sender.Card + } else if sender.Nickname != "" { + metadata["sender_name"] = sender.Nickname } - triggered, strippedContent := c.checkGroupTrigger(content, evt.IsBotMentioned) + triggered, strippedContent := c.checkGroupTrigger(content, isBotMentioned) if !triggered { logger.DebugCF("onebot", "Group message ignored (no trigger)", map[string]interface{}{ "sender": senderID, "group": groupIDStr, - "is_mentioned": evt.IsBotMentioned, + "is_mentioned": isBotMentioned, "content": truncate(content, 100), }) return } content = strippedContent - logger.InfoCF("onebot", "Received group message", map[string]interface{}{ - "sender": senderID, - "group": groupIDStr, - "message_id": evt.MessageID, - "is_mentioned": evt.IsBotMentioned, - "length": len(content), - "content": truncate(content, 100), - }) - default: logger.WarnCF("onebot", "Unknown message type, cannot route", map[string]interface{}{ - "type": evt.MessageType, - "message_id": evt.MessageID, - "user_id": evt.UserID, + "type": raw.MessageType, + "message_id": messageID, + "user_id": userID, }) return } - if evt.Sender.Nickname != "" { - metadata["nickname"] = evt.Sender.Nickname - } - - logger.DebugCF("onebot", "Forwarding message to bus", map[string]interface{}{ - "sender_id": senderID, - "chat_id": chatID, - "content": truncate(content, 100), + logger.InfoCF("onebot", "Received "+raw.MessageType+" message", map[string]interface{}{ + "sender": senderID, + "chat_id": chatID, + "message_id": messageID, + "length": len(content), + "content": truncate(content, 100), + "media_count": len(parsed.Media), }) - c.HandleMessage(senderID, chatID, content, []string{}, metadata) + if sender.Nickname != "" { + metadata["nickname"] = sender.Nickname + } + + c.lastMessageID.Store(chatID, messageID) + + if raw.MessageType == "group" && messageID != "" && messageID != "0" { + c.setMsgEmojiLike(messageID, 289, true) + c.pendingEmojiMsg.Store(chatID, messageID) + } + + c.HandleMessage(senderID, chatID, content, parsed.Media, metadata) } func (c *OneBotChannel) isDuplicate(messageID string) bool {