add systray ui for all platform (#1649)

* add systray ui for all platform

* update from getlantern/systray to fyne.io/systray for fix test
This commit is contained in:
Cytown
2026-03-17 14:12:32 +08:00
committed by GitHub
parent cef0f28881
commit e41423483e
15 changed files with 674 additions and 231 deletions
+18 -3
View File
@@ -40,9 +40,13 @@ func (b *EventBroadcaster) Subscribe() chan string {
// Unsubscribe removes a listener channel and closes it.
func (b *EventBroadcaster) Unsubscribe(ch chan string) {
b.mu.Lock()
delete(b.clients, ch)
b.mu.Unlock()
close(ch)
defer b.mu.Unlock()
// Check if the channel is still registered before closing
if _, exists := b.clients[ch]; exists {
delete(b.clients, ch)
close(ch)
}
}
// Broadcast sends a GatewayEvent to all connected SSE clients.
@@ -63,3 +67,14 @@ func (b *EventBroadcaster) Broadcast(event GatewayEvent) {
}
}
}
// Shutdown closes all subscriber channels, notifying all SSE clients to disconnect.
// This should be called when the server is shutting down.
func (b *EventBroadcaster) Shutdown() {
// Close all channels to notify listeners
for ch := range b.clients {
b.Unsubscribe(ch)
}
// Clear the map
b.clients = make(map[chan string]struct{})
}
+248 -125
View File
@@ -3,10 +3,10 @@ package api
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"os/exec"
@@ -18,6 +18,7 @@ import (
"time"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/health"
"github.com/sipeed/picoclaw/web/backend/utils"
)
@@ -48,6 +49,27 @@ var gatewayHealthGet = func(url string, timeout time.Duration) (*http.Response,
return client.Get(url)
}
// getGatewayHealth checks the gateway health endpoint and returns the status response
// Returns (*health.StatusResponse, statusCode, error). If error is not nil, the other values are not valid.
func getGatewayHealth(port int, timeout time.Duration) (*health.StatusResponse, int, error) {
if port == 0 {
port = 18790
}
url := fmt.Sprintf("http://127.0.0.1:%d/health", port)
resp, err := gatewayHealthGet(url, timeout)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
var healthResponse health.StatusResponse
if decErr := json.NewDecoder(resp.Body).Decode(&healthResponse); decErr != nil {
return nil, resp.StatusCode, decErr
}
return &healthResponse, resp.StatusCode, nil
}
// registerGatewayRoutes binds gateway lifecycle endpoints to the ServeMux.
func (h *Handler) registerGatewayRoutes(mux *http.ServeMux) {
mux.HandleFunc("GET /api/gateway/status", h.handleGatewayStatus)
@@ -62,12 +84,35 @@ func (h *Handler) registerGatewayRoutes(mux *http.ServeMux) {
// TryAutoStartGateway checks whether gateway start preconditions are met and
// starts it when possible. Intended to be called by the backend at startup.
func (h *Handler) TryAutoStartGateway() {
// Check if gateway is already running via health endpoint
cfg, cfgErr := config.LoadConfig(h.configPath)
if cfgErr == nil && cfg != nil {
healthResp, statusCode, err := getGatewayHealth(cfg.Gateway.Port, 2*time.Second)
if err == nil && statusCode == http.StatusOK {
// Gateway is already running, attach to the existing process
pid := healthResp.Pid
gateway.mu.Lock()
defer gateway.mu.Unlock()
ready, reason, err := h.gatewayStartReady()
if err != nil {
log.Printf("Skip auto-starting gateway: %v", err)
return
}
if !ready {
log.Printf("Skip auto-starting gateway: %s", reason)
return
}
_, err = h.startGatewayLocked("starting", pid)
if err != nil {
log.Printf("Failed to attach to running gateway (PID: %d): %v", pid, err)
}
return
}
}
gateway.mu.Lock()
defer gateway.mu.Unlock()
if isGatewayProcessAliveLocked() {
return
}
if gateway.cmd != nil && gateway.cmd.Process != nil {
gateway.cmd = nil
}
@@ -82,7 +127,7 @@ func (h *Handler) TryAutoStartGateway() {
return
}
pid, err := h.startGatewayLocked("starting")
pid, err := h.startGatewayLocked("starting", 0)
if err != nil {
log.Printf("Failed to auto-start gateway: %v", err)
return
@@ -125,10 +170,6 @@ func lookupModelConfig(cfg *config.Config, modelName string) *config.ModelConfig
return modelCfg
}
func isGatewayProcessAliveLocked() bool {
return isCmdProcessAliveLocked(gateway.cmd)
}
func isCmdProcessAliveLocked(cmd *exec.Cmd) bool {
if cmd == nil || cmd.Process == nil {
return false
@@ -157,6 +198,28 @@ func setGatewayRuntimeStatusLocked(status string) {
gateway.startupDeadline = time.Time{}
}
// attachToGatewayProcess attaches to an existing gateway process by PID
// and updates the gateway state accordingly.
// Assumes gateway.mu is held by the caller.
func attachToGatewayProcessLocked(pid int, cfg *config.Config) error {
process, err := os.FindProcess(pid)
if err != nil {
return fmt.Errorf("failed to find process for PID %d: %w", pid, err)
}
gateway.cmd = &exec.Cmd{Process: process}
setGatewayRuntimeStatusLocked("running")
// Update bootDefaultModel from config
if cfg != nil {
defaultModelName := strings.TrimSpace(cfg.Agents.Defaults.GetModelName())
gateway.bootDefaultModel = defaultModelName
}
log.Printf("Attached to gateway process (PID: %d)", pid)
return nil
}
func gatewayStatusOnHealthFailureLocked() string {
if gateway.runtimeStatus == "starting" || gateway.runtimeStatus == "restarting" {
if gateway.startupDeadline.IsZero() || time.Now().Before(gateway.startupDeadline) {
@@ -238,24 +301,41 @@ func stopGatewayProcessForRestart(cmd *exec.Cmd) error {
return fmt.Errorf("existing gateway did not exit before restart")
}
func gatewayRestartRequired(status, bootDefaultModel, configDefaultModel string) bool {
return status == "running" &&
bootDefaultModel != "" &&
configDefaultModel != "" &&
bootDefaultModel != configDefaultModel
}
func (h *Handler) startGatewayLocked(initialStatus string) (int, error) {
func (h *Handler) startGatewayLocked(initialStatus string, existingPid int) (int, error) {
cfg, err := config.LoadConfig(h.configPath)
if err != nil {
return 0, fmt.Errorf("failed to load config: %w", err)
}
defaultModelName := strings.TrimSpace(cfg.Agents.Defaults.GetModelName())
var cmd *exec.Cmd
var pid int
if existingPid > 0 {
// Attach to existing process
pid = existingPid
gateway.cmd = nil // Clear first to ensure clean state
if err = attachToGatewayProcessLocked(pid, cfg); err != nil {
return 0, err
}
// Broadcast the attached state
gateway.events.Broadcast(GatewayEvent{
Status: initialStatus,
PID: pid,
BootDefaultModel: defaultModelName,
ConfigDefaultModel: defaultModelName,
RestartRequired: false,
})
return pid, nil
}
// Start new process
// Locate the picoclaw executable
execPath := utils.FindPicoclawBinary()
cmd := exec.Command(execPath, "gateway")
cmd = exec.Command(execPath, "gateway")
cmd.Env = os.Environ()
// Forward the launcher's config path via the environment variable that
// GetConfigPath() already reads, so the gateway sub-process uses the same
@@ -293,7 +373,7 @@ func (h *Handler) startGatewayLocked(initialStatus string) (int, error) {
gateway.cmd = cmd
gateway.bootDefaultModel = defaultModelName
setGatewayRuntimeStatusLocked(initialStatus)
pid := cmd.Process.Pid
pid = cmd.Process.Pid
log.Printf("Started picoclaw gateway (PID: %d) from %s", pid, execPath)
// Broadcast the launch state immediately so clients can reflect it without polling.
@@ -351,30 +431,22 @@ func (h *Handler) startGatewayLocked(initialStatus string) (int, error) {
if err != nil {
continue
}
healthHost := gatewayProbeHost(h.effectiveGatewayBindHost(cfg))
healthPort := cfg.Gateway.Port
if healthPort == 0 {
healthPort = 18790
}
healthURL := fmt.Sprintf("http://%s/health", net.JoinHostPort(healthHost, strconv.Itoa(healthPort)))
resp, err := gatewayHealthGet(healthURL, 1*time.Second)
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
gateway.mu.Lock()
if gateway.cmd == cmd {
setGatewayRuntimeStatusLocked("running")
}
gateway.mu.Unlock()
gateway.events.Broadcast(GatewayEvent{
Status: "running",
PID: pid,
BootDefaultModel: defaultModelName,
ConfigDefaultModel: defaultModelName,
RestartRequired: false,
})
return
healthResp, statusCode, err := getGatewayHealth(cfg.Gateway.Port, 1*time.Second)
if err == nil && statusCode == http.StatusOK && healthResp.Pid == pid {
// Verify the health endpoint returns the expected pid
gateway.mu.Lock()
if gateway.cmd == cmd {
setGatewayRuntimeStatusLocked("running")
}
gateway.mu.Unlock()
gateway.events.Broadcast(GatewayEvent{
Status: "running",
PID: pid,
BootDefaultModel: defaultModelName,
ConfigDefaultModel: defaultModelName,
RestartRequired: false,
})
return
}
}
}()
@@ -386,19 +458,54 @@ func (h *Handler) startGatewayLocked(initialStatus string) (int, error) {
//
// POST /api/gateway/start
func (h *Handler) handleGatewayStart(w http.ResponseWriter, r *http.Request) {
// Prevent duplicate starts by checking health endpoint
cfg, cfgErr := config.LoadConfig(h.configPath)
if cfgErr == nil && cfg != nil {
healthResp, statusCode, err := getGatewayHealth(cfg.Gateway.Port, 2*time.Second)
if err == nil && statusCode == http.StatusOK {
// Gateway is already running, attach to the existing process
pid := healthResp.Pid
gateway.mu.Lock()
ready, reason, err := h.gatewayStartReady()
if err != nil {
gateway.mu.Unlock()
http.Error(
w,
fmt.Sprintf("Failed to validate gateway start conditions: %v", err),
http.StatusInternalServerError,
)
return
}
if !ready {
gateway.mu.Unlock()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]any{
"status": "precondition_failed",
"message": reason,
})
return
}
_, err = h.startGatewayLocked("starting", pid)
gateway.mu.Unlock()
if err != nil {
log.Printf("Failed to attach to running gateway (PID: %d): %v", pid, err)
http.Error(w, fmt.Sprintf("Failed to attach to gateway: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]any{
"status": "ok",
"pid": pid,
})
return
}
}
gateway.mu.Lock()
defer gateway.mu.Unlock()
// Prevent duplicate starts
if isGatewayProcessAliveLocked() {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusConflict)
json.NewEncoder(w).Encode(map[string]any{
"status": "already_running",
"pid": gateway.cmd.Process.Pid,
})
return
}
if gateway.cmd != nil && gateway.cmd.Process != nil {
gateway.cmd = nil
setGatewayRuntimeStatusLocked("stopped")
@@ -423,7 +530,7 @@ func (h *Handler) handleGatewayStart(w http.ResponseWriter, r *http.Request) {
return
}
pid, err := h.startGatewayLocked("starting")
pid, err := h.startGatewayLocked("starting", 0)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to start gateway: %v", err), http.StatusInternalServerError)
return
@@ -475,27 +582,16 @@ func (h *Handler) handleGatewayStop(w http.ResponseWriter, r *http.Request) {
})
}
// handleGatewayRestart stops the gateway (if running) and starts a new instance.
//
// POST /api/gateway/restart
func (h *Handler) handleGatewayRestart(w http.ResponseWriter, r *http.Request) {
// RestartGateway restarts the gateway process. This is a non-blocking operation
// that stops the current gateway (if running) and starts a new one.
// Returns the PID of the new gateway process or an error.
func (h *Handler) RestartGateway() (int, error) {
ready, reason, err := h.gatewayStartReady()
if err != nil {
http.Error(
w,
fmt.Sprintf("Failed to validate gateway start conditions: %v", err),
http.StatusInternalServerError,
)
return
return 0, fmt.Errorf("failed to validate gateway start conditions: %w", err)
}
if !ready {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]any{
"status": "precondition_failed",
"message": reason,
})
return
return 0, &preconditionFailedError{reason: reason}
}
gateway.mu.Lock()
@@ -519,8 +615,7 @@ func (h *Handler) handleGatewayRestart(w http.ResponseWriter, r *http.Request) {
}
}
gateway.mu.Unlock()
http.Error(w, fmt.Sprintf("Failed to restart gateway: %v", err), http.StatusInternalServerError)
return
return 0, fmt.Errorf("failed to stop gateway: %w", err)
}
gateway.mu.Lock()
@@ -528,7 +623,7 @@ func (h *Handler) handleGatewayRestart(w http.ResponseWriter, r *http.Request) {
gateway.cmd = nil
gateway.bootDefaultModel = ""
}
pid, err := h.startGatewayLocked("restarting")
pid, err := h.startGatewayLocked("restarting", 0)
if err != nil {
gateway.cmd = nil
gateway.bootDefaultModel = ""
@@ -536,6 +631,43 @@ func (h *Handler) handleGatewayRestart(w http.ResponseWriter, r *http.Request) {
}
gateway.mu.Unlock()
if err != nil {
return 0, fmt.Errorf("failed to start gateway: %w", err)
}
return pid, nil
}
// preconditionFailedError is returned when gateway restart preconditions are not met
type preconditionFailedError struct {
reason string
}
func (e *preconditionFailedError) Error() string {
return e.reason
}
// IsBadRequest returns true if the error should result in a 400 Bad Request status
func (e *preconditionFailedError) IsBadRequest() bool {
return true
}
// handleGatewayRestart stops the gateway (if running) and starts a new instance.
//
// POST /api/gateway/restart
func (h *Handler) handleGatewayRestart(w http.ResponseWriter, r *http.Request) {
pid, err := h.RestartGateway()
if err != nil {
// Check if it's a precondition failed error
var precondErr *preconditionFailedError
if errors.As(err, &precondErr) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]any{
"status": "precondition_failed",
"message": precondErr.reason,
})
return
}
http.Error(w, fmt.Sprintf("Failed to restart gateway: %v", err), http.StatusInternalServerError)
return
}
@@ -573,83 +705,74 @@ func (h *Handler) handleGatewayStatus(w http.ResponseWriter, r *http.Request) {
func (h *Handler) gatewayStatusData() map[string]any {
data := map[string]any{}
cfg, cfgErr := config.LoadConfig(h.configPath)
configDefaultModel := ""
if cfgErr == nil && cfg != nil {
configDefaultModel = strings.TrimSpace(cfg.Agents.Defaults.GetModelName())
configDefaultModel := strings.TrimSpace(cfg.Agents.Defaults.GetModelName())
if configDefaultModel != "" {
data["config_default_model"] = configDefaultModel
}
}
// Check process state
gateway.mu.Lock()
processAlive := isGatewayProcessAliveLocked()
bootDefaultModel := ""
if processAlive {
data["pid"] = gateway.cmd.Process.Pid
if gateway.bootDefaultModel != "" {
data["boot_default_model"] = gateway.bootDefaultModel
bootDefaultModel = gateway.bootDefaultModel
}
// Probe health endpoint to get pid and status
port := 0
if cfgErr == nil && cfg != nil {
port = cfg.Gateway.Port
}
gateway.mu.Unlock()
if !processAlive {
healthResp, statusCode, err := getGatewayHealth(port, 2*time.Second)
if err != nil {
gateway.mu.Lock()
data["gateway_status"] = currentGatewayStatusLocked(false)
data["gateway_status"] = currentGatewayStatusLocked(true)
gateway.mu.Unlock()
log.Printf("Gateway health check failed: %v", err)
} else {
// Process is alive — probe its health endpoint
host := "127.0.0.1"
port := 18790
if cfgErr == nil && cfg != nil {
host = gatewayProbeHost(h.effectiveGatewayBindHost(cfg))
if cfg.Gateway.Port != 0 {
port = cfg.Gateway.Port
}
}
url := fmt.Sprintf("http://%s/health", net.JoinHostPort(host, strconv.Itoa(port)))
resp, err := gatewayHealthGet(url, 2*time.Second)
if err != nil {
log.Printf("Gateway health status: %d", statusCode)
if statusCode != http.StatusOK {
gateway.mu.Lock()
data["gateway_status"] = currentGatewayStatusLocked(true)
setGatewayRuntimeStatusLocked("error")
gateway.mu.Unlock()
data["gateway_status"] = "error"
data["status_code"] = statusCode
} else {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
gateway.mu.Lock()
setGatewayRuntimeStatusLocked("error")
gateway.mu.Unlock()
data["gateway_status"] = "error"
data["status_code"] = resp.StatusCode
gateway.mu.Lock()
// Check if this pid matches our tracked process
if gateway.cmd != nil && gateway.cmd.Process != nil && gateway.cmd.Process.Pid == healthResp.Pid {
setGatewayRuntimeStatusLocked("running")
bootDefaultModel := gateway.bootDefaultModel
if bootDefaultModel != "" {
data["boot_default_model"] = bootDefaultModel
}
data["gateway_status"] = "running"
data["pid"] = healthResp.Pid
} else {
var healthData map[string]any
if decErr := json.NewDecoder(resp.Body).Decode(&healthData); decErr != nil {
gateway.mu.Lock()
// Health endpoint responded with a different pid
// This could be a manual restart, try to attach to the new process
oldPid := "none"
if gateway.cmd != nil && gateway.cmd.Process != nil {
oldPid = fmt.Sprintf("%d", gateway.cmd.Process.Pid)
}
log.Printf("Detected new gateway PID (old: %s, new: %d), attempting to attach", oldPid, healthResp.Pid)
if err := attachToGatewayProcessLocked(healthResp.Pid, cfg); err != nil {
// Failed to find the process, treat as error
setGatewayRuntimeStatusLocked("error")
gateway.mu.Unlock()
data["gateway_status"] = "error"
data["pid"] = healthResp.Pid
log.Printf("Failed to attach to new gateway process (PID: %d): %v", healthResp.Pid, err)
} else {
gateway.mu.Lock()
setGatewayRuntimeStatusLocked("running")
gateway.mu.Unlock()
for k, v := range healthData {
data[k] = v
// Successfully attached, update response data
bootDefaultModel := gateway.bootDefaultModel
if bootDefaultModel != "" {
data["boot_default_model"] = bootDefaultModel
}
data["gateway_status"] = "running"
data["pid"] = healthResp.Pid
}
}
gateway.mu.Unlock()
}
}
status, _ := data["gateway_status"].(string)
data["gateway_restart_required"] = gatewayRestartRequired(
status,
bootDefaultModel,
configDefaultModel,
)
data["gateway_restart_required"] = false
ready, reason, readyErr := h.gatewayStartReady()
if readyErr != nil {
-54
View File
@@ -494,60 +494,6 @@ func TestGatewayStatusReturnsRestartingDuringRestartGap(t *testing.T) {
}
}
func TestGatewayStatusIncludesRestartRequiredWhenModelsDiffer(t *testing.T) {
resetGatewayTestState(t)
configPath := filepath.Join(t.TempDir(), "config.json")
cfg := config.DefaultConfig()
cfg.Agents.Defaults.ModelName = cfg.ModelList[0].ModelName
cfg.ModelList[0].APIKey = "test-key"
if err := config.SaveConfig(configPath, cfg); err != nil {
t.Fatalf("SaveConfig() error = %v", err)
}
h := NewHandler(configPath)
mux := http.NewServeMux()
h.RegisterRoutes(mux)
cmd := startLongRunningProcess(t)
t.Cleanup(func() {
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
_ = cmd.Wait()
})
gateway.mu.Lock()
gateway.cmd = cmd
gateway.bootDefaultModel = "previous-model"
setGatewayRuntimeStatusLocked("running")
gateway.mu.Unlock()
gatewayHealthGet = func(string, time.Duration) (*http.Response, error) {
rec := httptest.NewRecorder()
rec.WriteHeader(http.StatusOK)
_, _ = rec.WriteString(`{"ok":true}`)
return rec.Result(), nil
}
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/api/gateway/status", nil)
mux.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("status = %d, want %d", rec.Code, http.StatusOK)
}
var body map[string]any
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
t.Fatalf("unmarshal response: %v", err)
}
if got := body["gateway_restart_required"]; got != true {
t.Fatalf("gateway_restart_required = %#v, want true", got)
}
}
func TestGatewayRestartKeepsRunningProcessWhenPreconditionsFail(t *testing.T) {
configPath := filepath.Join(t.TempDir(), "config.json")
cfg := config.DefaultConfig()
+5
View File
@@ -70,3 +70,8 @@ func (h *Handler) RegisterRoutes(mux *http.ServeMux) {
// Launcher service parameters (port/public)
h.registerLauncherConfigRoutes(mux)
}
// Shutdown gracefully shuts down the handler, closing all SSE connections.
func (h *Handler) Shutdown() {
gateway.events.Shutdown()
}