From c8bac699fef02e509a367b627dde61aed5a4a666 Mon Sep 17 00:00:00 2001 From: lc6464 <64722907+lc6464@users.noreply.github.com> Date: Fri, 10 Apr 2026 20:23:12 +0800 Subject: [PATCH] fix(pico): separate thought and normal messages --- pkg/agent/loop.go | 55 ++++++++++++++-- pkg/agent/loop_test.go | 56 ++++++++++++++++ pkg/channels/pico/client.go | 8 ++- pkg/channels/pico/client_test.go | 64 +++++++++++++++++++ pkg/channels/pico/pico.go | 16 ++++- pkg/channels/pico/protocol.go | 10 +++ pkg/providers/antigravity_provider.go | 17 +++-- pkg/providers/antigravity_provider_test.go | 24 +++++++ .../src/components/chat/assistant-message.tsx | 38 +++++++++-- .../src/components/chat/chat-page.tsx | 1 + web/frontend/src/features/chat/history.ts | 3 +- web/frontend/src/features/chat/protocol.ts | 27 +++++++- web/frontend/src/i18n/locales/en.json | 1 + web/frontend/src/i18n/locales/zh.json | 1 + web/frontend/src/store/chat.ts | 3 + 15 files changed, 300 insertions(+), 24 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index ac230aa86..03fdfec82 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -105,6 +105,8 @@ const ( toolLimitResponse = "I've reached `max_tool_iterations` without a final response. Increase `max_tool_iterations` in config.json if this task needs more tool steps." handledToolResponseSummary = "Requested output delivered via tool attachment." sessionKeyAgentPrefix = "agent:" + metadataKeyMessageKind = "message_kind" + messageKindThought = "thought" metadataKeyAccountID = "account_id" metadataKeyGuildID = "guild_id" metadataKeyTeamID = "team_id" @@ -1622,6 +1624,41 @@ func (al *AgentLoop) targetReasoningChannelID(channelName string) (chatID string return "" } +func (al *AgentLoop) publishPicoReasoning(ctx context.Context, reasoningContent, chatID string) { + if reasoningContent == "" || chatID == "" { + return + } + + if ctx.Err() != nil { + return + } + + pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second) + defer pubCancel() + + if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{ + Channel: "pico", + ChatID: chatID, + Content: reasoningContent, + Metadata: map[string]string{ + metadataKeyMessageKind: messageKindThought, + }, + }); err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) || + errors.Is(err, bus.ErrBusClosed) { + logger.DebugCF("agent", "Pico reasoning publish skipped (timeout/cancel)", map[string]any{ + "channel": "pico", + "error": err.Error(), + }) + } else { + logger.WarnCF("agent", "Failed to publish pico reasoning (best-effort)", map[string]any{ + "channel": "pico", + "error": err.Error(), + }) + } + } +} + func (al *AgentLoop) handleReasoning( ctx context.Context, reasoningContent, channelName, channelID string, @@ -2223,12 +2260,16 @@ turnLoop: if reasoningContent == "" { reasoningContent = response.ReasoningContent } - go al.handleReasoning( - turnCtx, - reasoningContent, - ts.channel, - al.targetReasoningChannelID(ts.channel), - ) + if ts.channel == "pico" { + al.publishPicoReasoning(turnCtx, reasoningContent, ts.chatID) + } else { + go al.handleReasoning( + turnCtx, + reasoningContent, + ts.channel, + al.targetReasoningChannelID(ts.channel), + ) + } al.emitEvent( EventKindLLMResponse, ts.eventMeta("runTurn", "turn.llm.response"), @@ -2277,7 +2318,7 @@ turnLoop: if len(response.ToolCalls) == 0 || gracefulTerminal { responseContent := response.Content - if responseContent == "" && response.ReasoningContent != "" { + if responseContent == "" && response.ReasoningContent != "" && ts.channel != "pico" { responseContent = response.ReasoningContent } if steerMsgs := al.dequeueSteeringMessagesForScope(ts.sessionKey); len(steerMsgs) > 0 { diff --git a/pkg/agent/loop_test.go b/pkg/agent/loop_test.go index a67c8d040..56ea000c8 100644 --- a/pkg/agent/loop_test.go +++ b/pkg/agent/loop_test.go @@ -2660,6 +2660,62 @@ func TestProcessMessage_PublishesReasoningContentToReasoningChannel(t *testing.T } } +func TestProcessMessage_PicoPublishesReasoningAsThoughtMessage(t *testing.T) { + tmpDir := t.TempDir() + cfg := &config.Config{ + Agents: config.AgentsConfig{ + Defaults: config.AgentDefaults{ + Workspace: tmpDir, + ModelName: "test-model", + MaxTokens: 4096, + MaxToolIterations: 10, + }, + }, + } + + msgBus := bus.NewMessageBus() + provider := &reasoningContentProvider{ + response: "final answer", + reasoningContent: "thinking trace", + } + al := NewAgentLoop(cfg, msgBus, provider) + + response, err := al.processMessage(context.Background(), bus.InboundMessage{ + Channel: "pico", + SenderID: "user1", + ChatID: "pico:test-session", + Content: "hello", + }) + if err != nil { + t.Fatalf("processMessage() error = %v", err) + } + if response != "final answer" { + t.Fatalf("processMessage() response = %q, want %q", response, "final answer") + } + + var thoughtMsg *bus.OutboundMessage + deadline := time.After(3 * time.Second) + + for thoughtMsg == nil { + select { + case outbound := <-msgBus.OutboundChan(): + msg := outbound + if msg.Content == "thinking trace" { + thoughtMsg = &msg + } + case <-deadline: + t.Fatal("expected thought outbound message for pico") + } + } + + if thoughtMsg.Channel != "pico" || thoughtMsg.ChatID != "pico:test-session" { + t.Fatalf("thought message route = %s/%s, want pico/pico:test-session", thoughtMsg.Channel, thoughtMsg.ChatID) + } + if thoughtMsg.Metadata[metadataKeyMessageKind] != messageKindThought { + t.Fatalf("thought metadata kind = %q, want %q", thoughtMsg.Metadata[metadataKeyMessageKind], messageKindThought) + } +} + func TestProcessHeartbeat_DoesNotPublishToolFeedback(t *testing.T) { tmpDir := t.TempDir() heartbeatFile := filepath.Join(tmpDir, "heartbeat-task.txt") diff --git a/pkg/channels/pico/client.go b/pkg/channels/pico/client.go index b4bfd09e5..bf3e38cf4 100644 --- a/pkg/channels/pico/client.go +++ b/pkg/channels/pico/client.go @@ -242,7 +242,11 @@ func (c *PicoClientChannel) handleInbound(pc *picoConn, msg PicoMessage) { } func (c *PicoClientChannel) handleServerMessage(pc *picoConn, msg PicoMessage) { - content, _ := msg.Payload["content"].(string) + if isThoughtPayload(msg.Payload) { + return + } + + content, _ := msg.Payload[PayloadKeyContent].(string) if strings.TrimSpace(content) == "" { return } @@ -285,7 +289,7 @@ func (c *PicoClientChannel) Send(ctx context.Context, msg bus.OutboundMessage) ( } outMsg := newMessage(TypeMessageSend, map[string]any{ - "content": msg.Content, + PayloadKeyContent: msg.Content, }) outMsg.SessionID = strings.TrimPrefix(msg.ChatID, "pico_client:") return nil, pc.writeJSON(outMsg) diff --git a/pkg/channels/pico/client_test.go b/pkg/channels/pico/client_test.go index b40606647..732589432 100644 --- a/pkg/channels/pico/client_test.go +++ b/pkg/channels/pico/client_test.go @@ -316,3 +316,67 @@ func TestPicoChannel_HandleMessageSend_AllowsMediaOnly(t *testing.T) { t.Fatal("timed out waiting for inbound media message") } } + +func TestIsThoughtPayload(t *testing.T) { + tests := []struct { + name string + payload map[string]any + want bool + }{ + { + name: "explicit thought bool", + payload: map[string]any{PayloadKeyThought: true}, + want: true, + }, + { + name: "thought false", + payload: map[string]any{PayloadKeyThought: false}, + want: false, + }, + { + name: "thought string ignored", + payload: map[string]any{PayloadKeyThought: "true"}, + want: false, + }, + { + name: "default normal", + payload: map[string]any{PayloadKeyContent: "hello"}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isThoughtPayload(tt.payload); got != tt.want { + t.Fatalf("isThoughtPayload() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestPicoClientChannel_HandleServerMessage_IgnoresThought(t *testing.T) { + mb := bus.NewMessageBus() + ch, err := NewPicoClientChannel(config.PicoClientConfig{ + URL: "ws://localhost:8080/ws", + }, mb) + if err != nil { + t.Fatalf("NewPicoClientChannel() error = %v", err) + } + + ch.ctx = context.Background() + pc := &picoConn{sessionID: "sess-thought"} + + ch.handleServerMessage(pc, PicoMessage{ + Type: TypeMessageCreate, + Payload: map[string]any{ + PayloadKeyContent: "internal reasoning", + PayloadKeyThought: true, + }, + }) + + select { + case msg := <-mb.InboundChan(): + t.Fatalf("expected no inbound publish for thought payload, got %+v", msg) + case <-time.After(150 * time.Millisecond): + } +} diff --git a/pkg/channels/pico/pico.go b/pkg/channels/pico/pico.go index e22da1ba1..6525c2d4a 100644 --- a/pkg/channels/pico/pico.go +++ b/pkg/channels/pico/pico.go @@ -39,6 +39,13 @@ var allowedInlineImageMIMETypes = map[string]struct{}{ "image/bmp": {}, } +func outboundMessageIsThought(metadata map[string]string) bool { + if len(metadata) == 0 { + return false + } + return strings.EqualFold(strings.TrimSpace(metadata["message_kind"]), MessageKindThought) +} + // writeJSON sends a JSON message to the connection with write locking. func (pc *picoConn) writeJSON(v any) error { if pc.closed.Load() { @@ -247,9 +254,11 @@ func (c *PicoChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]stri if !c.IsRunning() { return nil, channels.ErrNotRunning } + isThought := outboundMessageIsThought(msg.Metadata) outMsg := newMessage(TypeMessageCreate, map[string]any{ - "content": msg.Content, + PayloadKeyContent: msg.Content, + PayloadKeyThought: isThought, }) return nil, c.broadcastToSession(msg.ChatID, outMsg) @@ -288,8 +297,9 @@ func (c *PicoChannel) SendPlaceholder(ctx context.Context, chatID string) (strin msgID := uuid.New().String() outMsg := newMessage(TypeMessageCreate, map[string]any{ - "content": text, - "message_id": msgID, + PayloadKeyContent: text, + PayloadKeyThought: false, + "message_id": msgID, }) if err := c.broadcastToSession(chatID, outMsg); err != nil { diff --git a/pkg/channels/pico/protocol.go b/pkg/channels/pico/protocol.go index 3f8ba8643..ecdc2d140 100644 --- a/pkg/channels/pico/protocol.go +++ b/pkg/channels/pico/protocol.go @@ -19,6 +19,11 @@ const ( TypePong = "pong" PicoTokenPrefix = "pico-" + + PayloadKeyContent = "content" + PayloadKeyThought = "thought" + + MessageKindThought = "thought" ) // PicoMessage is the wire format for all Pico Protocol messages. @@ -39,6 +44,11 @@ func newMessage(msgType string, payload map[string]any) PicoMessage { } } +func isThoughtPayload(payload map[string]any) bool { + thought, _ := payload[PayloadKeyThought].(bool) + return thought +} + func newErrorWithPayload(code, message string, extra map[string]any) PicoMessage { payload := map[string]any{ "code": code, diff --git a/pkg/providers/antigravity_provider.go b/pkg/providers/antigravity_provider.go index 8a1890212..b5ab847d5 100644 --- a/pkg/providers/antigravity_provider.go +++ b/pkg/providers/antigravity_provider.go @@ -389,6 +389,7 @@ type antigravityJSONResponse struct { Content struct { Parts []struct { Text string `json:"text,omitempty"` + Thought bool `json:"thought,omitempty"` ThoughtSignature string `json:"thoughtSignature,omitempty"` ThoughtSignatureSnake string `json:"thought_signature,omitempty"` FunctionCall *antigravityFunctionCall `json:"functionCall,omitempty"` @@ -406,6 +407,7 @@ type antigravityJSONResponse struct { func (p *AntigravityProvider) parseSSEResponse(body string) (*LLMResponse, error) { var contentParts []string + var reasoningParts []string var toolCalls []ToolCall var usage *UsageInfo var finishReason string @@ -433,7 +435,11 @@ func (p *AntigravityProvider) parseSSEResponse(body string) (*LLMResponse, error for _, candidate := range resp.Candidates { for _, part := range candidate.Content.Parts { if part.Text != "" { - contentParts = append(contentParts, part.Text) + if part.Thought { + reasoningParts = append(reasoningParts, part.Text) + } else { + contentParts = append(contentParts, part.Text) + } } if part.FunctionCall != nil { argumentsJSON, _ := json.Marshal(part.FunctionCall.Args) @@ -475,10 +481,11 @@ func (p *AntigravityProvider) parseSSEResponse(body string) (*LLMResponse, error } return &LLMResponse{ - Content: strings.Join(contentParts, ""), - ToolCalls: toolCalls, - FinishReason: mappedFinish, - Usage: usage, + Content: strings.Join(contentParts, ""), + ReasoningContent: strings.Join(reasoningParts, ""), + ToolCalls: toolCalls, + FinishReason: mappedFinish, + Usage: usage, }, nil } diff --git a/pkg/providers/antigravity_provider_test.go b/pkg/providers/antigravity_provider_test.go index 238765321..9155e2d56 100644 --- a/pkg/providers/antigravity_provider_test.go +++ b/pkg/providers/antigravity_provider_test.go @@ -54,3 +54,27 @@ func TestResolveToolResponseNameInfersNameFromGeneratedCallID(t *testing.T) { t.Fatalf("expected inferred tool name search_docs, got %q", got) } } + +func TestParseSSEResponse_SplitsThoughtAndVisibleContent(t *testing.T) { + p := &AntigravityProvider{} + body := "data: {\"response\":{\"candidates\":[{\"content\":{\"parts\":[{\"text\":\"hidden reasoning\",\"thought\":true},{\"text\":\"visible answer\"}],\"role\":\"model\"},\"finishReason\":\"STOP\"}],\"usageMetadata\":{\"promptTokenCount\":8,\"candidatesTokenCount\":17,\"totalTokenCount\":216}}}\n" + + "data: [DONE]\n" + + resp, err := p.parseSSEResponse(body) + if err != nil { + t.Fatalf("parseSSEResponse() error = %v", err) + } + + if resp.Content != "visible answer" { + t.Fatalf("Content = %q, want %q", resp.Content, "visible answer") + } + if resp.ReasoningContent != "hidden reasoning" { + t.Fatalf("ReasoningContent = %q, want %q", resp.ReasoningContent, "hidden reasoning") + } + if resp.FinishReason != "stop" { + t.Fatalf("FinishReason = %q, want %q", resp.FinishReason, "stop") + } + if resp.Usage == nil || resp.Usage.TotalTokens != 216 { + t.Fatalf("Usage.TotalTokens = %v, want %d", resp.Usage, 216) + } +} diff --git a/web/frontend/src/components/chat/assistant-message.tsx b/web/frontend/src/components/chat/assistant-message.tsx index 9966226b2..418516172 100644 --- a/web/frontend/src/components/chat/assistant-message.tsx +++ b/web/frontend/src/components/chat/assistant-message.tsx @@ -1,5 +1,6 @@ -import { IconCheck, IconCopy } from "@tabler/icons-react" +import { IconBrain, IconCheck, IconCopy } from "@tabler/icons-react" import { useState } from "react" +import { useTranslation } from "react-i18next" import ReactMarkdown from "react-markdown" import rehypeRaw from "rehype-raw" import rehypeSanitize from "rehype-sanitize" @@ -7,16 +8,20 @@ import remarkGfm from "remark-gfm" import { Button } from "@/components/ui/button" import { formatMessageTime } from "@/hooks/use-pico-chat" +import { cn } from "@/lib/utils" interface AssistantMessageProps { content: string + isThought?: boolean timestamp?: string | number } export function AssistantMessage({ content, + isThought = false, timestamp = "", }: AssistantMessageProps) { + const { t } = useTranslation() const [isCopied, setIsCopied] = useState(false) const formattedTimestamp = timestamp !== "" ? formatMessageTime(timestamp) : "" @@ -33,6 +38,12 @@ export function AssistantMessage({
PicoClaw + {isThought && ( + + + {t("chat.reasoningLabel")} + + )} {formattedTimestamp && ( <> @@ -42,8 +53,22 @@ export function AssistantMessage({
-
-
+
+
{isCopied ? ( diff --git a/web/frontend/src/components/chat/chat-page.tsx b/web/frontend/src/components/chat/chat-page.tsx index 38a0fc6b1..e8e07a801 100644 --- a/web/frontend/src/components/chat/chat-page.tsx +++ b/web/frontend/src/components/chat/chat-page.tsx @@ -247,6 +247,7 @@ export function ChatPage() { {msg.role === "assistant" ? ( ) : ( diff --git a/web/frontend/src/features/chat/history.ts b/web/frontend/src/features/chat/history.ts index 850b3319e..92beb06b7 100644 --- a/web/frontend/src/features/chat/history.ts +++ b/web/frontend/src/features/chat/history.ts @@ -24,6 +24,7 @@ export async function loadSessionMessages( id: `hist-${index}-${Date.now()}`, role: message.role, content: message.content, + kind: message.role === "assistant" ? "normal" : undefined, attachments: toChatAttachments(message.media), timestamp: fallbackTime, })) @@ -50,7 +51,7 @@ function messageSignature(message: ChatMessage): string { return `${message.role}\u0000${message.content}\u0000${normalizeMessageTimestamp( message.timestamp, - )}\u0000${attachmentSignature}` + )}\u0000${message.kind ?? ""}\u0000${attachmentSignature}` } function comparableTimestamp(timestamp: number | string): number { diff --git a/web/frontend/src/features/chat/protocol.ts b/web/frontend/src/features/chat/protocol.ts index 7429aef01..a7edfc21b 100644 --- a/web/frontend/src/features/chat/protocol.ts +++ b/web/frontend/src/features/chat/protocol.ts @@ -1,7 +1,10 @@ import { toast } from "sonner" import { normalizeUnixTimestamp } from "@/features/chat/state" -import { updateChatStore } from "@/store/chat" +import { + type AssistantMessageKind, + updateChatStore, +} from "@/store/chat" export interface PicoMessage { type: string @@ -11,6 +14,16 @@ export interface PicoMessage { payload?: Record } +function parseAssistantMessageKind( + payload: Record, +): AssistantMessageKind { + return payload.thought === true ? "thought" : "normal" +} + +function hasAssistantKindPayload(payload: Record): boolean { + return typeof payload.thought === "boolean" +} + export function handlePicoMessage( message: PicoMessage, expectedSessionId: string, @@ -25,6 +38,7 @@ export function handlePicoMessage( case "message.create": { const content = (payload.content as string) || "" const messageId = (payload.message_id as string) || `pico-${Date.now()}` + const kind = parseAssistantMessageKind(payload) const timestamp = message.timestamp !== undefined && Number.isFinite(Number(message.timestamp)) @@ -38,6 +52,7 @@ export function handlePicoMessage( id: messageId, role: "assistant", content, + kind, timestamp, }, ], @@ -49,13 +64,21 @@ export function handlePicoMessage( case "message.update": { const content = (payload.content as string) || "" const messageId = payload.message_id as string + const hasKind = hasAssistantKindPayload(payload) + const kind = parseAssistantMessageKind(payload) if (!messageId) { break } updateChatStore((prev) => ({ messages: prev.messages.map((msg) => - msg.id === messageId ? { ...msg, content } : msg, + msg.id === messageId + ? { + ...msg, + content, + ...(hasKind ? { kind } : {}), + } + : msg, ), })) break diff --git a/web/frontend/src/i18n/locales/en.json b/web/frontend/src/i18n/locales/en.json index b53abeb76..2434d4576 100644 --- a/web/frontend/src/i18n/locales/en.json +++ b/web/frontend/src/i18n/locales/en.json @@ -47,6 +47,7 @@ "step3": "Preparing response...", "step4": "Almost there..." }, + "reasoningLabel": "Reasoning", "history": "History", "noHistory": "No chat history yet", "historyLoadFailed": "Failed to load chat history", diff --git a/web/frontend/src/i18n/locales/zh.json b/web/frontend/src/i18n/locales/zh.json index e2e8eae04..c03d4181d 100644 --- a/web/frontend/src/i18n/locales/zh.json +++ b/web/frontend/src/i18n/locales/zh.json @@ -47,6 +47,7 @@ "step3": "准备回复...", "step4": "马上就好..." }, + "reasoningLabel": "思考", "history": "历史记录", "noHistory": "暂无对话历史", "historyLoadFailed": "加载历史记录失败", diff --git a/web/frontend/src/store/chat.ts b/web/frontend/src/store/chat.ts index 21eb5edff..2c6f70610 100644 --- a/web/frontend/src/store/chat.ts +++ b/web/frontend/src/store/chat.ts @@ -11,11 +11,14 @@ export interface ChatAttachment { filename?: string } +export type AssistantMessageKind = "normal" | "thought" + export interface ChatMessage { id: string role: "user" | "assistant" content: string timestamp: number | string + kind?: AssistantMessageKind attachments?: ChatAttachment[] }