mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
feat(channels): add MediaSender optional interface for outbound media
Add outbound media sending capability so the agent can publish media attachments (images, files, audio, video) through channels via the bus. - Add MediaPart and OutboundMediaMessage types to bus - Add PublishOutboundMedia/SubscribeOutboundMedia bus methods - Add MediaSender interface discovered via type assertion by Manager - Add media dispatch/worker in Manager with shared retry logic - Extend ToolResult with Media field and MediaResult constructor - Publish outbound media from agent loop on tool results - Implement SendMedia for Telegram, Discord, Slack, LINE, OneBot, WeCom
This commit is contained in:
@@ -727,6 +727,19 @@ func (al *AgentLoop) runLLMIteration(
|
||||
})
|
||||
}
|
||||
|
||||
// If tool returned media refs, publish them as outbound media
|
||||
if len(toolResult.Media) > 0 && opts.SendResponse {
|
||||
parts := make([]bus.MediaPart, 0, len(toolResult.Media))
|
||||
for _, ref := range toolResult.Media {
|
||||
parts = append(parts, bus.MediaPart{Ref: ref})
|
||||
}
|
||||
al.bus.PublishOutboundMedia(ctx, bus.OutboundMediaMessage{
|
||||
Channel: opts.Channel,
|
||||
ChatID: opts.ChatID,
|
||||
Parts: parts,
|
||||
})
|
||||
}
|
||||
|
||||
// Determine content for LLM based on tool result
|
||||
contentForLLM := toolResult.ForLLM
|
||||
if contentForLLM == "" && toolResult.Err != nil {
|
||||
|
||||
+34
-7
@@ -10,17 +10,19 @@ import (
|
||||
var ErrBusClosed = errors.New("message bus closed")
|
||||
|
||||
type MessageBus struct {
|
||||
inbound chan InboundMessage
|
||||
outbound chan OutboundMessage
|
||||
done chan struct{}
|
||||
closed atomic.Bool
|
||||
inbound chan InboundMessage
|
||||
outbound chan OutboundMessage
|
||||
outboundMedia chan OutboundMediaMessage
|
||||
done chan struct{}
|
||||
closed atomic.Bool
|
||||
}
|
||||
|
||||
func NewMessageBus() *MessageBus {
|
||||
return &MessageBus{
|
||||
inbound: make(chan InboundMessage, 100),
|
||||
outbound: make(chan OutboundMessage, 100),
|
||||
done: make(chan struct{}),
|
||||
inbound: make(chan InboundMessage, 100),
|
||||
outbound: make(chan OutboundMessage, 100),
|
||||
outboundMedia: make(chan OutboundMediaMessage, 100),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,6 +76,31 @@ func (mb *MessageBus) SubscribeOutbound(ctx context.Context) (OutboundMessage, b
|
||||
}
|
||||
}
|
||||
|
||||
func (mb *MessageBus) PublishOutboundMedia(ctx context.Context, msg OutboundMediaMessage) error {
|
||||
if mb.closed.Load() {
|
||||
return ErrBusClosed
|
||||
}
|
||||
select {
|
||||
case mb.outboundMedia <- msg:
|
||||
return nil
|
||||
case <-mb.done:
|
||||
return ErrBusClosed
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (mb *MessageBus) SubscribeOutboundMedia(ctx context.Context) (OutboundMediaMessage, bool) {
|
||||
select {
|
||||
case msg, ok := <-mb.outboundMedia:
|
||||
return msg, ok
|
||||
case <-mb.done:
|
||||
return OutboundMediaMessage{}, false
|
||||
case <-ctx.Done():
|
||||
return OutboundMediaMessage{}, false
|
||||
}
|
||||
}
|
||||
|
||||
func (mb *MessageBus) Close() {
|
||||
if mb.closed.CompareAndSwap(false, true) {
|
||||
close(mb.done)
|
||||
|
||||
@@ -24,3 +24,19 @@ type OutboundMessage struct {
|
||||
ChatID string `json:"chat_id"`
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
// MediaPart describes a single media attachment to send.
|
||||
type MediaPart struct {
|
||||
Type string `json:"type"` // "image" | "audio" | "video" | "file"
|
||||
Ref string `json:"ref"` // media store ref, e.g. "media://abc123"
|
||||
Caption string `json:"caption,omitempty"` // optional caption text
|
||||
Filename string `json:"filename,omitempty"` // original filename hint
|
||||
ContentType string `json:"content_type,omitempty"` // MIME type hint
|
||||
}
|
||||
|
||||
// OutboundMediaMessage carries media attachments from Agent to channels via the bus.
|
||||
type OutboundMediaMessage struct {
|
||||
Channel string `json:"channel"`
|
||||
ChatID string `json:"chat_id"`
|
||||
Parts []MediaPart `json:"parts"`
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package discord
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -128,6 +129,103 @@ func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro
|
||||
return c.sendChunk(ctx, channelID, msg.Content)
|
||||
}
|
||||
|
||||
// SendMedia implements the channels.MediaSender interface.
|
||||
func (c *DiscordChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
|
||||
c.stopTyping(msg.ChatID)
|
||||
|
||||
if !c.IsRunning() {
|
||||
return channels.ErrNotRunning
|
||||
}
|
||||
|
||||
channelID := msg.ChatID
|
||||
if channelID == "" {
|
||||
return fmt.Errorf("channel ID is empty")
|
||||
}
|
||||
|
||||
store := c.GetMediaStore()
|
||||
if store == nil {
|
||||
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
|
||||
}
|
||||
|
||||
// Collect all files into a single ChannelMessageSendComplex call
|
||||
files := make([]*discordgo.File, 0, len(msg.Parts))
|
||||
var caption string
|
||||
|
||||
for _, part := range msg.Parts {
|
||||
localPath, err := store.Resolve(part.Ref)
|
||||
if err != nil {
|
||||
logger.ErrorCF("discord", "Failed to resolve media ref", map[string]any{
|
||||
"ref": part.Ref,
|
||||
"error": err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
file, err := os.Open(localPath)
|
||||
if err != nil {
|
||||
logger.ErrorCF("discord", "Failed to open media file", map[string]any{
|
||||
"path": localPath,
|
||||
"error": err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
// Note: discordgo reads from the Reader and we can't close it before send
|
||||
|
||||
filename := part.Filename
|
||||
if filename == "" {
|
||||
filename = "file"
|
||||
}
|
||||
|
||||
files = append(files, &discordgo.File{
|
||||
Name: filename,
|
||||
ContentType: part.ContentType,
|
||||
Reader: file,
|
||||
})
|
||||
|
||||
if part.Caption != "" && caption == "" {
|
||||
caption = part.Caption
|
||||
}
|
||||
}
|
||||
|
||||
if len(files) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sendCtx, cancel := context.WithTimeout(ctx, sendTimeout)
|
||||
defer cancel()
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := c.session.ChannelMessageSendComplex(channelID, &discordgo.MessageSend{
|
||||
Content: caption,
|
||||
Files: files,
|
||||
})
|
||||
done <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
// Close all file readers
|
||||
for _, f := range files {
|
||||
if closer, ok := f.Reader.(*os.File); ok {
|
||||
closer.Close()
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("discord send media: %w", channels.ErrTemporary)
|
||||
}
|
||||
return nil
|
||||
case <-sendCtx.Done():
|
||||
// Close all file readers
|
||||
for _, f := range files {
|
||||
if closer, ok := f.Reader.(*os.File); ok {
|
||||
closer.Close()
|
||||
}
|
||||
}
|
||||
return sendCtx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content string) error {
|
||||
// Use the passed ctx for timeout control
|
||||
sendCtx, cancel := context.WithTimeout(ctx, sendTimeout)
|
||||
|
||||
@@ -496,6 +496,36 @@ func (c *LINEChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
||||
return c.sendPush(ctx, msg.ChatID, msg.Content, quoteToken)
|
||||
}
|
||||
|
||||
// SendMedia implements the channels.MediaSender interface.
|
||||
// LINE requires media to be accessible via public URL; since we only have local files,
|
||||
// we fall back to sending a text message with the filename/caption.
|
||||
// For full support, an external file hosting service would be needed.
|
||||
func (c *LINEChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return channels.ErrNotRunning
|
||||
}
|
||||
|
||||
store := c.GetMediaStore()
|
||||
if store == nil {
|
||||
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
|
||||
}
|
||||
|
||||
// LINE Messaging API requires publicly accessible URLs for media messages.
|
||||
// Since we only have local file paths, send caption text as fallback.
|
||||
for _, part := range msg.Parts {
|
||||
caption := part.Caption
|
||||
if caption == "" {
|
||||
caption = fmt.Sprintf("[%s: %s]", part.Type, part.Filename)
|
||||
}
|
||||
|
||||
if err := c.sendPush(ctx, msg.ChatID, caption, ""); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildTextMessage creates a text message object, optionally with quoteToken.
|
||||
func buildTextMessage(content, quoteToken string) map[string]string {
|
||||
msg := map[string]string{
|
||||
|
||||
+142
-8
@@ -44,10 +44,12 @@ var channelRateConfig = map[string]float64{
|
||||
}
|
||||
|
||||
type channelWorker struct {
|
||||
ch Channel
|
||||
queue chan bus.OutboundMessage
|
||||
done chan struct{}
|
||||
limiter *rate.Limiter
|
||||
ch Channel
|
||||
queue chan bus.OutboundMessage
|
||||
mediaQueue chan bus.OutboundMediaMessage
|
||||
done chan struct{}
|
||||
mediaDone chan struct{}
|
||||
limiter *rate.Limiter
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
@@ -239,10 +241,12 @@ func (m *Manager) StartAll(ctx context.Context) error {
|
||||
// Start per-channel workers
|
||||
for name, w := range m.workers {
|
||||
go m.runWorker(dispatchCtx, name, w)
|
||||
go m.runMediaWorker(dispatchCtx, name, w)
|
||||
}
|
||||
|
||||
// Start the dispatcher that reads from the bus and routes to workers
|
||||
go m.dispatchOutbound(dispatchCtx)
|
||||
go m.dispatchOutboundMedia(dispatchCtx)
|
||||
|
||||
// Start shared HTTP server if configured
|
||||
if m.httpServer != nil {
|
||||
@@ -293,6 +297,13 @@ func (m *Manager) StopAll(ctx context.Context) error {
|
||||
for _, w := range m.workers {
|
||||
<-w.done
|
||||
}
|
||||
// Close all media worker queues and wait for them to drain
|
||||
for _, w := range m.workers {
|
||||
close(w.mediaQueue)
|
||||
}
|
||||
for _, w := range m.workers {
|
||||
<-w.mediaDone
|
||||
}
|
||||
|
||||
// Stop all channels
|
||||
for name, channel := range m.channels {
|
||||
@@ -321,10 +332,12 @@ func newChannelWorker(name string, ch Channel) *channelWorker {
|
||||
burst := int(math.Max(1, math.Ceil(rateVal/2)))
|
||||
|
||||
return &channelWorker{
|
||||
ch: ch,
|
||||
queue: make(chan bus.OutboundMessage, defaultChannelQueueSize),
|
||||
done: make(chan struct{}),
|
||||
limiter: rate.NewLimiter(rate.Limit(rateVal), burst),
|
||||
ch: ch,
|
||||
queue: make(chan bus.OutboundMessage, defaultChannelQueueSize),
|
||||
mediaQueue: make(chan bus.OutboundMediaMessage, defaultChannelQueueSize),
|
||||
done: make(chan struct{}),
|
||||
mediaDone: make(chan struct{}),
|
||||
limiter: rate.NewLimiter(rate.Limit(rateVal), burst),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -457,6 +470,125 @@ func (m *Manager) dispatchOutbound(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) dispatchOutboundMedia(ctx context.Context) {
|
||||
logger.InfoC("channels", "Outbound media dispatcher started")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.InfoC("channels", "Outbound media dispatcher stopped")
|
||||
return
|
||||
default:
|
||||
msg, ok := m.bus.SubscribeOutboundMedia(ctx)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Silently skip internal channels
|
||||
if constants.IsInternalChannel(msg.Channel) {
|
||||
continue
|
||||
}
|
||||
|
||||
m.mu.RLock()
|
||||
_, exists := m.channels[msg.Channel]
|
||||
w, wExists := m.workers[msg.Channel]
|
||||
m.mu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
logger.WarnCF("channels", "Unknown channel for outbound media message", map[string]any{
|
||||
"channel": msg.Channel,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
if wExists {
|
||||
select {
|
||||
case w.mediaQueue <- msg:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runMediaWorker processes outbound media messages for a single channel.
|
||||
func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWorker) {
|
||||
defer close(w.mediaDone)
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-w.mediaQueue:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
m.sendMediaWithRetry(ctx, name, w, msg)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendMediaWithRetry sends a media message through the channel with rate limiting and
|
||||
// retry logic. If the channel does not implement MediaSender, it silently skips.
|
||||
func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMediaMessage) {
|
||||
ms, ok := w.ch.(MediaSender)
|
||||
if !ok {
|
||||
logger.DebugCF("channels", "Channel does not support MediaSender, skipping media", map[string]any{
|
||||
"channel": name,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Rate limit: wait for token
|
||||
if err := w.limiter.Wait(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
for attempt := 0; attempt <= maxRetries; attempt++ {
|
||||
lastErr = ms.SendMedia(ctx, msg)
|
||||
if lastErr == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Permanent failures — don't retry
|
||||
if errors.Is(lastErr, ErrNotRunning) || errors.Is(lastErr, ErrSendFailed) {
|
||||
break
|
||||
}
|
||||
|
||||
// Last attempt exhausted — don't sleep
|
||||
if attempt == maxRetries {
|
||||
break
|
||||
}
|
||||
|
||||
// Rate limit error — fixed delay
|
||||
if errors.Is(lastErr, ErrRateLimit) {
|
||||
select {
|
||||
case <-time.After(rateLimitDelay):
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// ErrTemporary or unknown error — exponential backoff
|
||||
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// All retries exhausted or permanent failure
|
||||
logger.ErrorCF("channels", "SendMedia failed", map[string]any{
|
||||
"channel": name,
|
||||
"chat_id": msg.ChatID,
|
||||
"error": lastErr.Error(),
|
||||
"retries": maxRetries,
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) GetChannel(name string) (Channel, bool) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
@@ -502,6 +634,8 @@ func (m *Manager) UnregisterChannel(name string) {
|
||||
if w, ok := m.workers[name]; ok {
|
||||
close(w.queue)
|
||||
<-w.done
|
||||
close(w.mediaQueue)
|
||||
<-w.mediaDone
|
||||
}
|
||||
delete(m.workers, name)
|
||||
delete(m.channels, name)
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/sipeed/picoclaw/pkg/bus"
|
||||
)
|
||||
|
||||
// MediaSender is an optional interface for channels that can send
|
||||
// media attachments (images, files, audio, video).
|
||||
// Manager discovers channels implementing this interface via type
|
||||
// assertion and routes OutboundMediaMessage to them.
|
||||
type MediaSender interface {
|
||||
SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error
|
||||
}
|
||||
@@ -431,6 +431,117 @@ func (c *OneBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendMedia implements the channels.MediaSender interface.
|
||||
func (c *OneBotChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return channels.ErrNotRunning
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
conn := c.conn
|
||||
c.mu.Unlock()
|
||||
|
||||
if conn == nil {
|
||||
return fmt.Errorf("OneBot WebSocket not connected")
|
||||
}
|
||||
|
||||
store := c.GetMediaStore()
|
||||
if store == nil {
|
||||
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
|
||||
}
|
||||
|
||||
// Build media segments
|
||||
var segments []oneBotMessageSegment
|
||||
for _, part := range msg.Parts {
|
||||
localPath, err := store.Resolve(part.Ref)
|
||||
if err != nil {
|
||||
logger.ErrorCF("onebot", "Failed to resolve media ref", map[string]any{
|
||||
"ref": part.Ref,
|
||||
"error": err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
segType := "image"
|
||||
switch part.Type {
|
||||
case "image":
|
||||
segType = "image"
|
||||
case "video":
|
||||
segType = "video"
|
||||
case "audio":
|
||||
segType = "record"
|
||||
default:
|
||||
segType = "file"
|
||||
}
|
||||
|
||||
segments = append(segments, oneBotMessageSegment{
|
||||
Type: segType,
|
||||
Data: map[string]any{"file": "file://" + localPath},
|
||||
})
|
||||
|
||||
if part.Caption != "" {
|
||||
segments = append(segments, oneBotMessageSegment{
|
||||
Type: "text",
|
||||
Data: map[string]any{"text": part.Caption},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if len(segments) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
chatID := msg.ChatID
|
||||
var action, idKey string
|
||||
var rawID string
|
||||
if rest, ok := strings.CutPrefix(chatID, "group:"); ok {
|
||||
action, idKey, rawID = "send_group_msg", "group_id", rest
|
||||
} else if rest, ok := strings.CutPrefix(chatID, "private:"); ok {
|
||||
action, idKey, rawID = "send_private_msg", "user_id", rest
|
||||
} else {
|
||||
action, idKey, rawID = "send_private_msg", "user_id", chatID
|
||||
}
|
||||
|
||||
id, err := strconv.ParseInt(rawID, 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid %s in chatID: %s: %w", idKey, chatID, channels.ErrSendFailed)
|
||||
}
|
||||
|
||||
echo := fmt.Sprintf("send_%d", atomic.AddInt64(&c.echoCounter, 1))
|
||||
|
||||
req := oneBotAPIRequest{
|
||||
Action: action,
|
||||
Params: map[string]any{idKey: id, "message": segments},
|
||||
Echo: echo,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal OneBot request: %w", err)
|
||||
}
|
||||
|
||||
c.writeMu.Lock()
|
||||
_ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
err = conn.WriteMessage(websocket.TextMessage, data)
|
||||
_ = conn.SetWriteDeadline(time.Time{})
|
||||
c.writeMu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
logger.ErrorCF("onebot", "Failed to send media message", map[string]any{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return fmt.Errorf("onebot send media: %w", channels.ErrTemporary)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *OneBotChannel) buildMessageSegments(chatID, content string) []oneBotMessageSegment {
|
||||
var segments []oneBotMessageSegment
|
||||
|
||||
|
||||
@@ -149,6 +149,60 @@ func (c *SlackChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendMedia implements the channels.MediaSender interface.
|
||||
func (c *SlackChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return channels.ErrNotRunning
|
||||
}
|
||||
|
||||
channelID, _ := parseSlackChatID(msg.ChatID)
|
||||
if channelID == "" {
|
||||
return fmt.Errorf("invalid slack chat ID: %s", msg.ChatID)
|
||||
}
|
||||
|
||||
store := c.GetMediaStore()
|
||||
if store == nil {
|
||||
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
|
||||
}
|
||||
|
||||
for _, part := range msg.Parts {
|
||||
localPath, err := store.Resolve(part.Ref)
|
||||
if err != nil {
|
||||
logger.ErrorCF("slack", "Failed to resolve media ref", map[string]any{
|
||||
"ref": part.Ref,
|
||||
"error": err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
filename := part.Filename
|
||||
if filename == "" {
|
||||
filename = "file"
|
||||
}
|
||||
|
||||
title := part.Caption
|
||||
if title == "" {
|
||||
title = filename
|
||||
}
|
||||
|
||||
_, err = c.api.UploadFileV2Context(ctx, slack.UploadFileV2Parameters{
|
||||
Channel: channelID,
|
||||
File: localPath,
|
||||
Filename: filename,
|
||||
Title: title,
|
||||
})
|
||||
if err != nil {
|
||||
logger.ErrorCF("slack", "Failed to upload media", map[string]any{
|
||||
"filename": filename,
|
||||
"error": err.Error(),
|
||||
})
|
||||
return fmt.Errorf("slack send media: %w", channels.ErrTemporary)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SlackChannel) eventLoop() {
|
||||
for {
|
||||
select {
|
||||
|
||||
@@ -231,6 +231,91 @@ func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendMedia implements the channels.MediaSender interface.
|
||||
func (c *TelegramChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return channels.ErrNotRunning
|
||||
}
|
||||
|
||||
chatID, err := parseChatID(msg.ChatID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid chat ID %s: %w", msg.ChatID, channels.ErrSendFailed)
|
||||
}
|
||||
|
||||
store := c.GetMediaStore()
|
||||
if store == nil {
|
||||
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
|
||||
}
|
||||
|
||||
for _, part := range msg.Parts {
|
||||
localPath, err := store.Resolve(part.Ref)
|
||||
if err != nil {
|
||||
logger.ErrorCF("telegram", "Failed to resolve media ref", map[string]any{
|
||||
"ref": part.Ref,
|
||||
"error": err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
file, err := os.Open(localPath)
|
||||
if err != nil {
|
||||
logger.ErrorCF("telegram", "Failed to open media file", map[string]any{
|
||||
"path": localPath,
|
||||
"error": err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
filename := part.Filename
|
||||
if filename == "" {
|
||||
filename = "file"
|
||||
}
|
||||
|
||||
switch part.Type {
|
||||
case "image":
|
||||
params := &telego.SendPhotoParams{
|
||||
ChatID: tu.ID(chatID),
|
||||
Photo: telego.InputFile{File: file},
|
||||
Caption: part.Caption,
|
||||
}
|
||||
_, err = c.bot.SendPhoto(ctx, params)
|
||||
case "audio":
|
||||
params := &telego.SendAudioParams{
|
||||
ChatID: tu.ID(chatID),
|
||||
Audio: telego.InputFile{File: file},
|
||||
Caption: part.Caption,
|
||||
}
|
||||
_, err = c.bot.SendAudio(ctx, params)
|
||||
case "video":
|
||||
params := &telego.SendVideoParams{
|
||||
ChatID: tu.ID(chatID),
|
||||
Video: telego.InputFile{File: file},
|
||||
Caption: part.Caption,
|
||||
}
|
||||
_, err = c.bot.SendVideo(ctx, params)
|
||||
default: // "file" or unknown types
|
||||
params := &telego.SendDocumentParams{
|
||||
ChatID: tu.ID(chatID),
|
||||
Document: telego.InputFile{File: file},
|
||||
Caption: part.Caption,
|
||||
}
|
||||
_, err = c.bot.SendDocument(ctx, params)
|
||||
}
|
||||
|
||||
file.Close()
|
||||
|
||||
if err != nil {
|
||||
logger.ErrorCF("telegram", "Failed to send media", map[string]any{
|
||||
"type": part.Type,
|
||||
"error": err.Error(),
|
||||
})
|
||||
return fmt.Errorf("telegram send media: %w", channels.ErrTemporary)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Message) error {
|
||||
if message == nil {
|
||||
return fmt.Errorf("message is nil")
|
||||
|
||||
@@ -7,8 +7,11 @@ import (
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -187,6 +190,197 @@ func (c *WeComAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) err
|
||||
return c.sendTextMessage(ctx, accessToken, msg.ChatID, msg.Content)
|
||||
}
|
||||
|
||||
// SendMedia implements the channels.MediaSender interface.
|
||||
func (c *WeComAppChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) error {
|
||||
if !c.IsRunning() {
|
||||
return channels.ErrNotRunning
|
||||
}
|
||||
|
||||
accessToken := c.getAccessToken()
|
||||
if accessToken == "" {
|
||||
return fmt.Errorf("no valid access token available: %w", channels.ErrTemporary)
|
||||
}
|
||||
|
||||
store := c.GetMediaStore()
|
||||
if store == nil {
|
||||
return fmt.Errorf("no media store available: %w", channels.ErrSendFailed)
|
||||
}
|
||||
|
||||
for _, part := range msg.Parts {
|
||||
localPath, err := store.Resolve(part.Ref)
|
||||
if err != nil {
|
||||
logger.ErrorCF("wecom_app", "Failed to resolve media ref", map[string]any{
|
||||
"ref": part.Ref,
|
||||
"error": err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// Map part type to WeCom media type
|
||||
mediaType := "file"
|
||||
switch part.Type {
|
||||
case "image":
|
||||
mediaType = "image"
|
||||
case "audio":
|
||||
mediaType = "voice"
|
||||
case "video":
|
||||
mediaType = "video"
|
||||
default:
|
||||
mediaType = "file"
|
||||
}
|
||||
|
||||
// Upload media to get media_id
|
||||
mediaID, err := c.uploadMedia(ctx, accessToken, mediaType, localPath)
|
||||
if err != nil {
|
||||
logger.ErrorCF("wecom_app", "Failed to upload media", map[string]any{
|
||||
"type": mediaType,
|
||||
"error": err.Error(),
|
||||
})
|
||||
// Fallback: send caption as text
|
||||
if part.Caption != "" {
|
||||
_ = c.sendTextMessage(ctx, accessToken, msg.ChatID, part.Caption)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Send media message using the media_id
|
||||
if mediaType == "image" {
|
||||
err = c.sendImageMessage(ctx, accessToken, msg.ChatID, mediaID)
|
||||
} else {
|
||||
// For non-image types, send as text fallback with caption
|
||||
caption := part.Caption
|
||||
if caption == "" {
|
||||
caption = fmt.Sprintf("[%s: %s]", part.Type, part.Filename)
|
||||
}
|
||||
err = c.sendTextMessage(ctx, accessToken, msg.ChatID, caption)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadMedia uploads a local file to WeCom temporary media storage.
|
||||
func (c *WeComAppChannel) uploadMedia(ctx context.Context, accessToken, mediaType, localPath string) (string, error) {
|
||||
apiURL := fmt.Sprintf("%s/cgi-bin/media/upload?access_token=%s&type=%s",
|
||||
wecomAPIBase, url.QueryEscape(accessToken), url.QueryEscape(mediaType))
|
||||
|
||||
file, err := os.Open(localPath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to open file: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
writer := multipart.NewWriter(body)
|
||||
|
||||
filename := filepath.Base(localPath)
|
||||
formFile, err := writer.CreateFormFile("media", filename)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create form file: %w", err)
|
||||
}
|
||||
|
||||
if _, err = io.Copy(formFile, file); err != nil {
|
||||
return "", fmt.Errorf("failed to copy file content: %w", err)
|
||||
}
|
||||
writer.Close()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiURL, body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", writer.FormDataContentType())
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return "", channels.ClassifyNetError(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
return "", channels.ClassifySendError(resp.StatusCode, fmt.Errorf("wecom upload error: %s", string(respBody)))
|
||||
}
|
||||
|
||||
var result struct {
|
||||
ErrCode int `json:"errcode"`
|
||||
ErrMsg string `json:"errmsg"`
|
||||
MediaID string `json:"media_id"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return "", fmt.Errorf("failed to parse upload response: %w", err)
|
||||
}
|
||||
|
||||
if result.ErrCode != 0 {
|
||||
return "", fmt.Errorf("upload API error: %s (code: %d)", result.ErrMsg, result.ErrCode)
|
||||
}
|
||||
|
||||
return result.MediaID, nil
|
||||
}
|
||||
|
||||
// sendImageMessage sends an image message using a media_id.
|
||||
func (c *WeComAppChannel) sendImageMessage(ctx context.Context, accessToken, userID, mediaID string) error {
|
||||
apiURL := fmt.Sprintf("%s/cgi-bin/message/send?access_token=%s", wecomAPIBase, accessToken)
|
||||
|
||||
msg := WeComImageMessage{
|
||||
ToUser: userID,
|
||||
MsgType: "image",
|
||||
AgentID: c.config.AgentID,
|
||||
}
|
||||
msg.Image.MediaID = mediaID
|
||||
|
||||
jsonData, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal message: %w", err)
|
||||
}
|
||||
|
||||
timeout := c.config.ReplyTimeout
|
||||
if timeout <= 0 {
|
||||
timeout = 5
|
||||
}
|
||||
|
||||
reqCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, apiURL, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: time.Duration(timeout) * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return channels.ClassifyNetError(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
return channels.ClassifySendError(resp.StatusCode, fmt.Errorf("wecom_app API error: %s", string(respBody)))
|
||||
}
|
||||
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read response: %w", err)
|
||||
}
|
||||
|
||||
var sendResp WeComSendMessageResponse
|
||||
if err := json.Unmarshal(respBody, &sendResp); err != nil {
|
||||
return fmt.Errorf("failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
if sendResp.ErrCode != 0 {
|
||||
return fmt.Errorf("API error: %s (code: %d)", sendResp.ErrMsg, sendResp.ErrCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WebhookPath returns the path for registering on the shared HTTP server.
|
||||
func (c *WeComAppChannel) WebhookPath() string {
|
||||
if c.config.WebhookPath != "" {
|
||||
|
||||
@@ -30,6 +30,10 @@ type ToolResult struct {
|
||||
// Err is the underlying error (not JSON serialized).
|
||||
// Used for internal error handling and logging.
|
||||
Err error `json:"-"`
|
||||
|
||||
// Media contains media store refs produced by this tool.
|
||||
// When non-empty, the agent will publish these as OutboundMediaMessage.
|
||||
Media []string `json:"media,omitempty"`
|
||||
}
|
||||
|
||||
// NewToolResult creates a basic ToolResult with content for the LLM.
|
||||
@@ -120,6 +124,19 @@ func UserResult(content string) *ToolResult {
|
||||
}
|
||||
}
|
||||
|
||||
// MediaResult creates a ToolResult with media refs for the user.
|
||||
// The agent will publish these refs as OutboundMediaMessage.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// result := MediaResult("Image generated successfully", []string{"media://abc123"})
|
||||
func MediaResult(forLLM string, mediaRefs []string) *ToolResult {
|
||||
return &ToolResult{
|
||||
ForLLM: forLLM,
|
||||
Media: mediaRefs,
|
||||
}
|
||||
}
|
||||
|
||||
// MarshalJSON implements custom JSON serialization.
|
||||
// The Err field is excluded from JSON output via the json:"-" tag.
|
||||
func (tr *ToolResult) MarshalJSON() ([]byte, error) {
|
||||
|
||||
Reference in New Issue
Block a user