mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
e7e21df354
* fix(agent): honor explicit thinking off * fix(agent): address thinking off lint failures * Clarify unset thinking level display * fix ci
490 lines
13 KiB
Go
490 lines
13 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/bus"
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
"github.com/sipeed/picoclaw/pkg/logger"
|
|
"github.com/sipeed/picoclaw/pkg/providers"
|
|
)
|
|
|
|
func (p *Pipeline) tryConfiguredStreamingLLM(
|
|
ctx context.Context,
|
|
ts *turnState,
|
|
exec *turnExecution,
|
|
messagesForCall []providers.Message,
|
|
toolDefsForCall []providers.ToolDefinition,
|
|
) (*providers.LLMResponse, bool, error) {
|
|
exec.streamingPublisher = nil
|
|
exec.streamingFallback = false
|
|
if !p.configuredStreamingEligible(ts, exec) {
|
|
return nil, false, nil
|
|
}
|
|
streamProvider, ok := exec.activeProvider.(providers.StreamingProvider)
|
|
if !ok {
|
|
logger.DebugCF("agent", "configured streaming not used", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"model": exec.activeModel,
|
|
"reason": "provider_not_streaming",
|
|
})
|
|
return nil, false, nil
|
|
}
|
|
|
|
streamer, ok := p.Bus.GetStreamer(ctx, ts.channel, ts.chatID, ts.sessionKey)
|
|
if !ok || streamer == nil {
|
|
logger.DebugCF("agent", "configured streaming not used", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"chat_id": ts.chatID,
|
|
"model": exec.activeModel,
|
|
"reason": "streamer_unavailable",
|
|
})
|
|
return nil, false, nil
|
|
}
|
|
|
|
publisher := &streamingChunkPublisher{
|
|
streamer: streamer,
|
|
channel: ts.channel,
|
|
chatID: ts.chatID,
|
|
modelName: exec.llmModelName,
|
|
}
|
|
|
|
logger.DebugCF("agent", "configured streaming enabled", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"chat_id": ts.chatID,
|
|
"model": exec.llmModel,
|
|
})
|
|
|
|
chunkCount := 0
|
|
firstChunkAt := time.Time{}
|
|
lastChunkAt := time.Time{}
|
|
recordChunk := func() {
|
|
now := time.Now()
|
|
chunkCount++
|
|
if firstChunkAt.IsZero() {
|
|
firstChunkAt = now
|
|
}
|
|
lastChunkAt = now
|
|
}
|
|
var response *providers.LLMResponse
|
|
var streamErr error
|
|
if eventProvider, ok := exec.activeProvider.(providers.StreamingEventProvider); ok {
|
|
response, streamErr = eventProvider.ChatStreamEvents(
|
|
ctx,
|
|
messagesForCall,
|
|
toolDefsForCall,
|
|
exec.llmModel,
|
|
exec.llmOpts,
|
|
func(chunk providers.StreamChunk) {
|
|
recordChunk()
|
|
if !exec.suppressReasoning && strings.TrimSpace(chunk.ReasoningContent) != "" {
|
|
publisher.UpdateReasoning(ctx, chunk.ReasoningContent)
|
|
}
|
|
if strings.TrimSpace(chunk.Content) != "" {
|
|
publisher.Update(ctx, chunk.Content)
|
|
}
|
|
},
|
|
)
|
|
} else {
|
|
response, streamErr = streamProvider.ChatStream(
|
|
ctx,
|
|
messagesForCall,
|
|
toolDefsForCall,
|
|
exec.llmModel,
|
|
exec.llmOpts,
|
|
func(accumulated string) {
|
|
recordChunk()
|
|
publisher.Update(ctx, accumulated)
|
|
},
|
|
)
|
|
}
|
|
logConfiguredStreamingSummary(ts, exec, chunkCount, firstChunkAt, lastChunkAt, streamErr)
|
|
if streamErr == nil {
|
|
if updateErr := publisher.Err(); updateErr != nil {
|
|
logFields := map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"model": exec.llmModel,
|
|
"error": updateErr.Error(),
|
|
}
|
|
if publisher.Published() {
|
|
logger.WarnCF("agent", "ChatStream update failed after visible output", logFields)
|
|
return nil, true, configuredStreamingVisibleError{err: updateErr}
|
|
}
|
|
logger.WarnCF("agent", "ChatStream update failed before visible output; retrying with Chat", logFields)
|
|
publisher.Cancel(ctx)
|
|
fallbackResponse, err := exec.activeProvider.Chat(
|
|
ctx,
|
|
messagesForCall,
|
|
toolDefsForCall,
|
|
exec.llmModel,
|
|
exec.llmOpts,
|
|
)
|
|
if err == nil && fallbackResponse != nil {
|
|
exec.streamingFallback = true
|
|
}
|
|
return fallbackResponse, true, err
|
|
}
|
|
}
|
|
if streamErr != nil {
|
|
if !publisher.Published() {
|
|
logger.WarnCF("agent", "ChatStream failed before visible output; retrying with Chat", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"model": exec.llmModel,
|
|
"error": streamErr.Error(),
|
|
})
|
|
publisher.Cancel(ctx)
|
|
fallbackResponse, err := exec.activeProvider.Chat(
|
|
ctx,
|
|
messagesForCall,
|
|
toolDefsForCall,
|
|
exec.llmModel,
|
|
exec.llmOpts,
|
|
)
|
|
if err == nil && fallbackResponse != nil {
|
|
exec.streamingFallback = true
|
|
}
|
|
return fallbackResponse, true, err
|
|
}
|
|
return nil, true, configuredStreamingVisibleError{err: streamErr}
|
|
}
|
|
|
|
if response != nil {
|
|
exec.streamingPublisher = publisher
|
|
}
|
|
|
|
return response, true, nil
|
|
}
|
|
|
|
func logConfiguredStreamingSummary(
|
|
ts *turnState,
|
|
exec *turnExecution,
|
|
chunkCount int,
|
|
firstChunkAt time.Time,
|
|
lastChunkAt time.Time,
|
|
streamErr error,
|
|
) {
|
|
fields := map[string]any{
|
|
"chunks": chunkCount,
|
|
}
|
|
if ts != nil {
|
|
fields["agent_id"] = ts.agent.ID
|
|
fields["channel"] = ts.channel
|
|
}
|
|
if exec != nil {
|
|
fields["model"] = exec.llmModel
|
|
}
|
|
if !firstChunkAt.IsZero() && !lastChunkAt.IsZero() {
|
|
fields["chunk_span_ms"] = lastChunkAt.Sub(firstChunkAt).Milliseconds()
|
|
}
|
|
if streamErr != nil {
|
|
fields["error"] = streamErr.Error()
|
|
}
|
|
logger.DebugCF("agent", "configured streaming completed", fields)
|
|
}
|
|
|
|
type configuredStreamingVisibleError struct {
|
|
err error
|
|
}
|
|
|
|
func (e configuredStreamingVisibleError) Error() string {
|
|
if e.err == nil {
|
|
return "configured streaming failed after visible output"
|
|
}
|
|
return e.err.Error()
|
|
}
|
|
|
|
func (e configuredStreamingVisibleError) Unwrap() error {
|
|
return e.err
|
|
}
|
|
|
|
func isConfiguredStreamingVisibleError(err error) bool {
|
|
var visibleErr configuredStreamingVisibleError
|
|
return errors.As(err, &visibleErr)
|
|
}
|
|
|
|
func finalizeConfiguredStreamingLLM(
|
|
ctx context.Context,
|
|
ts *turnState,
|
|
exec *turnExecution,
|
|
content string,
|
|
contextUsage *bus.ContextUsage,
|
|
) error {
|
|
if exec == nil || exec.streamingPublisher == nil {
|
|
return nil
|
|
}
|
|
publisher := exec.streamingPublisher
|
|
exec.streamingPublisher = nil
|
|
visibleBeforeFinalize := publisher.Published()
|
|
if err := publisher.Finalize(ctx, content, contextUsage); err != nil {
|
|
if visibleBeforeFinalize {
|
|
logger.WarnCF("agent", "stream final flush failed after visible output", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"model": exec.llmModel,
|
|
"error": err.Error(),
|
|
})
|
|
return configuredStreamingVisibleError{err: err}
|
|
}
|
|
publisher.Cancel(ctx)
|
|
logger.WarnCF("agent", "stream final flush failed", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"model": exec.llmModel,
|
|
"error": err.Error(),
|
|
})
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func cancelConfiguredStreamingLLM(ctx context.Context, exec *turnExecution) {
|
|
if exec == nil || exec.streamingPublisher == nil {
|
|
return
|
|
}
|
|
publisher := exec.streamingPublisher
|
|
exec.streamingPublisher = nil
|
|
publisher.Cancel(ctx)
|
|
}
|
|
|
|
func (p *Pipeline) configuredStreamingEligible(ts *turnState, exec *turnExecution) bool {
|
|
if p == nil || ts == nil || exec == nil || p.Bus == nil {
|
|
logger.DebugCF("agent", "configured streaming not used", map[string]any{
|
|
"reason": "missing_pipeline_state",
|
|
})
|
|
return false
|
|
}
|
|
if strings.TrimSpace(ts.channel) == "" || strings.TrimSpace(ts.chatID) == "" {
|
|
logger.DebugCF("agent", "configured streaming not used", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"chat_id": ts.chatID,
|
|
"model": exec.activeModel,
|
|
"reason": "missing_channel_context",
|
|
})
|
|
return false
|
|
}
|
|
if !ts.opts.SendResponse && !ts.opts.AllowInterimPicoPublish {
|
|
logger.DebugCF("agent", "configured streaming not used", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"chat_id": ts.chatID,
|
|
"model": exec.activeModel,
|
|
"reason": "turn_output_disabled",
|
|
})
|
|
return false
|
|
}
|
|
if len(exec.activeCandidates) != 1 {
|
|
logger.DebugCF("agent", "configured streaming not used", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"model": exec.activeModel,
|
|
"candidates": len(exec.activeCandidates),
|
|
"reason": "fallback_candidates_enabled",
|
|
})
|
|
return false
|
|
}
|
|
if exec.activeModelConfig == nil || !exec.activeModelConfig.Streaming.Enabled {
|
|
modelName := ""
|
|
modelStreaming := false
|
|
if exec.activeModelConfig != nil {
|
|
modelName = exec.activeModelConfig.ModelName
|
|
modelStreaming = exec.activeModelConfig.Streaming.Enabled
|
|
}
|
|
logger.DebugCF("agent", "configured streaming not used", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"model": exec.activeModel,
|
|
"model_name": modelName,
|
|
"model_streaming": modelStreaming,
|
|
"has_model_config": exec.activeModelConfig != nil,
|
|
"reason": "model_streaming_disabled",
|
|
})
|
|
return false
|
|
}
|
|
channelStreaming, ok := p.channelStreamingConfig(ts.channel)
|
|
if !ok || !channelStreaming.Enabled {
|
|
logger.DebugCF("agent", "configured streaming not used", map[string]any{
|
|
"agent_id": ts.agent.ID,
|
|
"channel": ts.channel,
|
|
"model": exec.activeModel,
|
|
"channel_streaming": channelStreaming.Enabled,
|
|
"has_channel_config": ok,
|
|
"reason": "channel_streaming_disabled",
|
|
})
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (p *Pipeline) channelStreamingConfig(channelName string) (config.StreamingConfig, bool) {
|
|
if p == nil || p.Cfg == nil || p.Cfg.Channels == nil {
|
|
return config.StreamingConfig{}, false
|
|
}
|
|
ch := p.Cfg.Channels[channelName]
|
|
if ch == nil {
|
|
return config.StreamingConfig{}, false
|
|
}
|
|
decoded, err := ch.GetDecoded()
|
|
if err != nil {
|
|
logger.WarnCF("agent", "channel streaming config decode failed", map[string]any{
|
|
"channel": channelName,
|
|
"error": err.Error(),
|
|
})
|
|
return config.StreamingConfig{}, false
|
|
}
|
|
return streamingConfigFromDecodedSettings(decoded)
|
|
}
|
|
|
|
func streamingConfigFromDecodedSettings(decoded any) (config.StreamingConfig, bool) {
|
|
value := reflect.ValueOf(decoded)
|
|
if !value.IsValid() {
|
|
return config.StreamingConfig{}, false
|
|
}
|
|
if value.Kind() == reflect.Ptr {
|
|
if value.IsNil() {
|
|
return config.StreamingConfig{}, false
|
|
}
|
|
value = value.Elem()
|
|
}
|
|
if value.Kind() != reflect.Struct {
|
|
return config.StreamingConfig{}, false
|
|
}
|
|
|
|
field := value.FieldByName("Streaming")
|
|
if !field.IsValid() || !field.CanInterface() {
|
|
return config.StreamingConfig{}, false
|
|
}
|
|
streaming, ok := field.Interface().(config.StreamingConfig)
|
|
return streaming, ok
|
|
}
|
|
|
|
type streamingChunkPublisher struct {
|
|
streamer bus.Streamer
|
|
channel string
|
|
chatID string
|
|
modelName string
|
|
published bool
|
|
reasoningPublished bool
|
|
err error
|
|
}
|
|
|
|
func (p *streamingChunkPublisher) Update(ctx context.Context, accumulated string) {
|
|
if p == nil || p.streamer == nil || strings.TrimSpace(accumulated) == "" {
|
|
return
|
|
}
|
|
if setter, ok := p.streamer.(interface{ SetModelName(modelName string) }); ok {
|
|
setter.SetModelName(p.modelName)
|
|
}
|
|
if err := p.streamer.Update(ctx, accumulated); err != nil {
|
|
p.err = err
|
|
logger.WarnCF("agent", "stream update failed", map[string]any{
|
|
"channel": p.channel,
|
|
"chat_id": p.chatID,
|
|
"error": err.Error(),
|
|
})
|
|
return
|
|
}
|
|
p.published = true
|
|
}
|
|
|
|
func (p *streamingChunkPublisher) UpdateReasoning(ctx context.Context, accumulated string) {
|
|
if p == nil || p.streamer == nil || strings.TrimSpace(accumulated) == "" {
|
|
return
|
|
}
|
|
if setter, ok := p.streamer.(interface{ SetModelName(modelName string) }); ok {
|
|
setter.SetModelName(p.modelName)
|
|
}
|
|
reasoningStreamer, ok := p.streamer.(bus.ReasoningStreamer)
|
|
if !ok {
|
|
return
|
|
}
|
|
if err := reasoningStreamer.UpdateReasoning(ctx, accumulated); err != nil {
|
|
p.err = err
|
|
logger.WarnCF("agent", "stream reasoning update failed", map[string]any{
|
|
"channel": p.channel,
|
|
"chat_id": p.chatID,
|
|
"error": err.Error(),
|
|
})
|
|
return
|
|
}
|
|
p.reasoningPublished = true
|
|
}
|
|
|
|
func (p *streamingChunkPublisher) Published() bool {
|
|
return p != nil && p.published
|
|
}
|
|
|
|
func (p *streamingChunkPublisher) ReasoningPublished() bool {
|
|
return p != nil && p.reasoningPublished
|
|
}
|
|
|
|
func (p *streamingChunkPublisher) Err() error {
|
|
if p == nil {
|
|
return nil
|
|
}
|
|
return p.err
|
|
}
|
|
|
|
func (p *streamingChunkPublisher) Finalize(ctx context.Context, content string, contextUsage *bus.ContextUsage) error {
|
|
if p == nil || p.streamer == nil {
|
|
return nil
|
|
}
|
|
if strings.TrimSpace(content) == "" && !p.published {
|
|
return nil
|
|
}
|
|
if setter, ok := p.streamer.(interface{ SetModelName(modelName string) }); ok {
|
|
setter.SetModelName(p.modelName)
|
|
}
|
|
var err error
|
|
if streamer, ok := p.streamer.(bus.ContextUsageStreamer); ok {
|
|
err = streamer.FinalizeWithContext(ctx, content, contextUsage)
|
|
} else {
|
|
err = p.streamer.Finalize(ctx, content)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("stream finalize: %w", err)
|
|
}
|
|
p.published = true
|
|
return nil
|
|
}
|
|
|
|
func (p *streamingChunkPublisher) FinalizeReasoning(ctx context.Context, content string) error {
|
|
if p == nil || p.streamer == nil || !p.reasoningPublished || strings.TrimSpace(content) == "" {
|
|
return nil
|
|
}
|
|
reasoningStreamer, ok := p.streamer.(bus.ReasoningStreamer)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
if err := reasoningStreamer.FinalizeReasoning(ctx, content); err != nil {
|
|
return fmt.Errorf("stream reasoning finalize: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *streamingChunkPublisher) ClearFinalizedStreamMarker() {
|
|
if p == nil || p.streamer == nil {
|
|
return
|
|
}
|
|
if cleaner, ok := p.streamer.(interface{ ClearFinalizedStreamMarker() }); ok {
|
|
cleaner.ClearFinalizedStreamMarker()
|
|
}
|
|
}
|
|
|
|
func (p *streamingChunkPublisher) Cancel(ctx context.Context) {
|
|
if p == nil || p.streamer == nil {
|
|
return
|
|
}
|
|
p.streamer.Cancel(ctx)
|
|
}
|