diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index ebbeec0c1..091332d1a 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -728,6 +728,19 @@ func (al *AgentLoop) runLLMIteration( }) } + // If tool returned media refs, publish them as outbound media + if len(toolResult.Media) > 0 && opts.SendResponse { + parts := make([]bus.MediaPart, 0, len(toolResult.Media)) + for _, ref := range toolResult.Media { + parts = append(parts, bus.MediaPart{Ref: ref}) + } + al.bus.PublishOutboundMedia(ctx, bus.OutboundMediaMessage{ + Channel: opts.Channel, + ChatID: opts.ChatID, + Parts: parts, + }) + } + // Determine content for LLM based on tool result contentForLLM := toolResult.ForLLM if contentForLLM == "" && toolResult.Err != nil { diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 100ddc456..6a1c987b7 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -10,17 +10,19 @@ import ( var ErrBusClosed = errors.New("message bus closed") type MessageBus struct { - inbound chan InboundMessage - outbound chan OutboundMessage - done chan struct{} - closed atomic.Bool + inbound chan InboundMessage + outbound chan OutboundMessage + outboundMedia chan OutboundMediaMessage + done chan struct{} + closed atomic.Bool } func NewMessageBus() *MessageBus { return &MessageBus{ - inbound: make(chan InboundMessage, 100), - outbound: make(chan OutboundMessage, 100), - done: make(chan struct{}), + inbound: make(chan InboundMessage, 100), + outbound: make(chan OutboundMessage, 100), + outboundMedia: make(chan OutboundMediaMessage, 100), + done: make(chan struct{}), } } @@ -74,6 +76,31 @@ func (mb *MessageBus) SubscribeOutbound(ctx context.Context) (OutboundMessage, b } } +func (mb *MessageBus) PublishOutboundMedia(ctx context.Context, msg OutboundMediaMessage) error { + if mb.closed.Load() { + return ErrBusClosed + } + select { + case mb.outboundMedia <- msg: + return nil + case <-mb.done: + return ErrBusClosed + case <-ctx.Done(): + return ctx.Err() + } +} + +func (mb *MessageBus) SubscribeOutboundMedia(ctx context.Context) (OutboundMediaMessage, bool) { + select { + case msg, ok := <-mb.outboundMedia: + return msg, ok + case <-mb.done: + return OutboundMediaMessage{}, false + case <-ctx.Done(): + return OutboundMediaMessage{}, false + } +} + func (mb *MessageBus) Close() { if mb.closed.CompareAndSwap(false, true) { close(mb.done) diff --git a/pkg/bus/types.go b/pkg/bus/types.go index 358829c55..1a7a14170 100644 --- a/pkg/bus/types.go +++ b/pkg/bus/types.go @@ -24,3 +24,19 @@ type OutboundMessage struct { ChatID string `json:"chat_id"` Content string `json:"content"` } + +// MediaPart describes a single media attachment to send. +type MediaPart struct { + Type string `json:"type"` // "image" | "audio" | "video" | "file" + Ref string `json:"ref"` // media store ref, e.g. "media://abc123" + Caption string `json:"caption,omitempty"` // optional caption text + Filename string `json:"filename,omitempty"` // original filename hint + ContentType string `json:"content_type,omitempty"` // MIME type hint +} + +// OutboundMediaMessage carries media attachments from Agent to channels via the bus. +type OutboundMediaMessage struct { + Channel string `json:"channel"` + ChatID string `json:"chat_id"` + Parts []MediaPart `json:"parts"` +} diff --git a/pkg/channels/discord/discord.go b/pkg/channels/discord/discord.go index d5524f7f9..7987f45a9 100644 --- a/pkg/channels/discord/discord.go +++ b/pkg/channels/discord/discord.go @@ -3,6 +3,7 @@ package discord import ( "context" "fmt" + "os" "strings" "sync" "time" @@ -128,6 +129,103 @@ func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro return c.sendChunk(ctx, channelID, msg.Content) } +// SendMedia implements the channels.MediaSender interface. +func (c *DiscordChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { + c.stopTyping(msg.ChatID) + + if !c.IsRunning() { + return channels.ErrNotRunning + } + + channelID := msg.ChatID + if channelID == "" { + return fmt.Errorf("channel ID is empty") + } + + store := c.GetMediaStore() + if store == nil { + return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + } + + // Collect all files into a single ChannelMessageSendComplex call + files := make([]*discordgo.File, 0, len(msg.Parts)) + var caption string + + for _, part := range msg.Parts { + localPath, err := store.Resolve(part.Ref) + if err != nil { + logger.ErrorCF("discord", "Failed to resolve media ref", map[string]any{ + "ref": part.Ref, + "error": err.Error(), + }) + continue + } + + file, err := os.Open(localPath) + if err != nil { + logger.ErrorCF("discord", "Failed to open media file", map[string]any{ + "path": localPath, + "error": err.Error(), + }) + continue + } + // Note: discordgo reads from the Reader and we can't close it before send + + filename := part.Filename + if filename == "" { + filename = "file" + } + + files = append(files, &discordgo.File{ + Name: filename, + ContentType: part.ContentType, + Reader: file, + }) + + if part.Caption != "" && caption == "" { + caption = part.Caption + } + } + + if len(files) == 0 { + return nil + } + + sendCtx, cancel := context.WithTimeout(ctx, sendTimeout) + defer cancel() + + done := make(chan error, 1) + go func() { + _, err := c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{ + Content: caption, + Files: files, + }) + done <- err + }() + + select { + case err := <-done: + // Close all file readers + for _, f := range files { + if closer, ok := f.Reader.(*os.File); ok { + closer.Close() + } + } + if err != nil { + return fmt.Errorf("discord send media: %w", channels.ErrTemporary) + } + return nil + case <-sendCtx.Done(): + // Close all file readers + for _, f := range files { + if closer, ok := f.Reader.(*os.File); ok { + closer.Close() + } + } + return sendCtx.Err() + } +} + func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content string) error { // Use the passed ctx for timeout control sendCtx, cancel := context.WithTimeout(ctx, sendTimeout) diff --git a/pkg/channels/line/line.go b/pkg/channels/line/line.go index 6ae048468..5b0af4f1d 100644 --- a/pkg/channels/line/line.go +++ b/pkg/channels/line/line.go @@ -496,6 +496,36 @@ func (c *LINEChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { return c.sendPush(ctx, msg.ChatID, msg.Content, quoteToken) } +// SendMedia implements the channels.MediaSender interface. +// LINE requires media to be accessible via public URL; since we only have local files, +// we fall back to sending a text message with the filename/caption. +// For full support, an external file hosting service would be needed. +func (c *LINEChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { + if !c.IsRunning() { + return channels.ErrNotRunning + } + + store := c.GetMediaStore() + if store == nil { + return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + } + + // LINE Messaging API requires publicly accessible URLs for media messages. + // Since we only have local file paths, send caption text as fallback. + for _, part := range msg.Parts { + caption := part.Caption + if caption == "" { + caption = fmt.Sprintf("[%s: %s]", part.Type, part.Filename) + } + + if err := c.sendPush(ctx, msg.ChatID, caption, ""); err != nil { + return err + } + } + + return nil +} + // buildTextMessage creates a text message object, optionally with quoteToken. func buildTextMessage(content, quoteToken string) map[string]string { msg := map[string]string{ diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index dadc068e9..92412edeb 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -44,10 +44,12 @@ var channelRateConfig = map[string]float64{ } type channelWorker struct { - ch Channel - queue chan bus.OutboundMessage - done chan struct{} - limiter *rate.Limiter + ch Channel + queue chan bus.OutboundMessage + mediaQueue chan bus.OutboundMediaMessage + done chan struct{} + mediaDone chan struct{} + limiter *rate.Limiter } type Manager struct { @@ -239,10 +241,12 @@ func (m *Manager) StartAll(ctx context.Context) error { // Start per-channel workers for name, w := range m.workers { go m.runWorker(dispatchCtx, name, w) + go m.runMediaWorker(dispatchCtx, name, w) } // Start the dispatcher that reads from the bus and routes to workers go m.dispatchOutbound(dispatchCtx) + go m.dispatchOutboundMedia(dispatchCtx) // Start shared HTTP server if configured if m.httpServer != nil { @@ -293,6 +297,13 @@ func (m *Manager) StopAll(ctx context.Context) error { for _, w := range m.workers { <-w.done } + // Close all media worker queues and wait for them to drain + for _, w := range m.workers { + close(w.mediaQueue) + } + for _, w := range m.workers { + <-w.mediaDone + } // Stop all channels for name, channel := range m.channels { @@ -321,10 +332,12 @@ func newChannelWorker(name string, ch Channel) *channelWorker { burst := int(math.Max(1, math.Ceil(rateVal/2))) return &channelWorker{ - ch: ch, - queue: make(chan bus.OutboundMessage, defaultChannelQueueSize), - done: make(chan struct{}), - limiter: rate.NewLimiter(rate.Limit(rateVal), burst), + ch: ch, + queue: make(chan bus.OutboundMessage, defaultChannelQueueSize), + mediaQueue: make(chan bus.OutboundMediaMessage, defaultChannelQueueSize), + done: make(chan struct{}), + mediaDone: make(chan struct{}), + limiter: rate.NewLimiter(rate.Limit(rateVal), burst), } } @@ -457,6 +470,125 @@ func (m *Manager) dispatchOutbound(ctx context.Context) { } } +func (m *Manager) dispatchOutboundMedia(ctx context.Context) { + logger.InfoC("channels", "Outbound media dispatcher started") + + for { + select { + case <-ctx.Done(): + logger.InfoC("channels", "Outbound media dispatcher stopped") + return + default: + msg, ok := m.bus.SubscribeOutboundMedia(ctx) + if !ok { + continue + } + + // Silently skip internal channels + if constants.IsInternalChannel(msg.Channel) { + continue + } + + m.mu.RLock() + _, exists := m.channels[msg.Channel] + w, wExists := m.workers[msg.Channel] + m.mu.RUnlock() + + if !exists { + logger.WarnCF("channels", "Unknown channel for outbound media message", map[string]any{ + "channel": msg.Channel, + }) + continue + } + + if wExists { + select { + case w.mediaQueue <- msg: + case <-ctx.Done(): + return + } + } + } + } +} + +// runMediaWorker processes outbound media messages for a single channel. +func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWorker) { + defer close(w.mediaDone) + for { + select { + case msg, ok := <-w.mediaQueue: + if !ok { + return + } + m.sendMediaWithRetry(ctx, name, w, msg) + case <-ctx.Done(): + return + } + } +} + +// sendMediaWithRetry sends a media message through the channel with rate limiting and +// retry logic. If the channel does not implement MediaSender, it silently skips. +func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMediaMessage) { + ms, ok := w.ch.(MediaSender) + if !ok { + logger.DebugCF("channels", "Channel does not support MediaSender, skipping media", map[string]any{ + "channel": name, + }) + return + } + + // Rate limit: wait for token + if err := w.limiter.Wait(ctx); err != nil { + return + } + + var lastErr error + for attempt := 0; attempt <= maxRetries; attempt++ { + lastErr = ms.SendMedia(ctx, msg) + if lastErr == nil { + return + } + + // Permanent failures — don't retry + if errors.Is(lastErr, ErrNotRunning) || errors.Is(lastErr, ErrSendFailed) { + break + } + + // Last attempt exhausted — don't sleep + if attempt == maxRetries { + break + } + + // Rate limit error — fixed delay + if errors.Is(lastErr, ErrRateLimit) { + select { + case <-time.After(rateLimitDelay): + continue + case <-ctx.Done(): + return + } + } + + // ErrTemporary or unknown error — exponential backoff + backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff) + select { + case <-time.After(backoff): + case <-ctx.Done(): + return + } + } + + // All retries exhausted or permanent failure + logger.ErrorCF("channels", "SendMedia failed", map[string]any{ + "channel": name, + "chat_id": msg.ChatID, + "error": lastErr.Error(), + "retries": maxRetries, + }) +} + func (m *Manager) GetChannel(name string) (Channel, bool) { m.mu.RLock() defer m.mu.RUnlock() @@ -502,6 +634,8 @@ func (m *Manager) UnregisterChannel(name string) { if w, ok := m.workers[name]; ok { close(w.queue) <-w.done + close(w.mediaQueue) + <-w.mediaDone } delete(m.workers, name) delete(m.channels, name) diff --git a/pkg/channels/media.go b/pkg/channels/media.go new file mode 100644 index 000000000..c645a6180 --- /dev/null +++ b/pkg/channels/media.go @@ -0,0 +1,15 @@ +package channels + +import ( + "context" + + "github.com/sipeed/picoclaw/pkg/bus" +) + +// MediaSender is an optional interface for channels that can send +// media attachments (images, files, audio, video). +// Manager discovers channels implementing this interface via type +// assertion and routes OutboundMediaMessage to them. +type MediaSender interface { + SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error +} diff --git a/pkg/channels/onebot/onebot.go b/pkg/channels/onebot/onebot.go index 76950663e..fb357cf27 100644 --- a/pkg/channels/onebot/onebot.go +++ b/pkg/channels/onebot/onebot.go @@ -431,6 +431,117 @@ func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error return nil } +// SendMedia implements the channels.MediaSender interface. +func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { + if !c.IsRunning() { + return channels.ErrNotRunning + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + c.mu.Lock() + conn := c.conn + c.mu.Unlock() + + if conn == nil { + return fmt.Errorf("OneBot WebSocket not connected") + } + + store := c.GetMediaStore() + if store == nil { + return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + } + + // Build media segments + var segments []oneBotMessageSegment + for _, part := range msg.Parts { + localPath, err := store.Resolve(part.Ref) + if err != nil { + logger.ErrorCF("onebot", "Failed to resolve media ref", map[string]any{ + "ref": part.Ref, + "error": err.Error(), + }) + continue + } + + segType := "image" + switch part.Type { + case "image": + segType = "image" + case "video": + segType = "video" + case "audio": + segType = "record" + default: + segType = "file" + } + + segments = append(segments, oneBotMessageSegment{ + Type: segType, + Data: map[string]any{"file": "file://" + localPath}, + }) + + if part.Caption != "" { + segments = append(segments, oneBotMessageSegment{ + Type: "text", + Data: map[string]any{"text": part.Caption}, + }) + } + } + + if len(segments) == 0 { + return nil + } + + chatID := msg.ChatID + var action, idKey string + var rawID string + if rest, ok := strings.CutPrefix(chatID, "group:"); ok { + action, idKey, rawID = "send_group_msg", "group_id", rest + } else if rest, ok := strings.CutPrefix(chatID, "private:"); ok { + action, idKey, rawID = "send_private_msg", "user_id", rest + } else { + action, idKey, rawID = "send_private_msg", "user_id", chatID + } + + id, err := strconv.ParseInt(rawID, 10, 64) + if err != nil { + return fmt.Errorf("invalid %s in chatID: %s: %w", idKey, chatID, channels.ErrSendFailed) + } + + echo := fmt.Sprintf("send_%d", atomic.AddInt64(&c.echoCounter, 1)) + + req := oneBotAPIRequest{ + Action: action, + Params: map[string]any{idKey: id, "message": segments}, + Echo: echo, + } + + data, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("failed to marshal OneBot request: %w", err) + } + + c.writeMu.Lock() + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + err = conn.WriteMessage(websocket.TextMessage, data) + _ = conn.SetWriteDeadline(time.Time{}) + c.writeMu.Unlock() + + if err != nil { + logger.ErrorCF("onebot", "Failed to send media message", map[string]any{ + "error": err.Error(), + }) + return fmt.Errorf("onebot send media: %w", channels.ErrTemporary) + } + + return nil +} + func (c *OneBotChannel) buildMessageSegments(chatID, content string) []oneBotMessageSegment { var segments []oneBotMessageSegment diff --git a/pkg/channels/slack/slack.go b/pkg/channels/slack/slack.go index 9e066e00a..f2dda15ac 100644 --- a/pkg/channels/slack/slack.go +++ b/pkg/channels/slack/slack.go @@ -149,6 +149,60 @@ func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error return nil } +// SendMedia implements the channels.MediaSender interface. +func (c *SlackChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { + if !c.IsRunning() { + return channels.ErrNotRunning + } + + channelID, _ := parseSlackChatID(msg.ChatID) + if channelID == "" { + return fmt.Errorf("invalid slack chat ID: %s", msg.ChatID) + } + + store := c.GetMediaStore() + if store == nil { + return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + } + + for _, part := range msg.Parts { + localPath, err := store.Resolve(part.Ref) + if err != nil { + logger.ErrorCF("slack", "Failed to resolve media ref", map[string]any{ + "ref": part.Ref, + "error": err.Error(), + }) + continue + } + + filename := part.Filename + if filename == "" { + filename = "file" + } + + title := part.Caption + if title == "" { + title = filename + } + + _, err = c.api.UploadFileV2Context(ctx, slack.UploadFileV2Parameters{ + Channel: channelID, + File: localPath, + Filename: filename, + Title: title, + }) + if err != nil { + logger.ErrorCF("slack", "Failed to upload media", map[string]any{ + "filename": filename, + "error": err.Error(), + }) + return fmt.Errorf("slack send media: %w", channels.ErrTemporary) + } + } + + return nil +} + func (c *SlackChannel) eventLoop() { for { select { diff --git a/pkg/channels/telegram/telegram.go b/pkg/channels/telegram/telegram.go index a07eb6579..f9390b8ed 100644 --- a/pkg/channels/telegram/telegram.go +++ b/pkg/channels/telegram/telegram.go @@ -231,6 +231,91 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return nil } +// SendMedia implements the channels.MediaSender interface. +func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { + if !c.IsRunning() { + return channels.ErrNotRunning + } + + chatID, err := parseChatID(msg.ChatID) + if err != nil { + return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed) + } + + store := c.GetMediaStore() + if store == nil { + return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + } + + for _, part := range msg.Parts { + localPath, err := store.Resolve(part.Ref) + if err != nil { + logger.ErrorCF("telegram", "Failed to resolve media ref", map[string]any{ + "ref": part.Ref, + "error": err.Error(), + }) + continue + } + + file, err := os.Open(localPath) + if err != nil { + logger.ErrorCF("telegram", "Failed to open media file", map[string]any{ + "path": localPath, + "error": err.Error(), + }) + continue + } + + filename := part.Filename + if filename == "" { + filename = "file" + } + + switch part.Type { + case "image": + params := &telego.SendPhotoParams{ + ChatID: tu.ID(chatID), + Photo: telego.InputFile{File: file}, + Caption: part.Caption, + } + _, err = c.bot.SendPhoto(ctx, params) + case "audio": + params := &telego.SendAudioParams{ + ChatID: tu.ID(chatID), + Audio: telego.InputFile{File: file}, + Caption: part.Caption, + } + _, err = c.bot.SendAudio(ctx, params) + case "video": + params := &telego.SendVideoParams{ + ChatID: tu.ID(chatID), + Video: telego.InputFile{File: file}, + Caption: part.Caption, + } + _, err = c.bot.SendVideo(ctx, params) + default: // "file" or unknown types + params := &telego.SendDocumentParams{ + ChatID: tu.ID(chatID), + Document: telego.InputFile{File: file}, + Caption: part.Caption, + } + _, err = c.bot.SendDocument(ctx, params) + } + + file.Close() + + if err != nil { + logger.ErrorCF("telegram", "Failed to send media", map[string]any{ + "type": part.Type, + "error": err.Error(), + }) + return fmt.Errorf("telegram send media: %w", channels.ErrTemporary) + } + } + + return nil +} + func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Message) error { if message == nil { return fmt.Errorf("message is nil") diff --git a/pkg/channels/wecom/app.go b/pkg/channels/wecom/app.go index 52750505c..4c2a4d326 100644 --- a/pkg/channels/wecom/app.go +++ b/pkg/channels/wecom/app.go @@ -7,8 +7,11 @@ import ( "encoding/xml" "fmt" "io" + "mime/multipart" "net/http" "net/url" + "os" + "path/filepath" "strings" "sync" "time" @@ -187,6 +190,197 @@ func (c *WeComAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return c.sendTextMessage(ctx, accessToken, msg.ChatID, msg.Content) } +// SendMedia implements the channels.MediaSender interface. +func (c *WeComAppChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error { + if !c.IsRunning() { + return channels.ErrNotRunning + } + + accessToken := c.getAccessToken() + if accessToken == "" { + return fmt.Errorf("no valid access token available: %w", channels.ErrTemporary) + } + + store := c.GetMediaStore() + if store == nil { + return fmt.Errorf("no media store available: %w", channels.ErrSendFailed) + } + + for _, part := range msg.Parts { + localPath, err := store.Resolve(part.Ref) + if err != nil { + logger.ErrorCF("wecom_app", "Failed to resolve media ref", map[string]any{ + "ref": part.Ref, + "error": err.Error(), + }) + continue + } + + // Map part type to WeCom media type + mediaType := "file" + switch part.Type { + case "image": + mediaType = "image" + case "audio": + mediaType = "voice" + case "video": + mediaType = "video" + default: + mediaType = "file" + } + + // Upload media to get media_id + mediaID, err := c.uploadMedia(ctx, accessToken, mediaType, localPath) + if err != nil { + logger.ErrorCF("wecom_app", "Failed to upload media", map[string]any{ + "type": mediaType, + "error": err.Error(), + }) + // Fallback: send caption as text + if part.Caption != "" { + _ = c.sendTextMessage(ctx, accessToken, msg.ChatID, part.Caption) + } + continue + } + + // Send media message using the media_id + if mediaType == "image" { + err = c.sendImageMessage(ctx, accessToken, msg.ChatID, mediaID) + } else { + // For non-image types, send as text fallback with caption + caption := part.Caption + if caption == "" { + caption = fmt.Sprintf("[%s: %s]", part.Type, part.Filename) + } + err = c.sendTextMessage(ctx, accessToken, msg.ChatID, caption) + } + + if err != nil { + return err + } + } + + return nil +} + +// uploadMedia uploads a local file to WeCom temporary media storage. +func (c *WeComAppChannel) uploadMedia(ctx context.Context, accessToken, mediaType, localPath string) (string, error) { + apiURL := fmt.Sprintf("%s/cgi-bin/media/upload?access_token=%s&type=%s", + wecomAPIBase, url.QueryEscape(accessToken), url.QueryEscape(mediaType)) + + file, err := os.Open(localPath) + if err != nil { + return "", fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + + filename := filepath.Base(localPath) + formFile, err := writer.CreateFormFile("media", filename) + if err != nil { + return "", fmt.Errorf("failed to create form file: %w", err) + } + + if _, err = io.Copy(formFile, file); err != nil { + return "", fmt.Errorf("failed to copy file content: %w", err) + } + writer.Close() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiURL, body) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", writer.FormDataContentType()) + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return "", channels.ClassifyNetError(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return "", channels.ClassifySendError(resp.StatusCode, fmt.Errorf("wecom upload error: %s", string(respBody))) + } + + var result struct { + ErrCode int `json:"errcode"` + ErrMsg string `json:"errmsg"` + MediaID string `json:"media_id"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return "", fmt.Errorf("failed to parse upload response: %w", err) + } + + if result.ErrCode != 0 { + return "", fmt.Errorf("upload API error: %s (code: %d)", result.ErrMsg, result.ErrCode) + } + + return result.MediaID, nil +} + +// sendImageMessage sends an image message using a media_id. +func (c *WeComAppChannel) sendImageMessage(ctx context.Context, accessToken, userID, mediaID string) error { + apiURL := fmt.Sprintf("%s/cgi-bin/message/send?access_token=%s", wecomAPIBase, accessToken) + + msg := WeComImageMessage{ + ToUser: userID, + MsgType: "image", + AgentID: c.config.AgentID, + } + msg.Image.MediaID = mediaID + + jsonData, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + timeout := c.config.ReplyTimeout + if timeout <= 0 { + timeout = 5 + } + + reqCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, apiURL, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: time.Duration(timeout) * time.Second} + resp, err := client.Do(req) + if err != nil { + return channels.ClassifyNetError(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return channels.ClassifySendError(resp.StatusCode, fmt.Errorf("wecom_app API error: %s", string(respBody))) + } + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response: %w", err) + } + + var sendResp WeComSendMessageResponse + if err := json.Unmarshal(respBody, &sendResp); err != nil { + return fmt.Errorf("failed to parse response: %w", err) + } + + if sendResp.ErrCode != 0 { + return fmt.Errorf("API error: %s (code: %d)", sendResp.ErrMsg, sendResp.ErrCode) + } + + return nil +} + // WebhookPath returns the path for registering on the shared HTTP server. func (c *WeComAppChannel) WebhookPath() string { if c.config.WebhookPath != "" { diff --git a/pkg/tools/result.go b/pkg/tools/result.go index b13055b1c..cab833284 100644 --- a/pkg/tools/result.go +++ b/pkg/tools/result.go @@ -30,6 +30,10 @@ type ToolResult struct { // Err is the underlying error (not JSON serialized). // Used for internal error handling and logging. Err error `json:"-"` + + // Media contains media store refs produced by this tool. + // When non-empty, the agent will publish these as OutboundMediaMessage. + Media []string `json:"media,omitempty"` } // NewToolResult creates a basic ToolResult with content for the LLM. @@ -120,6 +124,19 @@ func UserResult(content string) *ToolResult { } } +// MediaResult creates a ToolResult with media refs for the user. +// The agent will publish these refs as OutboundMediaMessage. +// +// Example: +// +// result := MediaResult("Image generated successfully", []string{"media://abc123"}) +func MediaResult(forLLM string, mediaRefs []string) *ToolResult { + return &ToolResult{ + ForLLM: forLLM, + Media: mediaRefs, + } +} + // MarshalJSON implements custom JSON serialization. // The Err field is excluded from JSON output via the json:"-" tag. func (tr *ToolResult) MarshalJSON() ([]byte, error) {