mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
refactor(channels): unify message splitting and add per-channel worker queues
Move message splitting from individual channels (Discord) to the Manager layer via per-channel worker goroutines. Each channel now declares its max message length through BaseChannelOption/MessageLengthProvider, and the Manager automatically splits oversized outbound messages before dispatch. This prevents one slow channel from blocking all others. - Add WithMaxMessageLength option and MessageLengthProvider interface - Set platform-specific limits (Discord 2000, Telegram 4096, Slack 40000, etc.) - Convert SplitMessage to rune-aware counting for correct Unicode handling - Replace single dispatcher goroutine with per-channel buffered worker queues - Remove Discord's internal SplitMessage call (now handled centrally)
This commit is contained in:
+42
-8
@@ -17,21 +17,55 @@ type Channel interface {
|
|||||||
IsAllowed(senderID string) bool
|
IsAllowed(senderID string) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type BaseChannel struct {
|
// BaseChannelOption is a functional option for configuring a BaseChannel.
|
||||||
config any
|
type BaseChannelOption func(*BaseChannel)
|
||||||
bus *bus.MessageBus
|
|
||||||
running atomic.Bool
|
// WithMaxMessageLength sets the maximum message length (in runes) for a channel.
|
||||||
name string
|
// Messages exceeding this limit will be automatically split by the Manager.
|
||||||
allowList []string
|
// A value of 0 means no limit.
|
||||||
|
func WithMaxMessageLength(n int) BaseChannelOption {
|
||||||
|
return func(c *BaseChannel) { c.maxMessageLength = n }
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBaseChannel(name string, config any, bus *bus.MessageBus, allowList []string) *BaseChannel {
|
// MessageLengthProvider is an opt-in interface that channels implement
|
||||||
return &BaseChannel{
|
// to advertise their maximum message length. The Manager uses this via
|
||||||
|
// type assertion to decide whether to split outbound messages.
|
||||||
|
type MessageLengthProvider interface {
|
||||||
|
MaxMessageLength() int
|
||||||
|
}
|
||||||
|
|
||||||
|
type BaseChannel struct {
|
||||||
|
config any
|
||||||
|
bus *bus.MessageBus
|
||||||
|
running atomic.Bool
|
||||||
|
name string
|
||||||
|
allowList []string
|
||||||
|
maxMessageLength int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBaseChannel(
|
||||||
|
name string,
|
||||||
|
config any,
|
||||||
|
bus *bus.MessageBus,
|
||||||
|
allowList []string,
|
||||||
|
opts ...BaseChannelOption,
|
||||||
|
) *BaseChannel {
|
||||||
|
bc := &BaseChannel{
|
||||||
config: config,
|
config: config,
|
||||||
bus: bus,
|
bus: bus,
|
||||||
name: name,
|
name: name,
|
||||||
allowList: allowList,
|
allowList: allowList,
|
||||||
}
|
}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(bc)
|
||||||
|
}
|
||||||
|
return bc
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxMessageLength returns the maximum message length (in runes) for this channel.
|
||||||
|
// A value of 0 means no limit.
|
||||||
|
func (c *BaseChannel) MaxMessageLength() int {
|
||||||
|
return c.maxMessageLength
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *BaseChannel) Name() string {
|
func (c *BaseChannel) Name() string {
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ func NewDingTalkChannel(cfg config.DingTalkConfig, messageBus *bus.MessageBus) (
|
|||||||
return nil, fmt.Errorf("dingtalk client_id and client_secret are required")
|
return nil, fmt.Errorf("dingtalk client_id and client_secret are required")
|
||||||
}
|
}
|
||||||
|
|
||||||
base := channels.NewBaseChannel("dingtalk", cfg, messageBus, cfg.AllowFrom)
|
base := channels.NewBaseChannel("dingtalk", cfg, messageBus, cfg.AllowFrom, channels.WithMaxMessageLength(20000))
|
||||||
|
|
||||||
return &DingTalkChannel{
|
return &DingTalkChannel{
|
||||||
BaseChannel: base,
|
BaseChannel: base,
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ func NewDiscordChannel(cfg config.DiscordConfig, bus *bus.MessageBus) (*DiscordC
|
|||||||
return nil, fmt.Errorf("failed to create discord session: %w", err)
|
return nil, fmt.Errorf("failed to create discord session: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
base := channels.NewBaseChannel("discord", cfg, bus, cfg.AllowFrom)
|
base := channels.NewBaseChannel("discord", cfg, bus, cfg.AllowFrom, channels.WithMaxMessageLength(2000))
|
||||||
|
|
||||||
return &DiscordChannel{
|
return &DiscordChannel{
|
||||||
BaseChannel: base,
|
BaseChannel: base,
|
||||||
@@ -121,20 +121,11 @@ func (c *DiscordChannel) Send(ctx context.Context, msg bus.OutboundMessage) erro
|
|||||||
return fmt.Errorf("channel ID is empty")
|
return fmt.Errorf("channel ID is empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
runes := []rune(msg.Content)
|
if len([]rune(msg.Content)) == 0 {
|
||||||
if len(runes) == 0 {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
chunks := utils.SplitMessage(msg.Content, 2000) // Split messages into chunks, Discord length limit: 2000 chars
|
return c.sendChunk(ctx, channelID, msg.Content)
|
||||||
|
|
||||||
for _, chunk := range chunks {
|
|
||||||
if err := c.sendChunk(ctx, channelID, chunk); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content string) error {
|
func (c *DiscordChannel) sendChunk(ctx context.Context, channelID, content string) error {
|
||||||
|
|||||||
@@ -60,7 +60,7 @@ func NewLINEChannel(cfg config.LINEConfig, messageBus *bus.MessageBus) (*LINECha
|
|||||||
return nil, fmt.Errorf("line channel_secret and channel_access_token are required")
|
return nil, fmt.Errorf("line channel_secret and channel_access_token are required")
|
||||||
}
|
}
|
||||||
|
|
||||||
base := channels.NewBaseChannel("line", cfg, messageBus, cfg.AllowFrom)
|
base := channels.NewBaseChannel("line", cfg, messageBus, cfg.AllowFrom, channels.WithMaxMessageLength(5000))
|
||||||
|
|
||||||
return &LINEChannel{
|
return &LINEChannel{
|
||||||
BaseChannel: base,
|
BaseChannel: base,
|
||||||
|
|||||||
+103
-9
@@ -15,10 +15,20 @@ import (
|
|||||||
"github.com/sipeed/picoclaw/pkg/config"
|
"github.com/sipeed/picoclaw/pkg/config"
|
||||||
"github.com/sipeed/picoclaw/pkg/constants"
|
"github.com/sipeed/picoclaw/pkg/constants"
|
||||||
"github.com/sipeed/picoclaw/pkg/logger"
|
"github.com/sipeed/picoclaw/pkg/logger"
|
||||||
|
"github.com/sipeed/picoclaw/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const defaultChannelQueueSize = 100
|
||||||
|
|
||||||
|
type channelWorker struct {
|
||||||
|
ch Channel
|
||||||
|
queue chan bus.OutboundMessage
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
channels map[string]Channel
|
channels map[string]Channel
|
||||||
|
workers map[string]*channelWorker
|
||||||
bus *bus.MessageBus
|
bus *bus.MessageBus
|
||||||
config *config.Config
|
config *config.Config
|
||||||
dispatchTask *asyncTask
|
dispatchTask *asyncTask
|
||||||
@@ -32,6 +42,7 @@ type asyncTask struct {
|
|||||||
func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error) {
|
func NewManager(cfg *config.Config, messageBus *bus.MessageBus) (*Manager, error) {
|
||||||
m := &Manager{
|
m := &Manager{
|
||||||
channels: make(map[string]Channel),
|
channels: make(map[string]Channel),
|
||||||
|
workers: make(map[string]*channelWorker),
|
||||||
bus: messageBus,
|
bus: messageBus,
|
||||||
config: cfg,
|
config: cfg,
|
||||||
}
|
}
|
||||||
@@ -63,6 +74,11 @@ func (m *Manager) initChannel(name, displayName string) {
|
|||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
m.channels[name] = ch
|
m.channels[name] = ch
|
||||||
|
m.workers[name] = &channelWorker{
|
||||||
|
ch: ch,
|
||||||
|
queue: make(chan bus.OutboundMessage, defaultChannelQueueSize),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
logger.InfoCF("channels", "Channel enabled successfully", map[string]any{
|
logger.InfoCF("channels", "Channel enabled successfully", map[string]any{
|
||||||
"channel": displayName,
|
"channel": displayName,
|
||||||
})
|
})
|
||||||
@@ -141,8 +157,6 @@ func (m *Manager) StartAll(ctx context.Context) error {
|
|||||||
dispatchCtx, cancel := context.WithCancel(ctx)
|
dispatchCtx, cancel := context.WithCancel(ctx)
|
||||||
m.dispatchTask = &asyncTask{cancel: cancel}
|
m.dispatchTask = &asyncTask{cancel: cancel}
|
||||||
|
|
||||||
go m.dispatchOutbound(dispatchCtx)
|
|
||||||
|
|
||||||
for name, channel := range m.channels {
|
for name, channel := range m.channels {
|
||||||
logger.InfoCF("channels", "Starting channel", map[string]any{
|
logger.InfoCF("channels", "Starting channel", map[string]any{
|
||||||
"channel": name,
|
"channel": name,
|
||||||
@@ -155,6 +169,14 @@ func (m *Manager) StartAll(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start per-channel workers
|
||||||
|
for name, w := range m.workers {
|
||||||
|
go m.runWorker(dispatchCtx, name, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the dispatcher that reads from the bus and routes to workers
|
||||||
|
go m.dispatchOutbound(dispatchCtx)
|
||||||
|
|
||||||
logger.InfoC("channels", "All channels started")
|
logger.InfoC("channels", "All channels started")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -165,11 +187,21 @@ func (m *Manager) StopAll(ctx context.Context) error {
|
|||||||
|
|
||||||
logger.InfoC("channels", "Stopping all channels")
|
logger.InfoC("channels", "Stopping all channels")
|
||||||
|
|
||||||
|
// Cancel dispatcher first
|
||||||
if m.dispatchTask != nil {
|
if m.dispatchTask != nil {
|
||||||
m.dispatchTask.cancel()
|
m.dispatchTask.cancel()
|
||||||
m.dispatchTask = nil
|
m.dispatchTask = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close all worker queues and wait for them to drain
|
||||||
|
for _, w := range m.workers {
|
||||||
|
close(w.queue)
|
||||||
|
}
|
||||||
|
for _, w := range m.workers {
|
||||||
|
<-w.done
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop all channels
|
||||||
for name, channel := range m.channels {
|
for name, channel := range m.channels {
|
||||||
logger.InfoCF("channels", "Stopping channel", map[string]any{
|
logger.InfoCF("channels", "Stopping channel", map[string]any{
|
||||||
"channel": name,
|
"channel": name,
|
||||||
@@ -186,6 +218,44 @@ func (m *Manager) StopAll(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// runWorker processes outbound messages for a single channel, splitting
|
||||||
|
// messages that exceed the channel's maximum message length.
|
||||||
|
func (m *Manager) runWorker(ctx context.Context, name string, w *channelWorker) {
|
||||||
|
defer close(w.done)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg, ok := <-w.queue:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
maxLen := 0
|
||||||
|
if mlp, ok := w.ch.(MessageLengthProvider); ok {
|
||||||
|
maxLen = mlp.MaxMessageLength()
|
||||||
|
}
|
||||||
|
if maxLen > 0 && len([]rune(msg.Content)) > maxLen {
|
||||||
|
chunks := utils.SplitMessage(msg.Content, maxLen)
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
chunkMsg := msg
|
||||||
|
chunkMsg.Content = chunk
|
||||||
|
if err := w.ch.Send(ctx, chunkMsg); err != nil {
|
||||||
|
logger.ErrorCF("channels", "Error sending chunk", map[string]any{
|
||||||
|
"channel": name, "error": err.Error(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := w.ch.Send(ctx, msg); err != nil {
|
||||||
|
logger.ErrorCF("channels", "Error sending message", map[string]any{
|
||||||
|
"channel": name, "error": err.Error(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Manager) dispatchOutbound(ctx context.Context) {
|
func (m *Manager) dispatchOutbound(ctx context.Context) {
|
||||||
logger.InfoC("channels", "Outbound dispatcher started")
|
logger.InfoC("channels", "Outbound dispatcher started")
|
||||||
|
|
||||||
@@ -206,7 +276,8 @@ func (m *Manager) dispatchOutbound(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
channel, exists := m.channels[msg.Channel]
|
_, exists := m.channels[msg.Channel]
|
||||||
|
w, wExists := m.workers[msg.Channel]
|
||||||
m.mu.RUnlock()
|
m.mu.RUnlock()
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
@@ -216,11 +287,12 @@ func (m *Manager) dispatchOutbound(ctx context.Context) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := channel.Send(ctx, msg); err != nil {
|
if wExists {
|
||||||
logger.ErrorCF("channels", "Error sending message to channel", map[string]any{
|
select {
|
||||||
"channel": msg.Channel,
|
case w.queue <- msg:
|
||||||
"error": err.Error(),
|
case <-ctx.Done():
|
||||||
})
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -262,17 +334,28 @@ func (m *Manager) RegisterChannel(name string, channel Channel) {
|
|||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
m.channels[name] = channel
|
m.channels[name] = channel
|
||||||
|
m.workers[name] = &channelWorker{
|
||||||
|
ch: channel,
|
||||||
|
queue: make(chan bus.OutboundMessage, defaultChannelQueueSize),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) UnregisterChannel(name string) {
|
func (m *Manager) UnregisterChannel(name string) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
if w, ok := m.workers[name]; ok {
|
||||||
|
close(w.queue)
|
||||||
|
<-w.done
|
||||||
|
}
|
||||||
|
delete(m.workers, name)
|
||||||
delete(m.channels, name)
|
delete(m.channels, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
|
func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, content string) error {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
channel, exists := m.channels[channelName]
|
_, exists := m.channels[channelName]
|
||||||
|
w, wExists := m.workers[channelName]
|
||||||
m.mu.RUnlock()
|
m.mu.RUnlock()
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
@@ -285,5 +368,16 @@ func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, conten
|
|||||||
Content: content,
|
Content: content,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if wExists {
|
||||||
|
select {
|
||||||
|
case w.queue <- msg:
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback: direct send (should not happen)
|
||||||
|
channel, _ := m.channels[channelName]
|
||||||
return channel.Send(ctx, msg)
|
return channel.Send(ctx, msg)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ func NewSlackChannel(cfg config.SlackConfig, messageBus *bus.MessageBus) (*Slack
|
|||||||
|
|
||||||
socketClient := socketmode.New(api)
|
socketClient := socketmode.New(api)
|
||||||
|
|
||||||
base := channels.NewBaseChannel("slack", cfg, messageBus, cfg.AllowFrom)
|
base := channels.NewBaseChannel("slack", cfg, messageBus, cfg.AllowFrom, channels.WithMaxMessageLength(40000))
|
||||||
|
|
||||||
return &SlackChannel{
|
return &SlackChannel{
|
||||||
BaseChannel: base,
|
BaseChannel: base,
|
||||||
|
|||||||
@@ -76,7 +76,13 @@ func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChann
|
|||||||
return nil, fmt.Errorf("failed to create telegram bot: %w", err)
|
return nil, fmt.Errorf("failed to create telegram bot: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
base := channels.NewBaseChannel("telegram", telegramCfg, bus, telegramCfg.AllowFrom)
|
base := channels.NewBaseChannel(
|
||||||
|
"telegram",
|
||||||
|
telegramCfg,
|
||||||
|
bus,
|
||||||
|
telegramCfg.AllowFrom,
|
||||||
|
channels.WithMaxMessageLength(4096),
|
||||||
|
)
|
||||||
|
|
||||||
return &TelegramChannel{
|
return &TelegramChannel{
|
||||||
BaseChannel: base,
|
BaseChannel: base,
|
||||||
|
|||||||
@@ -120,7 +120,7 @@ func NewWeComAppChannel(cfg config.WeComAppConfig, messageBus *bus.MessageBus) (
|
|||||||
return nil, fmt.Errorf("wecom_app corp_id, corp_secret and agent_id are required")
|
return nil, fmt.Errorf("wecom_app corp_id, corp_secret and agent_id are required")
|
||||||
}
|
}
|
||||||
|
|
||||||
base := channels.NewBaseChannel("wecom_app", cfg, messageBus, cfg.AllowFrom)
|
base := channels.NewBaseChannel("wecom_app", cfg, messageBus, cfg.AllowFrom, channels.WithMaxMessageLength(2048))
|
||||||
|
|
||||||
return &WeComAppChannel{
|
return &WeComAppChannel{
|
||||||
BaseChannel: base,
|
BaseChannel: base,
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ func NewWeComBotChannel(cfg config.WeComConfig, messageBus *bus.MessageBus) (*We
|
|||||||
return nil, fmt.Errorf("wecom token and webhook_url are required")
|
return nil, fmt.Errorf("wecom token and webhook_url are required")
|
||||||
}
|
}
|
||||||
|
|
||||||
base := channels.NewBaseChannel("wecom", cfg, messageBus, cfg.AllowFrom)
|
base := channels.NewBaseChannel("wecom", cfg, messageBus, cfg.AllowFrom, channels.WithMaxMessageLength(2048))
|
||||||
|
|
||||||
return &WeComBotChannel{
|
return &WeComBotChannel{
|
||||||
BaseChannel: base,
|
BaseChannel: base,
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ type WhatsAppChannel struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewWhatsAppChannel(cfg config.WhatsAppConfig, bus *bus.MessageBus) (*WhatsAppChannel, error) {
|
func NewWhatsAppChannel(cfg config.WhatsAppConfig, bus *bus.MessageBus) (*WhatsAppChannel, error) {
|
||||||
base := channels.NewBaseChannel("whatsapp", cfg, bus, cfg.AllowFrom)
|
base := channels.NewBaseChannel("whatsapp", cfg, bus, cfg.AllowFrom, channels.WithMaxMessageLength(65536))
|
||||||
|
|
||||||
return &WhatsAppChannel{
|
return &WhatsAppChannel{
|
||||||
BaseChannel: base,
|
BaseChannel: base,
|
||||||
|
|||||||
+68
-46
@@ -5,11 +5,20 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// SplitMessage splits long messages into chunks, preserving code block integrity.
|
// SplitMessage splits long messages into chunks, preserving code block integrity.
|
||||||
|
// The maxLen parameter is measured in runes (Unicode characters), not bytes.
|
||||||
// The function reserves a buffer (10% of maxLen, min 50) to leave room for closing code blocks,
|
// The function reserves a buffer (10% of maxLen, min 50) to leave room for closing code blocks,
|
||||||
// but may extend to maxLen when needed.
|
// but may extend to maxLen when needed.
|
||||||
// Call SplitMessage with the full text content and the maximum allowed length of a single message;
|
// Call SplitMessage with the full text content and the maximum allowed length of a single message;
|
||||||
// it returns a slice of message chunks that each respect maxLen and avoid splitting fenced code blocks.
|
// it returns a slice of message chunks that each respect maxLen and avoid splitting fenced code blocks.
|
||||||
func SplitMessage(content string, maxLen int) []string {
|
func SplitMessage(content string, maxLen int) []string {
|
||||||
|
if maxLen <= 0 {
|
||||||
|
if content == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return []string{content}
|
||||||
|
}
|
||||||
|
|
||||||
|
runes := []rune(content)
|
||||||
var messages []string
|
var messages []string
|
||||||
|
|
||||||
// Dynamic buffer: 10% of maxLen, but at least 50 chars if possible
|
// Dynamic buffer: 10% of maxLen, but at least 50 chars if possible
|
||||||
@@ -21,9 +30,9 @@ func SplitMessage(content string, maxLen int) []string {
|
|||||||
codeBlockBuffer = maxLen / 2
|
codeBlockBuffer = maxLen / 2
|
||||||
}
|
}
|
||||||
|
|
||||||
for len(content) > 0 {
|
for len(runes) > 0 {
|
||||||
if len(content) <= maxLen {
|
if len(runes) <= maxLen {
|
||||||
messages = append(messages, content)
|
messages = append(messages, string(runes))
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -34,56 +43,66 @@ func SplitMessage(content string, maxLen int) []string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Find natural split point within the effective limit
|
// Find natural split point within the effective limit
|
||||||
msgEnd := findLastNewline(content[:effectiveLimit], 200)
|
msgEnd := findLastNewlineRunes(runes[:effectiveLimit], 200)
|
||||||
if msgEnd <= 0 {
|
if msgEnd <= 0 {
|
||||||
msgEnd = findLastSpace(content[:effectiveLimit], 100)
|
msgEnd = findLastSpaceRunes(runes[:effectiveLimit], 100)
|
||||||
}
|
}
|
||||||
if msgEnd <= 0 {
|
if msgEnd <= 0 {
|
||||||
msgEnd = effectiveLimit
|
msgEnd = effectiveLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if this would end with an incomplete code block
|
// Check if this would end with an incomplete code block
|
||||||
candidate := content[:msgEnd]
|
candidate := runes[:msgEnd]
|
||||||
unclosedIdx := findLastUnclosedCodeBlock(candidate)
|
unclosedIdx := findLastUnclosedCodeBlockRunes(candidate)
|
||||||
|
|
||||||
if unclosedIdx >= 0 {
|
if unclosedIdx >= 0 {
|
||||||
// Message would end with incomplete code block
|
// Message would end with incomplete code block
|
||||||
// Try to extend up to maxLen to include the closing ```
|
// Try to extend up to maxLen to include the closing ```
|
||||||
if len(content) > msgEnd {
|
if len(runes) > msgEnd {
|
||||||
closingIdx := findNextClosingCodeBlock(content, msgEnd)
|
closingIdx := findNextClosingCodeBlockRunes(runes, msgEnd)
|
||||||
if closingIdx > 0 && closingIdx <= maxLen {
|
if closingIdx > 0 && closingIdx <= maxLen {
|
||||||
// Extend to include the closing ```
|
// Extend to include the closing ```
|
||||||
msgEnd = closingIdx
|
msgEnd = closingIdx
|
||||||
} else {
|
} else {
|
||||||
// Code block is too long to fit in one chunk or missing closing fence.
|
// Code block is too long to fit in one chunk or missing closing fence.
|
||||||
// Try to split inside by injecting closing and reopening fences.
|
// Try to split inside by injecting closing and reopening fences.
|
||||||
headerEnd := strings.Index(content[unclosedIdx:], "\n")
|
candidateStr := string(candidate)
|
||||||
|
unclosedStr := string(runes[unclosedIdx:])
|
||||||
|
headerEnd := strings.Index(unclosedStr, "\n")
|
||||||
|
var header string
|
||||||
if headerEnd == -1 {
|
if headerEnd == -1 {
|
||||||
headerEnd = unclosedIdx + 3
|
header = strings.TrimSpace(string(runes[unclosedIdx : unclosedIdx+3]))
|
||||||
} else {
|
} else {
|
||||||
headerEnd += unclosedIdx
|
header = strings.TrimSpace(string(runes[unclosedIdx : unclosedIdx+headerEnd]))
|
||||||
}
|
}
|
||||||
header := strings.TrimSpace(content[unclosedIdx:headerEnd])
|
headerEndIdx := unclosedIdx + len([]rune(header))
|
||||||
|
if headerEnd != -1 {
|
||||||
|
headerEndIdx = unclosedIdx + headerEnd
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = candidateStr // used above for context
|
||||||
|
|
||||||
// If we have a reasonable amount of content after the header, split inside
|
// If we have a reasonable amount of content after the header, split inside
|
||||||
if msgEnd > headerEnd+20 {
|
if msgEnd > headerEndIdx+20 {
|
||||||
// Find a better split point closer to maxLen
|
// Find a better split point closer to maxLen
|
||||||
innerLimit := maxLen - 5 // Leave room for "\n```"
|
innerLimit := maxLen - 5 // Leave room for "\n```"
|
||||||
betterEnd := findLastNewline(content[:innerLimit], 200)
|
betterEnd := findLastNewlineRunes(runes[:innerLimit], 200)
|
||||||
if betterEnd > headerEnd {
|
if betterEnd > headerEndIdx {
|
||||||
msgEnd = betterEnd
|
msgEnd = betterEnd
|
||||||
} else {
|
} else {
|
||||||
msgEnd = innerLimit
|
msgEnd = innerLimit
|
||||||
}
|
}
|
||||||
messages = append(messages, strings.TrimRight(content[:msgEnd], " \t\n\r")+"\n```")
|
chunk := strings.TrimRight(string(runes[:msgEnd]), " \t\n\r") + "\n```"
|
||||||
content = strings.TrimSpace(header + "\n" + content[msgEnd:])
|
messages = append(messages, chunk)
|
||||||
|
remaining := strings.TrimSpace(header + "\n" + string(runes[msgEnd:]))
|
||||||
|
runes = []rune(remaining)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise, try to split before the code block starts
|
// Otherwise, try to split before the code block starts
|
||||||
newEnd := findLastNewline(content[:unclosedIdx], 200)
|
newEnd := findLastNewlineRunes(runes[:unclosedIdx], 200)
|
||||||
if newEnd <= 0 {
|
if newEnd <= 0 {
|
||||||
newEnd = findLastSpace(content[:unclosedIdx], 100)
|
newEnd = findLastSpaceRunes(runes[:unclosedIdx], 100)
|
||||||
}
|
}
|
||||||
if newEnd > 0 {
|
if newEnd > 0 {
|
||||||
msgEnd = newEnd
|
msgEnd = newEnd
|
||||||
@@ -93,8 +112,10 @@ func SplitMessage(content string, maxLen int) []string {
|
|||||||
msgEnd = unclosedIdx
|
msgEnd = unclosedIdx
|
||||||
} else {
|
} else {
|
||||||
msgEnd = maxLen - 5
|
msgEnd = maxLen - 5
|
||||||
messages = append(messages, strings.TrimRight(content[:msgEnd], " \t\n\r")+"\n```")
|
chunk := strings.TrimRight(string(runes[:msgEnd]), " \t\n\r") + "\n```"
|
||||||
content = strings.TrimSpace(header + "\n" + content[msgEnd:])
|
messages = append(messages, chunk)
|
||||||
|
remaining := strings.TrimSpace(header + "\n" + string(runes[msgEnd:]))
|
||||||
|
runes = []rune(remaining)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -106,21 +127,22 @@ func SplitMessage(content string, maxLen int) []string {
|
|||||||
msgEnd = effectiveLimit
|
msgEnd = effectiveLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
messages = append(messages, content[:msgEnd])
|
messages = append(messages, string(runes[:msgEnd]))
|
||||||
content = strings.TrimSpace(content[msgEnd:])
|
remaining := strings.TrimSpace(string(runes[msgEnd:]))
|
||||||
|
runes = []rune(remaining)
|
||||||
}
|
}
|
||||||
|
|
||||||
return messages
|
return messages
|
||||||
}
|
}
|
||||||
|
|
||||||
// findLastUnclosedCodeBlock finds the last opening ``` that doesn't have a closing ```
|
// findLastUnclosedCodeBlockRunes finds the last opening ``` that doesn't have a closing ```
|
||||||
// Returns the position of the opening ``` or -1 if all code blocks are complete
|
// Returns the rune position of the opening ``` or -1 if all code blocks are complete
|
||||||
func findLastUnclosedCodeBlock(text string) int {
|
func findLastUnclosedCodeBlockRunes(runes []rune) int {
|
||||||
inCodeBlock := false
|
inCodeBlock := false
|
||||||
lastOpenIdx := -1
|
lastOpenIdx := -1
|
||||||
|
|
||||||
for i := 0; i < len(text); i++ {
|
for i := 0; i < len(runes); i++ {
|
||||||
if i+2 < len(text) && text[i] == '`' && text[i+1] == '`' && text[i+2] == '`' {
|
if i+2 < len(runes) && runes[i] == '`' && runes[i+1] == '`' && runes[i+2] == '`' {
|
||||||
// Toggle code block state on each fence
|
// Toggle code block state on each fence
|
||||||
if !inCodeBlock {
|
if !inCodeBlock {
|
||||||
// Entering a code block: record this opening fence
|
// Entering a code block: record this opening fence
|
||||||
@@ -137,41 +159,41 @@ func findLastUnclosedCodeBlock(text string) int {
|
|||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
// findNextClosingCodeBlock finds the next closing ``` starting from a position
|
// findNextClosingCodeBlockRunes finds the next closing ``` starting from a rune position
|
||||||
// Returns the position after the closing ``` or -1 if not found
|
// Returns the rune position after the closing ``` or -1 if not found
|
||||||
func findNextClosingCodeBlock(text string, startIdx int) int {
|
func findNextClosingCodeBlockRunes(runes []rune, startIdx int) int {
|
||||||
for i := startIdx; i < len(text); i++ {
|
for i := startIdx; i < len(runes); i++ {
|
||||||
if i+2 < len(text) && text[i] == '`' && text[i+1] == '`' && text[i+2] == '`' {
|
if i+2 < len(runes) && runes[i] == '`' && runes[i+1] == '`' && runes[i+2] == '`' {
|
||||||
return i + 3
|
return i + 3
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
// findLastNewline finds the last newline character within the last N characters
|
// findLastNewlineRunes finds the last newline character within the last N runes
|
||||||
// Returns the position of the newline or -1 if not found
|
// Returns the rune position of the newline or -1 if not found
|
||||||
func findLastNewline(s string, searchWindow int) int {
|
func findLastNewlineRunes(runes []rune, searchWindow int) int {
|
||||||
searchStart := len(s) - searchWindow
|
searchStart := len(runes) - searchWindow
|
||||||
if searchStart < 0 {
|
if searchStart < 0 {
|
||||||
searchStart = 0
|
searchStart = 0
|
||||||
}
|
}
|
||||||
for i := len(s) - 1; i >= searchStart; i-- {
|
for i := len(runes) - 1; i >= searchStart; i-- {
|
||||||
if s[i] == '\n' {
|
if runes[i] == '\n' {
|
||||||
return i
|
return i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
// findLastSpace finds the last space character within the last N characters
|
// findLastSpaceRunes finds the last space character within the last N runes
|
||||||
// Returns the position of the space or -1 if not found
|
// Returns the rune position of the space or -1 if not found
|
||||||
func findLastSpace(s string, searchWindow int) int {
|
func findLastSpaceRunes(runes []rune, searchWindow int) int {
|
||||||
searchStart := len(s) - searchWindow
|
searchStart := len(runes) - searchWindow
|
||||||
if searchStart < 0 {
|
if searchStart < 0 {
|
||||||
searchStart = 0
|
searchStart = 0
|
||||||
}
|
}
|
||||||
for i := len(s) - 1; i >= searchStart; i-- {
|
for i := len(runes) - 1; i >= searchStart; i-- {
|
||||||
if s[i] == ' ' || s[i] == '\t' {
|
if runes[i] == ' ' || runes[i] == '\t' {
|
||||||
return i
|
return i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+43
-17
@@ -34,11 +34,15 @@ func TestSplitMessage(t *testing.T) {
|
|||||||
maxLen: 2000,
|
maxLen: 2000,
|
||||||
expectChunks: 2,
|
expectChunks: 2,
|
||||||
checkContent: func(t *testing.T, chunks []string) {
|
checkContent: func(t *testing.T, chunks []string) {
|
||||||
if len(chunks[0]) > 2000 {
|
if len([]rune(chunks[0])) > 2000 {
|
||||||
t.Errorf("Chunk 0 too large: %d", len(chunks[0]))
|
t.Errorf("Chunk 0 too large: %d runes", len([]rune(chunks[0])))
|
||||||
}
|
}
|
||||||
if len(chunks[0])+len(chunks[1]) != len(longText) {
|
if len([]rune(chunks[0]))+len([]rune(chunks[1])) != len([]rune(longText)) {
|
||||||
t.Errorf("Total length mismatch. Got %d, want %d", len(chunks[0])+len(chunks[1]), len(longText))
|
t.Errorf(
|
||||||
|
"Total rune length mismatch. Got %d, want %d",
|
||||||
|
len([]rune(chunks[0]))+len([]rune(chunks[1])),
|
||||||
|
len([]rune(longText)),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -53,11 +57,11 @@ func TestSplitMessage(t *testing.T) {
|
|||||||
maxLen: 2000,
|
maxLen: 2000,
|
||||||
expectChunks: 2,
|
expectChunks: 2,
|
||||||
checkContent: func(t *testing.T, chunks []string) {
|
checkContent: func(t *testing.T, chunks []string) {
|
||||||
if len(chunks[0]) != 1750 {
|
if len([]rune(chunks[0])) != 1750 {
|
||||||
t.Errorf("Expected chunk 0 to be 1750 length (split at newline), got %d", len(chunks[0]))
|
t.Errorf("Expected chunk 0 to be 1750 runes (split at newline), got %d", len([]rune(chunks[0])))
|
||||||
}
|
}
|
||||||
if chunks[1] != strings.Repeat("b", 300) {
|
if chunks[1] != strings.Repeat("b", 300) {
|
||||||
t.Errorf("Chunk 1 content mismatch. Len: %d", len(chunks[1]))
|
t.Errorf("Chunk 1 content mismatch. Len: %d", len([]rune(chunks[1])))
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -78,17 +82,39 @@ func TestSplitMessage(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Preserve Unicode characters",
|
name: "Preserve Unicode characters (rune-aware)",
|
||||||
content: strings.Repeat("\u4e16", 1000), // 3000 bytes
|
content: strings.Repeat("\u4e16", 2500), // 2500 runes, 7500 bytes
|
||||||
maxLen: 2000,
|
maxLen: 2000,
|
||||||
expectChunks: 2,
|
expectChunks: 2,
|
||||||
checkContent: func(t *testing.T, chunks []string) {
|
checkContent: func(t *testing.T, chunks []string) {
|
||||||
// Just verify we didn't panic and got valid strings.
|
// Verify chunks contain valid unicode and don't split mid-rune
|
||||||
// Go strings are UTF-8, if we split mid-rune it would be bad,
|
for i, chunk := range chunks {
|
||||||
// but standard slicing might do that.
|
runeCount := len([]rune(chunk))
|
||||||
// Let's assume standard behavior is acceptable or check if it produces invalid rune?
|
if runeCount > 2000 {
|
||||||
if !strings.Contains(chunks[0], "\u4e16") {
|
t.Errorf("Chunk %d has %d runes, exceeds maxLen 2000", i, runeCount)
|
||||||
t.Error("Chunk should contain unicode characters")
|
}
|
||||||
|
if !strings.Contains(chunk, "\u4e16") {
|
||||||
|
t.Errorf("Chunk %d should contain unicode characters", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Verify total rune count is preserved
|
||||||
|
totalRunes := 0
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
totalRunes += len([]rune(chunk))
|
||||||
|
}
|
||||||
|
if totalRunes != 2500 {
|
||||||
|
t.Errorf("Total rune count mismatch. Got %d, want 2500", totalRunes)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Zero maxLen returns single chunk",
|
||||||
|
content: "Hello world",
|
||||||
|
maxLen: 0,
|
||||||
|
expectChunks: 1,
|
||||||
|
checkContent: func(t *testing.T, chunks []string) {
|
||||||
|
if chunks[0] != "Hello world" {
|
||||||
|
t.Errorf("Expected original content, got %q", chunks[0])
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -145,7 +171,7 @@ func TestSplitMessage_CodeBlockIntegrity(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// First chunk should contain meaningful content
|
// First chunk should contain meaningful content
|
||||||
if len(chunks[0]) > 40 {
|
if len([]rune(chunks[0])) > 40 {
|
||||||
t.Errorf("First chunk exceeded maxLen: length %d", len(chunks[0]))
|
t.Errorf("First chunk exceeded maxLen: length %d runes", len([]rune(chunks[0])))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user