mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
c69c48ad46
- add a dedicated /api/gateway/logs endpoint for incremental log polling - keep /api/gateway/status focused on runtime and health data only - update frontend log fetching to use the new API and add backend tests covering the status/logs separation and cleared-log behavior
775 lines
20 KiB
Go
775 lines
20 KiB
Go
package api
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/sipeed/picoclaw/pkg/config"
|
|
"github.com/sipeed/picoclaw/web/backend/utils"
|
|
)
|
|
|
|
// gateway holds the state for the managed gateway process.
|
|
var gateway = struct {
|
|
mu sync.Mutex
|
|
cmd *exec.Cmd
|
|
bootDefaultModel string
|
|
runtimeStatus string
|
|
startupDeadline time.Time
|
|
logs *LogBuffer
|
|
events *EventBroadcaster
|
|
}{
|
|
runtimeStatus: "stopped",
|
|
logs: NewLogBuffer(200),
|
|
events: NewEventBroadcaster(),
|
|
}
|
|
|
|
var (
|
|
gatewayStartupWindow = 15 * time.Second
|
|
gatewayRestartGracePeriod = 5 * time.Second
|
|
gatewayRestartForceKillWindow = 3 * time.Second
|
|
gatewayRestartPollInterval = 100 * time.Millisecond
|
|
)
|
|
|
|
var gatewayHealthGet = func(url string, timeout time.Duration) (*http.Response, error) {
|
|
client := http.Client{Timeout: timeout}
|
|
return client.Get(url)
|
|
}
|
|
|
|
// registerGatewayRoutes binds gateway lifecycle endpoints to the ServeMux.
|
|
func (h *Handler) registerGatewayRoutes(mux *http.ServeMux) {
|
|
mux.HandleFunc("GET /api/gateway/status", h.handleGatewayStatus)
|
|
mux.HandleFunc("GET /api/gateway/events", h.handleGatewayEvents)
|
|
mux.HandleFunc("GET /api/gateway/logs", h.handleGatewayLogs)
|
|
mux.HandleFunc("POST /api/gateway/logs/clear", h.handleGatewayClearLogs)
|
|
mux.HandleFunc("POST /api/gateway/start", h.handleGatewayStart)
|
|
mux.HandleFunc("POST /api/gateway/stop", h.handleGatewayStop)
|
|
mux.HandleFunc("POST /api/gateway/restart", h.handleGatewayRestart)
|
|
}
|
|
|
|
// TryAutoStartGateway checks whether gateway start preconditions are met and
|
|
// starts it when possible. Intended to be called by the backend at startup.
|
|
func (h *Handler) TryAutoStartGateway() {
|
|
gateway.mu.Lock()
|
|
defer gateway.mu.Unlock()
|
|
|
|
if isGatewayProcessAliveLocked() {
|
|
return
|
|
}
|
|
if gateway.cmd != nil && gateway.cmd.Process != nil {
|
|
gateway.cmd = nil
|
|
}
|
|
|
|
ready, reason, err := h.gatewayStartReady()
|
|
if err != nil {
|
|
log.Printf("Skip auto-starting gateway: %v", err)
|
|
return
|
|
}
|
|
if !ready {
|
|
log.Printf("Skip auto-starting gateway: %s", reason)
|
|
return
|
|
}
|
|
|
|
pid, err := h.startGatewayLocked("starting")
|
|
if err != nil {
|
|
log.Printf("Failed to auto-start gateway: %v", err)
|
|
return
|
|
}
|
|
log.Printf("Gateway auto-started (PID: %d)", pid)
|
|
}
|
|
|
|
// gatewayStartReady validates whether current config can start the gateway.
|
|
func (h *Handler) gatewayStartReady() (bool, string, error) {
|
|
cfg, err := config.LoadConfig(h.configPath)
|
|
if err != nil {
|
|
return false, "", fmt.Errorf("failed to load config: %w", err)
|
|
}
|
|
|
|
modelName := strings.TrimSpace(cfg.Agents.Defaults.GetModelName())
|
|
if modelName == "" {
|
|
return false, "no default model configured", nil
|
|
}
|
|
|
|
modelCfg := lookupModelConfig(cfg, modelName)
|
|
if modelCfg == nil {
|
|
return false, fmt.Sprintf("default model %q is invalid", modelName), nil
|
|
}
|
|
|
|
if !hasModelConfiguration(*modelCfg) {
|
|
return false, fmt.Sprintf("default model %q has no credentials configured", modelName), nil
|
|
}
|
|
if requiresRuntimeProbe(*modelCfg) && !probeLocalModelAvailability(*modelCfg) {
|
|
return false, fmt.Sprintf("default model %q is not reachable", modelName), nil
|
|
}
|
|
|
|
return true, "", nil
|
|
}
|
|
|
|
func lookupModelConfig(cfg *config.Config, modelName string) *config.ModelConfig {
|
|
modelCfg, err := cfg.GetModelConfig(modelName)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return modelCfg
|
|
}
|
|
|
|
func isGatewayProcessAliveLocked() bool {
|
|
return isCmdProcessAliveLocked(gateway.cmd)
|
|
}
|
|
|
|
func isCmdProcessAliveLocked(cmd *exec.Cmd) bool {
|
|
if cmd == nil || cmd.Process == nil {
|
|
return false
|
|
}
|
|
|
|
// Wait() sets ProcessState when the process exits; use it when available.
|
|
if cmd.ProcessState != nil && cmd.ProcessState.Exited() {
|
|
return false
|
|
}
|
|
|
|
// Windows does not support Signal(0) probing. If we still own cmd and it
|
|
// has not reported exit, treat it as alive.
|
|
if runtime.GOOS == "windows" {
|
|
return true
|
|
}
|
|
|
|
return cmd.Process.Signal(syscall.Signal(0)) == nil
|
|
}
|
|
|
|
func setGatewayRuntimeStatusLocked(status string) {
|
|
gateway.runtimeStatus = status
|
|
if status == "starting" || status == "restarting" {
|
|
gateway.startupDeadline = time.Now().Add(gatewayStartupWindow)
|
|
return
|
|
}
|
|
gateway.startupDeadline = time.Time{}
|
|
}
|
|
|
|
func gatewayStatusOnHealthFailureLocked() string {
|
|
if gateway.runtimeStatus == "starting" || gateway.runtimeStatus == "restarting" {
|
|
if gateway.startupDeadline.IsZero() || time.Now().Before(gateway.startupDeadline) {
|
|
return gateway.runtimeStatus
|
|
}
|
|
return "error"
|
|
}
|
|
if gateway.runtimeStatus == "running" {
|
|
return "running"
|
|
}
|
|
if gateway.runtimeStatus == "error" {
|
|
return "error"
|
|
}
|
|
return "error"
|
|
}
|
|
|
|
func currentGatewayStatusLocked(processAlive bool) string {
|
|
if !processAlive {
|
|
if gateway.runtimeStatus == "restarting" {
|
|
if gateway.startupDeadline.IsZero() || time.Now().Before(gateway.startupDeadline) {
|
|
return "restarting"
|
|
}
|
|
return "error"
|
|
}
|
|
if gateway.runtimeStatus == "error" {
|
|
return "error"
|
|
}
|
|
return "stopped"
|
|
}
|
|
return gatewayStatusOnHealthFailureLocked()
|
|
}
|
|
|
|
func waitForGatewayProcessExit(cmd *exec.Cmd, timeout time.Duration) bool {
|
|
if cmd == nil || cmd.Process == nil {
|
|
return true
|
|
}
|
|
|
|
deadline := time.Now().Add(timeout)
|
|
for {
|
|
if !isCmdProcessAliveLocked(cmd) {
|
|
return true
|
|
}
|
|
if time.Now().After(deadline) {
|
|
return false
|
|
}
|
|
time.Sleep(gatewayRestartPollInterval)
|
|
}
|
|
}
|
|
|
|
func stopGatewayProcessForRestart(cmd *exec.Cmd) error {
|
|
if cmd == nil || cmd.Process == nil || !isCmdProcessAliveLocked(cmd) {
|
|
return nil
|
|
}
|
|
|
|
var stopErr error
|
|
if runtime.GOOS == "windows" {
|
|
stopErr = cmd.Process.Kill()
|
|
} else {
|
|
stopErr = cmd.Process.Signal(syscall.SIGTERM)
|
|
}
|
|
if stopErr != nil && isCmdProcessAliveLocked(cmd) {
|
|
return fmt.Errorf("failed to stop existing gateway: %w", stopErr)
|
|
}
|
|
|
|
if waitForGatewayProcessExit(cmd, gatewayRestartGracePeriod) {
|
|
return nil
|
|
}
|
|
|
|
if runtime.GOOS != "windows" {
|
|
killErr := cmd.Process.Signal(syscall.SIGKILL)
|
|
if killErr != nil && isCmdProcessAliveLocked(cmd) {
|
|
return fmt.Errorf("failed to force-stop existing gateway: %w", killErr)
|
|
}
|
|
if waitForGatewayProcessExit(cmd, gatewayRestartForceKillWindow) {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return fmt.Errorf("existing gateway did not exit before restart")
|
|
}
|
|
|
|
func gatewayRestartRequired(status, bootDefaultModel, configDefaultModel string) bool {
|
|
return status == "running" &&
|
|
bootDefaultModel != "" &&
|
|
configDefaultModel != "" &&
|
|
bootDefaultModel != configDefaultModel
|
|
}
|
|
|
|
func (h *Handler) startGatewayLocked(initialStatus string) (int, error) {
|
|
cfg, err := config.LoadConfig(h.configPath)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to load config: %w", err)
|
|
}
|
|
defaultModelName := strings.TrimSpace(cfg.Agents.Defaults.GetModelName())
|
|
|
|
// Locate the picoclaw executable
|
|
execPath := utils.FindPicoclawBinary()
|
|
|
|
cmd := exec.Command(execPath, "gateway")
|
|
cmd.Env = os.Environ()
|
|
// Forward the launcher's config path via the environment variable that
|
|
// GetConfigPath() already reads, so the gateway sub-process uses the same
|
|
// config file without requiring a --config flag on the gateway subcommand.
|
|
if h.configPath != "" {
|
|
cmd.Env = append(cmd.Env, "PICOCLAW_CONFIG="+h.configPath)
|
|
}
|
|
if host := h.gatewayHostOverride(); host != "" {
|
|
cmd.Env = append(cmd.Env, "PICOCLAW_GATEWAY_HOST="+host)
|
|
}
|
|
|
|
stdoutPipe, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to create stdout pipe: %w", err)
|
|
}
|
|
|
|
stderrPipe, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to create stderr pipe: %w", err)
|
|
}
|
|
|
|
// Clear old logs for this new run
|
|
gateway.logs.Reset()
|
|
|
|
// Ensure Pico Channel is configured before starting gateway
|
|
if _, err := h.ensurePicoChannel(); err != nil {
|
|
log.Printf("Warning: failed to ensure pico channel: %v", err)
|
|
// Non-fatal: gateway can still start without pico channel
|
|
}
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
return 0, fmt.Errorf("failed to start gateway: %w", err)
|
|
}
|
|
|
|
gateway.cmd = cmd
|
|
gateway.bootDefaultModel = defaultModelName
|
|
setGatewayRuntimeStatusLocked(initialStatus)
|
|
pid := cmd.Process.Pid
|
|
log.Printf("Started picoclaw gateway (PID: %d) from %s", pid, execPath)
|
|
|
|
// Broadcast the launch state immediately so clients can reflect it without polling.
|
|
gateway.events.Broadcast(GatewayEvent{
|
|
Status: initialStatus,
|
|
PID: pid,
|
|
BootDefaultModel: defaultModelName,
|
|
ConfigDefaultModel: defaultModelName,
|
|
RestartRequired: false,
|
|
})
|
|
|
|
// Capture stdout/stderr in background
|
|
go scanPipe(stdoutPipe, gateway.logs)
|
|
go scanPipe(stderrPipe, gateway.logs)
|
|
|
|
// Wait for exit in background and clean up
|
|
go func() {
|
|
if err := cmd.Wait(); err != nil {
|
|
log.Printf("Gateway process exited: %v", err)
|
|
} else {
|
|
log.Printf("Gateway process exited normally")
|
|
}
|
|
|
|
gateway.mu.Lock()
|
|
shouldBroadcastStopped := false
|
|
if gateway.cmd == cmd {
|
|
gateway.cmd = nil
|
|
gateway.bootDefaultModel = ""
|
|
if gateway.runtimeStatus != "restarting" {
|
|
setGatewayRuntimeStatusLocked("stopped")
|
|
shouldBroadcastStopped = true
|
|
}
|
|
}
|
|
gateway.mu.Unlock()
|
|
|
|
if shouldBroadcastStopped {
|
|
gateway.events.Broadcast(GatewayEvent{
|
|
Status: "stopped",
|
|
RestartRequired: false,
|
|
})
|
|
}
|
|
}()
|
|
|
|
// Start a goroutine to probe health and broadcast "running" once ready
|
|
go func() {
|
|
for i := 0; i < 30; i++ { // try for up to 15 seconds
|
|
time.Sleep(500 * time.Millisecond)
|
|
gateway.mu.Lock()
|
|
stillOurs := gateway.cmd == cmd
|
|
gateway.mu.Unlock()
|
|
if !stillOurs {
|
|
return
|
|
}
|
|
cfg, err := config.LoadConfig(h.configPath)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
healthHost := gatewayProbeHost(h.effectiveGatewayBindHost(cfg))
|
|
healthPort := cfg.Gateway.Port
|
|
if healthPort == 0 {
|
|
healthPort = 18790
|
|
}
|
|
healthURL := fmt.Sprintf("http://%s/health", net.JoinHostPort(healthHost, strconv.Itoa(healthPort)))
|
|
resp, err := gatewayHealthGet(healthURL, 1*time.Second)
|
|
if err == nil {
|
|
resp.Body.Close()
|
|
if resp.StatusCode == http.StatusOK {
|
|
gateway.mu.Lock()
|
|
if gateway.cmd == cmd {
|
|
setGatewayRuntimeStatusLocked("running")
|
|
}
|
|
gateway.mu.Unlock()
|
|
gateway.events.Broadcast(GatewayEvent{
|
|
Status: "running",
|
|
PID: pid,
|
|
BootDefaultModel: defaultModelName,
|
|
ConfigDefaultModel: defaultModelName,
|
|
RestartRequired: false,
|
|
})
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return pid, nil
|
|
}
|
|
|
|
// handleGatewayStart starts the picoclaw gateway subprocess.
|
|
//
|
|
// POST /api/gateway/start
|
|
func (h *Handler) handleGatewayStart(w http.ResponseWriter, r *http.Request) {
|
|
gateway.mu.Lock()
|
|
defer gateway.mu.Unlock()
|
|
|
|
// Prevent duplicate starts
|
|
if isGatewayProcessAliveLocked() {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusConflict)
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "already_running",
|
|
"pid": gateway.cmd.Process.Pid,
|
|
})
|
|
return
|
|
}
|
|
if gateway.cmd != nil && gateway.cmd.Process != nil {
|
|
gateway.cmd = nil
|
|
setGatewayRuntimeStatusLocked("stopped")
|
|
}
|
|
|
|
ready, reason, err := h.gatewayStartReady()
|
|
if err != nil {
|
|
http.Error(
|
|
w,
|
|
fmt.Sprintf("Failed to validate gateway start conditions: %v", err),
|
|
http.StatusInternalServerError,
|
|
)
|
|
return
|
|
}
|
|
if !ready {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "precondition_failed",
|
|
"message": reason,
|
|
})
|
|
return
|
|
}
|
|
|
|
pid, err := h.startGatewayLocked("starting")
|
|
if err != nil {
|
|
http.Error(w, fmt.Sprintf("Failed to start gateway: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "ok",
|
|
"pid": pid,
|
|
})
|
|
}
|
|
|
|
// handleGatewayStop stops the running gateway subprocess gracefully.
|
|
//
|
|
// POST /api/gateway/stop
|
|
func (h *Handler) handleGatewayStop(w http.ResponseWriter, r *http.Request) {
|
|
gateway.mu.Lock()
|
|
defer gateway.mu.Unlock()
|
|
|
|
if gateway.cmd == nil || gateway.cmd.Process == nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "not_running",
|
|
})
|
|
return
|
|
}
|
|
|
|
pid := gateway.cmd.Process.Pid
|
|
|
|
// Send SIGTERM for graceful shutdown (SIGKILL on Windows)
|
|
var sigErr error
|
|
if runtime.GOOS == "windows" {
|
|
sigErr = gateway.cmd.Process.Kill()
|
|
} else {
|
|
sigErr = gateway.cmd.Process.Signal(syscall.SIGTERM)
|
|
}
|
|
|
|
if sigErr != nil {
|
|
http.Error(w, fmt.Sprintf("Failed to stop gateway (PID %d): %v", pid, sigErr), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
log.Printf("Sent stop signal to gateway (PID: %d)", pid)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "ok",
|
|
"pid": pid,
|
|
})
|
|
}
|
|
|
|
// handleGatewayRestart stops the gateway (if running) and starts a new instance.
|
|
//
|
|
// POST /api/gateway/restart
|
|
func (h *Handler) handleGatewayRestart(w http.ResponseWriter, r *http.Request) {
|
|
ready, reason, err := h.gatewayStartReady()
|
|
if err != nil {
|
|
http.Error(
|
|
w,
|
|
fmt.Sprintf("Failed to validate gateway start conditions: %v", err),
|
|
http.StatusInternalServerError,
|
|
)
|
|
return
|
|
}
|
|
if !ready {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "precondition_failed",
|
|
"message": reason,
|
|
})
|
|
return
|
|
}
|
|
|
|
gateway.mu.Lock()
|
|
previousCmd := gateway.cmd
|
|
setGatewayRuntimeStatusLocked("restarting")
|
|
gateway.events.Broadcast(GatewayEvent{
|
|
Status: "restarting",
|
|
RestartRequired: false,
|
|
})
|
|
gateway.mu.Unlock()
|
|
|
|
if err = stopGatewayProcessForRestart(previousCmd); err != nil {
|
|
gateway.mu.Lock()
|
|
if gateway.cmd == previousCmd {
|
|
if isCmdProcessAliveLocked(previousCmd) {
|
|
setGatewayRuntimeStatusLocked("running")
|
|
} else {
|
|
gateway.cmd = nil
|
|
gateway.bootDefaultModel = ""
|
|
setGatewayRuntimeStatusLocked("error")
|
|
}
|
|
}
|
|
gateway.mu.Unlock()
|
|
http.Error(w, fmt.Sprintf("Failed to restart gateway: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
gateway.mu.Lock()
|
|
if gateway.cmd == previousCmd {
|
|
gateway.cmd = nil
|
|
gateway.bootDefaultModel = ""
|
|
}
|
|
pid, err := h.startGatewayLocked("restarting")
|
|
if err != nil {
|
|
gateway.cmd = nil
|
|
gateway.bootDefaultModel = ""
|
|
setGatewayRuntimeStatusLocked("error")
|
|
}
|
|
gateway.mu.Unlock()
|
|
if err != nil {
|
|
http.Error(w, fmt.Sprintf("Failed to restart gateway: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "ok",
|
|
"pid": pid,
|
|
})
|
|
}
|
|
|
|
// handleGatewayClearLogs clears the in-memory gateway log buffer.
|
|
//
|
|
// POST /api/gateway/logs/clear
|
|
func (h *Handler) handleGatewayClearLogs(w http.ResponseWriter, r *http.Request) {
|
|
gateway.logs.Clear()
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "cleared",
|
|
"log_total": 0,
|
|
"log_run_id": gateway.logs.RunID(),
|
|
})
|
|
}
|
|
|
|
// handleGatewayStatus returns the gateway run status and health info.
|
|
//
|
|
// GET /api/gateway/status
|
|
func (h *Handler) handleGatewayStatus(w http.ResponseWriter, r *http.Request) {
|
|
data := h.gatewayStatusData()
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(data)
|
|
}
|
|
|
|
func (h *Handler) gatewayStatusData() map[string]any {
|
|
data := map[string]any{}
|
|
cfg, cfgErr := config.LoadConfig(h.configPath)
|
|
configDefaultModel := ""
|
|
if cfgErr == nil && cfg != nil {
|
|
configDefaultModel = strings.TrimSpace(cfg.Agents.Defaults.GetModelName())
|
|
if configDefaultModel != "" {
|
|
data["config_default_model"] = configDefaultModel
|
|
}
|
|
}
|
|
|
|
// Check process state
|
|
gateway.mu.Lock()
|
|
processAlive := isGatewayProcessAliveLocked()
|
|
bootDefaultModel := ""
|
|
if processAlive {
|
|
data["pid"] = gateway.cmd.Process.Pid
|
|
if gateway.bootDefaultModel != "" {
|
|
data["boot_default_model"] = gateway.bootDefaultModel
|
|
bootDefaultModel = gateway.bootDefaultModel
|
|
}
|
|
}
|
|
gateway.mu.Unlock()
|
|
|
|
if !processAlive {
|
|
gateway.mu.Lock()
|
|
data["gateway_status"] = currentGatewayStatusLocked(false)
|
|
gateway.mu.Unlock()
|
|
} else {
|
|
// Process is alive — probe its health endpoint
|
|
host := "127.0.0.1"
|
|
port := 18790
|
|
if cfgErr == nil && cfg != nil {
|
|
host = gatewayProbeHost(h.effectiveGatewayBindHost(cfg))
|
|
if cfg.Gateway.Port != 0 {
|
|
port = cfg.Gateway.Port
|
|
}
|
|
}
|
|
|
|
url := fmt.Sprintf("http://%s/health", net.JoinHostPort(host, strconv.Itoa(port)))
|
|
resp, err := gatewayHealthGet(url, 2*time.Second)
|
|
|
|
if err != nil {
|
|
gateway.mu.Lock()
|
|
data["gateway_status"] = currentGatewayStatusLocked(true)
|
|
gateway.mu.Unlock()
|
|
} else {
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
gateway.mu.Lock()
|
|
setGatewayRuntimeStatusLocked("error")
|
|
gateway.mu.Unlock()
|
|
data["gateway_status"] = "error"
|
|
data["status_code"] = resp.StatusCode
|
|
} else {
|
|
var healthData map[string]any
|
|
if decErr := json.NewDecoder(resp.Body).Decode(&healthData); decErr != nil {
|
|
gateway.mu.Lock()
|
|
setGatewayRuntimeStatusLocked("error")
|
|
gateway.mu.Unlock()
|
|
data["gateway_status"] = "error"
|
|
} else {
|
|
gateway.mu.Lock()
|
|
setGatewayRuntimeStatusLocked("running")
|
|
gateway.mu.Unlock()
|
|
for k, v := range healthData {
|
|
data[k] = v
|
|
}
|
|
data["gateway_status"] = "running"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
status, _ := data["gateway_status"].(string)
|
|
data["gateway_restart_required"] = gatewayRestartRequired(
|
|
status,
|
|
bootDefaultModel,
|
|
configDefaultModel,
|
|
)
|
|
|
|
ready, reason, readyErr := h.gatewayStartReady()
|
|
if readyErr != nil {
|
|
data["gateway_start_allowed"] = false
|
|
data["gateway_start_reason"] = readyErr.Error()
|
|
} else {
|
|
data["gateway_start_allowed"] = ready
|
|
if !ready {
|
|
data["gateway_start_reason"] = reason
|
|
}
|
|
}
|
|
|
|
return data
|
|
}
|
|
|
|
// handleGatewayLogs returns buffered gateway logs, optionally incrementally.
|
|
//
|
|
// GET /api/gateway/logs
|
|
func (h *Handler) handleGatewayLogs(w http.ResponseWriter, r *http.Request) {
|
|
data := gatewayLogsData(r)
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(data)
|
|
}
|
|
|
|
// gatewayLogsData reads log_offset and log_run_id query params from the request
|
|
// and returns incremental log lines.
|
|
func gatewayLogsData(r *http.Request) map[string]any {
|
|
data := map[string]any{}
|
|
clientOffset := 0
|
|
clientRunID := -1
|
|
|
|
if v := r.URL.Query().Get("log_offset"); v != "" {
|
|
if n, err := strconv.Atoi(v); err == nil {
|
|
clientOffset = n
|
|
}
|
|
}
|
|
|
|
if v := r.URL.Query().Get("log_run_id"); v != "" {
|
|
if n, err := strconv.Atoi(v); err == nil {
|
|
clientRunID = n
|
|
}
|
|
}
|
|
|
|
runID := gateway.logs.RunID()
|
|
|
|
if runID == 0 {
|
|
data["logs"] = []string{}
|
|
data["log_total"] = 0
|
|
data["log_run_id"] = 0
|
|
return data
|
|
}
|
|
|
|
// If runID changed, reset offset to get all logs from new run
|
|
offset := clientOffset
|
|
if clientRunID != runID {
|
|
offset = 0
|
|
}
|
|
|
|
lines, total, runID := gateway.logs.LinesSince(offset)
|
|
if lines == nil {
|
|
lines = []string{}
|
|
}
|
|
|
|
data["logs"] = lines
|
|
data["log_total"] = total
|
|
data["log_run_id"] = runID
|
|
return data
|
|
}
|
|
|
|
// handleGatewayEvents serves an SSE stream of gateway state change events.
|
|
//
|
|
// GET /api/gateway/events
|
|
func (h *Handler) handleGatewayEvents(w http.ResponseWriter, r *http.Request) {
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "SSE not supported", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
// Subscribe to gateway events
|
|
ch := gateway.events.Subscribe()
|
|
defer gateway.events.Unsubscribe(ch)
|
|
|
|
// Send initial status so the client doesn't start blank
|
|
initial := h.currentGatewayStatus()
|
|
fmt.Fprintf(w, "data: %s\n\n", initial)
|
|
flusher.Flush()
|
|
|
|
for {
|
|
select {
|
|
case <-r.Context().Done():
|
|
return
|
|
case data, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
fmt.Fprintf(w, "data: %s\n\n", data)
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
|
|
// currentGatewayStatus returns the current gateway status as a JSON string.
|
|
func (h *Handler) currentGatewayStatus() string {
|
|
data := h.gatewayStatusData()
|
|
encoded, _ := json.Marshal(data)
|
|
return string(encoded)
|
|
}
|
|
|
|
// scanPipe reads lines from r and appends them to buf. Returns when r reaches EOF.
|
|
func scanPipe(r io.Reader, buf *LogBuffer) {
|
|
scanner := bufio.NewScanner(r)
|
|
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
|
for scanner.Scan() {
|
|
buf.Append(scanner.Text())
|
|
}
|
|
}
|