diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index d2b6838c5..507a87d05 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -11,6 +11,8 @@ import ( // ErrBusClosed is returned when publishing to a closed MessageBus. var ErrBusClosed = errors.New("message bus closed") +const defaultBusBufferSize = 16 + type MessageBus struct { inbound chan InboundMessage outbound chan OutboundMessage @@ -21,9 +23,9 @@ type MessageBus struct { func NewMessageBus() *MessageBus { return &MessageBus{ - inbound: make(chan InboundMessage, 100), - outbound: make(chan OutboundMessage, 100), - outboundMedia: make(chan OutboundMediaMessage, 100), + inbound: make(chan InboundMessage, defaultBusBufferSize), + outbound: make(chan OutboundMessage, defaultBusBufferSize), + outboundMedia: make(chan OutboundMediaMessage, defaultBusBufferSize), done: make(chan struct{}), } } diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index 47826824e..cc3f6961f 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -67,7 +67,7 @@ func TestPublishInbound_ContextCancel(t *testing.T) { // Fill the buffer ctx := context.Background() - for i := 0; i < 100; i++ { + for i := 0; i < defaultBusBufferSize; i++ { if err := mb.PublishInbound(ctx, InboundMessage{Content: "fill"}); err != nil { t.Fatalf("fill failed at %d: %v", i, err) } @@ -194,7 +194,7 @@ func TestPublishInbound_FullBuffer(t *testing.T) { ctx := context.Background() // Fill the buffer - for i := 0; i < 100; i++ { + for i := 0; i < defaultBusBufferSize; i++ { if err := mb.PublishInbound(ctx, InboundMessage{Content: "fill"}); err != nil { t.Fatalf("fill failed at %d: %v", i, err) } diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 418933af7..1aae47faa 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -2,10 +2,13 @@ package channels import ( "context" + "crypto/rand" + "encoding/binary" + "encoding/hex" + "strconv" "strings" "sync/atomic" - - "github.com/google/uuid" + "time" "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" @@ -14,6 +17,28 @@ import ( "github.com/sipeed/picoclaw/pkg/media" ) +var ( + fastIDCounter uint64 + fastIDPrefix string +) + +func init() { + // One-time read from crypto/rand for a unique prefix (single syscall). + var b [8]byte + if _, err := rand.Read(b[:]); err != nil { + // fallback to time-based prefix + binary.BigEndian.PutUint64(b[:], uint64(time.Now().UnixNano())) + } + fastIDPrefix = hex.EncodeToString(b[:]) +} + +// fastID generates a unique ID using a random prefix and an atomic counter. +// Much cheaper than uuid.New() which calls crypto/rand.Read on every invocation. +func fastID() string { + n := atomic.AddUint64(&fastIDCounter, 1) + return fastIDPrefix + strconv.FormatUint(n, 16) +} + type Channel interface { Name() string Start(ctx context.Context) error @@ -264,7 +289,7 @@ func (c *BaseChannel) GetPlaceholderRecorder() PlaceholderRecorder { func BuildMediaScope(channel, chatID, messageID string) string { id := messageID if id == "" { - id = uuid.New().String() + id = fastID() } return channel + ":" + chatID + ":" + id } diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 07c2ce1e2..ae6eb3ce4 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -26,14 +26,30 @@ import ( ) const ( - defaultChannelQueueSize = 100 + defaultChannelQueueSize = 16 defaultRateLimit = 10 // default 10 msg/s maxRetries = 3 rateLimitDelay = 1 * time.Second baseBackoff = 500 * time.Millisecond maxBackoff = 8 * time.Second + + janitorInterval = 10 * time.Second + typingStopTTL = 5 * time.Minute + placeholderTTL = 10 * time.Minute ) +// typingEntry wraps a typing stop function with a creation timestamp for TTL eviction. +type typingEntry struct { + stop func() + createdAt time.Time +} + +// placeholderEntry wraps a placeholder ID with a creation timestamp for TTL eviction. +type placeholderEntry struct { + id string + createdAt time.Time +} + // channelRateConfig maps channel name to per-second rate limit. var channelRateConfig = map[string]float64{ "telegram": 20, @@ -73,14 +89,14 @@ type asyncTask struct { // Implements PlaceholderRecorder. func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string) { key := channel + ":" + chatID - m.placeholders.Store(key, placeholderID) + m.placeholders.Store(key, placeholderEntry{id: placeholderID, createdAt: time.Now()}) } // RecordTypingStop registers a typing stop function for later invocation. // Implements PlaceholderRecorder. func (m *Manager) RecordTypingStop(channel, chatID string, stop func()) { key := channel + ":" + chatID - m.typingStops.Store(key, stop) + m.typingStops.Store(key, typingEntry{stop: stop, createdAt: time.Now()}) } // preSend handles typing stop and placeholder editing before sending a message. @@ -90,16 +106,16 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess // 1. Stop typing if v, loaded := m.typingStops.LoadAndDelete(key); loaded { - if stop, ok := v.(func()); ok { - stop() // idempotent, safe + if entry, ok := v.(typingEntry); ok { + entry.stop() // idempotent, safe } } // 2. Try editing placeholder if v, loaded := m.placeholders.LoadAndDelete(key); loaded { - if placeholderID, ok := v.(string); ok && placeholderID != "" { + if entry, ok := v.(placeholderEntry); ok && entry.id != "" { if editor, ok := ch.(MessageEditor); ok { - if err := editor.EditMessage(ctx, msg.ChatID, placeholderID, msg.Content); err == nil { + if err := editor.EditMessage(ctx, msg.ChatID, entry.id, msg.Content); err == nil { return true // edited successfully, skip Send } // edit failed → fall through to normal Send @@ -156,7 +172,6 @@ func (m *Manager) initChannel(name, displayName string) { setter.SetPlaceholderRecorder(m) } m.channels[name] = ch - m.workers[name] = newChannelWorker(name, ch) logger.InfoCF("channels", "Channel enabled successfully", map[string]any{ "channel": displayName, }) @@ -285,11 +300,11 @@ func (m *Manager) StartAll(ctx context.Context) error { "channel": name, "error": err.Error(), }) + continue } - } - - // Start per-channel workers - for name, w := range m.workers { + // Lazily create worker only after channel starts successfully + w := newChannelWorker(name, channel) + m.workers[name] = w go m.runWorker(dispatchCtx, name, w) go m.runMediaWorker(dispatchCtx, name, w) } @@ -298,6 +313,9 @@ func (m *Manager) StartAll(ctx context.Context) error { go m.dispatchOutbound(dispatchCtx) go m.dispatchOutboundMedia(dispatchCtx) + // Start the TTL janitor that cleans up stale typing/placeholder entries + go m.runTTLJanitor(dispatchCtx) + // Start shared HTTP server if configured if m.httpServer != nil { go func() { @@ -342,17 +360,25 @@ func (m *Manager) StopAll(ctx context.Context) error { // Close all worker queues and wait for them to drain for _, w := range m.workers { - close(w.queue) + if w != nil { + close(w.queue) + } } for _, w := range m.workers { - <-w.done + if w != nil { + <-w.done + } } // Close all media worker queues and wait for them to drain for _, w := range m.workers { - close(w.mediaQueue) + if w != nil { + close(w.mediaQueue) + } } for _, w := range m.workers { - <-w.mediaDone + if w != nil { + <-w.mediaDone + } } // Stop all channels @@ -487,40 +513,39 @@ func (m *Manager) dispatchOutbound(ctx context.Context) { logger.InfoC("channels", "Outbound dispatcher started") for { - select { - case <-ctx.Done(): + msg, ok := m.bus.SubscribeOutbound(ctx) + if !ok { logger.InfoC("channels", "Outbound dispatcher stopped") return - default: - msg, ok := m.bus.SubscribeOutbound(ctx) - if !ok { - continue - } + } - // Silently skip internal channels - if constants.IsInternalChannel(msg.Channel) { - continue - } + // Silently skip internal channels + if constants.IsInternalChannel(msg.Channel) { + continue + } - m.mu.RLock() - _, exists := m.channels[msg.Channel] - w, wExists := m.workers[msg.Channel] - m.mu.RUnlock() + m.mu.RLock() + _, exists := m.channels[msg.Channel] + w, wExists := m.workers[msg.Channel] + m.mu.RUnlock() - if !exists { - logger.WarnCF("channels", "Unknown channel for outbound message", map[string]any{ - "channel": msg.Channel, - }) - continue - } + if !exists { + logger.WarnCF("channels", "Unknown channel for outbound message", map[string]any{ + "channel": msg.Channel, + }) + continue + } - if wExists { - select { - case w.queue <- msg: - case <-ctx.Done(): - return - } + if wExists && w != nil { + select { + case w.queue <- msg: + case <-ctx.Done(): + return } + } else if exists { + logger.WarnCF("channels", "Channel has no active worker, skipping message", map[string]any{ + "channel": msg.Channel, + }) } } } @@ -529,40 +554,39 @@ func (m *Manager) dispatchOutboundMedia(ctx context.Context) { logger.InfoC("channels", "Outbound media dispatcher started") for { - select { - case <-ctx.Done(): + msg, ok := m.bus.SubscribeOutboundMedia(ctx) + if !ok { logger.InfoC("channels", "Outbound media dispatcher stopped") return - default: - msg, ok := m.bus.SubscribeOutboundMedia(ctx) - if !ok { - continue - } + } - // Silently skip internal channels - if constants.IsInternalChannel(msg.Channel) { - continue - } + // Silently skip internal channels + if constants.IsInternalChannel(msg.Channel) { + continue + } - m.mu.RLock() - _, exists := m.channels[msg.Channel] - w, wExists := m.workers[msg.Channel] - m.mu.RUnlock() + m.mu.RLock() + _, exists := m.channels[msg.Channel] + w, wExists := m.workers[msg.Channel] + m.mu.RUnlock() - if !exists { - logger.WarnCF("channels", "Unknown channel for outbound media message", map[string]any{ - "channel": msg.Channel, - }) - continue - } + if !exists { + logger.WarnCF("channels", "Unknown channel for outbound media message", map[string]any{ + "channel": msg.Channel, + }) + continue + } - if wExists { - select { - case w.mediaQueue <- msg: - case <-ctx.Done(): - return - } + if wExists && w != nil { + select { + case w.mediaQueue <- msg: + case <-ctx.Done(): + return } + } else if exists { + logger.WarnCF("channels", "Channel has no active worker, skipping media message", map[string]any{ + "channel": msg.Channel, + }) } } } @@ -644,6 +668,40 @@ func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channe }) } +// runTTLJanitor periodically scans the typingStops and placeholders maps +// and evicts entries that have exceeded their TTL. This prevents memory +// accumulation when outbound paths fail to trigger preSend (e.g. LLM errors). +func (m *Manager) runTTLJanitor(ctx context.Context) { + ticker := time.NewTicker(janitorInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + m.typingStops.Range(func(key, value any) bool { + if entry, ok := value.(typingEntry); ok { + if now.Sub(entry.createdAt) > typingStopTTL { + if _, loaded := m.typingStops.LoadAndDelete(key); loaded { + entry.stop() // idempotent, safe + } + } + } + return true + }) + m.placeholders.Range(func(key, value any) bool { + if entry, ok := value.(placeholderEntry); ok { + if now.Sub(entry.createdAt) > placeholderTTL { + m.placeholders.Delete(key) + } + } + return true + }) + } + } +} + func (m *Manager) GetChannel(name string) (Channel, bool) { m.mu.RLock() defer m.mu.RUnlock() @@ -680,13 +738,12 @@ func (m *Manager) RegisterChannel(name string, channel Channel) { m.mu.Lock() defer m.mu.Unlock() m.channels[name] = channel - m.workers[name] = newChannelWorker(name, channel) } func (m *Manager) UnregisterChannel(name string) { m.mu.Lock() defer m.mu.Unlock() - if w, ok := m.workers[name]; ok { + if w, ok := m.workers[name]; ok && w != nil { close(w.queue) <-w.done close(w.mediaQueue) @@ -712,7 +769,7 @@ func (m *Manager) SendToChannel(ctx context.Context, channelName, chatID, conten Content: content, } - if wExists { + if wExists && w != nil { select { case w.queue <- msg: return nil diff --git a/pkg/channels/manager_test.go b/pkg/channels/manager_test.go index 0573c0a8e..45590584b 100644 --- a/pkg/channels/manager_test.go +++ b/pkg/channels/manager_test.go @@ -632,3 +632,233 @@ func TestSendWithRetry_PreSendEditsPlaceholder(t *testing.T) { t.Fatal("expected Send to NOT be called when placeholder was edited") } } + +// --- Dispatcher exit tests (Step 1) --- + +func TestDispatcherExitsOnCancel(t *testing.T) { + mb := bus.NewMessageBus() + defer mb.Close() + + m := &Manager{ + channels: make(map[string]Channel), + workers: make(map[string]*channelWorker), + bus: mb, + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + + go func() { + m.dispatchOutbound(ctx) + close(done) + }() + + // Cancel context and verify the dispatcher exits quickly + cancel() + + select { + case <-done: + // success + case <-time.After(2 * time.Second): + t.Fatal("dispatchOutbound did not exit within 2s after context cancel") + } +} + +func TestDispatcherMediaExitsOnCancel(t *testing.T) { + mb := bus.NewMessageBus() + defer mb.Close() + + m := &Manager{ + channels: make(map[string]Channel), + workers: make(map[string]*channelWorker), + bus: mb, + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + + go func() { + m.dispatchOutboundMedia(ctx) + close(done) + }() + + cancel() + + select { + case <-done: + // success + case <-time.After(2 * time.Second): + t.Fatal("dispatchOutboundMedia did not exit within 2s after context cancel") + } +} + +// --- TTL Janitor tests (Step 2) --- + +func TestTypingStopJanitorEviction(t *testing.T) { + m := newTestManager() + + var stopCalled atomic.Bool + // Store a typing entry with a creation time far in the past + m.typingStops.Store("test:123", typingEntry{ + stop: func() { stopCalled.Store(true) }, + createdAt: time.Now().Add(-10 * time.Minute), // well past typingStopTTL + }) + + // Run janitor with a short-lived context + ctx, cancel := context.WithCancel(context.Background()) + + // Manually trigger the janitor logic once by simulating a tick + go func() { + // Override janitor to run immediately + now := time.Now() + m.typingStops.Range(func(key, value any) bool { + if entry, ok := value.(typingEntry); ok { + if now.Sub(entry.createdAt) > typingStopTTL { + if _, loaded := m.typingStops.LoadAndDelete(key); loaded { + entry.stop() + } + } + } + return true + }) + cancel() + }() + + <-ctx.Done() + + if !stopCalled.Load() { + t.Fatal("expected typing stop function to be called by janitor eviction") + } + + // Verify entry was deleted + if _, loaded := m.typingStops.Load("test:123"); loaded { + t.Fatal("expected typing entry to be deleted after eviction") + } +} + +func TestPlaceholderJanitorEviction(t *testing.T) { + m := newTestManager() + + // Store a placeholder entry with a creation time far in the past + m.placeholders.Store("test:456", placeholderEntry{ + id: "msg_old", + createdAt: time.Now().Add(-20 * time.Minute), // well past placeholderTTL + }) + + // Simulate janitor logic + now := time.Now() + m.placeholders.Range(func(key, value any) bool { + if entry, ok := value.(placeholderEntry); ok { + if now.Sub(entry.createdAt) > placeholderTTL { + m.placeholders.Delete(key) + } + } + return true + }) + + // Verify entry was deleted + if _, loaded := m.placeholders.Load("test:456"); loaded { + t.Fatal("expected placeholder entry to be deleted after eviction") + } +} + +func TestPreSendStillWorksWithWrappedTypes(t *testing.T) { + m := newTestManager() + var stopCalled bool + var editCalled bool + + ch := &mockMessageEditor{ + mockChannel: mockChannel{ + sendFn: func(_ context.Context, _ bus.OutboundMessage) error { + return nil + }, + }, + editFn: func(_ context.Context, chatID, messageID, content string) error { + editCalled = true + if messageID != "ph_id" { + t.Fatalf("expected messageID ph_id, got %s", messageID) + } + return nil + }, + } + + // Use the new wrapped types via the public API + m.RecordTypingStop("test", "chat1", func() { + stopCalled = true + }) + m.RecordPlaceholder("test", "chat1", "ph_id") + + msg := bus.OutboundMessage{Channel: "test", ChatID: "chat1", Content: "response"} + edited := m.preSend(context.Background(), "test", msg, ch) + + if !stopCalled { + t.Fatal("expected typing stop to be called via wrapped type") + } + if !editCalled { + t.Fatal("expected EditMessage to be called via wrapped type") + } + if !edited { + t.Fatal("expected preSend to return true") + } +} + +// --- Lazy worker creation tests (Step 6) --- + +func TestLazyWorkerCreation(t *testing.T) { + m := newTestManager() + + ch := &mockChannel{ + sendFn: func(_ context.Context, _ bus.OutboundMessage) error { + return nil + }, + } + + // RegisterChannel should NOT create a worker + m.RegisterChannel("lazy", ch) + + m.mu.RLock() + _, chExists := m.channels["lazy"] + _, wExists := m.workers["lazy"] + m.mu.RUnlock() + + if !chExists { + t.Fatal("expected channel to be registered") + } + if wExists { + t.Fatal("expected worker to NOT be created by RegisterChannel (lazy creation)") + } +} + +// --- FastID uniqueness test (Step 5) --- + +func TestBuildMediaScope_FastIDUniqueness(t *testing.T) { + seen := make(map[string]bool) + + for i := 0; i < 1000; i++ { + scope := BuildMediaScope("test", "chat1", "") + if seen[scope] { + t.Fatalf("duplicate scope generated: %s", scope) + } + seen[scope] = true + } + + // Verify format: "channel:chatID:id" + scope := BuildMediaScope("telegram", "42", "") + parts := 0 + for _, c := range scope { + if c == ':' { + parts++ + } + } + if parts != 2 { + t.Fatalf("expected scope to have 2 colons (channel:chatID:id), got: %s", scope) + } +} + +func TestBuildMediaScope_WithMessageID(t *testing.T) { + scope := BuildMediaScope("discord", "chat99", "msg123") + expected := "discord:chat99:msg123" + if scope != expected { + t.Fatalf("expected %s, got %s", expected, scope) + } +} diff --git a/pkg/channels/split.go b/pkg/channels/split.go index 27d76df1b..1c951a31f 100644 --- a/pkg/channels/split.go +++ b/pkg/channels/split.go @@ -19,6 +19,7 @@ func SplitMessage(content string, maxLen int) []string { } runes := []rune(content) + totalLen := len(runes) var messages []string // Dynamic buffer: 10% of maxLen, but at least 50 chars if possible @@ -30,9 +31,11 @@ func SplitMessage(content string, maxLen int) []string { codeBlockBuffer = maxLen / 2 } - for len(runes) > 0 { - if len(runes) <= maxLen { - messages = append(messages, string(runes)) + start := 0 + for start < totalLen { + remaining := totalLen - start + if remaining <= maxLen { + messages = append(messages, string(runes[start:totalLen])) break } @@ -42,77 +45,88 @@ func SplitMessage(content string, maxLen int) []string { effectiveLimit = maxLen / 2 } + end := start + effectiveLimit + // Find natural split point within the effective limit - msgEnd := findLastNewlineRunes(runes[:effectiveLimit], 200) - if msgEnd <= 0 { - msgEnd = findLastSpaceRunes(runes[:effectiveLimit], 100) + msgEnd := findLastNewlineInRange(runes, start, end, 200) + if msgEnd <= start { + msgEnd = findLastSpaceInRange(runes, start, end, 100) } - if msgEnd <= 0 { - msgEnd = effectiveLimit + if msgEnd <= start { + msgEnd = end } // Check if this would end with an incomplete code block - candidate := runes[:msgEnd] - unclosedIdx := findLastUnclosedCodeBlockRunes(candidate) + unclosedIdx := findLastUnclosedCodeBlockInRange(runes, start, msgEnd) if unclosedIdx >= 0 { // Message would end with incomplete code block // Try to extend up to maxLen to include the closing ``` - if len(runes) > msgEnd { - closingIdx := findNextClosingCodeBlockRunes(runes, msgEnd) - if closingIdx > 0 && closingIdx <= maxLen { + if totalLen > msgEnd { + closingIdx := findNextClosingCodeBlockInRange(runes, msgEnd, totalLen) + if closingIdx > 0 && closingIdx-start <= maxLen { // Extend to include the closing ``` msgEnd = closingIdx } else { // Code block is too long to fit in one chunk or missing closing fence. // Try to split inside by injecting closing and reopening fences. - fenceRunes := runes[unclosedIdx:] - headerEnd := findNewlineInRunes(fenceRunes) + headerEnd := findNewlineFrom(runes, unclosedIdx) var header string if headerEnd == -1 { header = strings.TrimSpace(string(runes[unclosedIdx : unclosedIdx+3])) } else { - header = strings.TrimSpace(string(runes[unclosedIdx : unclosedIdx+headerEnd])) + header = strings.TrimSpace(string(runes[unclosedIdx:headerEnd])) } headerEndIdx := unclosedIdx + len([]rune(header)) if headerEnd != -1 { - headerEndIdx = unclosedIdx + headerEnd + headerEndIdx = headerEnd } // If we have a reasonable amount of content after the header, split inside if msgEnd > headerEndIdx+20 { // Find a better split point closer to maxLen - innerLimit := maxLen - 5 // Leave room for "\n```" - betterEnd := findLastNewlineRunes(runes[:innerLimit], 200) + innerLimit := start + maxLen - 5 // Leave room for "\n```" + if innerLimit > totalLen { + innerLimit = totalLen + } + betterEnd := findLastNewlineInRange(runes, start, innerLimit, 200) if betterEnd > headerEndIdx { msgEnd = betterEnd } else { msgEnd = innerLimit } - chunk := strings.TrimRight(string(runes[:msgEnd]), " \t\n\r") + "\n```" + chunk := strings.TrimRight(string(runes[start:msgEnd]), " \t\n\r") + "\n```" messages = append(messages, chunk) - remaining := strings.TrimSpace(header + "\n" + string(runes[msgEnd:])) + remaining := strings.TrimSpace(header + "\n" + string(runes[msgEnd:totalLen])) + // Replace the tail of runes with the reconstructed remaining runes = []rune(remaining) + totalLen = len(runes) + start = 0 continue } // Otherwise, try to split before the code block starts - newEnd := findLastNewlineRunes(runes[:unclosedIdx], 200) - if newEnd <= 0 { - newEnd = findLastSpaceRunes(runes[:unclosedIdx], 100) + newEnd := findLastNewlineInRange(runes, start, unclosedIdx, 200) + if newEnd <= start { + newEnd = findLastSpaceInRange(runes, start, unclosedIdx, 100) } - if newEnd > 0 { + if newEnd > start { msgEnd = newEnd } else { // If we can't split before, we MUST split inside (last resort) - if unclosedIdx > 20 { + if unclosedIdx-start > 20 { msgEnd = unclosedIdx } else { - msgEnd = maxLen - 5 - chunk := strings.TrimRight(string(runes[:msgEnd]), " \t\n\r") + "\n```" + splitAt := start + maxLen - 5 + if splitAt > totalLen { + splitAt = totalLen + } + chunk := strings.TrimRight(string(runes[start:splitAt]), " \t\n\r") + "\n```" messages = append(messages, chunk) - remaining := strings.TrimSpace(header + "\n" + string(runes[msgEnd:])) + remaining := strings.TrimSpace(header + "\n" + string(runes[splitAt:totalLen])) runes = []rune(remaining) + totalLen = len(runes) + start = 0 continue } } @@ -120,29 +134,30 @@ func SplitMessage(content string, maxLen int) []string { } } - if msgEnd <= 0 { - msgEnd = effectiveLimit + if msgEnd <= start { + msgEnd = start + effectiveLimit } - messages = append(messages, string(runes[:msgEnd])) - remaining := strings.TrimSpace(string(runes[msgEnd:])) - runes = []rune(remaining) + messages = append(messages, string(runes[start:msgEnd])) + // Advance start, skipping leading whitespace of next chunk + start = msgEnd + for start < totalLen && (runes[start] == ' ' || runes[start] == '\t' || runes[start] == '\n' || runes[start] == '\r') { + start++ + } } return messages } -// findLastUnclosedCodeBlockRunes finds the last opening ``` that doesn't have a closing ``` -// Returns the rune position of the opening ``` or -1 if all code blocks are complete -func findLastUnclosedCodeBlockRunes(runes []rune) int { +// findLastUnclosedCodeBlockInRange finds the last opening ``` that doesn't have a closing ``` +// within runes[start:end]. Returns the absolute rune index or -1. +func findLastUnclosedCodeBlockInRange(runes []rune, start, end int) int { inCodeBlock := false lastOpenIdx := -1 - for i := 0; i < len(runes); i++ { - if i+2 < len(runes) && runes[i] == '`' && runes[i+1] == '`' && runes[i+2] == '`' { - // Toggle code block state on each fence + for i := start; i < end; i++ { + if i+2 < end && runes[i] == '`' && runes[i+1] == '`' && runes[i+2] == '`' { if !inCodeBlock { - // Entering a code block: record this opening fence lastOpenIdx = i } inCodeBlock = !inCodeBlock @@ -156,36 +171,21 @@ func findLastUnclosedCodeBlockRunes(runes []rune) int { return -1 } -// findNextClosingCodeBlockRunes finds the next closing ``` starting from a rune position -// Returns the rune position after the closing ``` or -1 if not found -func findNextClosingCodeBlockRunes(runes []rune, startIdx int) int { - for i := startIdx; i < len(runes); i++ { - if i+2 < len(runes) && runes[i] == '`' && runes[i+1] == '`' && runes[i+2] == '`' { +// findNextClosingCodeBlockInRange finds the next closing ``` starting from startIdx +// within runes[startIdx:end]. Returns the absolute index after the closing ``` or -1. +func findNextClosingCodeBlockInRange(runes []rune, startIdx, end int) int { + for i := startIdx; i < end; i++ { + if i+2 < end && runes[i] == '`' && runes[i+1] == '`' && runes[i+2] == '`' { return i + 3 } } return -1 } -// findNewlineInRunes finds the first newline character in a rune slice. -// Returns the rune index of the newline or -1 if not found. -func findNewlineInRunes(runes []rune) int { - for i, r := range runes { - if r == '\n' { - return i - } - } - return -1 -} - -// findLastNewlineRunes finds the last newline character within the last N runes -// Returns the rune position of the newline or -1 if not found -func findLastNewlineRunes(runes []rune, searchWindow int) int { - searchStart := len(runes) - searchWindow - if searchStart < 0 { - searchStart = 0 - } - for i := len(runes) - 1; i >= searchStart; i-- { +// findNewlineFrom finds the first newline character starting from the given index. +// Returns the absolute index or -1 if not found. +func findNewlineFrom(runes []rune, from int) int { + for i := from; i < len(runes); i++ { if runes[i] == '\n' { return i } @@ -193,17 +193,32 @@ func findLastNewlineRunes(runes []rune, searchWindow int) int { return -1 } -// findLastSpaceRunes finds the last space character within the last N runes -// Returns the rune position of the space or -1 if not found -func findLastSpaceRunes(runes []rune, searchWindow int) int { - searchStart := len(runes) - searchWindow - if searchStart < 0 { - searchStart = 0 +// findLastNewlineInRange finds the last newline within the last searchWindow runes +// of the range runes[start:end]. Returns the absolute index or start-1 (indicating not found). +func findLastNewlineInRange(runes []rune, start, end, searchWindow int) int { + searchStart := end - searchWindow + if searchStart < start { + searchStart = start } - for i := len(runes) - 1; i >= searchStart; i-- { + for i := end - 1; i >= searchStart; i-- { + if runes[i] == '\n' { + return i + } + } + return start - 1 +} + +// findLastSpaceInRange finds the last space/tab within the last searchWindow runes +// of the range runes[start:end]. Returns the absolute index or start-1 (indicating not found). +func findLastSpaceInRange(runes []rune, start, end, searchWindow int) int { + searchStart := end - searchWindow + if searchStart < start { + searchStart = start + } + for i := end - 1; i >= searchStart; i-- { if runes[i] == ' ' || runes[i] == '\t' { return i } } - return -1 + return start - 1 }