mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
f8b656ec37
Add unified ShouldRespondInGroup to BaseChannel, replacing scattered per-channel group filtering logic. Introduce GroupTriggerConfig (with mention_only + prefixes), TypingConfig, and PlaceholderConfig types. Migrate Discord MentionOnly, OneBot checkGroupTrigger, and LINE hardcoded mention-only to the shared mechanism. Add group trigger entry points for Slack, Telegram, QQ, Feishu, DingTalk, and WeCom. Legacy config fields are preserved with automatic migration.
214 lines
6.1 KiB
Go
214 lines
6.1 KiB
Go
// PicoClaw - Ultra-lightweight personal AI agent
|
|
// DingTalk channel implementation using Stream Mode
|
|
|
|
package dingtalk
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot"
|
|
"github.com/open-dingtalk/dingtalk-stream-sdk-go/client"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/channels"
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
"github.com/sipeed/picoclaw/pkg/utils"
|
|
)
|
|
|
|
// DingTalkChannel implements the Channel interface for DingTalk (钉钉)
|
|
// It uses WebSocket for receiving messages via stream mode and API for sending
|
|
type DingTalkChannel struct {
|
|
*channels.BaseChannel
|
|
config config.DingTalkConfig
|
|
clientID string
|
|
clientSecret string
|
|
streamClient *client.StreamClient
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
// Map to store session webhooks for each chat
|
|
sessionWebhooks sync.Map // chatID -> sessionWebhook
|
|
}
|
|
|
|
// NewDingTalkChannel creates a new DingTalk channel instance
|
|
func NewDingTalkChannel(cfg config.DingTalkConfig, messageBus *bus.MessageBus) (*DingTalkChannel, error) {
|
|
if cfg.ClientID == "" || cfg.ClientSecret == "" {
|
|
return nil, fmt.Errorf("dingtalk client_id and client_secret are required")
|
|
}
|
|
|
|
base := channels.NewBaseChannel("dingtalk", cfg, messageBus, cfg.AllowFrom,
|
|
channels.WithMaxMessageLength(20000),
|
|
channels.WithGroupTrigger(cfg.GroupTrigger),
|
|
)
|
|
|
|
return &DingTalkChannel{
|
|
BaseChannel: base,
|
|
config: cfg,
|
|
clientID: cfg.ClientID,
|
|
clientSecret: cfg.ClientSecret,
|
|
}, nil
|
|
}
|
|
|
|
// Start initializes the DingTalk channel with Stream Mode
|
|
func (c *DingTalkChannel) Start(ctx context.Context) error {
|
|
logger.InfoC("dingtalk", "Starting DingTalk channel (Stream Mode)...")
|
|
|
|
c.ctx, c.cancel = context.WithCancel(ctx)
|
|
|
|
// Create credential config
|
|
cred := client.NewAppCredentialConfig(c.clientID, c.clientSecret)
|
|
|
|
// Create the stream client with options
|
|
c.streamClient = client.NewStreamClient(
|
|
client.WithAppCredential(cred),
|
|
client.WithAutoReconnect(true),
|
|
)
|
|
|
|
// Register chatbot callback handler (IChatBotMessageHandler is a function type)
|
|
c.streamClient.RegisterChatBotCallbackRouter(c.onChatBotMessageReceived)
|
|
|
|
// Start the stream client
|
|
if err := c.streamClient.Start(c.ctx); err != nil {
|
|
return fmt.Errorf("failed to start stream client: %w", err)
|
|
}
|
|
|
|
c.SetRunning(true)
|
|
logger.InfoC("dingtalk", "DingTalk channel started (Stream Mode)")
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully stops the DingTalk channel
|
|
func (c *DingTalkChannel) Stop(ctx context.Context) error {
|
|
logger.InfoC("dingtalk", "Stopping DingTalk channel...")
|
|
|
|
if c.cancel != nil {
|
|
c.cancel()
|
|
}
|
|
|
|
if c.streamClient != nil {
|
|
c.streamClient.Close()
|
|
}
|
|
|
|
c.SetRunning(false)
|
|
logger.InfoC("dingtalk", "DingTalk channel stopped")
|
|
return nil
|
|
}
|
|
|
|
// Send sends a message to DingTalk via the chatbot reply API
|
|
func (c *DingTalkChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
|
|
if !c.IsRunning() {
|
|
return channels.ErrNotRunning
|
|
}
|
|
|
|
// Get session webhook from storage
|
|
sessionWebhookRaw, ok := c.sessionWebhooks.Load(msg.ChatID)
|
|
if !ok {
|
|
return fmt.Errorf("no session_webhook found for chat %s, cannot send message", msg.ChatID)
|
|
}
|
|
|
|
sessionWebhook, ok := sessionWebhookRaw.(string)
|
|
if !ok {
|
|
return fmt.Errorf("invalid session_webhook type for chat %s", msg.ChatID)
|
|
}
|
|
|
|
logger.DebugCF("dingtalk", "Sending message", map[string]any{
|
|
"chat_id": msg.ChatID,
|
|
"preview": utils.Truncate(msg.Content, 100),
|
|
})
|
|
|
|
// Use the session webhook to send the reply
|
|
return c.SendDirectReply(ctx, sessionWebhook, msg.Content)
|
|
}
|
|
|
|
// onChatBotMessageReceived implements the IChatBotMessageHandler function signature
|
|
// This is called by the Stream SDK when a new message arrives
|
|
// IChatBotMessageHandler is: func(c context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error)
|
|
func (c *DingTalkChannel) onChatBotMessageReceived(
|
|
ctx context.Context,
|
|
data *chatbot.BotCallbackDataModel,
|
|
) ([]byte, error) {
|
|
// Extract message content from Text field
|
|
content := data.Text.Content
|
|
if content == "" {
|
|
// Try to extract from Content interface{} if Text is empty
|
|
if contentMap, ok := data.Content.(map[string]any); ok {
|
|
if textContent, ok := contentMap["content"].(string); ok {
|
|
content = textContent
|
|
}
|
|
}
|
|
}
|
|
|
|
if content == "" {
|
|
return nil, nil // Ignore empty messages
|
|
}
|
|
|
|
senderID := data.SenderStaffId
|
|
senderNick := data.SenderNick
|
|
chatID := senderID
|
|
if data.ConversationType != "1" {
|
|
// For group chats
|
|
chatID = data.ConversationId
|
|
}
|
|
|
|
// Store the session webhook for this chat so we can reply later
|
|
c.sessionWebhooks.Store(chatID, data.SessionWebhook)
|
|
|
|
metadata := map[string]string{
|
|
"sender_name": senderNick,
|
|
"conversation_id": data.ConversationId,
|
|
"conversation_type": data.ConversationType,
|
|
"platform": "dingtalk",
|
|
"session_webhook": data.SessionWebhook,
|
|
}
|
|
|
|
var peer bus.Peer
|
|
if data.ConversationType == "1" {
|
|
peer = bus.Peer{Kind: "direct", ID: senderID}
|
|
} else {
|
|
peer = bus.Peer{Kind: "group", ID: data.ConversationId}
|
|
// In group chats, apply unified group trigger filtering
|
|
respond, cleaned := c.ShouldRespondInGroup(false, content)
|
|
if !respond {
|
|
return nil, nil
|
|
}
|
|
content = cleaned
|
|
}
|
|
|
|
logger.DebugCF("dingtalk", "Received message", map[string]any{
|
|
"sender_nick": senderNick,
|
|
"sender_id": senderID,
|
|
"preview": utils.Truncate(content, 50),
|
|
})
|
|
|
|
// Handle the message through the base channel
|
|
c.HandleMessage(peer, "", senderID, chatID, content, nil, metadata)
|
|
|
|
// Return nil to indicate we've handled the message asynchronously
|
|
// The response will be sent through the message bus
|
|
return nil, nil
|
|
}
|
|
|
|
// SendDirectReply sends a direct reply using the session webhook
|
|
func (c *DingTalkChannel) SendDirectReply(ctx context.Context, sessionWebhook, content string) error {
|
|
replier := chatbot.NewChatbotReplier()
|
|
|
|
// Convert string content to []byte for the API
|
|
contentBytes := []byte(content)
|
|
titleBytes := []byte("PicoClaw")
|
|
|
|
// Send markdown formatted reply
|
|
err := replier.SimpleReplyMarkdown(
|
|
ctx,
|
|
sessionWebhook,
|
|
titleBytes,
|
|
contentBytes,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("dingtalk send: %w", channels.ErrTemporary)
|
|
}
|
|
|
|
return nil
|
|
}
|