fix(web): refactor pico chat flow and fix proxied websocket URLs (#1639)

- move chat controller, state, protocol, history, and websocket logic into a dedicated chat feature module
- improve chat reconnection, session hydration, and send gating based on actual websocket state
- preserve gateway status during transient SSE disconnects and update stop state immediately
- generate wss websocket URLs behind HTTPS proxies and add backend tests for forwarded proto handling
This commit is contained in:
wenjie
2026-03-16 16:25:16 +08:00
committed by GitHub
parent 0c94e6f7b3
commit c513ad22d7
16 changed files with 509 additions and 215 deletions
@@ -0,0 +1,459 @@
import { getDefaultStore } from "jotai"
import { toast } from "sonner"
import { getPicoToken } from "@/api/pico"
import {
loadSessionMessages,
mergeHistoryMessages,
} from "@/features/chat/history"
import { type PicoMessage, handlePicoMessage } from "@/features/chat/protocol"
import {
clearStoredSessionId,
generateSessionId,
readStoredSessionId,
} from "@/features/chat/state"
import {
invalidateSocket,
isCurrentSocket,
normalizeWsUrlForBrowser,
} from "@/features/chat/websocket"
import i18n from "@/i18n"
import { getChatState, updateChatStore } from "@/store/chat"
import { type GatewayState, gatewayAtom } from "@/store/gateway"
const store = getDefaultStore()
let wsRef: WebSocket | null = null
let isConnecting = false
let msgIdCounter = 0
let activeSessionIdRef = getChatState().activeSessionId
let initialized = false
let unsubscribeGateway: (() => void) | null = null
let hydratePromise: Promise<void> | null = null
let connectionGeneration = 0
let reconnectTimer: number | null = null
let reconnectAttempts = 0
let shouldMaintainConnection = false
function clearReconnectTimer() {
if (reconnectTimer !== null) {
window.clearTimeout(reconnectTimer)
reconnectTimer = null
}
}
function shouldReconnectFor(generation: number, sessionId: string): boolean {
return (
shouldMaintainConnection &&
generation === connectionGeneration &&
sessionId === activeSessionIdRef &&
store.get(gatewayAtom).status === "running"
)
}
function scheduleReconnect(generation: number, sessionId: string) {
if (!shouldReconnectFor(generation, sessionId) || reconnectTimer !== null) {
return
}
const delay = Math.min(1000 * 2 ** reconnectAttempts, 5000)
reconnectAttempts += 1
reconnectTimer = window.setTimeout(() => {
reconnectTimer = null
if (!shouldReconnectFor(generation, sessionId)) {
return
}
void connectChat()
}, delay)
}
function needsActiveSessionHydration(): boolean {
const state = getChatState()
const storedSessionId = readStoredSessionId()
return Boolean(
storedSessionId &&
storedSessionId === state.activeSessionId &&
!state.hasHydratedActiveSession,
)
}
function setActiveSessionId(sessionId: string) {
activeSessionIdRef = sessionId
updateChatStore({ activeSessionId: sessionId })
}
function disconnectChatInternal({
clearDesiredConnection,
}: {
clearDesiredConnection: boolean
}) {
connectionGeneration += 1
clearReconnectTimer()
if (clearDesiredConnection) {
shouldMaintainConnection = false
}
const socket = wsRef
wsRef = null
isConnecting = false
invalidateSocket(socket)
updateChatStore({
connectionState: "disconnected",
isTyping: false,
})
}
export async function connectChat() {
if (
store.get(gatewayAtom).status !== "running" ||
needsActiveSessionHydration()
) {
return
}
if (
isConnecting ||
(wsRef &&
(wsRef.readyState === WebSocket.OPEN ||
wsRef.readyState === WebSocket.CONNECTING))
) {
return
}
const generation = connectionGeneration + 1
connectionGeneration = generation
isConnecting = true
clearReconnectTimer()
updateChatStore({ connectionState: "connecting" })
try {
const { token, ws_url } = await getPicoToken()
const sessionId = activeSessionIdRef
if (generation !== connectionGeneration) {
isConnecting = false
return
}
if (!token) {
console.error("No pico token available")
updateChatStore({ connectionState: "error" })
isConnecting = false
scheduleReconnect(generation, sessionId)
return
}
const finalWsUrl = normalizeWsUrlForBrowser(ws_url)
const url = `${finalWsUrl}?session_id=${encodeURIComponent(sessionId)}`
const socket = new WebSocket(url, [`token.${token}`])
if (generation !== connectionGeneration) {
isConnecting = false
invalidateSocket(socket)
return
}
socket.onopen = () => {
if (
!isCurrentSocket({
socket,
currentSocket: wsRef,
generation,
currentGeneration: connectionGeneration,
sessionId,
currentSessionId: activeSessionIdRef,
})
) {
return
}
updateChatStore({ connectionState: "connected" })
isConnecting = false
reconnectAttempts = 0
}
socket.onmessage = (event) => {
if (
!isCurrentSocket({
socket,
currentSocket: wsRef,
generation,
currentGeneration: connectionGeneration,
sessionId,
currentSessionId: activeSessionIdRef,
})
) {
return
}
try {
const message = JSON.parse(event.data) as PicoMessage
handlePicoMessage(message, sessionId)
} catch {
console.warn("Non-JSON message from pico:", event.data)
}
}
socket.onclose = () => {
if (
!isCurrentSocket({
socket,
currentSocket: wsRef,
generation,
currentGeneration: connectionGeneration,
sessionId,
currentSessionId: activeSessionIdRef,
})
) {
return
}
wsRef = null
isConnecting = false
updateChatStore({
connectionState: "disconnected",
isTyping: false,
})
scheduleReconnect(generation, sessionId)
}
socket.onerror = () => {
if (
!isCurrentSocket({
socket,
currentSocket: wsRef,
generation,
currentGeneration: connectionGeneration,
sessionId,
currentSessionId: activeSessionIdRef,
})
) {
return
}
isConnecting = false
updateChatStore({ connectionState: "error" })
scheduleReconnect(generation, sessionId)
}
wsRef = socket
} catch (error) {
if (generation !== connectionGeneration) {
isConnecting = false
return
}
console.error("Failed to connect to pico:", error)
updateChatStore({ connectionState: "error" })
isConnecting = false
scheduleReconnect(generation, activeSessionIdRef)
}
}
export function disconnectChat() {
disconnectChatInternal({ clearDesiredConnection: true })
}
export async function hydrateActiveSession() {
if (hydratePromise) {
return hydratePromise
}
const state = getChatState()
const storedSessionId = readStoredSessionId()
if (
!storedSessionId ||
state.hasHydratedActiveSession ||
storedSessionId !== state.activeSessionId
) {
if (!state.hasHydratedActiveSession) {
updateChatStore({ hasHydratedActiveSession: true })
}
return
}
hydratePromise = loadSessionMessages(storedSessionId)
.then((historyMessages) => {
const currentState = getChatState()
if (currentState.activeSessionId !== storedSessionId) {
return
}
if (currentState.messages.length > 0) {
updateChatStore({
messages: mergeHistoryMessages(
historyMessages,
currentState.messages,
),
hasHydratedActiveSession: true,
})
return
}
updateChatStore({
messages: historyMessages,
isTyping: false,
hasHydratedActiveSession: true,
})
})
.catch((error) => {
console.error("Failed to restore last session history:", error)
const currentState = getChatState()
if (currentState.activeSessionId !== storedSessionId) {
return
}
if (currentState.messages.length > 0) {
updateChatStore({ hasHydratedActiveSession: true })
return
}
clearStoredSessionId()
updateChatStore({
messages: [],
isTyping: false,
hasHydratedActiveSession: true,
})
})
.finally(() => {
hydratePromise = null
})
return hydratePromise
}
export function sendChatMessage(content: string) {
if (!wsRef || wsRef.readyState !== WebSocket.OPEN) {
console.warn("WebSocket not connected")
return false
}
const socket = wsRef
const id = `msg-${++msgIdCounter}-${Date.now()}`
updateChatStore((prev) => ({
messages: [
...prev.messages,
{ id, role: "user", content, timestamp: Date.now() },
],
isTyping: true,
}))
try {
socket.send(
JSON.stringify({
type: "message.send",
id,
payload: { content },
}),
)
return true
} catch (error) {
console.error("Failed to send pico message:", error)
updateChatStore((prev) => ({
messages: prev.messages.filter((message) => message.id !== id),
isTyping: false,
}))
return false
}
}
export async function switchChatSession(sessionId: string) {
if (sessionId === activeSessionIdRef) {
return
}
try {
const historyMessages = await loadSessionMessages(sessionId)
disconnectChatInternal({ clearDesiredConnection: false })
setActiveSessionId(sessionId)
updateChatStore({
messages: historyMessages,
isTyping: false,
hasHydratedActiveSession: true,
})
if (store.get(gatewayAtom).status === "running") {
shouldMaintainConnection = true
await connectChat()
}
} catch (error) {
console.error("Failed to load session history:", error)
toast.error(i18n.t("chat.historyOpenFailed"))
}
}
export async function newChatSession() {
if (getChatState().messages.length === 0) {
return
}
disconnectChatInternal({ clearDesiredConnection: false })
setActiveSessionId(generateSessionId())
updateChatStore({
messages: [],
isTyping: false,
hasHydratedActiveSession: true,
})
if (store.get(gatewayAtom).status === "running") {
shouldMaintainConnection = true
await connectChat()
}
}
export function initializeChatStore() {
if (initialized) {
return
}
initialized = true
activeSessionIdRef = getChatState().activeSessionId
let lastGatewayStatus: GatewayState | null = null
const syncConnectionWithGateway = (force: boolean = false) => {
const gatewayStatus = store.get(gatewayAtom).status
if (!force && gatewayStatus === lastGatewayStatus) {
return
}
lastGatewayStatus = gatewayStatus
if (gatewayStatus === "running") {
shouldMaintainConnection = true
if (needsActiveSessionHydration()) {
return
}
void connectChat()
return
}
if (gatewayStatus === "stopped" || gatewayStatus === "error") {
disconnectChatInternal({ clearDesiredConnection: true })
}
}
unsubscribeGateway = store.sub(gatewayAtom, syncConnectionWithGateway)
if (!readStoredSessionId()) {
updateChatStore({ hasHydratedActiveSession: true })
syncConnectionWithGateway(true)
return
}
void hydrateActiveSession().finally(() => {
if (!initialized) {
return
}
syncConnectionWithGateway(true)
})
}
export function teardownChatStore() {
unsubscribeGateway?.()
unsubscribeGateway = null
initialized = false
disconnectChat()
}
+68
View File
@@ -0,0 +1,68 @@
import { getSessionHistory } from "@/api/sessions"
import { normalizeUnixTimestamp } from "@/features/chat/state"
import type { ChatMessage } from "@/store/chat"
export async function loadSessionMessages(
sessionId: string,
): Promise<ChatMessage[]> {
const detail = await getSessionHistory(sessionId)
const fallbackTime = detail.updated
return detail.messages.map((message, index) => ({
id: `hist-${index}-${Date.now()}`,
role: message.role,
content: message.content,
timestamp: fallbackTime,
}))
}
function normalizeMessageTimestamp(timestamp: number | string): string {
if (typeof timestamp === "number") {
return String(normalizeUnixTimestamp(timestamp))
}
const trimmed = timestamp.trim()
if (/^-?\d+(\.\d+)?$/.test(trimmed)) {
return String(normalizeUnixTimestamp(Number(trimmed)))
}
const parsed = Date.parse(trimmed)
return Number.isNaN(parsed) ? trimmed : String(parsed)
}
function messageSignature(message: ChatMessage): string {
return `${message.role}\u0000${message.content}\u0000${normalizeMessageTimestamp(
message.timestamp,
)}`
}
function comparableTimestamp(timestamp: number | string): number {
const normalized = normalizeMessageTimestamp(timestamp)
const numeric = Number(normalized)
return Number.isFinite(numeric) ? numeric : 0
}
export function mergeHistoryMessages(
historyMessages: ChatMessage[],
currentMessages: ChatMessage[],
): ChatMessage[] {
const currentIds = new Set(currentMessages.map((message) => message.id))
const currentSignatures = new Set(
currentMessages.map((message) => messageSignature(message)),
)
const merged = [
...historyMessages.filter(
(message) =>
!currentIds.has(message.id) &&
!currentSignatures.has(messageSignature(message)),
),
...currentMessages,
]
return merged.sort(
(left, right) =>
comparableTimestamp(left.timestamp) -
comparableTimestamp(right.timestamp),
)
}
@@ -0,0 +1,81 @@
import { normalizeUnixTimestamp } from "@/features/chat/state"
import { updateChatStore } from "@/store/chat"
export interface PicoMessage {
type: string
id?: string
session_id?: string
timestamp?: number | string
payload?: Record<string, unknown>
}
export function handlePicoMessage(
message: PicoMessage,
expectedSessionId: string,
) {
if (message.session_id && message.session_id !== expectedSessionId) {
return
}
const payload = message.payload || {}
switch (message.type) {
case "message.create": {
const content = (payload.content as string) || ""
const messageId = (payload.message_id as string) || `pico-${Date.now()}`
const timestamp =
message.timestamp !== undefined &&
Number.isFinite(Number(message.timestamp))
? normalizeUnixTimestamp(Number(message.timestamp))
: Date.now()
updateChatStore((prev) => ({
messages: [
...prev.messages,
{
id: messageId,
role: "assistant",
content,
timestamp,
},
],
isTyping: false,
}))
break
}
case "message.update": {
const content = (payload.content as string) || ""
const messageId = payload.message_id as string
if (!messageId) {
break
}
updateChatStore((prev) => ({
messages: prev.messages.map((msg) =>
msg.id === messageId ? { ...msg, content } : msg,
),
}))
break
}
case "typing.start":
updateChatStore({ isTyping: true })
break
case "typing.stop":
updateChatStore({ isTyping: false })
break
case "error":
console.error("Pico error:", payload)
updateChatStore({ isTyping: false })
break
case "pong":
break
default:
console.log("Unknown pico message type:", message.type)
}
}
+59
View File
@@ -0,0 +1,59 @@
const LAST_SESSION_STORAGE_KEY = "picoclaw:last-session-id"
const UNIX_MS_THRESHOLD = 1e12
function readStorageValue() {
return (
globalThis.localStorage?.getItem(LAST_SESSION_STORAGE_KEY)?.trim() || ""
)
}
export function readStoredSessionId(): string {
return readStorageValue()
}
export function writeStoredSessionId(sessionId: string) {
if (sessionId) {
globalThis.localStorage?.setItem(LAST_SESSION_STORAGE_KEY, sessionId)
return
}
globalThis.localStorage?.removeItem(LAST_SESSION_STORAGE_KEY)
}
export function clearStoredSessionId() {
globalThis.localStorage?.removeItem(LAST_SESSION_STORAGE_KEY)
}
export function generateSessionId(): string {
const webCrypto = globalThis.crypto
if (webCrypto && typeof webCrypto.randomUUID === "function") {
return webCrypto.randomUUID()
}
if (webCrypto && typeof webCrypto.getRandomValues === "function") {
const bytes = new Uint8Array(16)
webCrypto.getRandomValues(bytes)
bytes[6] = (bytes[6] & 0x0f) | 0x40
bytes[8] = (bytes[8] & 0x3f) | 0x80
const hex = Array.from(bytes, (b) => b.toString(16).padStart(2, "0"))
return (
`${hex[0]}${hex[1]}${hex[2]}${hex[3]}-` +
`${hex[4]}${hex[5]}-` +
`${hex[6]}${hex[7]}-` +
`${hex[8]}${hex[9]}-` +
`${hex[10]}${hex[11]}${hex[12]}${hex[13]}${hex[14]}${hex[15]}`
)
}
return `session-${Date.now()}-${Math.random().toString(16).slice(2, 10)}`
}
export function getInitialActiveSessionId(): string {
return readStorageValue() || generateSessionId()
}
export function normalizeUnixTimestamp(timestamp: number): number {
return timestamp < UNIX_MS_THRESHOLD ? timestamp * 1000 : timestamp
}
@@ -0,0 +1,57 @@
export function normalizeWsUrlForBrowser(wsUrl: string): string {
let finalWsUrl = wsUrl
try {
const parsedUrl = new URL(wsUrl)
const isLocalHost =
parsedUrl.hostname === "localhost" ||
parsedUrl.hostname === "127.0.0.1" ||
parsedUrl.hostname === "0.0.0.0"
const isBrowserLocal =
window.location.hostname === "localhost" ||
window.location.hostname === "127.0.0.1"
if (isLocalHost && !isBrowserLocal) {
parsedUrl.hostname = window.location.hostname
finalWsUrl = parsedUrl.toString()
}
} catch (error) {
console.warn("Could not parse ws_url:", error)
}
return finalWsUrl
}
export function invalidateSocket(socket: WebSocket | null) {
if (!socket) {
return
}
socket.onopen = null
socket.onmessage = null
socket.onclose = null
socket.onerror = null
socket.close()
}
export function isCurrentSocket({
socket,
currentSocket,
generation,
currentGeneration,
sessionId,
currentSessionId,
}: {
socket: WebSocket
currentSocket: WebSocket | null
generation: number
currentGeneration: number
sessionId: string
currentSessionId: string
}): boolean {
return (
currentSocket === socket &&
generation === currentGeneration &&
sessionId === currentSessionId
)
}