fix(review): address copilot backpressure and SSE parse feedback

This commit is contained in:
lc6464
2026-04-11 11:18:41 +08:00
parent 86917faa9b
commit e9f55d776d
3 changed files with 83 additions and 3 deletions
+1 -1
View File
@@ -2261,7 +2261,7 @@ turnLoop:
reasoningContent = response.ReasoningContent
}
if ts.channel == "pico" {
al.publishPicoReasoning(turnCtx, reasoningContent, ts.chatID)
go al.publishPicoReasoning(turnCtx, reasoningContent, ts.chatID)
} else {
go al.handleReasoning(
turnCtx,
+5 -2
View File
@@ -481,14 +481,17 @@ func parseGeminiStreamResponse(
if !strings.HasPrefix(line, "data: ") {
continue
}
data := strings.TrimPrefix(line, "data: ")
data := strings.TrimSpace(strings.TrimPrefix(line, "data: "))
if data == "" {
continue
}
if data == "[DONE]" {
break
}
var chunk geminiGenerateContentResponse
if err := json.Unmarshal([]byte(data), &chunk); err != nil {
continue
return nil, fmt.Errorf("invalid gemini stream chunk: %w", err)
}
for _, candidate := range chunk.Candidates {
+77
View File
@@ -212,6 +212,83 @@ func TestGeminiProvider_ChatStreamParsesThoughtTextAndToolCalls(t *testing.T) {
}
}
func TestGeminiProvider_ChatStreamSkipsEmptyDataFrames(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
flusher, ok := w.(http.Flusher)
if !ok {
t.Fatal("response writer is not flushable")
}
_, _ = fmt.Fprint(w, "data: \n\n")
flusher.Flush()
chunk := map[string]any{
"candidates": []any{map[string]any{
"content": map[string]any{
"parts": []any{map[string]any{"text": "ok"}},
},
"finishReason": "STOP",
}},
}
raw, err := json.Marshal(chunk)
if err != nil {
t.Fatalf("marshal chunk: %v", err)
}
_, _ = fmt.Fprintf(w, "data: %s\n\n", raw)
flusher.Flush()
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
flusher.Flush()
}))
defer server.Close()
provider := NewGeminiProvider("test-key", server.URL, "", "", 0, nil, nil)
resp, err := provider.ChatStream(
t.Context(),
[]Message{{Role: "user", Content: "hello"}},
nil,
"gemini-2.5-flash",
nil,
nil,
)
if err != nil {
t.Fatalf("ChatStream() error = %v", err)
}
if resp.Content != "ok" {
t.Fatalf("Content = %q, want %q", resp.Content, "ok")
}
}
func TestGeminiProvider_ChatStreamReturnsErrorOnInvalidDataFrame(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
flusher, ok := w.(http.Flusher)
if !ok {
t.Fatal("response writer is not flushable")
}
_, _ = fmt.Fprint(w, "data: {invalid-json}\n\n")
flusher.Flush()
}))
defer server.Close()
provider := NewGeminiProvider("test-key", server.URL, "", "", 0, nil, nil)
_, err := provider.ChatStream(
t.Context(),
[]Message{{Role: "user", Content: "hello"}},
nil,
"gemini-2.5-flash",
nil,
nil,
)
if err == nil {
t.Fatal("ChatStream() expected error for invalid SSE data frame")
}
if !strings.Contains(err.Error(), "invalid gemini stream chunk") {
t.Fatalf("error = %v, want contains %q", err, "invalid gemini stream chunk")
}
}
func TestGeminiProvider_BuildRequestBodyIncludesMediaAndThinkingConfig(t *testing.T) {
provider := NewGeminiProvider("test-key", "https://example.com/v1beta", "", "", 0, nil, nil)