feat: add web gateway hot reload and polling state sync (#1684)

* feat(gateway): support hot reload and empty startup

- extract gateway runtime into pkg/gateway
- add gateway.hot_reload config with default and example values
- allow starting the gateway without a default model via --allow-empty
- stop treating missing enabled channels as a startup error
- update related tests

* feat: replace gateway SSE updates with polling-based state sync

- remove gateway SSE broadcasting and event endpoint
- add polling-based gateway status refresh with stopping state handling
- detect when gateway restart is required after default model changes
- resolve gateway health and websocket proxy targets from configured host
- update gateway UI labels and add backend/frontend test coverage
This commit is contained in:
wenjie
2026-03-17 18:46:00 +08:00
committed by GitHub
parent 11207186c8
commit 8a44410e37
24 changed files with 700 additions and 543 deletions
+11 -1
View File
@@ -5,6 +5,8 @@ import (
"github.com/spf13/cobra"
"github.com/sipeed/picoclaw/cmd/picoclaw/internal"
"github.com/sipeed/picoclaw/pkg/gateway"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/utils"
)
@@ -12,6 +14,7 @@ import (
func NewGatewayCommand() *cobra.Command {
var debug bool
var noTruncate bool
var allowEmpty bool
cmd := &cobra.Command{
Use: "gateway",
@@ -31,12 +34,19 @@ func NewGatewayCommand() *cobra.Command {
return nil
},
RunE: func(_ *cobra.Command, _ []string) error {
return gatewayCmd(debug)
return gateway.Run(debug, internal.GetConfigPath(), allowEmpty)
},
}
cmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
cmd.Flags().BoolVarP(&noTruncate, "no-truncate", "T", false, "Disable string truncation in debug logs")
cmd.Flags().BoolVarP(
&allowEmpty,
"allow-empty",
"E",
false,
"Continue starting even when no default model is configured",
)
return cmd
}
@@ -28,4 +28,5 @@ func TestNewGatewayCommand(t *testing.T) {
assert.True(t, cmd.HasFlags())
assert.NotNil(t, cmd.Flags().Lookup("debug"))
assert.NotNil(t, cmd.Flags().Lookup("allow-empty"))
}
-626
View File
@@ -1,626 +0,0 @@
package gateway
import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/sipeed/picoclaw/cmd/picoclaw/internal"
"github.com/sipeed/picoclaw/pkg/agent"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
_ "github.com/sipeed/picoclaw/pkg/channels/dingtalk"
_ "github.com/sipeed/picoclaw/pkg/channels/discord"
_ "github.com/sipeed/picoclaw/pkg/channels/feishu"
_ "github.com/sipeed/picoclaw/pkg/channels/irc"
_ "github.com/sipeed/picoclaw/pkg/channels/line"
_ "github.com/sipeed/picoclaw/pkg/channels/maixcam"
_ "github.com/sipeed/picoclaw/pkg/channels/matrix"
_ "github.com/sipeed/picoclaw/pkg/channels/onebot"
_ "github.com/sipeed/picoclaw/pkg/channels/pico"
_ "github.com/sipeed/picoclaw/pkg/channels/qq"
_ "github.com/sipeed/picoclaw/pkg/channels/slack"
_ "github.com/sipeed/picoclaw/pkg/channels/telegram"
_ "github.com/sipeed/picoclaw/pkg/channels/wecom"
_ "github.com/sipeed/picoclaw/pkg/channels/whatsapp"
_ "github.com/sipeed/picoclaw/pkg/channels/whatsapp_native"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/cron"
"github.com/sipeed/picoclaw/pkg/devices"
"github.com/sipeed/picoclaw/pkg/health"
"github.com/sipeed/picoclaw/pkg/heartbeat"
"github.com/sipeed/picoclaw/pkg/logger"
"github.com/sipeed/picoclaw/pkg/media"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/state"
"github.com/sipeed/picoclaw/pkg/tools"
"github.com/sipeed/picoclaw/pkg/voice"
)
// Timeout constants for service operations
const (
serviceShutdownTimeout = 30 * time.Second
providerReloadTimeout = 30 * time.Second
gracefulShutdownTimeout = 15 * time.Second
)
// gatewayServices holds references to all running services
type gatewayServices struct {
CronService *cron.CronService
HeartbeatService *heartbeat.HeartbeatService
MediaStore media.MediaStore
ChannelManager *channels.Manager
DeviceService *devices.Service
HealthServer *health.Server
}
func gatewayCmd(debug bool) error {
if debug {
logger.SetLevel(logger.DEBUG)
fmt.Println("🔍 Debug mode enabled")
}
configPath := internal.GetConfigPath()
cfg, err := internal.LoadConfig()
if err != nil {
return fmt.Errorf("error loading config: %w", err)
}
provider, modelID, err := providers.CreateProvider(cfg)
if err != nil {
return fmt.Errorf("error creating provider: %w", err)
}
// Use the resolved model ID from provider creation
if modelID != "" {
cfg.Agents.Defaults.ModelName = modelID
}
msgBus := bus.NewMessageBus()
agentLoop := agent.NewAgentLoop(cfg, msgBus, provider)
// Print agent startup info
fmt.Println("\n📦 Agent Status:")
startupInfo := agentLoop.GetStartupInfo()
toolsInfo := startupInfo["tools"].(map[string]any)
skillsInfo := startupInfo["skills"].(map[string]any)
fmt.Printf(" • Tools: %d loaded\n", toolsInfo["count"])
fmt.Printf(" • Skills: %d/%d available\n",
skillsInfo["available"],
skillsInfo["total"])
// Log to file as well
logger.InfoCF("agent", "Agent initialized",
map[string]any{
"tools_count": toolsInfo["count"],
"skills_total": skillsInfo["total"],
"skills_available": skillsInfo["available"],
})
// Setup and start all services
services, err := setupAndStartServices(cfg, agentLoop, msgBus)
if err != nil {
return err
}
fmt.Printf("✓ Gateway started on %s:%d\n", cfg.Gateway.Host, cfg.Gateway.Port)
fmt.Println("Press Ctrl+C to stop")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go agentLoop.Run(ctx)
// Setup config file watcher for hot reload
configReloadChan, stopWatch := setupConfigWatcherPolling(configPath, debug)
defer stopWatch()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Main event loop - wait for signals or config changes
for {
select {
case <-sigChan:
logger.Info("Shutting down...")
shutdownGateway(services, agentLoop, provider, true)
return nil
case newCfg := <-configReloadChan:
err := handleConfigReload(ctx, agentLoop, newCfg, &provider, services, msgBus)
if err != nil {
logger.Errorf("Config reload failed: %v", err)
}
}
}
}
// setupAndStartServices initializes and starts all services
func setupAndStartServices(
cfg *config.Config,
agentLoop *agent.AgentLoop,
msgBus *bus.MessageBus,
) (*gatewayServices, error) {
services := &gatewayServices{}
// Setup cron tool and service
execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute
var err error
services.CronService, err = setupCronTool(
agentLoop,
msgBus,
cfg.WorkspacePath(),
cfg.Agents.Defaults.RestrictToWorkspace,
execTimeout,
cfg,
)
if err != nil {
return nil, fmt.Errorf("error setting up cron service: %w", err)
}
if err = services.CronService.Start(); err != nil {
return nil, fmt.Errorf("error starting cron service: %w", err)
}
fmt.Println("✓ Cron service started")
// Setup heartbeat service
services.HeartbeatService = heartbeat.NewHeartbeatService(
cfg.WorkspacePath(),
cfg.Heartbeat.Interval,
cfg.Heartbeat.Enabled,
)
services.HeartbeatService.SetBus(msgBus)
services.HeartbeatService.SetHandler(createHeartbeatHandler(agentLoop))
if err = services.HeartbeatService.Start(); err != nil {
return nil, fmt.Errorf("error starting heartbeat service: %w", err)
}
fmt.Println("✓ Heartbeat service started")
// Create media store for file lifecycle management with TTL cleanup
services.MediaStore = media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{
Enabled: cfg.Tools.MediaCleanup.Enabled,
MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute,
Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute,
})
// Start the media store if it's a FileMediaStore with cleanup
if fms, ok := services.MediaStore.(*media.FileMediaStore); ok {
fms.Start()
}
// Create channel manager
services.ChannelManager, err = channels.NewManager(cfg, msgBus, services.MediaStore)
if err != nil {
// Stop the media store if it's a FileMediaStore with cleanup
if fms, ok := services.MediaStore.(*media.FileMediaStore); ok {
fms.Stop()
}
return nil, fmt.Errorf("error creating channel manager: %w", err)
}
// Inject channel manager and media store into agent loop
agentLoop.SetChannelManager(services.ChannelManager)
agentLoop.SetMediaStore(services.MediaStore)
// Wire up voice transcription if a supported provider is configured.
if transcriber := voice.DetectTranscriber(cfg); transcriber != nil {
agentLoop.SetTranscriber(transcriber)
logger.InfoCF("voice", "Transcription enabled (agent-level)", map[string]any{"provider": transcriber.Name()})
}
enabledChannels := services.ChannelManager.GetEnabledChannels()
if len(enabledChannels) > 0 {
fmt.Printf("✓ Channels enabled: %s\n", enabledChannels)
} else {
fmt.Println("⚠ Warning: No channels enabled")
}
// Setup shared HTTP server with health endpoints and webhook handlers
addr := fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port)
services.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port)
services.ChannelManager.SetupHTTPServer(addr, services.HealthServer)
if err = services.ChannelManager.StartAll(context.Background()); err != nil {
return nil, fmt.Errorf("error starting channels: %w", err)
}
fmt.Printf("✓ Health endpoints available at http://%s:%d/health and /ready\n", cfg.Gateway.Host, cfg.Gateway.Port)
// Setup state manager and device service
stateManager := state.NewManager(cfg.WorkspacePath())
services.DeviceService = devices.NewService(devices.Config{
Enabled: cfg.Devices.Enabled,
MonitorUSB: cfg.Devices.MonitorUSB,
}, stateManager)
services.DeviceService.SetBus(msgBus)
if err = services.DeviceService.Start(context.Background()); err != nil {
logger.ErrorCF("device", "Error starting device service", map[string]any{"error": err.Error()})
} else if cfg.Devices.Enabled {
fmt.Println("✓ Device event service started")
}
return services, nil
}
// stopAndCleanupServices stops all services and cleans up resources
func stopAndCleanupServices(
services *gatewayServices,
shutdownTimeout time.Duration,
) {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer shutdownCancel()
if services.ChannelManager != nil {
services.ChannelManager.StopAll(shutdownCtx)
}
if services.DeviceService != nil {
services.DeviceService.Stop()
}
if services.HeartbeatService != nil {
services.HeartbeatService.Stop()
}
if services.CronService != nil {
services.CronService.Stop()
}
if services.MediaStore != nil {
// Stop the media store if it's a FileMediaStore with cleanup
if fms, ok := services.MediaStore.(*media.FileMediaStore); ok {
fms.Stop()
}
}
}
// shutdownGateway performs a complete gateway shutdown
func shutdownGateway(
services *gatewayServices,
agentLoop *agent.AgentLoop,
provider providers.LLMProvider,
fullShutdown bool,
) {
if cp, ok := provider.(providers.StatefulProvider); ok && fullShutdown {
cp.Close()
}
stopAndCleanupServices(services, gracefulShutdownTimeout)
agentLoop.Stop()
agentLoop.Close()
logger.Info("✓ Gateway stopped")
}
// handleConfigReload handles config file reload by stopping all services,
// reloading the provider and config, and restarting services with the new config.
func handleConfigReload(
ctx context.Context,
al *agent.AgentLoop,
newCfg *config.Config,
providerRef *providers.LLMProvider,
services *gatewayServices,
msgBus *bus.MessageBus,
) error {
logger.Info("🔄 Config file changed, reloading...")
newModel := newCfg.Agents.Defaults.ModelName
if newModel == "" {
newModel = newCfg.Agents.Defaults.Model
}
logger.Infof(" New model is '%s', recreating provider...", newModel)
// Stop all services before reloading
logger.Info(" Stopping all services...")
stopAndCleanupServices(services, serviceShutdownTimeout)
// Create new provider from updated config first to ensure validity
// This will use the correct API key and settings from newCfg.ModelList
newProvider, newModelID, err := providers.CreateProvider(newCfg)
if err != nil {
logger.Errorf(" ⚠ Error creating new provider: %v", err)
logger.Warn(" Attempting to restart services with old provider and config...")
// Try to restart services with old configuration
if restartErr := restartServices(al, services, msgBus); restartErr != nil {
logger.Errorf(" ⚠ Failed to restart services: %v", restartErr)
}
return fmt.Errorf("error creating new provider: %w", err)
}
if newModelID != "" {
newCfg.Agents.Defaults.ModelName = newModelID
}
// Use the atomic reload method on AgentLoop to safely swap provider and config.
// This handles locking internally to prevent races with in-flight LLM calls
// and concurrent reads of registry/config while the swap occurs.
reloadCtx, reloadCancel := context.WithTimeout(context.Background(), providerReloadTimeout)
defer reloadCancel()
if err := al.ReloadProviderAndConfig(reloadCtx, newProvider, newCfg); err != nil {
logger.Errorf(" ⚠ Error reloading agent loop: %v", err)
// Close the newly created provider since it wasn't adopted
if cp, ok := newProvider.(providers.StatefulProvider); ok {
cp.Close()
}
logger.Warn(" Attempting to restart services with old provider and config...")
if restartErr := restartServices(al, services, msgBus); restartErr != nil {
logger.Errorf(" ⚠ Failed to restart services: %v", restartErr)
}
return fmt.Errorf("error reloading agent loop: %w", err)
}
// Update local provider reference only after successful atomic reload
*providerRef = newProvider
// Restart all services with new config
logger.Info(" Restarting all services with new configuration...")
if err := restartServices(al, services, msgBus); err != nil {
logger.Errorf(" ⚠ Error restarting services: %v", err)
return fmt.Errorf("error restarting services: %w", err)
}
logger.Info(" ✓ Provider, configuration, and services reloaded successfully (thread-safe)")
return nil
}
// restartServices restarts all services after a config reload
func restartServices(
al *agent.AgentLoop,
services *gatewayServices,
msgBus *bus.MessageBus,
) error {
// Get current config from agent loop (which has been updated if this is a reload)
cfg := al.GetConfig()
// Re-create and start cron service with new config
execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute
var err error
services.CronService, err = setupCronTool(
al,
msgBus,
cfg.WorkspacePath(),
cfg.Agents.Defaults.RestrictToWorkspace,
execTimeout,
cfg,
)
if err != nil {
return fmt.Errorf("error restarting cron service: %w", err)
}
if err = services.CronService.Start(); err != nil {
return fmt.Errorf("error restarting cron service: %w", err)
}
fmt.Println(" ✓ Cron service restarted")
// Re-create and start heartbeat service with new config
services.HeartbeatService = heartbeat.NewHeartbeatService(
cfg.WorkspacePath(),
cfg.Heartbeat.Interval,
cfg.Heartbeat.Enabled,
)
services.HeartbeatService.SetBus(msgBus)
services.HeartbeatService.SetHandler(createHeartbeatHandler(al))
if err = services.HeartbeatService.Start(); err != nil {
return fmt.Errorf("error restarting heartbeat service: %w", err)
}
fmt.Println(" ✓ Heartbeat service restarted")
// Re-create media store with new config
services.MediaStore = media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{
Enabled: cfg.Tools.MediaCleanup.Enabled,
MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute,
Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute,
})
// Start the media store if it's a FileMediaStore with cleanup
if fms, ok := services.MediaStore.(*media.FileMediaStore); ok {
fms.Start()
}
al.SetMediaStore(services.MediaStore)
// Re-create channel manager with new config
services.ChannelManager, err = channels.NewManager(cfg, msgBus, services.MediaStore)
if err != nil {
return fmt.Errorf("error recreating channel manager: %w", err)
}
al.SetChannelManager(services.ChannelManager)
enabledChannels := services.ChannelManager.GetEnabledChannels()
if len(enabledChannels) > 0 {
fmt.Printf(" ✓ Channels enabled: %s\n", enabledChannels)
} else {
fmt.Println(" ⚠ Warning: No channels enabled")
}
// Setup HTTP server with new config
addr := fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port)
services.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port)
services.ChannelManager.SetupHTTPServer(addr, services.HealthServer)
// Use background context for lifecycle to ensure services persist after restartServices returns
if err = services.ChannelManager.StartAll(context.Background()); err != nil {
return fmt.Errorf("error restarting channels: %w", err)
}
fmt.Printf(
" ✓ Channels restarted, health endpoints at http://%s:%d/health and ready\n",
cfg.Gateway.Host,
cfg.Gateway.Port,
)
// Re-create device service with new config
stateManager := state.NewManager(cfg.WorkspacePath())
services.DeviceService = devices.NewService(devices.Config{
Enabled: cfg.Devices.Enabled,
MonitorUSB: cfg.Devices.MonitorUSB,
}, stateManager)
services.DeviceService.SetBus(msgBus)
if err := services.DeviceService.Start(context.Background()); err != nil {
logger.WarnCF("device", "Failed to restart device service", map[string]any{"error": err.Error()})
} else if cfg.Devices.Enabled {
fmt.Println(" ✓ Device event service restarted")
}
// Wire up voice transcription with new config
transcriber := voice.DetectTranscriber(cfg)
al.SetTranscriber(transcriber) // This will set it to nil if disabled
if transcriber != nil {
logger.InfoCF("voice", "Transcription re-enabled (agent-level)", map[string]any{"provider": transcriber.Name()})
} else {
logger.InfoCF("voice", "Transcription disabled", nil)
}
return nil
}
// setupConfigWatcherPolling sets up a simple polling-based config file watcher
// Returns a channel for config updates and a stop function
func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Config, func()) {
configChan := make(chan *config.Config, 1)
stop := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// Get initial file info
lastModTime := getFileModTime(configPath)
lastSize := getFileSize(configPath)
ticker := time.NewTicker(2 * time.Second) // Check every 2 seconds
defer ticker.Stop()
for {
select {
case <-ticker.C:
currentModTime := getFileModTime(configPath)
currentSize := getFileSize(configPath)
// Check if file changed (modification time or size changed)
if currentModTime.After(lastModTime) || currentSize != lastSize {
if debug {
logger.Debugf("🔍 Config file change detected")
}
// Debounce - wait a bit to ensure file write is complete
time.Sleep(500 * time.Millisecond)
// Update last known state to prevent repeated reload attempts on failure
lastModTime = currentModTime
lastSize = currentSize
// Validate and load new config
newCfg, err := config.LoadConfig(configPath)
if err != nil {
logger.Errorf("⚠ Error loading new config: %v", err)
logger.Warn(" Using previous valid config")
continue
}
// Validate the new config
if err := newCfg.ValidateModelList(); err != nil {
logger.Errorf(" ⚠ New config validation failed: %v", err)
logger.Warn(" Using previous valid config")
continue
}
logger.Info("✓ Config file validated and loaded")
// Send new config to main loop (non-blocking)
select {
case configChan <- newCfg:
default:
// Channel full, skip this update
logger.Warn("⚠ Previous config reload still in progress, skipping")
}
}
case <-stop:
return
}
}
}()
stopFunc := func() {
close(stop)
wg.Wait()
}
return configChan, stopFunc
}
// getFileModTime returns the modification time of a file, or zero time if file doesn't exist
func getFileModTime(path string) time.Time {
info, err := os.Stat(path)
if err != nil {
return time.Time{}
}
return info.ModTime()
}
// getFileSize returns the size of a file, or 0 if file doesn't exist
func getFileSize(path string) int64 {
info, err := os.Stat(path)
if err != nil {
return 0
}
return info.Size()
}
func setupCronTool(
agentLoop *agent.AgentLoop,
msgBus *bus.MessageBus,
workspace string,
restrict bool,
execTimeout time.Duration,
cfg *config.Config,
) (*cron.CronService, error) {
cronStorePath := filepath.Join(workspace, "cron", "jobs.json")
// Create cron service
cronService := cron.NewCronService(cronStorePath, nil)
// Create and register CronTool if enabled
var cronTool *tools.CronTool
if cfg.Tools.IsToolEnabled("cron") {
var err error
cronTool, err = tools.NewCronTool(cronService, agentLoop, msgBus, workspace, restrict, execTimeout, cfg)
if err != nil {
return nil, fmt.Errorf("critical error during CronTool initialization: %w", err)
}
agentLoop.RegisterTool(cronTool)
}
// Set onJob handler
if cronTool != nil {
cronService.SetOnJob(func(job *cron.CronJob) (string, error) {
result := cronTool.ExecuteJob(context.Background(), job)
return result, nil
})
}
return cronService, nil
}
func createHeartbeatHandler(agentLoop *agent.AgentLoop) func(prompt, channel, chatID string) *tools.ToolResult {
return func(prompt, channel, chatID string) *tools.ToolResult {
// Use cli:direct as fallback if no valid channel
if channel == "" || chatID == "" {
channel, chatID = "cli", "direct"
}
// Use ProcessHeartbeat - no session history, each heartbeat is independent
var response string
var err error
response, err = agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID)
if err != nil {
return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err))
}
if response == "HEARTBEAT_OK" {
return tools.SilentResult("Heartbeat OK")
}
// For heartbeat, always return silent - the subagent result will be
// sent to user via processSystemMessage when the async task completes
return tools.SilentResult(response)
}
}