mirror of
https://github.com/sipeed/picoclaw.git
synced 2026-06-12 18:08:54 +00:00
feat(pico): add pico_client outbound WebSocket channel (#1198)
* feat(pico): add pico_client outbound WebSocket channel Add a client-mode counterpart to the existing pico server channel. pico_client connects to a remote Pico Protocol WebSocket server, enabling picoclaw to bridge messages with external Pico-compatible services. Includes config, factory registration, manager wiring, 8 unit tests, and a minimal echo-server example for interactive testing. * fix(pico): address PR #1198 review — goroutine leak, race, auth - Add per-connection context cancel to picoConn to prevent pingLoop goroutine leak on disconnect - Re-acquire mutex in StartTyping stop closure to avoid stale conn race - Remove query-param token auth from echo server (header-only) - Move ListenAndServe to main goroutine where log.Fatal is safe Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: replace ConsumeInbound with InboundChan select in client test MessageBus does not expose a ConsumeInbound method. Use a select on InboundChan() with context cancellation, matching the pattern used in the bus package tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,47 @@
|
||||
# pico-echo-server
|
||||
|
||||
Minimal Pico Protocol WebSocket server for testing the `pico_client` channel.
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
go run ./examples/pico-echo-server -addr :9090 -token secret
|
||||
```
|
||||
|
||||
### Flags
|
||||
|
||||
| Flag | Default | Description |
|
||||
|----------|---------|------------------------------------|
|
||||
| `-addr` | `:9090` | Listen address |
|
||||
| `-token` | (none) | Auth token; empty disables auth |
|
||||
|
||||
## How it works
|
||||
|
||||
- Listens for WebSocket connections at `/ws`
|
||||
- Authenticates via `Authorization: Bearer <token>` header or `?token=<token>` query param
|
||||
- Prints received `message.send` content to stdout
|
||||
- Responds to `ping` with `pong`
|
||||
- Lines typed into stdin are broadcast as `message.create` to all connected clients
|
||||
|
||||
## Testing with pico_client
|
||||
|
||||
1. Start the server:
|
||||
```bash
|
||||
go run ./examples/pico-echo-server -token mytoken
|
||||
```
|
||||
|
||||
2. Configure `pico_client` in your `config.json`:
|
||||
```json
|
||||
{
|
||||
"channels": {
|
||||
"pico_client": {
|
||||
"enabled": true,
|
||||
"url": "ws://localhost:9090/ws",
|
||||
"token": "mytoken",
|
||||
"session_id": "test-session"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
3. Start picoclaw — the client connects and you can exchange messages interactively via stdin/stdout.
|
||||
@@ -0,0 +1,160 @@
|
||||
// pico-echo-server is a minimal Pico Protocol WebSocket server for testing
|
||||
// the pico_client channel. It accepts connections, prints received messages
|
||||
// to stdout, and forwards stdin lines as message.create to all connected clients.
|
||||
//
|
||||
// Usage:
|
||||
//
|
||||
// go run ./examples/pico-echo-server -addr :9090 -token secret
|
||||
//
|
||||
// Then configure pico_client with url=ws://localhost:9090/ws&token=secret.
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
type picoMessage struct {
|
||||
Type string `json:"type"`
|
||||
ID string `json:"id,omitempty"`
|
||||
SessionID string `json:"session_id,omitempty"`
|
||||
Timestamp int64 `json:"timestamp,omitempty"`
|
||||
Payload map[string]any `json:"payload,omitempty"`
|
||||
}
|
||||
|
||||
var upgrader = websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }}
|
||||
|
||||
type server struct {
|
||||
token string
|
||||
mu sync.Mutex
|
||||
conns map[*websocket.Conn]string // conn → sessionID
|
||||
}
|
||||
|
||||
func (s *server) handleWS(w http.ResponseWriter, r *http.Request) {
|
||||
if s.token != "" {
|
||||
auth := r.Header.Get("Authorization")
|
||||
if auth != "Bearer "+s.token {
|
||||
http.Error(w, "unauthorized", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Printf("upgrade: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
sessionID := r.URL.Query().Get("session_id")
|
||||
if sessionID == "" {
|
||||
sessionID = fmt.Sprintf("sess-%d", time.Now().UnixMilli())
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.conns[conn] = sessionID
|
||||
s.mu.Unlock()
|
||||
|
||||
log.Printf("[+] client connected (session=%s)", sessionID)
|
||||
|
||||
defer func() {
|
||||
s.mu.Lock()
|
||||
delete(s.conns, conn)
|
||||
s.mu.Unlock()
|
||||
conn.Close()
|
||||
log.Printf("[-] client disconnected (session=%s)", sessionID)
|
||||
}()
|
||||
|
||||
for {
|
||||
_, raw, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
|
||||
log.Printf("read error: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var msg picoMessage
|
||||
if err := json.Unmarshal(raw, &msg); err != nil {
|
||||
log.Printf("bad json: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
switch msg.Type {
|
||||
case "ping":
|
||||
pong := picoMessage{Type: "pong", ID: msg.ID, Timestamp: time.Now().UnixMilli()}
|
||||
conn.WriteJSON(pong)
|
||||
|
||||
case "message.send":
|
||||
content, _ := msg.Payload["content"].(string)
|
||||
fmt.Printf("[%s] %s\n", sessionID, content)
|
||||
|
||||
case "typing.start":
|
||||
log.Printf("[%s] typing...", sessionID)
|
||||
|
||||
case "typing.stop":
|
||||
log.Printf("[%s] stopped typing", sessionID)
|
||||
|
||||
default:
|
||||
log.Printf("[%s] unknown type: %s", sessionID, msg.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) broadcast(content string) {
|
||||
msg := picoMessage{
|
||||
Type: "message.create",
|
||||
Timestamp: time.Now().UnixMilli(),
|
||||
Payload: map[string]any{"content": content},
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
for conn, sid := range s.conns {
|
||||
msg.SessionID = sid
|
||||
if err := conn.WriteJSON(msg); err != nil {
|
||||
log.Printf("write to %s failed: %v", sid, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
addr := flag.String("addr", ":9090", "listen address")
|
||||
token := flag.String("token", "", "auth token (empty = no auth)")
|
||||
flag.Parse()
|
||||
|
||||
s := &server{
|
||||
token: *token,
|
||||
conns: make(map[*websocket.Conn]string),
|
||||
}
|
||||
|
||||
http.HandleFunc("/ws", s.handleWS)
|
||||
|
||||
log.Printf("listening on %s", *addr)
|
||||
log.Printf("connect with: ws://localhost%s/ws", *addr)
|
||||
fmt.Println("Type messages to send to connected clients (Ctrl+C to quit):")
|
||||
|
||||
go func() {
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
s.broadcast(line)
|
||||
log.Printf("[server] sent: %s", line)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Fatal(http.ListenAndServe(*addr, nil))
|
||||
}
|
||||
Reference in New Issue
Block a user