feat(web): use a global WebSocket for Pico chat sessions (#1507)

- centralize Pico chat connection and session state in a shared store
- move chat lifecycle control out of usePicoChat
- hydrate and restore the active session across the app
This commit is contained in:
wenjie
2026-03-13 19:04:18 +08:00
committed by GitHub
parent 27fef9eab8
commit 4d8fdb0b3d
7 changed files with 549 additions and 431 deletions
@@ -15,6 +15,7 @@ import { useChatModels } from "@/hooks/use-chat-models"
import { useGateway } from "@/hooks/use-gateway"
import { usePicoChat } from "@/hooks/use-pico-chat"
import { useSessionHistory } from "@/hooks/use-session-history"
import { hydrateActiveSession } from "@/lib/pico-chat-controller"
export function ChatPage() {
const { t } = useTranslation()
@@ -67,6 +68,10 @@ export function ChatPage() {
syncScrollState(e.currentTarget)
}
useEffect(() => {
void hydrateActiveSession()
}, [])
useEffect(() => {
if (scrollRef.current) {
if (isAtBottom) {
+11 -431
View File
@@ -1,79 +1,12 @@
import dayjs from "dayjs"
import { useAtomValue } from "jotai"
import {
type SetStateAction,
useCallback,
useEffect,
useRef,
useState,
} from "react"
import { useTranslation } from "react-i18next"
import { toast } from "sonner"
import { getPicoToken } from "@/api/pico"
import { getSessionHistory } from "@/api/sessions"
import { gatewayAtom } from "@/store"
// Pico Protocol message types
interface PicoMessage {
type: string
id?: string
session_id?: string
timestamp?: number | string
payload?: Record<string, unknown>
}
export interface ChatMessage {
id: string
role: "user" | "assistant"
content: string
timestamp: number | string
}
type ConnectionState = "disconnected" | "connecting" | "connected" | "error"
const LAST_SESSION_STORAGE_KEY = "picoclaw:last-session-id"
function readStoredSessionId(): string {
const value = localStorage.getItem(LAST_SESSION_STORAGE_KEY)?.trim()
return value || ""
}
function writeStoredSessionId(sessionId: string) {
if (sessionId) {
localStorage.setItem(LAST_SESSION_STORAGE_KEY, sessionId)
return
}
localStorage.removeItem(LAST_SESSION_STORAGE_KEY)
}
function generateSessionId(): string {
const webCrypto = globalThis.crypto
if (webCrypto && typeof webCrypto.randomUUID === "function") {
return webCrypto.randomUUID()
}
if (webCrypto && typeof webCrypto.getRandomValues === "function") {
const bytes = new Uint8Array(16)
webCrypto.getRandomValues(bytes)
// RFC4122 v4: set version and variant bits.
bytes[6] = (bytes[6] & 0x0f) | 0x40
bytes[8] = (bytes[8] & 0x3f) | 0x80
const hex = Array.from(bytes, (b) => b.toString(16).padStart(2, "0"))
return (
`${hex[0]}${hex[1]}${hex[2]}${hex[3]}-` +
`${hex[4]}${hex[5]}-` +
`${hex[6]}${hex[7]}-` +
`${hex[8]}${hex[9]}-` +
`${hex[10]}${hex[11]}${hex[12]}${hex[13]}${hex[14]}${hex[15]}`
)
}
return `session-${Date.now()}-${Math.random().toString(16).slice(2, 10)}`
}
newChatSession,
sendChatMessage,
switchChatSession,
} from "@/lib/pico-chat-controller"
import { chatAtom } from "@/store/chat"
const UNIX_MS_THRESHOLD = 1e12
@@ -124,369 +57,16 @@ export function formatMessageTime(dateRaw: number | string | Date): string {
}
export function usePicoChat() {
const { t } = useTranslation()
const { status: gatewayState } = useAtomValue(gatewayAtom)
const [messages, setMessages] = useState<ChatMessage[]>([])
const [connectionState, setConnectionState] =
useState<ConnectionState>("disconnected")
const [isTyping, setIsTyping] = useState(false)
const [activeSessionId, setActiveSessionId] = useState<string>(
() => readStoredSessionId() || generateSessionId(),
)
const wsRef = useRef<WebSocket | null>(null)
const isConnectingRef = useRef(false)
const msgIdCounter = useRef(0)
const activeSessionIdRef = useRef(activeSessionId)
const messagesRevisionRef = useRef(0)
const setTrackedMessages = useCallback(
(nextState: SetStateAction<ChatMessage[]>) => {
setMessages((prev) => {
const next =
typeof nextState === "function"
? (nextState as (prevState: ChatMessage[]) => ChatMessage[])(prev)
: nextState
if (next !== prev) {
messagesRevisionRef.current += 1
}
return next
})
},
[],
)
// Keep ref in sync
useEffect(() => {
activeSessionIdRef.current = activeSessionId
writeStoredSessionId(activeSessionId)
}, [activeSessionId])
const loadSessionMessages = useCallback(async (sessionId: string) => {
const detail = await getSessionHistory(sessionId)
const fallbackTime = detail.updated
return detail.messages.map((m, i) => ({
id: `hist-${i}-${Date.now()}`,
role: m.role as "user" | "assistant",
content: m.content,
timestamp: fallbackTime,
}))
}, [])
useEffect(() => {
const storedSessionId = readStoredSessionId()
if (!storedSessionId) {
return
}
const restoreRevision = messagesRevisionRef.current
let cancelled = false
void loadSessionMessages(storedSessionId)
.then((historyMessages) => {
if (cancelled) {
return
}
if (activeSessionIdRef.current !== storedSessionId) {
return
}
if (messagesRevisionRef.current !== restoreRevision) {
return
}
setTrackedMessages(historyMessages)
setIsTyping(false)
})
.catch((err) => {
console.error("Failed to restore last session history:", err)
if (cancelled) {
return
}
if (activeSessionIdRef.current !== storedSessionId) {
return
}
if (messagesRevisionRef.current !== restoreRevision) {
return
}
localStorage.removeItem(LAST_SESSION_STORAGE_KEY)
setTrackedMessages([])
setIsTyping(false)
})
return () => {
cancelled = true
}
}, [loadSessionMessages, setTrackedMessages])
const handlePicoMessage = useCallback(
(msg: PicoMessage) => {
const payload = msg.payload || {}
switch (msg.type) {
case "message.create": {
const content = (payload.content as string) || ""
const messageId =
(payload.message_id as string) || `pico-${Date.now()}`
// Use provided timestamp or current time
const timestampRaw =
msg.timestamp !== undefined &&
Number.isFinite(Number(msg.timestamp))
? normalizeUnixTimestamp(Number(msg.timestamp))
: Date.now()
setTrackedMessages((prev) => [
...prev,
{
id: messageId,
role: "assistant",
content,
timestamp: timestampRaw,
},
])
setIsTyping(false)
break
}
case "message.update": {
const content = (payload.content as string) || ""
const messageId = payload.message_id as string
if (!messageId) break
setTrackedMessages((prev) =>
prev.map((m) => (m.id === messageId ? { ...m, content } : m)),
)
break
}
case "typing.start":
setIsTyping(true)
break
case "typing.stop":
setIsTyping(false)
break
case "error":
console.error("Pico error:", payload)
setIsTyping(false)
break
case "pong":
// heartbeat response, ignore
break
default:
console.log("Unknown pico message type:", msg.type)
}
},
[setTrackedMessages],
)
const connect = useCallback(async () => {
if (
isConnectingRef.current ||
(wsRef.current &&
(wsRef.current.readyState === WebSocket.OPEN ||
wsRef.current.readyState === WebSocket.CONNECTING))
) {
return
}
isConnectingRef.current = true
setConnectionState("connecting")
try {
const { token, ws_url } = await getPicoToken()
if (!token) {
console.error("No pico token available")
setConnectionState("error")
isConnectingRef.current = false
return
}
// If the backend returns a localhost URL but we are accessing it via a LAN IP
// (e.g., from a mobile device during dev), rewrite the hostname to match.
let finalWsUrl = ws_url
try {
const parsedUrl = new URL(ws_url)
const isLocalHost =
parsedUrl.hostname === "localhost" ||
parsedUrl.hostname === "127.0.0.1" ||
parsedUrl.hostname === "0.0.0.0"
const isBrowserLocal =
window.location.hostname === "localhost" ||
window.location.hostname === "127.0.0.1"
if (isLocalHost && !isBrowserLocal) {
parsedUrl.hostname = window.location.hostname
finalWsUrl = parsedUrl.toString()
}
} catch (e) {
console.warn("Could not parse ws_url:", e)
}
// Build WebSocket URL with session_id
const sessionId = activeSessionIdRef.current
const url = `${finalWsUrl}?token=${encodeURIComponent(token)}&session_id=${encodeURIComponent(sessionId)}`
const socket = new WebSocket(url)
socket.onopen = () => {
setConnectionState("connected")
isConnectingRef.current = false
}
socket.onmessage = (event) => {
try {
const msg: PicoMessage = JSON.parse(event.data)
handlePicoMessage(msg)
} catch {
console.warn("Non-JSON message from pico:", event.data)
}
}
socket.onclose = () => {
setConnectionState("disconnected")
wsRef.current = null
isConnectingRef.current = false
}
socket.onerror = () => {
setConnectionState("error")
isConnectingRef.current = false
}
wsRef.current = socket
} catch (err) {
console.error("Failed to connect to pico:", err)
setConnectionState("error")
isConnectingRef.current = false
}
}, [handlePicoMessage])
const disconnect = useCallback(() => {
if (wsRef.current) {
wsRef.current.close()
wsRef.current = null
}
setConnectionState("disconnected")
isConnectingRef.current = false
}, [])
// Auto connect/disconnect based on gateway state
useEffect(() => {
// Wrap in setTimeout to avoid React calling setState synchronously during render
const timerId = setTimeout(() => {
if (gatewayState === "running") {
connect()
} else {
disconnect()
}
}, 0)
return () => clearTimeout(timerId)
}, [gatewayState, connect, disconnect])
// Cleanup on unmount
useEffect(() => {
return () => disconnect()
}, [disconnect])
const sendMessage = useCallback(
(content: string) => {
if (!wsRef.current || wsRef.current.readyState !== WebSocket.OPEN) {
console.warn("WebSocket not connected")
return
}
const id = `msg-${++msgIdCounter.current}-${Date.now()}`
const timestampRaw = Date.now()
// Add user message to local state
setTrackedMessages((prev) => [
...prev,
{ id, role: "user", content, timestamp: timestampRaw },
])
// Show typing indicator immediately
setIsTyping(true)
// Send via Pico Protocol
const picoMsg: PicoMessage = {
type: "message.send",
id,
payload: { content },
}
wsRef.current.send(JSON.stringify(picoMsg))
},
[setTrackedMessages],
)
// Switch to a historical session
const switchSession = useCallback(
async (sessionId: string) => {
if (sessionId === activeSessionIdRef.current) {
return
}
try {
const historyMessages = await loadSessionMessages(sessionId)
// Only switch the active websocket session after history has loaded successfully.
disconnect()
setActiveSessionId(sessionId)
setIsTyping(false)
setTrackedMessages(historyMessages)
} catch (err) {
console.error("Failed to load session history:", err)
toast.error(t("chat.historyOpenFailed"))
return
}
setTimeout(() => {
if (gatewayState === "running") {
connect()
}
}, 100)
},
[
connect,
disconnect,
gatewayState,
loadSessionMessages,
setTrackedMessages,
t,
],
)
// Start a new empty chat
const newChat = useCallback(() => {
if (messages.length === 0) {
return
}
disconnect()
const newId = generateSessionId()
setActiveSessionId(newId)
setTrackedMessages([])
setIsTyping(false)
// Reconnect with the fresh session
setTimeout(() => {
if (gatewayState === "running") {
connect()
}
}, 100)
}, [disconnect, connect, gatewayState, messages.length, setTrackedMessages])
const { messages, connectionState, isTyping, activeSessionId } =
useAtomValue(chatAtom)
return {
messages,
connectionState,
isTyping,
activeSessionId,
sendMessage,
switchSession,
newChat,
sendMessage: sendChatMessage,
switchSession: switchChatSession,
newChat: newChatSession,
}
}
@@ -0,0 +1,405 @@
import { getDefaultStore } from "jotai"
import { toast } from "sonner"
import { getPicoToken } from "@/api/pico"
import { getSessionHistory } from "@/api/sessions"
import i18n from "@/i18n"
import {
clearStoredSessionId,
generateSessionId,
normalizeUnixTimestamp,
readStoredSessionId,
} from "@/lib/pico-chat-state"
import { type ChatMessage, getChatState, updateChatStore } from "@/store/chat"
import { gatewayAtom } from "@/store/gateway"
interface PicoMessage {
type: string
id?: string
session_id?: string
timestamp?: number | string
payload?: Record<string, unknown>
}
const store = getDefaultStore()
let wsRef: WebSocket | null = null
let isConnecting = false
let msgIdCounter = 0
let activeSessionIdRef = getChatState().activeSessionId
let initialized = false
let unsubscribeGateway: (() => void) | null = null
let hydratePromise: Promise<void> | null = null
let connectionGeneration = 0
async function loadSessionMessages(sessionId: string): Promise<ChatMessage[]> {
const detail = await getSessionHistory(sessionId)
const fallbackTime = detail.updated
return detail.messages.map((message, index) => ({
id: `hist-${index}-${Date.now()}`,
role: message.role,
content: message.content,
timestamp: fallbackTime,
}))
}
function handlePicoMessage(message: PicoMessage) {
const payload = message.payload || {}
switch (message.type) {
case "message.create": {
const content = (payload.content as string) || ""
const messageId = (payload.message_id as string) || `pico-${Date.now()}`
const timestamp =
message.timestamp !== undefined &&
Number.isFinite(Number(message.timestamp))
? normalizeUnixTimestamp(Number(message.timestamp))
: Date.now()
updateChatStore((prev) => ({
messages: [
...prev.messages,
{
id: messageId,
role: "assistant",
content,
timestamp,
},
],
isTyping: false,
}))
break
}
case "message.update": {
const content = (payload.content as string) || ""
const messageId = payload.message_id as string
if (!messageId) {
break
}
updateChatStore((prev) => ({
messages: prev.messages.map((msg) =>
msg.id === messageId ? { ...msg, content } : msg,
),
}))
break
}
case "typing.start":
updateChatStore({ isTyping: true })
break
case "typing.stop":
updateChatStore({ isTyping: false })
break
case "error":
console.error("Pico error:", payload)
updateChatStore({ isTyping: false })
break
case "pong":
break
default:
console.log("Unknown pico message type:", message.type)
}
}
function setActiveSessionId(sessionId: string) {
activeSessionIdRef = sessionId
updateChatStore({ activeSessionId: sessionId })
}
export async function connectChat() {
if (store.get(gatewayAtom).status !== "running") {
return
}
if (
isConnecting ||
(wsRef &&
(wsRef.readyState === WebSocket.OPEN ||
wsRef.readyState === WebSocket.CONNECTING))
) {
return
}
const generation = connectionGeneration + 1
connectionGeneration = generation
isConnecting = true
updateChatStore({ connectionState: "connecting" })
try {
const { token, ws_url } = await getPicoToken()
if (generation !== connectionGeneration) {
return
}
if (!token) {
console.error("No pico token available")
updateChatStore({ connectionState: "error" })
isConnecting = false
return
}
let finalWsUrl = ws_url
try {
const parsedUrl = new URL(ws_url)
const isLocalHost =
parsedUrl.hostname === "localhost" ||
parsedUrl.hostname === "127.0.0.1" ||
parsedUrl.hostname === "0.0.0.0"
const isBrowserLocal =
window.location.hostname === "localhost" ||
window.location.hostname === "127.0.0.1"
if (isLocalHost && !isBrowserLocal) {
parsedUrl.hostname = window.location.hostname
finalWsUrl = parsedUrl.toString()
}
} catch (error) {
console.warn("Could not parse ws_url:", error)
}
const url = `${finalWsUrl}?token=${encodeURIComponent(token)}&session_id=${encodeURIComponent(activeSessionIdRef)}`
const socket = new WebSocket(url)
if (generation !== connectionGeneration) {
socket.close()
return
}
socket.onopen = () => {
if (wsRef !== socket) {
return
}
updateChatStore({ connectionState: "connected" })
isConnecting = false
}
socket.onmessage = (event) => {
try {
const message: PicoMessage = JSON.parse(event.data)
handlePicoMessage(message)
} catch {
console.warn("Non-JSON message from pico:", event.data)
}
}
socket.onclose = () => {
if (wsRef !== socket) {
return
}
wsRef = null
isConnecting = false
updateChatStore({
connectionState: "disconnected",
isTyping: false,
})
}
socket.onerror = () => {
if (wsRef !== socket) {
return
}
isConnecting = false
updateChatStore({ connectionState: "error" })
}
wsRef = socket
} catch (error) {
if (generation !== connectionGeneration) {
return
}
console.error("Failed to connect to pico:", error)
updateChatStore({ connectionState: "error" })
isConnecting = false
}
}
export function disconnectChat() {
connectionGeneration += 1
const socket = wsRef
wsRef = null
isConnecting = false
if (socket) {
socket.close()
}
updateChatStore({
connectionState: "disconnected",
isTyping: false,
})
}
export async function hydrateActiveSession() {
if (hydratePromise) {
return hydratePromise
}
const state = getChatState()
const storedSessionId = readStoredSessionId()
if (
!storedSessionId ||
state.hasHydratedActiveSession ||
state.messages.length > 0 ||
storedSessionId !== state.activeSessionId
) {
if (!state.hasHydratedActiveSession) {
updateChatStore({ hasHydratedActiveSession: true })
}
return
}
hydratePromise = loadSessionMessages(storedSessionId)
.then((historyMessages) => {
const currentState = getChatState()
if (currentState.activeSessionId !== storedSessionId) {
return
}
if (currentState.messages.length > 0) {
updateChatStore({ hasHydratedActiveSession: true })
return
}
updateChatStore({
messages: historyMessages,
isTyping: false,
hasHydratedActiveSession: true,
})
})
.catch((error) => {
console.error("Failed to restore last session history:", error)
const currentState = getChatState()
if (currentState.activeSessionId !== storedSessionId) {
return
}
if (currentState.messages.length > 0) {
updateChatStore({ hasHydratedActiveSession: true })
return
}
clearStoredSessionId()
updateChatStore({
messages: [],
isTyping: false,
hasHydratedActiveSession: true,
})
})
.finally(() => {
hydratePromise = null
})
return hydratePromise
}
export function sendChatMessage(content: string) {
if (!wsRef || wsRef.readyState !== WebSocket.OPEN) {
console.warn("WebSocket not connected")
return
}
const id = `msg-${++msgIdCounter}-${Date.now()}`
updateChatStore((prev) => ({
messages: [
...prev.messages,
{ id, role: "user", content, timestamp: Date.now() },
],
isTyping: true,
}))
wsRef.send(
JSON.stringify({
type: "message.send",
id,
payload: { content },
}),
)
}
export async function switchChatSession(sessionId: string) {
if (sessionId === activeSessionIdRef) {
return
}
try {
const historyMessages = await loadSessionMessages(sessionId)
disconnectChat()
setActiveSessionId(sessionId)
updateChatStore({
messages: historyMessages,
isTyping: false,
hasHydratedActiveSession: true,
})
if (store.get(gatewayAtom).status === "running") {
await connectChat()
}
} catch (error) {
console.error("Failed to load session history:", error)
toast.error(i18n.t("chat.historyOpenFailed"))
}
}
export async function newChatSession() {
if (getChatState().messages.length === 0) {
return
}
disconnectChat()
setActiveSessionId(generateSessionId())
updateChatStore({
messages: [],
isTyping: false,
hasHydratedActiveSession: true,
})
if (store.get(gatewayAtom).status === "running") {
await connectChat()
}
}
export function initializeChatStore() {
if (initialized) {
return
}
initialized = true
activeSessionIdRef = getChatState().activeSessionId
const syncConnectionWithGateway = () => {
if (store.get(gatewayAtom).status === "running") {
void connectChat()
return
}
disconnectChat()
}
unsubscribeGateway = store.sub(gatewayAtom, syncConnectionWithGateway)
if (!readStoredSessionId()) {
updateChatStore({ hasHydratedActiveSession: true })
}
syncConnectionWithGateway()
}
export function teardownChatStore() {
unsubscribeGateway?.()
unsubscribeGateway = null
initialized = false
disconnectChat()
}
+59
View File
@@ -0,0 +1,59 @@
const LAST_SESSION_STORAGE_KEY = "picoclaw:last-session-id"
const UNIX_MS_THRESHOLD = 1e12
function readStorageValue() {
return (
globalThis.localStorage?.getItem(LAST_SESSION_STORAGE_KEY)?.trim() || ""
)
}
export function readStoredSessionId(): string {
return readStorageValue()
}
export function writeStoredSessionId(sessionId: string) {
if (sessionId) {
globalThis.localStorage?.setItem(LAST_SESSION_STORAGE_KEY, sessionId)
return
}
globalThis.localStorage?.removeItem(LAST_SESSION_STORAGE_KEY)
}
export function clearStoredSessionId() {
globalThis.localStorage?.removeItem(LAST_SESSION_STORAGE_KEY)
}
export function generateSessionId(): string {
const webCrypto = globalThis.crypto
if (webCrypto && typeof webCrypto.randomUUID === "function") {
return webCrypto.randomUUID()
}
if (webCrypto && typeof webCrypto.getRandomValues === "function") {
const bytes = new Uint8Array(16)
webCrypto.getRandomValues(bytes)
bytes[6] = (bytes[6] & 0x0f) | 0x40
bytes[8] = (bytes[8] & 0x3f) | 0x80
const hex = Array.from(bytes, (b) => b.toString(16).padStart(2, "0"))
return (
`${hex[0]}${hex[1]}${hex[2]}${hex[3]}-` +
`${hex[4]}${hex[5]}-` +
`${hex[6]}${hex[7]}-` +
`${hex[8]}${hex[9]}-` +
`${hex[10]}${hex[11]}${hex[12]}${hex[13]}${hex[14]}${hex[15]}`
)
}
return `session-${Date.now()}-${Math.random().toString(16).slice(2, 10)}`
}
export function getInitialActiveSessionId(): string {
return readStorageValue() || generateSessionId()
}
export function normalizeUnixTimestamp(timestamp: number): number {
return timestamp < UNIX_MS_THRESHOLD ? timestamp * 1000 : timestamp
}
+6
View File
@@ -1,9 +1,15 @@
import { Outlet, createRootRoute } from "@tanstack/react-router"
import { TanStackRouterDevtools } from "@tanstack/react-router-devtools"
import { useEffect } from "react"
import { AppLayout } from "@/components/app-layout"
import { initializeChatStore } from "@/lib/pico-chat-controller"
const RootLayout = () => {
useEffect(() => {
initializeChatStore()
}, [])
return (
<AppLayout>
<Outlet />
+62
View File
@@ -0,0 +1,62 @@
import { atom, getDefaultStore } from "jotai"
import {
getInitialActiveSessionId,
writeStoredSessionId,
} from "@/lib/pico-chat-state"
export interface ChatMessage {
id: string
role: "user" | "assistant"
content: string
timestamp: number | string
}
export type ConnectionState =
| "disconnected"
| "connecting"
| "connected"
| "error"
export interface ChatStoreState {
messages: ChatMessage[]
connectionState: ConnectionState
isTyping: boolean
activeSessionId: string
hasHydratedActiveSession: boolean
}
type ChatStorePatch = Partial<ChatStoreState>
const DEFAULT_CHAT_STATE: ChatStoreState = {
messages: [],
connectionState: "disconnected",
isTyping: false,
activeSessionId: getInitialActiveSessionId(),
hasHydratedActiveSession: false,
}
export const chatAtom = atom<ChatStoreState>(DEFAULT_CHAT_STATE)
const store = getDefaultStore()
export function getChatState() {
return store.get(chatAtom)
}
export function updateChatStore(
patch:
| ChatStorePatch
| ((prev: ChatStoreState) => ChatStorePatch | ChatStoreState),
) {
store.set(chatAtom, (prev) => {
const nextPatch = typeof patch === "function" ? patch(prev) : patch
const next = { ...prev, ...nextPatch }
if (next.activeSessionId !== prev.activeSessionId) {
writeStoredSessionId(next.activeSessionId)
}
return next
})
}
+1
View File
@@ -1 +1,2 @@
export * from "./gateway"
export * from "./chat"