refactor gateway/helpers and add server.pid to health (#1646)

This commit is contained in:
Cytown
2026-03-17 09:35:52 +08:00
committed by GitHub
parent 79b0568d75
commit be4a33cc15
2 changed files with 55 additions and 69 deletions
+52 -69
View File
@@ -7,6 +7,7 @@ import (
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/sipeed/picoclaw/cmd/picoclaw/internal"
@@ -43,7 +44,6 @@ import (
// Timeout constants for service operations
const (
serviceRestartTimeout = 30 * time.Second
serviceShutdownTimeout = 30 * time.Second
providerReloadTimeout = 30 * time.Second
gracefulShutdownTimeout = 15 * time.Second
@@ -121,7 +121,7 @@ func gatewayCmd(debug bool) error {
defer stopWatch()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Main event loop - wait for signals or config changes
for {
@@ -150,7 +150,8 @@ func setupAndStartServices(
// Setup cron tool and service
execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute
services.CronService = setupCronTool(
var err error
services.CronService, err = setupCronTool(
agentLoop,
msgBus,
cfg.WorkspacePath(),
@@ -158,7 +159,10 @@ func setupAndStartServices(
execTimeout,
cfg,
)
if err := services.CronService.Start(); err != nil {
if err != nil {
return nil, fmt.Errorf("error setting up cron service: %w", err)
}
if err = services.CronService.Start(); err != nil {
return nil, fmt.Errorf("error starting cron service: %w", err)
}
fmt.Println("✓ Cron service started")
@@ -170,26 +174,8 @@ func setupAndStartServices(
cfg.Heartbeat.Enabled,
)
services.HeartbeatService.SetBus(msgBus)
services.HeartbeatService.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult {
// Use cli:direct as fallback if no valid channel
if channel == "" || chatID == "" {
channel, chatID = "cli", "direct"
}
// Use ProcessHeartbeat - no session history, each heartbeat is independent
var response string
var err error
response, err = agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID)
if err != nil {
return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err))
}
if response == "HEARTBEAT_OK" {
return tools.SilentResult("Heartbeat OK")
}
// For heartbeat, always return silent - the subagent result will be
// sent to user via processSystemMessage when the async task completes
return tools.SilentResult(response)
})
if err := services.HeartbeatService.Start(); err != nil {
services.HeartbeatService.SetHandler(createHeartbeatHandler(agentLoop))
if err = services.HeartbeatService.Start(); err != nil {
return nil, fmt.Errorf("error starting heartbeat service: %w", err)
}
fmt.Println("✓ Heartbeat service started")
@@ -206,7 +192,6 @@ func setupAndStartServices(
}
// Create channel manager
var err error
services.ChannelManager, err = channels.NewManager(cfg, msgBus, services.MediaStore)
if err != nil {
// Stop the media store if it's a FileMediaStore with cleanup
@@ -238,7 +223,7 @@ func setupAndStartServices(
services.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port)
services.ChannelManager.SetupHTTPServer(addr, services.HealthServer)
if err := services.ChannelManager.StartAll(context.Background()); err != nil {
if err = services.ChannelManager.StartAll(context.Background()); err != nil {
return nil, fmt.Errorf("error starting channels: %w", err)
}
@@ -251,7 +236,7 @@ func setupAndStartServices(
MonitorUSB: cfg.Devices.MonitorUSB,
}, stateManager)
services.DeviceService.SetBus(msgBus)
if err := services.DeviceService.Start(context.Background()); err != nil {
if err = services.DeviceService.Start(context.Background()); err != nil {
logger.ErrorCF("device", "Error starting device service", map[string]any{"error": err.Error()})
} else if cfg.Devices.Enabled {
fmt.Println("✓ Device event service started")
@@ -386,17 +371,13 @@ func restartServices(
services *gatewayServices,
msgBus *bus.MessageBus,
) error {
// Create an independent context with timeout for service restart
// This prevents cancellation from the main loop context during reload
ctx, cancel := context.WithTimeout(context.Background(), serviceRestartTimeout)
defer cancel()
// Get current config from agent loop (which has been updated if this is a reload)
cfg := al.GetConfig()
// Re-create and start cron service with new config
execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute
services.CronService = setupCronTool(
var err error
services.CronService, err = setupCronTool(
al,
msgBus,
cfg.WorkspacePath(),
@@ -404,7 +385,10 @@ func restartServices(
execTimeout,
cfg,
)
if err := services.CronService.Start(); err != nil {
if err != nil {
return fmt.Errorf("error restarting cron service: %w", err)
}
if err = services.CronService.Start(); err != nil {
return fmt.Errorf("error restarting cron service: %w", err)
}
fmt.Println(" ✓ Cron service restarted")
@@ -416,31 +400,12 @@ func restartServices(
cfg.Heartbeat.Enabled,
)
services.HeartbeatService.SetBus(msgBus)
services.HeartbeatService.SetHandler(func(prompt, channel, chatID string) *tools.ToolResult {
if channel == "" || chatID == "" {
channel, chatID = "cli", "direct"
}
var response string
var err error
response, err = al.ProcessHeartbeat(context.Background(), prompt, channel, chatID)
if err != nil {
return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err))
}
if response == "HEARTBEAT_OK" {
return tools.SilentResult("Heartbeat OK")
}
return tools.SilentResult(response)
})
if err := services.HeartbeatService.Start(); err != nil {
services.HeartbeatService.SetHandler(createHeartbeatHandler(al))
if err = services.HeartbeatService.Start(); err != nil {
return fmt.Errorf("error restarting heartbeat service: %w", err)
}
fmt.Println(" ✓ Heartbeat service restarted")
// Stop the old media store before creating a new one
if fms, ok := services.MediaStore.(*media.FileMediaStore); ok {
fms.Stop()
}
// Re-create media store with new config
services.MediaStore = media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{
Enabled: cfg.Tools.MediaCleanup.Enabled,
@@ -454,13 +419,8 @@ func restartServices(
al.SetMediaStore(services.MediaStore)
// Re-create channel manager with new config
var err error
services.ChannelManager, err = channels.NewManager(cfg, msgBus, services.MediaStore)
if err != nil {
// Stop the media store if it's a FileMediaStore with cleanup
if fms, ok := services.MediaStore.(*media.FileMediaStore); ok {
fms.Stop()
}
return fmt.Errorf("error recreating channel manager: %w", err)
}
al.SetChannelManager(services.ChannelManager)
@@ -477,7 +437,8 @@ func restartServices(
services.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port)
services.ChannelManager.SetupHTTPServer(addr, services.HealthServer)
if err := services.ChannelManager.StartAll(ctx); err != nil {
// Use background context for lifecycle to ensure services persist after restartServices returns
if err = services.ChannelManager.StartAll(context.Background()); err != nil {
return fmt.Errorf("error restarting channels: %w", err)
}
fmt.Printf(
@@ -493,7 +454,7 @@ func restartServices(
MonitorUSB: cfg.Devices.MonitorUSB,
}, stateManager)
services.DeviceService.SetBus(msgBus)
if err := services.DeviceService.Start(ctx); err != nil {
if err := services.DeviceService.Start(context.Background()); err != nil {
logger.WarnCF("device", "Failed to restart device service", map[string]any{"error": err.Error()})
} else if cfg.Devices.Enabled {
fmt.Println(" ✓ Device event service restarted")
@@ -544,6 +505,10 @@ func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Conf
// Debounce - wait a bit to ensure file write is complete
time.Sleep(500 * time.Millisecond)
// Update last known state to prevent repeated reload attempts on failure
lastModTime = currentModTime
lastSize = currentSize
// Validate and load new config
newCfg, err := config.LoadConfig(configPath)
if err != nil {
@@ -561,10 +526,6 @@ func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Conf
logger.Info("✓ Config file validated and loaded")
// Update last known state
lastModTime = currentModTime
lastSize = currentSize
// Send new config to main loop (non-blocking)
select {
case configChan <- newCfg:
@@ -613,7 +574,7 @@ func setupCronTool(
restrict bool,
execTimeout time.Duration,
cfg *config.Config,
) *cron.CronService {
) (*cron.CronService, error) {
cronStorePath := filepath.Join(workspace, "cron", "jobs.json")
// Create cron service
@@ -625,7 +586,7 @@ func setupCronTool(
var err error
cronTool, err = tools.NewCronTool(cronService, agentLoop, msgBus, workspace, restrict, execTimeout, cfg)
if err != nil {
logger.Fatalf("Critical error during CronTool initialization: %v", err)
return nil, fmt.Errorf("critical error during CronTool initialization: %w", err)
}
agentLoop.RegisterTool(cronTool)
@@ -639,5 +600,27 @@ func setupCronTool(
})
}
return cronService
return cronService, nil
}
func createHeartbeatHandler(agentLoop *agent.AgentLoop) func(prompt, channel, chatID string) *tools.ToolResult {
return func(prompt, channel, chatID string) *tools.ToolResult {
// Use cli:direct as fallback if no valid channel
if channel == "" || chatID == "" {
channel, chatID = "cli", "direct"
}
// Use ProcessHeartbeat - no session history, each heartbeat is independent
var response string
var err error
response, err = agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID)
if err != nil {
return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err))
}
if response == "HEARTBEAT_OK" {
return tools.SilentResult("Heartbeat OK")
}
// For heartbeat, always return silent - the subagent result will be
// sent to user via processSystemMessage when the async task completes
return tools.SilentResult(response)
}
}
+3
View File
@@ -6,6 +6,7 @@ import (
"fmt"
"maps"
"net/http"
"os"
"sync"
"time"
)
@@ -29,6 +30,7 @@ type StatusResponse struct {
Status string `json:"status"`
Uptime string `json:"uptime"`
Checks map[string]Check `json:"checks,omitempty"`
Pid int `json:"pid"`
}
func NewServer(host string, port int) *Server {
@@ -112,6 +114,7 @@ func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
resp := StatusResponse{
Status: "ok",
Uptime: uptime.String(),
Pid: os.Getpid(),
}
json.NewEncoder(w).Encode(resp)