mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
038fdf5000
Channels previously deleted downloaded media files via defer os.Remove, racing with the async Agent consumer. Introduce MediaStore to decouple file ownership: channels register files on download, Agent releases them after processing via ReleaseAll(scope). - New pkg/media with MediaStore interface + FileMediaStore implementation - InboundMessage gains MediaScope field for lifecycle tracking - BaseChannel gains SetMediaStore/GetMediaStore + BuildMediaScope helper - Manager injects MediaStore into channels; AgentLoop releases on completion - Telegram, Discord, Slack, OneBot, LINE channels migrated from defer os.Remove to store.Store() with media:// refs
393 lines
9.0 KiB
Go
393 lines
9.0 KiB
Go
// PicoClaw - Ultra-lightweight personal AI agent
|
|
// Inspired by and based on nanobot: https://github.com/HKUDS/nanobot
|
|
// License: MIT
|
|
//
|
|
// Copyright (c) 2026 PicoClaw contributors
|
|
|
|
package channels
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
"github.com/sipeed/picoclaw/pkg/constants"
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
"github.com/sipeed/picoclaw/pkg/media"
|
|
"github.com/sipeed/picoclaw/pkg/utils"
|
|
)
|
|
|
|
const defaultChannelQueueSize = 100
|
|
|
|
type channelWorker struct {
|
|
ch Channel
|
|
queue chan bus.OutboundMessage
|
|
done chan struct{}
|
|
}
|
|
|
|
type Manager struct {
|
|
channels map[string]Channel
|
|
workers map[string]*channelWorker
|
|
bus *bus.MessageBus
|
|
config *config.Config
|
|
mediaStore media.MediaStore
|
|
dispatchTask *asyncTask
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
type asyncTask struct {
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func NewManager(cfg *config.Config, messageBus *bus.MessageBus, store media.MediaStore) (*Manager, error) {
|
|
m := &Manager{
|
|
channels: make(map[string]Channel),
|
|
workers: make(map[string]*channelWorker),
|
|
bus: messageBus,
|
|
config: cfg,
|
|
mediaStore: store,
|
|
}
|
|
|
|
if err := m.initChannels(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// initChannel is a helper that looks up a factory by name and creates the channel.
|
|
func (m *Manager) initChannel(name, displayName string) {
|
|
f, ok := getFactory(name)
|
|
if !ok {
|
|
logger.WarnCF("channels", "Factory not registered", map[string]any{
|
|
"channel": displayName,
|
|
})
|
|
return
|
|
}
|
|
logger.DebugCF("channels", "Attempting to initialize channel", map[string]any{
|
|
"channel": displayName,
|
|
})
|
|
ch, err := f(m.config, m.bus)
|
|
if err != nil {
|
|
logger.ErrorCF("channels", "Failed to initialize channel", map[string]any{
|
|
"channel": displayName,
|
|
"error": err.Error(),
|
|
})
|
|
} else {
|
|
// Inject MediaStore if channel supports it
|
|
if m.mediaStore != nil {
|
|
if setter, ok := ch.(interface{ SetMediaStore(s media.MediaStore) }); ok {
|
|
setter.SetMediaStore(m.mediaStore)
|
|
}
|
|
}
|
|
m.channels[name] = ch
|
|
m.workers[name] = &channelWorker{
|
|
ch: ch,
|
|
queue: make(chan bus.OutboundMessage, defaultChannelQueueSize),
|
|
done: make(chan struct{}),
|
|
}
|
|
logger.InfoCF("channels", "Channel enabled successfully", map[string]any{
|
|
"channel": displayName,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (m *Manager) initChannels() error {
|
|
logger.InfoC("channels", "Initializing channel manager")
|
|
|
|
if m.config.Channels.Telegram.Enabled && m.config.Channels.Telegram.Token != "" {
|
|
m.initChannel("telegram", "Telegram")
|
|
}
|
|
|
|
if m.config.Channels.WhatsApp.Enabled && m.config.Channels.WhatsApp.BridgeURL != "" {
|
|
m.initChannel("whatsapp", "WhatsApp")
|
|
}
|
|
|
|
if m.config.Channels.Feishu.Enabled {
|
|
m.initChannel("feishu", "Feishu")
|
|
}
|
|
|
|
if m.config.Channels.Discord.Enabled && m.config.Channels.Discord.Token != "" {
|
|
m.initChannel("discord", "Discord")
|
|
}
|
|
|
|
if m.config.Channels.MaixCam.Enabled {
|
|
m.initChannel("maixcam", "MaixCam")
|
|
}
|
|
|
|
if m.config.Channels.QQ.Enabled {
|
|
m.initChannel("qq", "QQ")
|
|
}
|
|
|
|
if m.config.Channels.DingTalk.Enabled && m.config.Channels.DingTalk.ClientID != "" {
|
|
m.initChannel("dingtalk", "DingTalk")
|
|
}
|
|
|
|
if m.config.Channels.Slack.Enabled && m.config.Channels.Slack.BotToken != "" {
|
|
m.initChannel("slack", "Slack")
|
|
}
|
|
|
|
if m.config.Channels.LINE.Enabled && m.config.Channels.LINE.ChannelAccessToken != "" {
|
|
m.initChannel("line", "LINE")
|
|
}
|
|
|
|
if m.config.Channels.OneBot.Enabled && m.config.Channels.OneBot.WSUrl != "" {
|
|
m.initChannel("onebot", "OneBot")
|
|
}
|
|
|
|
if m.config.Channels.WeCom.Enabled && m.config.Channels.WeCom.Token != "" {
|
|
m.initChannel("wecom", "WeCom")
|
|
}
|
|
|
|
if m.config.Channels.WeComApp.Enabled && m.config.Channels.WeComApp.CorpID != "" {
|
|
m.initChannel("wecom_app", "WeCom App")
|
|
}
|
|
|
|
logger.InfoCF("channels", "Channel initialization completed", map[string]any{
|
|
"enabled_channels": len(m.channels),
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) StartAll(ctx context.Context) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if len(m.channels) == 0 {
|
|
logger.WarnC("channels", "No channels enabled")
|
|
return nil
|
|
}
|
|
|
|
logger.InfoC("channels", "Starting all channels")
|
|
|
|
dispatchCtx, cancel := context.WithCancel(ctx)
|
|
m.dispatchTask = &asyncTask{cancel: cancel}
|
|
|
|
for name, channel := range m.channels {
|
|
logger.InfoCF("channels", "Starting channel", map[string]any{
|
|
"channel": name,
|
|
})
|
|
if err := channel.Start(ctx); err != nil {
|
|
logger.ErrorCF("channels", "Failed to start channel", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
}
|
|
|
|
// Start per-channel workers
|
|
for name, w := range m.workers {
|
|
go m.runWorker(dispatchCtx, name, w)
|
|
}
|
|
|
|
// Start the dispatcher that reads from the bus and routes to workers
|
|
go m.dispatchOutbound(dispatchCtx)
|
|
|
|
logger.InfoC("channels", "All channels started")
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) StopAll(ctx context.Context) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
logger.InfoC("channels", "Stopping all channels")
|
|
|
|
// Cancel dispatcher first
|
|
if m.dispatchTask != nil {
|
|
m.dispatchTask.cancel()
|
|
m.dispatchTask = nil
|
|
}
|
|
|
|
// Close all worker queues and wait for them to drain
|
|
for _, w := range m.workers {
|
|
close(w.queue)
|
|
}
|
|
for _, w := range m.workers {
|
|
<-w.done
|
|
}
|
|
|
|
// Stop all channels
|
|
for name, channel := range m.channels {
|
|
logger.InfoCF("channels", "Stopping channel", map[string]any{
|
|
"channel": name,
|
|
})
|
|
if err := channel.Stop(ctx); err != nil {
|
|
logger.ErrorCF("channels", "Error stopping channel", map[string]any{
|
|
"channel": name,
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
}
|
|
|
|
logger.InfoC("channels", "All channels stopped")
|
|
return nil
|
|
}
|
|
|
|
// runWorker processes outbound messages for a single channel, splitting
|
|
// messages that exceed the channel's maximum message length.
|
|
func (m *Manager) runWorker(ctx context.Context, name string, w *channelWorker) {
|
|
defer close(w.done)
|
|
for {
|
|
select {
|
|
case msg, ok := <-w.queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
maxLen := 0
|
|
if mlp, ok := w.ch.(MessageLengthProvider); ok {
|
|
maxLen = mlp.MaxMessageLength()
|
|
}
|
|
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
|
|
chunks := utils.SplitMessage(msg.Content, maxLen)
|
|
for _, chunk := range chunks {
|
|
chunkMsg := msg
|
|
chunkMsg.Content = chunk
|
|
if err := w.ch.Send(ctx, chunkMsg); err != nil {
|
|
logger.ErrorCF("channels", "Error sending chunk", map[string]any{
|
|
"channel": name, "error": err.Error(),
|
|
})
|
|
}
|
|
}
|
|
} else {
|
|
if err := w.ch.Send(ctx, msg); err != nil {
|
|
logger.ErrorCF("channels", "Error sending message", map[string]any{
|
|
"channel": name, "error": err.Error(),
|
|
})
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) dispatchOutbound(ctx context.Context) {
|
|
logger.InfoC("channels", "Outbound dispatcher started")
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.InfoC("channels", "Outbound dispatcher stopped")
|
|
return
|
|
default:
|
|
msg, ok := m.bus.SubscribeOutbound(ctx)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// Silently skip internal channels
|
|
if constants.IsInternalChannel(msg.Channel) {
|
|
continue
|
|
}
|
|
|
|
m.mu.RLock()
|
|
_, exists := m.channels[msg.Channel]
|
|
w, wExists := m.workers[msg.Channel]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
logger.WarnCF("channels", "Unknown channel for outbound message", map[string]any{
|
|
"channel": msg.Channel,
|
|
})
|
|
continue
|
|
}
|
|
|
|
if wExists {
|
|
select {
|
|
case w.queue <- msg:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) GetChannel(name string) (Channel, bool) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
channel, ok := m.channels[name]
|
|
return channel, ok
|
|
}
|
|
|
|
func (m *Manager) GetStatus() map[string]any {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
status := make(map[string]any)
|
|
for name, channel := range m.channels {
|
|
status[name] = map[string]any{
|
|
"enabled": true,
|
|
"running": channel.IsRunning(),
|
|
}
|
|
}
|
|
return status
|
|
}
|
|
|
|
func (m *Manager) GetEnabledChannels() []string {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
names := make([]string, 0, len(m.channels))
|
|
for name := range m.channels {
|
|
names = append(names, name)
|
|
}
|
|
return names
|
|
}
|
|
|
|
func (m *Manager) RegisterChannel(name string, channel Channel) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.channels[name] = channel
|
|
m.workers[name] = &channelWorker{
|
|
ch: channel,
|
|
queue: make(chan bus.OutboundMessage, defaultChannelQueueSize),
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (m *Manager) UnregisterChannel(name string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if w, ok := m.workers[name]; ok {
|
|
close(w.queue)
|
|
<-w.done
|
|
}
|
|
delete(m.workers, name)
|
|
delete(m.channels, name)
|
|
}
|
|
|
|
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
|
|
m.mu.RLock()
|
|
_, exists := m.channels[channelName]
|
|
w, wExists := m.workers[channelName]
|
|
m.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("channel %s not found", channelName)
|
|
}
|
|
|
|
msg := bus.OutboundMessage{
|
|
Channel: channelName,
|
|
ChatID: chatID,
|
|
Content: content,
|
|
}
|
|
|
|
if wExists {
|
|
select {
|
|
case w.queue <- msg:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Fallback: direct send (should not happen)
|
|
channel, _ := m.channels[channelName]
|
|
return channel.Send(ctx, msg)
|
|
}
|