Files
picoclaw/pkg/agent/pipeline_streaming.go
T
lxowalle e7e21df354 fix(agent): honor explicit thinking off (#2898)
* fix(agent): honor explicit thinking off

* fix(agent): address thinking off lint failures

* Clarify unset thinking level display

* fix ci
2026-05-21 11:07:39 +08:00

490 lines
13 KiB
Go

package agent
import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"time"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/providers"
)
func (p *Pipeline) tryConfiguredStreamingLLM(
ctx context.Context,
ts *turnState,
exec *turnExecution,
messagesForCall []providers.Message,
toolDefsForCall []providers.ToolDefinition,
) (*providers.LLMResponse, bool, error) {
exec.streamingPublisher = nil
exec.streamingFallback = false
if !p.configuredStreamingEligible(ts, exec) {
return nil, false, nil
}
streamProvider, ok := exec.activeProvider.(providers.StreamingProvider)
if !ok {
logger.DebugCF("agent", "configured streaming not used", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"model": exec.activeModel,
"reason": "provider_not_streaming",
})
return nil, false, nil
}
streamer, ok := p.Bus.GetStreamer(ctx, ts.channel, ts.chatID, ts.sessionKey)
if !ok || streamer == nil {
logger.DebugCF("agent", "configured streaming not used", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"chat_id": ts.chatID,
"model": exec.activeModel,
"reason": "streamer_unavailable",
})
return nil, false, nil
}
publisher := &streamingChunkPublisher{
streamer: streamer,
channel: ts.channel,
chatID: ts.chatID,
modelName: exec.llmModelName,
}
logger.DebugCF("agent", "configured streaming enabled", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"chat_id": ts.chatID,
"model": exec.llmModel,
})
chunkCount := 0
firstChunkAt := time.Time{}
lastChunkAt := time.Time{}
recordChunk := func() {
now := time.Now()
chunkCount++
if firstChunkAt.IsZero() {
firstChunkAt = now
}
lastChunkAt = now
}
var response *providers.LLMResponse
var streamErr error
if eventProvider, ok := exec.activeProvider.(providers.StreamingEventProvider); ok {
response, streamErr = eventProvider.ChatStreamEvents(
ctx,
messagesForCall,
toolDefsForCall,
exec.llmModel,
exec.llmOpts,
func(chunk providers.StreamChunk) {
recordChunk()
if !exec.suppressReasoning && strings.TrimSpace(chunk.ReasoningContent) != "" {
publisher.UpdateReasoning(ctx, chunk.ReasoningContent)
}
if strings.TrimSpace(chunk.Content) != "" {
publisher.Update(ctx, chunk.Content)
}
},
)
} else {
response, streamErr = streamProvider.ChatStream(
ctx,
messagesForCall,
toolDefsForCall,
exec.llmModel,
exec.llmOpts,
func(accumulated string) {
recordChunk()
publisher.Update(ctx, accumulated)
},
)
}
logConfiguredStreamingSummary(ts, exec, chunkCount, firstChunkAt, lastChunkAt, streamErr)
if streamErr == nil {
if updateErr := publisher.Err(); updateErr != nil {
logFields := map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"model": exec.llmModel,
"error": updateErr.Error(),
}
if publisher.Published() {
logger.WarnCF("agent", "ChatStream update failed after visible output", logFields)
return nil, true, configuredStreamingVisibleError{err: updateErr}
}
logger.WarnCF("agent", "ChatStream update failed before visible output; retrying with Chat", logFields)
publisher.Cancel(ctx)
fallbackResponse, err := exec.activeProvider.Chat(
ctx,
messagesForCall,
toolDefsForCall,
exec.llmModel,
exec.llmOpts,
)
if err == nil && fallbackResponse != nil {
exec.streamingFallback = true
}
return fallbackResponse, true, err
}
}
if streamErr != nil {
if !publisher.Published() {
logger.WarnCF("agent", "ChatStream failed before visible output; retrying with Chat", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"model": exec.llmModel,
"error": streamErr.Error(),
})
publisher.Cancel(ctx)
fallbackResponse, err := exec.activeProvider.Chat(
ctx,
messagesForCall,
toolDefsForCall,
exec.llmModel,
exec.llmOpts,
)
if err == nil && fallbackResponse != nil {
exec.streamingFallback = true
}
return fallbackResponse, true, err
}
return nil, true, configuredStreamingVisibleError{err: streamErr}
}
if response != nil {
exec.streamingPublisher = publisher
}
return response, true, nil
}
func logConfiguredStreamingSummary(
ts *turnState,
exec *turnExecution,
chunkCount int,
firstChunkAt time.Time,
lastChunkAt time.Time,
streamErr error,
) {
fields := map[string]any{
"chunks": chunkCount,
}
if ts != nil {
fields["agent_id"] = ts.agent.ID
fields["channel"] = ts.channel
}
if exec != nil {
fields["model"] = exec.llmModel
}
if !firstChunkAt.IsZero() && !lastChunkAt.IsZero() {
fields["chunk_span_ms"] = lastChunkAt.Sub(firstChunkAt).Milliseconds()
}
if streamErr != nil {
fields["error"] = streamErr.Error()
}
logger.DebugCF("agent", "configured streaming completed", fields)
}
type configuredStreamingVisibleError struct {
err error
}
func (e configuredStreamingVisibleError) Error() string {
if e.err == nil {
return "configured streaming failed after visible output"
}
return e.err.Error()
}
func (e configuredStreamingVisibleError) Unwrap() error {
return e.err
}
func isConfiguredStreamingVisibleError(err error) bool {
var visibleErr configuredStreamingVisibleError
return errors.As(err, &visibleErr)
}
func finalizeConfiguredStreamingLLM(
ctx context.Context,
ts *turnState,
exec *turnExecution,
content string,
contextUsage *bus.ContextUsage,
) error {
if exec == nil || exec.streamingPublisher == nil {
return nil
}
publisher := exec.streamingPublisher
exec.streamingPublisher = nil
visibleBeforeFinalize := publisher.Published()
if err := publisher.Finalize(ctx, content, contextUsage); err != nil {
if visibleBeforeFinalize {
logger.WarnCF("agent", "stream final flush failed after visible output", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"model": exec.llmModel,
"error": err.Error(),
})
return configuredStreamingVisibleError{err: err}
}
publisher.Cancel(ctx)
logger.WarnCF("agent", "stream final flush failed", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"model": exec.llmModel,
"error": err.Error(),
})
return err
}
return nil
}
func cancelConfiguredStreamingLLM(ctx context.Context, exec *turnExecution) {
if exec == nil || exec.streamingPublisher == nil {
return
}
publisher := exec.streamingPublisher
exec.streamingPublisher = nil
publisher.Cancel(ctx)
}
func (p *Pipeline) configuredStreamingEligible(ts *turnState, exec *turnExecution) bool {
if p == nil || ts == nil || exec == nil || p.Bus == nil {
logger.DebugCF("agent", "configured streaming not used", map[string]any{
"reason": "missing_pipeline_state",
})
return false
}
if strings.TrimSpace(ts.channel) == "" || strings.TrimSpace(ts.chatID) == "" {
logger.DebugCF("agent", "configured streaming not used", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"chat_id": ts.chatID,
"model": exec.activeModel,
"reason": "missing_channel_context",
})
return false
}
if !ts.opts.SendResponse && !ts.opts.AllowInterimPicoPublish {
logger.DebugCF("agent", "configured streaming not used", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"chat_id": ts.chatID,
"model": exec.activeModel,
"reason": "turn_output_disabled",
})
return false
}
if len(exec.activeCandidates) != 1 {
logger.DebugCF("agent", "configured streaming not used", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"model": exec.activeModel,
"candidates": len(exec.activeCandidates),
"reason": "fallback_candidates_enabled",
})
return false
}
if exec.activeModelConfig == nil || !exec.activeModelConfig.Streaming.Enabled {
modelName := ""
modelStreaming := false
if exec.activeModelConfig != nil {
modelName = exec.activeModelConfig.ModelName
modelStreaming = exec.activeModelConfig.Streaming.Enabled
}
logger.DebugCF("agent", "configured streaming not used", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"model": exec.activeModel,
"model_name": modelName,
"model_streaming": modelStreaming,
"has_model_config": exec.activeModelConfig != nil,
"reason": "model_streaming_disabled",
})
return false
}
channelStreaming, ok := p.channelStreamingConfig(ts.channel)
if !ok || !channelStreaming.Enabled {
logger.DebugCF("agent", "configured streaming not used", map[string]any{
"agent_id": ts.agent.ID,
"channel": ts.channel,
"model": exec.activeModel,
"channel_streaming": channelStreaming.Enabled,
"has_channel_config": ok,
"reason": "channel_streaming_disabled",
})
return false
}
return true
}
func (p *Pipeline) channelStreamingConfig(channelName string) (config.StreamingConfig, bool) {
if p == nil || p.Cfg == nil || p.Cfg.Channels == nil {
return config.StreamingConfig{}, false
}
ch := p.Cfg.Channels[channelName]
if ch == nil {
return config.StreamingConfig{}, false
}
decoded, err := ch.GetDecoded()
if err != nil {
logger.WarnCF("agent", "channel streaming config decode failed", map[string]any{
"channel": channelName,
"error": err.Error(),
})
return config.StreamingConfig{}, false
}
return streamingConfigFromDecodedSettings(decoded)
}
func streamingConfigFromDecodedSettings(decoded any) (config.StreamingConfig, bool) {
value := reflect.ValueOf(decoded)
if !value.IsValid() {
return config.StreamingConfig{}, false
}
if value.Kind() == reflect.Ptr {
if value.IsNil() {
return config.StreamingConfig{}, false
}
value = value.Elem()
}
if value.Kind() != reflect.Struct {
return config.StreamingConfig{}, false
}
field := value.FieldByName("Streaming")
if !field.IsValid() || !field.CanInterface() {
return config.StreamingConfig{}, false
}
streaming, ok := field.Interface().(config.StreamingConfig)
return streaming, ok
}
type streamingChunkPublisher struct {
streamer bus.Streamer
channel string
chatID string
modelName string
published bool
reasoningPublished bool
err error
}
func (p *streamingChunkPublisher) Update(ctx context.Context, accumulated string) {
if p == nil || p.streamer == nil || strings.TrimSpace(accumulated) == "" {
return
}
if setter, ok := p.streamer.(interface{ SetModelName(modelName string) }); ok {
setter.SetModelName(p.modelName)
}
if err := p.streamer.Update(ctx, accumulated); err != nil {
p.err = err
logger.WarnCF("agent", "stream update failed", map[string]any{
"channel": p.channel,
"chat_id": p.chatID,
"error": err.Error(),
})
return
}
p.published = true
}
func (p *streamingChunkPublisher) UpdateReasoning(ctx context.Context, accumulated string) {
if p == nil || p.streamer == nil || strings.TrimSpace(accumulated) == "" {
return
}
if setter, ok := p.streamer.(interface{ SetModelName(modelName string) }); ok {
setter.SetModelName(p.modelName)
}
reasoningStreamer, ok := p.streamer.(bus.ReasoningStreamer)
if !ok {
return
}
if err := reasoningStreamer.UpdateReasoning(ctx, accumulated); err != nil {
p.err = err
logger.WarnCF("agent", "stream reasoning update failed", map[string]any{
"channel": p.channel,
"chat_id": p.chatID,
"error": err.Error(),
})
return
}
p.reasoningPublished = true
}
func (p *streamingChunkPublisher) Published() bool {
return p != nil && p.published
}
func (p *streamingChunkPublisher) ReasoningPublished() bool {
return p != nil && p.reasoningPublished
}
func (p *streamingChunkPublisher) Err() error {
if p == nil {
return nil
}
return p.err
}
func (p *streamingChunkPublisher) Finalize(ctx context.Context, content string, contextUsage *bus.ContextUsage) error {
if p == nil || p.streamer == nil {
return nil
}
if strings.TrimSpace(content) == "" && !p.published {
return nil
}
if setter, ok := p.streamer.(interface{ SetModelName(modelName string) }); ok {
setter.SetModelName(p.modelName)
}
var err error
if streamer, ok := p.streamer.(bus.ContextUsageStreamer); ok {
err = streamer.FinalizeWithContext(ctx, content, contextUsage)
} else {
err = p.streamer.Finalize(ctx, content)
}
if err != nil {
return fmt.Errorf("stream finalize: %w", err)
}
p.published = true
return nil
}
func (p *streamingChunkPublisher) FinalizeReasoning(ctx context.Context, content string) error {
if p == nil || p.streamer == nil || !p.reasoningPublished || strings.TrimSpace(content) == "" {
return nil
}
reasoningStreamer, ok := p.streamer.(bus.ReasoningStreamer)
if !ok {
return nil
}
if err := reasoningStreamer.FinalizeReasoning(ctx, content); err != nil {
return fmt.Errorf("stream reasoning finalize: %w", err)
}
return nil
}
func (p *streamingChunkPublisher) ClearFinalizedStreamMarker() {
if p == nil || p.streamer == nil {
return
}
if cleaner, ok := p.streamer.(interface{ ClearFinalizedStreamMarker() }); ok {
cleaner.ClearFinalizedStreamMarker()
}
}
func (p *streamingChunkPublisher) Cancel(ctx context.Context) {
if p == nil || p.streamer == nil {
return
}
p.streamer.Cancel(ctx)
}