diff --git a/cmd/picoclaw/internal/gateway/helpers.go b/cmd/picoclaw/internal/gateway/helpers.go index e3a51b5e9..5ebf26d78 100644 --- a/cmd/picoclaw/internal/gateway/helpers.go +++ b/cmd/picoclaw/internal/gateway/helpers.go @@ -2,9 +2,7 @@ package gateway import ( "context" - "errors" "fmt" - "net/http" "os" "os/signal" "path/filepath" @@ -200,16 +198,16 @@ func gatewayCmd(debug bool) error { fmt.Println("✓ Device event service started") } + // Setup shared HTTP server with health endpoints and webhook handlers + healthServer := health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port) + addr := fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port) + channelManager.SetupHTTPServer(addr, healthServer) + if err := channelManager.StartAll(ctx); err != nil { fmt.Printf("Error starting channels: %v\n", err) } - healthServer := health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port) - go func() { - if err := healthServer.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { - logger.ErrorCF("health", "Health server error", map[string]any{"error": err.Error()}) - } - }() + fmt.Printf("✓ Health endpoints available at http://%s:%d/health and /ready\n", cfg.Gateway.Host, cfg.Gateway.Port) go agentLoop.Run(ctx) @@ -224,12 +222,11 @@ func gatewayCmd(debug bool) error { } cancel() msgBus.Close() - healthServer.Stop(context.Background()) + channelManager.StopAll(ctx) deviceService.Stop() heartbeatService.Stop() cronService.Stop() agentLoop.Stop() - channelManager.StopAll(ctx) fmt.Println("✓ Gateway stopped") return nil diff --git a/pkg/channels/line/line.go b/pkg/channels/line/line.go index fd06334d5..6ae048468 100644 --- a/pkg/channels/line/line.go +++ b/pkg/channels/line/line.go @@ -44,7 +44,6 @@ type replyTokenEntry struct { type LINEChannel struct { *channels.BaseChannel config config.LINEConfig - httpServer *http.Server botUserID string // Bot's user ID botBasicID string // Bot's basic ID (e.g. @216ru...) botDisplayName string // Bot's display name for text-based mention detection @@ -68,7 +67,7 @@ func NewLINEChannel(cfg config.LINEConfig, messageBus *bus.MessageBus) (*LINECha }, nil } -// Start launches the HTTP webhook server. +// Start initializes the LINE channel. func (c *LINEChannel) Start(ctx context.Context) error { logger.InfoC("line", "Starting LINE channel (Webhook Mode)") @@ -87,31 +86,6 @@ func (c *LINEChannel) Start(ctx context.Context) error { }) } - mux := http.NewServeMux() - path := c.config.WebhookPath - if path == "" { - path = "/webhook/line" - } - mux.HandleFunc(path, c.webhookHandler) - - addr := fmt.Sprintf("%s:%d", c.config.WebhookHost, c.config.WebhookPort) - c.httpServer = &http.Server{ - Addr: addr, - Handler: mux, - } - - go func() { - logger.InfoCF("line", "LINE webhook server listening", map[string]any{ - "addr": addr, - "path": path, - }) - if err := c.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logger.ErrorCF("line", "Webhook server error", map[string]any{ - "error": err.Error(), - }) - } - }() - c.SetRunning(true) logger.InfoC("line", "LINE channel started (Webhook Mode)") return nil @@ -151,7 +125,7 @@ func (c *LINEChannel) fetchBotInfo() error { return nil } -// Stop gracefully shuts down the HTTP server. +// Stop gracefully stops the LINE channel. func (c *LINEChannel) Stop(ctx context.Context) error { logger.InfoC("line", "Stopping LINE channel") @@ -159,21 +133,24 @@ func (c *LINEChannel) Stop(ctx context.Context) error { c.cancel() } - if c.httpServer != nil { - shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if err := c.httpServer.Shutdown(shutdownCtx); err != nil { - logger.ErrorCF("line", "Webhook server shutdown error", map[string]any{ - "error": err.Error(), - }) - } - } - c.SetRunning(false) logger.InfoC("line", "LINE channel stopped") return nil } +// WebhookPath returns the path for registering on the shared HTTP server. +func (c *LINEChannel) WebhookPath() string { + if c.config.WebhookPath != "" { + return c.config.WebhookPath + } + return "/webhook/line" +} + +// ServeHTTP implements http.Handler for the shared HTTP server. +func (c *LINEChannel) ServeHTTP(w http.ResponseWriter, r *http.Request) { + c.webhookHandler(w, r) +} + // webhookHandler handles incoming LINE webhook requests. func (c *LINEChannel) webhookHandler(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 1bc321cec..dadc068e9 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "math" + "net/http" "sync" "time" @@ -19,6 +20,7 @@ import ( "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/config" "github.com/sipeed/picoclaw/pkg/constants" + "github.com/sipeed/picoclaw/pkg/health" "github.com/sipeed/picoclaw/pkg/logger" "github.com/sipeed/picoclaw/pkg/media" "github.com/sipeed/picoclaw/pkg/utils" @@ -55,6 +57,8 @@ type Manager struct { config *config.Config mediaStore media.MediaStore dispatchTask *asyncTask + mux *http.ServeMux + httpServer *http.Server mu sync.RWMutex } @@ -169,6 +173,43 @@ func (m *Manager) initChannels() error { return nil } +// SetupHTTPServer creates a shared HTTP server with the given listen address. +// It registers health endpoints from the health server and discovers channels +// that implement WebhookHandler and/or HealthChecker to register their handlers. +func (m *Manager) SetupHTTPServer(addr string, healthServer *health.Server) { + m.mux = http.NewServeMux() + + // Register health endpoints + if healthServer != nil { + healthServer.RegisterOnMux(m.mux) + } + + // Discover and register webhook handlers and health checkers + for name, ch := range m.channels { + if wh, ok := ch.(WebhookHandler); ok { + m.mux.Handle(wh.WebhookPath(), wh) + logger.InfoCF("channels", "Webhook handler registered", map[string]any{ + "channel": name, + "path": wh.WebhookPath(), + }) + } + if hc, ok := ch.(HealthChecker); ok { + m.mux.HandleFunc(hc.HealthPath(), hc.HealthHandler) + logger.InfoCF("channels", "Health endpoint registered", map[string]any{ + "channel": name, + "path": hc.HealthPath(), + }) + } + } + + m.httpServer = &http.Server{ + Addr: addr, + Handler: m.mux, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + } +} + func (m *Manager) StartAll(ctx context.Context) error { m.mu.Lock() defer m.mu.Unlock() @@ -203,6 +244,20 @@ func (m *Manager) StartAll(ctx context.Context) error { // Start the dispatcher that reads from the bus and routes to workers go m.dispatchOutbound(dispatchCtx) + // Start shared HTTP server if configured + if m.httpServer != nil { + go func() { + logger.InfoCF("channels", "Shared HTTP server listening", map[string]any{ + "addr": m.httpServer.Addr, + }) + if err := m.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.ErrorCF("channels", "Shared HTTP server error", map[string]any{ + "error": err.Error(), + }) + } + }() + } + logger.InfoC("channels", "All channels started") return nil } @@ -213,7 +268,19 @@ func (m *Manager) StopAll(ctx context.Context) error { logger.InfoC("channels", "Stopping all channels") - // Cancel dispatcher first + // Shutdown shared HTTP server first + if m.httpServer != nil { + shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := m.httpServer.Shutdown(shutdownCtx); err != nil { + logger.ErrorCF("channels", "Shared HTTP server shutdown error", map[string]any{ + "error": err.Error(), + }) + } + m.httpServer = nil + } + + // Cancel dispatcher if m.dispatchTask != nil { m.dispatchTask.cancel() m.dispatchTask = nil diff --git a/pkg/channels/webhook.go b/pkg/channels/webhook.go new file mode 100644 index 000000000..3cf27baf6 --- /dev/null +++ b/pkg/channels/webhook.go @@ -0,0 +1,20 @@ +package channels + +import "net/http" + +// WebhookHandler is an optional interface for channels that receive messages +// via HTTP webhooks. Manager discovers channels implementing this interface +// and registers them on the shared HTTP server. +type WebhookHandler interface { + // WebhookPath returns the path to mount this handler on the shared server. + // Examples: "/webhook/line", "/webhook/wecom" + WebhookPath() string + http.Handler // ServeHTTP(w http.ResponseWriter, r *http.Request) +} + +// HealthChecker is an optional interface for channels that expose +// a health check endpoint on the shared HTTP server. +type HealthChecker interface { + HealthPath() string + HealthHandler(w http.ResponseWriter, r *http.Request) +} diff --git a/pkg/channels/wecom/app.go b/pkg/channels/wecom/app.go index 41861e8fc..52750505c 100644 --- a/pkg/channels/wecom/app.go +++ b/pkg/channels/wecom/app.go @@ -28,7 +28,6 @@ const ( type WeComAppChannel struct { *channels.BaseChannel config config.WeComAppConfig - server *http.Server accessToken string tokenExpiry time.Time tokenMu sync.RWMutex @@ -134,7 +133,7 @@ func (c *WeComAppChannel) Name() string { return "wecom_app" } -// Start initializes the WeCom App channel with HTTP webhook server +// Start initializes the WeCom App channel func (c *WeComAppChannel) Start(ctx context.Context) error { logger.InfoC("wecom_app", "Starting WeCom App channel...") @@ -150,37 +149,8 @@ func (c *WeComAppChannel) Start(ctx context.Context) error { // Start token refresh goroutine go c.tokenRefreshLoop() - // Setup HTTP server for webhook - mux := http.NewServeMux() - webhookPath := c.config.WebhookPath - if webhookPath == "" { - webhookPath = "/webhook/wecom-app" - } - mux.HandleFunc(webhookPath, c.handleWebhook) - - // Health check endpoint - mux.HandleFunc("/health/wecom-app", c.handleHealth) - - addr := fmt.Sprintf("%s:%d", c.config.WebhookHost, c.config.WebhookPort) - c.server = &http.Server{ - Addr: addr, - Handler: mux, - } - c.SetRunning(true) - logger.InfoCF("wecom_app", "WeCom App channel started", map[string]any{ - "address": addr, - "path": webhookPath, - }) - - // Start server in goroutine - go func() { - if err := c.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logger.ErrorCF("wecom_app", "HTTP server error", map[string]any{ - "error": err.Error(), - }) - } - }() + logger.InfoC("wecom_app", "WeCom App channel started") return nil } @@ -193,12 +163,6 @@ func (c *WeComAppChannel) Stop(ctx context.Context) error { c.cancel() } - if c.server != nil { - shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - c.server.Shutdown(shutdownCtx) - } - c.SetRunning(false) logger.InfoC("wecom_app", "WeCom App channel stopped") return nil @@ -223,6 +187,29 @@ func (c *WeComAppChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return c.sendTextMessage(ctx, accessToken, msg.ChatID, msg.Content) } +// WebhookPath returns the path for registering on the shared HTTP server. +func (c *WeComAppChannel) WebhookPath() string { + if c.config.WebhookPath != "" { + return c.config.WebhookPath + } + return "/webhook/wecom-app" +} + +// ServeHTTP implements http.Handler for the shared HTTP server. +func (c *WeComAppChannel) ServeHTTP(w http.ResponseWriter, r *http.Request) { + c.handleWebhook(w, r) +} + +// HealthPath returns the health check endpoint path. +func (c *WeComAppChannel) HealthPath() string { + return "/health/wecom-app" +} + +// HealthHandler handles health check requests. +func (c *WeComAppChannel) HealthHandler(w http.ResponseWriter, r *http.Request) { + c.handleHealth(w, r) +} + // handleWebhook handles incoming webhook requests from WeCom func (c *WeComAppChannel) handleWebhook(w http.ResponseWriter, r *http.Request) { ctx := r.Context() diff --git a/pkg/channels/wecom/bot.go b/pkg/channels/wecom/bot.go index 7960802fb..d5912bddc 100644 --- a/pkg/channels/wecom/bot.go +++ b/pkg/channels/wecom/bot.go @@ -24,7 +24,6 @@ import ( type WeComBotChannel struct { *channels.BaseChannel config config.WeComConfig - server *http.Server ctx context.Context cancel context.CancelFunc processedMsgs map[string]bool // Message deduplication: msg_id -> processed @@ -101,43 +100,14 @@ func (c *WeComBotChannel) Name() string { return "wecom" } -// Start initializes the WeCom Bot channel with HTTP webhook server +// Start initializes the WeCom Bot channel func (c *WeComBotChannel) Start(ctx context.Context) error { logger.InfoC("wecom", "Starting WeCom Bot channel...") c.ctx, c.cancel = context.WithCancel(ctx) - // Setup HTTP server for webhook - mux := http.NewServeMux() - webhookPath := c.config.WebhookPath - if webhookPath == "" { - webhookPath = "/webhook/wecom" - } - mux.HandleFunc(webhookPath, c.handleWebhook) - - // Health check endpoint - mux.HandleFunc("/health/wecom", c.handleHealth) - - addr := fmt.Sprintf("%s:%d", c.config.WebhookHost, c.config.WebhookPort) - c.server = &http.Server{ - Addr: addr, - Handler: mux, - } - c.SetRunning(true) - logger.InfoCF("wecom", "WeCom Bot channel started", map[string]any{ - "address": addr, - "path": webhookPath, - }) - - // Start server in goroutine - go func() { - if err := c.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logger.ErrorCF("wecom", "HTTP server error", map[string]any{ - "error": err.Error(), - }) - } - }() + logger.InfoC("wecom", "WeCom Bot channel started") return nil } @@ -150,12 +120,6 @@ func (c *WeComBotChannel) Stop(ctx context.Context) error { c.cancel() } - if c.server != nil { - shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - c.server.Shutdown(shutdownCtx) - } - c.SetRunning(false) logger.InfoC("wecom", "WeCom Bot channel stopped") return nil @@ -177,6 +141,29 @@ func (c *WeComBotChannel) Send(ctx context.Context, msg bus.OutboundMessage) err return c.sendWebhookReply(ctx, msg.ChatID, msg.Content) } +// WebhookPath returns the path for registering on the shared HTTP server. +func (c *WeComBotChannel) WebhookPath() string { + if c.config.WebhookPath != "" { + return c.config.WebhookPath + } + return "/webhook/wecom" +} + +// ServeHTTP implements http.Handler for the shared HTTP server. +func (c *WeComBotChannel) ServeHTTP(w http.ResponseWriter, r *http.Request) { + c.handleWebhook(w, r) +} + +// HealthPath returns the health check endpoint path. +func (c *WeComBotChannel) HealthPath() string { + return "/health/wecom" +} + +// HealthHandler handles health check requests. +func (c *WeComBotChannel) HealthHandler(w http.ResponseWriter, r *http.Request) { + c.handleHealth(w, r) +} + // handleWebhook handles incoming webhook requests from WeCom func (c *WeComBotChannel) handleWebhook(w http.ResponseWriter, r *http.Request) { ctx := r.Context() diff --git a/pkg/health/server.go b/pkg/health/server.go index 77b36034d..de1ff60fe 100644 --- a/pkg/health/server.go +++ b/pkg/health/server.go @@ -156,6 +156,13 @@ func (s *Server) readyHandler(w http.ResponseWriter, r *http.Request) { }) } +// RegisterOnMux registers /health and /ready handlers onto the given mux. +// This allows the health endpoints to be served by a shared HTTP server. +func (s *Server) RegisterOnMux(mux *http.ServeMux) { + mux.HandleFunc("/health", s.healthHandler) + mux.HandleFunc("/ready", s.readyHandler) +} + func statusString(ok bool) string { if ok { return "ok"