From 6e8590900b99fb4b05ee34ad8adfdfa964580a6b Mon Sep 17 00:00:00 2001 From: afjcjsbx Date: Thu, 7 May 2026 19:24:02 +0200 Subject: [PATCH] fix(mcp): support streamable HTTP alias and request-response mode --- .github/workflows/build.yml | 11 + .github/workflows/pr.yml | 10 + CONTRIBUTING.md | 3 + Makefile | 6 +- cmd/picoclaw/internal/mcp/add.go | 2 +- cmd/picoclaw/internal/mcp/command_test.go | 19 + cmd/picoclaw/internal/mcp/helpers.go | 14 +- docs/reference/mcp-cli.md | 6 +- integration/README.md | 79 +++++ integration/docker-compose.runner.yml | 19 + .../fixtures/mcp-streamable-server/Dockerfile | 23 ++ .../fixtures/mcp-streamable-server/main.go | 71 ++++ .../suites/mcp-streamable/docker-compose.yml | 19 + integration/suites/mcp-streamable/suite.env | 1 + pkg/config/config.go | 6 +- pkg/config/mcp_transport.go | 32 ++ pkg/mcp/events.go | 11 +- pkg/mcp/manager.go | 21 +- pkg/mcp/manager_integration_test.go | 326 ++++++++++++++++++ .../manager_real_server_integration_test.go | 148 ++++++++ pkg/mcp/manager_test.go | 127 +++++++ scripts/run-integration-tests.sh | 117 +++++++ 22 files changed, 1032 insertions(+), 39 deletions(-) create mode 100644 integration/README.md create mode 100644 integration/docker-compose.runner.yml create mode 100644 integration/fixtures/mcp-streamable-server/Dockerfile create mode 100644 integration/fixtures/mcp-streamable-server/main.go create mode 100644 integration/suites/mcp-streamable/docker-compose.yml create mode 100644 integration/suites/mcp-streamable/suite.env create mode 100644 pkg/config/mcp_transport.go create mode 100644 pkg/mcp/manager_integration_test.go create mode 100644 pkg/mcp/manager_real_server_integration_test.go create mode 100644 scripts/run-integration-tests.sh diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index def19c3e5..1cd6ed5ea 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,7 +5,18 @@ on: branches: [ "main" ] jobs: + integration: + name: Integration Tests + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Run Docker-backed integration suites + run: bash ./scripts/run-integration-tests.sh + build: + needs: integration runs-on: ubuntu-latest steps: - name: Checkout diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 795fa5eba..f8706efee 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -64,3 +64,13 @@ jobs: - name: Run go test run: go test -tags goolm,stdjson ./... + + integration: + name: Integration Tests + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Run Docker-backed integration suites + run: bash ./scripts/run-integration-tests.sh diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a78c41c36..5780bfd93 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -73,10 +73,13 @@ make check # Full pre-commit check: deps + fmt + vet + test + docs consist ```bash make test # Run all tests +make integration-test # Run Docker-backed integration suites go test -run TestName -v ./pkg/session/ # Run a single test go test -bench=. -benchmem -run='^$' ./... # Run benchmarks ``` +Docker-backed integration suites are auto-discovered from [`integration/suites/`](integration/suites/). See [`integration/README.md`](integration/README.md) for the suite layout and the conventions used by CI. + ### Code Style ```bash diff --git a/Makefile b/Makefile index 3fa41bc24..81ef32f6f 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: all build install uninstall clean help test build-all lint-docs +.PHONY: all build install uninstall clean help test integration-test build-all lint-docs # Build variables BINARY_NAME=picoclaw @@ -379,6 +379,10 @@ test: generate @$(GO) test $(GOFLAGS) $$($(GO) list $(GOFLAGS) ./... | grep -v github.com/sipeed/picoclaw/web/) @cd web && make test +## integration-test: Run Docker-backed integration test suites +integration-test: + @bash ./scripts/run-integration-tests.sh + ## fmt: Format Go code fmt: @$(GOLANGCI_LINT) fmt diff --git a/cmd/picoclaw/internal/mcp/add.go b/cmd/picoclaw/internal/mcp/add.go index 8ad68571f..bf2edf89c 100644 --- a/cmd/picoclaw/internal/mcp/add.go +++ b/cmd/picoclaw/internal/mcp/add.go @@ -173,7 +173,7 @@ func parseAddArgs(args []string) (addOptions, string, string, []string, bool, er } func buildServerConfig(target string, args []string, opts addOptions) (config.MCPServerConfig, error) { - transport := strings.ToLower(strings.TrimSpace(opts.Transport)) + transport := config.NormalizeMCPTransportType(opts.Transport) if transport == "" { transport = "stdio" } diff --git a/cmd/picoclaw/internal/mcp/command_test.go b/cmd/picoclaw/internal/mcp/command_test.go index be1c9763e..cae4a2c32 100644 --- a/cmd/picoclaw/internal/mcp/command_test.go +++ b/cmd/picoclaw/internal/mcp/command_test.go @@ -296,6 +296,25 @@ func TestMCPAddHTTPServer(t *testing.T) { assert.Empty(t, server.Command) } +func TestMCPAddSupportsStreamableHTTPAlias(t *testing.T) { + configPath := setupMCPConfigEnv(t) + + cmd := NewMCPCommand() + _, err := executeCommand(cmd, []string{ + "add", + "context7", + "--transport", + "streamable-http", + "https://mcp.context7.com/mcp", + }, "") + require.NoError(t, err) + + cfg := readMCPConfig(t, configPath) + server := cfg.Tools.MCP.Servers["context7"] + assert.Equal(t, "http", server.Type) + assert.Equal(t, "https://mcp.context7.com/mcp", server.URL) +} + func TestMCPRemoveRemovesLastServerAndDisablesMCP(t *testing.T) { configPath := setupMCPConfigEnv(t) writeMCPConfig(t, configPath, &config.Config{ diff --git a/cmd/picoclaw/internal/mcp/helpers.go b/cmd/picoclaw/internal/mcp/helpers.go index 0fb0b245c..39124c787 100644 --- a/cmd/picoclaw/internal/mcp/helpers.go +++ b/cmd/picoclaw/internal/mcp/helpers.go @@ -156,17 +156,11 @@ func loadMCPConfigSchema() (*jsonschema.Resolved, error) { } func inferTransportType(server config.MCPServerConfig) string { - switch server.Type { - case "stdio", "http", "sse": - return server.Type + transport := config.EffectiveMCPTransportType(server) + if transport == "" { + return "unknown" } - if server.URL != "" { - return "sse" - } - if server.Command != "" { - return "stdio" - } - return "unknown" + return transport } func renderServerTarget(server config.MCPServerConfig) string { diff --git a/docs/reference/mcp-cli.md b/docs/reference/mcp-cli.md index 18b2b4c1c..d530815ab 100644 --- a/docs/reference/mcp-cli.md +++ b/docs/reference/mcp-cli.md @@ -117,7 +117,7 @@ Supported flags: | `--env`, `-e` | Add a stdio environment variable in `KEY=value` format. Repeatable. Values are saved to config. | | `--env-file` | Attach an env file path to a stdio server. Recommended for secrets you do not want stored inline in `config.json`. | | `--header`, `-H` | Add an HTTP header in `Name: Value` or `Name=Value` format. Repeatable. | -| `--transport`, `-t` | Transport type: `stdio` (default), `http`, or `sse`. | +| `--transport`, `-t` | Transport type: `stdio` (default), `http` / `streamable-http`, or `sse`. | | `--force`, `-f` | Overwrite an existing server entry without confirmation. | | `--deferred` | Mark the server as deferred: tools are hidden and discoverable on demand. | | `--no-deferred` | Mark the server as non-deferred: tools are always loaded into context. | @@ -198,13 +198,15 @@ For `stdio`: - `--header` is rejected - `-- [args...]` is supported and recommended for unambiguous parsing -For `http` / `sse`: +For `http` / `streamable-http` / `sse`: - `` must be a valid URL - extra command args are rejected - `--env` is rejected - `--env-file` is rejected - `--header` is supported and stored in `headers` +- `http` and `streamable-http` use streamable HTTP request-response mode +- `sse` uses the same streamable HTTP transport, but also enables the optional standalone SSE listener for server-initiated notifications Overwrite behavior: diff --git a/integration/README.md b/integration/README.md new file mode 100644 index 000000000..1cb2a617a --- /dev/null +++ b/integration/README.md @@ -0,0 +1,79 @@ +# Integration Test Suites + +This directory contains Docker-backed integration test suites that are auto-discovered by CI. + +## How It Works + +- The shared runner is defined in [`integration/docker-compose.runner.yml`](docker-compose.runner.yml). +- Each suite lives in `integration/suites//`. +- CI and local runs use [`scripts/run-integration-tests.sh`](../scripts/run-integration-tests.sh). +- The runner discovers every suite automatically, so adding a new suite does not require editing the GitHub Actions workflow. + +## Suite Layout + +Each suite directory must contain: + +- `suite.env` +- at least one `docker-compose.yml` or `docker-compose.*.yml` + +Example: + +```text +integration/suites/my-suite/ +├── docker-compose.yml +└── suite.env +``` + +## Required Manifest Fields + +`suite.env` is sourced by the runner script and must define: + +- `TEST_COMMAND`: shell command executed inside the integration runner container + +Optional fields: + +- `RUNNER_SERVICE`: override the default runner service name (`integration-runner`) + +Example: + +```bash +TEST_COMMAND='go test ./pkg/mcp -run TestIntegration_RealConfiguredServer -v' +``` + +## Docker Conventions + +Suite compose files can: + +- define dependency services needed by the tests +- extend or override the shared `integration-runner` service +- inject environment variables into the runner for the tests to consume + +The provided `mcp-streamable` suite is a good reference: + +- it starts a local streamable HTTP MCP server +- wires the runner to that service through Docker networking +- runs the Go integration test against the containerized server + +## Running Locally + +Run all suites: + +```bash +bash ./scripts/run-integration-tests.sh +``` + +Run one suite: + +```bash +bash ./scripts/run-integration-tests.sh mcp-streamable +``` + +## Adding a New Suite + +1. Create `integration/suites//docker-compose.yml`. +2. Create `integration/suites//suite.env`. +3. Put any fixture service code under `integration/fixtures/` if it is reusable. +4. Make sure the suite is self-contained and deterministic. +5. Validate it locally with `bash ./scripts/run-integration-tests.sh `. + +Once committed, the suite will be picked up automatically by the CI integration job. diff --git a/integration/docker-compose.runner.yml b/integration/docker-compose.runner.yml new file mode 100644 index 000000000..1de2f7922 --- /dev/null +++ b/integration/docker-compose.runner.yml @@ -0,0 +1,19 @@ +services: + integration-runner: + image: golang:1.25-bookworm + working_dir: /workspace + entrypoint: ["bash", "-lc"] + volumes: + - .:/workspace + - picoclaw-integration-gocache:/go-build-cache + - picoclaw-integration-gomodcache:/go-mod-cache + environment: + GOCACHE: /go-build-cache + GOMODCACHE: /go-mod-cache + GOTOOLCHAIN: local + CGO_ENABLED: "0" + GOFLAGS: -tags=goolm,stdjson,integration + +volumes: + picoclaw-integration-gocache: + picoclaw-integration-gomodcache: diff --git a/integration/fixtures/mcp-streamable-server/Dockerfile b/integration/fixtures/mcp-streamable-server/Dockerfile new file mode 100644 index 000000000..8fa920178 --- /dev/null +++ b/integration/fixtures/mcp-streamable-server/Dockerfile @@ -0,0 +1,23 @@ +FROM golang:1.25-bookworm AS build + +WORKDIR /src + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +RUN CGO_ENABLED=0 go build -o /out/mcp-streamable-server ./integration/fixtures/mcp-streamable-server + +FROM alpine:3.22 + +RUN adduser -D -u 10001 appuser + +COPY --from=build /out/mcp-streamable-server /usr/local/bin/mcp-streamable-server + +USER appuser +EXPOSE 8080 + +HEALTHCHECK --interval=5s --timeout=3s --retries=12 CMD wget -qO- http://127.0.0.1:8080/healthz || exit 1 + +ENTRYPOINT ["/usr/local/bin/mcp-streamable-server"] diff --git a/integration/fixtures/mcp-streamable-server/main.go b/integration/fixtures/mcp-streamable-server/main.go new file mode 100644 index 000000000..213837072 --- /dev/null +++ b/integration/fixtures/mcp-streamable-server/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "context" + "log" + "net/http" + "os" + "strings" + "time" + + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +func main() { + server := mcp.NewServer(&mcp.Implementation{ + Name: "picoclaw-integration-streamable-server", + Version: "1.0.0", + }, nil) + + mcp.AddTool(server, &mcp.Tool{ + Name: "echo", + Description: "Echo back the provided message", + }, func(ctx context.Context, req *mcp.CallToolRequest, args map[string]any) (*mcp.CallToolResult, any, error) { + message, _ := args["message"].(string) + return &mcp.CallToolResult{ + Content: []mcp.Content{ + &mcp.TextContent{Text: message}, + }, + }, nil, nil + }) + + streamable := mcp.NewStreamableHTTPHandler(func(*http.Request) *mcp.Server { + return server + }, &mcp.StreamableHTTPOptions{ + JSONResponse: envBool("STREAMABLE_JSON_RESPONSE", true), + }) + + mux := http.NewServeMux() + mux.Handle("/mcp", streamable) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + srv := &http.Server{ + Addr: ":8080", + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + + log.Printf("streamable MCP integration server listening on %s", srv.Addr) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatal(err) + } +} + +func envBool(name string, fallback bool) bool { + value := strings.TrimSpace(strings.ToLower(os.Getenv(name))) + if value == "" { + return fallback + } + + switch value { + case "1", "true", "yes", "on": + return true + case "0", "false", "no", "off": + return false + default: + return fallback + } +} diff --git a/integration/suites/mcp-streamable/docker-compose.yml b/integration/suites/mcp-streamable/docker-compose.yml new file mode 100644 index 000000000..195737eb2 --- /dev/null +++ b/integration/suites/mcp-streamable/docker-compose.yml @@ -0,0 +1,19 @@ +services: + integration-runner: + depends_on: + mcp-streamable-server: + condition: service_healthy + environment: + PICOCLAW_MCP_REAL_SERVER_JSON: >- + {"enabled":true,"type":"http","url":"http://mcp-streamable-server:8080/mcp"} + PICOCLAW_MCP_REAL_TOOL_NAME: echo + PICOCLAW_MCP_REAL_TOOL_ARGS_JSON: >- + {"message":"hello from docker integration suite"} + PICOCLAW_MCP_REAL_EXPECT_SUBSTRING: hello from docker integration suite + + mcp-streamable-server: + build: + context: . + dockerfile: integration/fixtures/mcp-streamable-server/Dockerfile + environment: + STREAMABLE_JSON_RESPONSE: "true" diff --git a/integration/suites/mcp-streamable/suite.env b/integration/suites/mcp-streamable/suite.env new file mode 100644 index 000000000..c86b1b84d --- /dev/null +++ b/integration/suites/mcp-streamable/suite.env @@ -0,0 +1 @@ +TEST_COMMAND='go test ./pkg/mcp -run TestIntegration_RealConfiguredServer -v' diff --git a/pkg/config/config.go b/pkg/config/config.go index acceee4d5..6bcac7c30 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -958,7 +958,11 @@ type MCPServerConfig struct { Env map[string]string `json:"env,omitempty"` // EnvFile is the path to a file containing environment variables (stdio only) EnvFile string `json:"env_file,omitempty"` - // Type is "stdio", "sse", or "http" (default: stdio if command is set, sse if url is set) + // Type is "stdio", "sse", "http", or "streamable-http". + // "http" and "streamable-http" both select streamable HTTP request-response + // mode, while "sse" keeps the standalone SSE listener enabled for + // server-initiated notifications. Defaults: stdio if command is set, sse if + // url is set. Type string `json:"type,omitempty"` // URL is used for SSE/HTTP transport URL string `json:"url,omitempty"` diff --git a/pkg/config/mcp_transport.go b/pkg/config/mcp_transport.go new file mode 100644 index 000000000..475f4dfdb --- /dev/null +++ b/pkg/config/mcp_transport.go @@ -0,0 +1,32 @@ +package config + +import "strings" + +// NormalizeMCPTransportType canonicalizes MCP transport names used in config. +// "http" is PicoClaw's streamable HTTP request-response mode, and +// "streamable-http" is accepted as an explicit alias for the same transport. +func NormalizeMCPTransportType(transport string) string { + normalized := strings.ToLower(strings.TrimSpace(transport)) + + switch normalized { + case "streamable-http", "streamable_http", "streamablehttp": + return "http" + default: + return normalized + } +} + +// EffectiveMCPTransportType returns the normalized configured transport, or the +// inferred default when the config leaves Type empty. +func EffectiveMCPTransportType(server MCPServerConfig) string { + if transport := NormalizeMCPTransportType(server.Type); transport != "" { + return transport + } + if server.URL != "" { + return "sse" + } + if server.Command != "" { + return "stdio" + } + return "" +} diff --git a/pkg/mcp/events.go b/pkg/mcp/events.go index 3b7f53f96..30bb76971 100644 --- a/pkg/mcp/events.go +++ b/pkg/mcp/events.go @@ -79,14 +79,5 @@ func setMCPAttrString(attrs map[string]any, key, value string) { } func mcpTransportType(cfg config.MCPServerConfig) string { - if cfg.Type != "" { - return cfg.Type - } - if cfg.URL != "" { - return "sse" - } - if cfg.Command != "" { - return "stdio" - } - return "" + return config.EffectiveMCPTransportType(cfg) } diff --git a/pkg/mcp/manager.go b/pkg/mcp/manager.go index 958927767..a5ab10a32 100644 --- a/pkg/mcp/manager.go +++ b/pkg/mcp/manager.go @@ -342,17 +342,9 @@ func connectServer( // Create transport based on configuration // Auto-detect transport type if not explicitly specified var transport mcp.Transport - transportType := cfg.Type - - // Auto-detect: if URL is provided, use SSE; if command is provided, use stdio + transportType := config.EffectiveMCPTransportType(cfg) if transportType == "" { - if cfg.URL != "" { - transportType = "sse" - } else if cfg.Command != "" { - transportType = "stdio" - } else { - return nil, fmt.Errorf("either URL or command must be provided") - } + return nil, fmt.Errorf("either URL or command must be provided") } switch transportType { @@ -362,12 +354,13 @@ func connectServer( } // Configure DisableStandaloneSSE based on transport type. - // - "http": Request-response only mode. Disable the standalone SSE stream - // to avoid compatibility issues with servers that don't support GET /mcp. + // - "http": Streamable HTTP request-response mode. Disable the standalone + // SSE stream to avoid compatibility issues with servers that don't + // support the optional GET listener. // - "sse": Bidirectional mode. Enable the standalone SSE stream to receive // server-initiated notifications (e.g., ToolListChangedNotification). // - Empty or auto-detected: Defaults to "sse" behavior (standalone SSE enabled). - disableStandaloneSSE := (cfg.Type == "http") + disableStandaloneSSE := transportType == "http" logger.DebugCF("mcp", "Using SSE/HTTP transport", map[string]any{ @@ -452,7 +445,7 @@ func connectServer( transport = &isolatedCommandTransport{Command: cmd} default: return nil, fmt.Errorf( - "unsupported transport type: %s (supported: stdio, sse, http)", + "unsupported transport type: %s (supported: stdio, sse, http, streamable-http)", transportType, ) } diff --git a/pkg/mcp/manager_integration_test.go b/pkg/mcp/manager_integration_test.go new file mode 100644 index 000000000..2fa9a31f1 --- /dev/null +++ b/pkg/mcp/manager_integration_test.go @@ -0,0 +1,326 @@ +//go:build integration + +package mcp + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + sdkmcp "github.com/modelcontextprotocol/go-sdk/mcp" + + "github.com/sipeed/picoclaw/pkg/config" +) + +// Run with: go test -tags=integration ./pkg/mcp +func TestIntegration_StreamableHTTPCompatibility(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + tests := []struct { + name string + transportType string + jsonResponse bool + rejectStandaloneGET bool + wantResponseContentType string + }{ + { + name: "http/json-only-without-get-listener", + transportType: "http", + jsonResponse: true, + rejectStandaloneGET: true, + wantResponseContentType: "application/json", + }, + { + name: "http/streaming-post-responses", + transportType: "http", + jsonResponse: false, + rejectStandaloneGET: false, + wantResponseContentType: "text/event-stream", + }, + { + name: "streamable-http-alias/json-only-without-get-listener", + transportType: "streamable-http", + jsonResponse: true, + rejectStandaloneGET: true, + wantResponseContentType: "application/json", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + server, recorder := newRecordedGoSDKStreamableServer(t, tt.jsonResponse, tt.rejectStandaloneGET) + defer server.Close() + + mgr := NewManager() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := mgr.ConnectServer(ctx, "compat", config.MCPServerConfig{ + Enabled: true, + Type: tt.transportType, + URL: server.URL, + Headers: map[string]string{ + "Authorization": "Bearer integration-token", + }, + }) + if err != nil { + t.Fatalf("ConnectServer() error = %v", err) + } + + tools := mgr.GetAllTools() + if got := len(tools["compat"]); got != 1 { + t.Fatalf("len(GetAllTools()[\"compat\"]) = %d, want 1", got) + } + + result, err := mgr.CallTool(ctx, "compat", "echo", map[string]any{ + "message": "hello from integration", + }) + if err != nil { + t.Fatalf("CallTool() error = %v", err) + } + if got, want := extractTextResult(t, result), "hello from integration"; got != want { + t.Fatalf("CallTool() text = %q, want %q", got, want) + } + + if err := mgr.Close(); err != nil { + t.Fatalf("Manager.Close() error = %v", err) + } + + assertRecordedCompatibility(t, recorder.snapshot(), tt.wantResponseContentType) + }) + } +} + +type recordedRequest struct { + Method string + Path string + JSONRPCMethod string + RequestSessionID string + Authorization string + ResponseStatusCode int + ResponseContentType string +} + +type requestRecorder struct { + mu sync.Mutex + requests []recordedRequest +} + +func (r *requestRecorder) add(req recordedRequest) { + r.mu.Lock() + defer r.mu.Unlock() + r.requests = append(r.requests, req) +} + +func (r *requestRecorder) snapshot() []recordedRequest { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]recordedRequest, len(r.requests)) + copy(out, r.requests) + return out +} + +func newRecordedGoSDKStreamableServer( + t *testing.T, + jsonResponse bool, + rejectStandaloneGET bool, +) (*httptest.Server, *requestRecorder) { + t.Helper() + + server := sdkmcp.NewServer(&sdkmcp.Implementation{ + Name: "streamable-integration-server", + Version: "1.0.0", + }, nil) + sdkmcp.AddTool(server, &sdkmcp.Tool{ + Name: "echo", + Description: "Echo a message", + }, func(ctx context.Context, req *sdkmcp.CallToolRequest, args map[string]any) (*sdkmcp.CallToolResult, any, error) { + message, _ := args["message"].(string) + return &sdkmcp.CallToolResult{ + Content: []sdkmcp.Content{ + &sdkmcp.TextContent{Text: message}, + }, + }, nil, nil + }) + + recorder := &requestRecorder{} + handler := sdkmcp.NewStreamableHTTPHandler(func(*http.Request) *sdkmcp.Server { + return server + }, &sdkmcp.StreamableHTTPOptions{ + JSONResponse: jsonResponse, + }) + + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if rejectStandaloneGET && r.Method == http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + recorder.add(recordedRequest{ + Method: r.Method, + Path: r.URL.Path, + RequestSessionID: r.Header.Get("Mcp-Session-Id"), + Authorization: r.Header.Get("Authorization"), + ResponseStatusCode: http.StatusMethodNotAllowed, + ResponseContentType: normalizeContentType(w.Header().Get("Content-Type")), + }) + return + } + + recorded := recordedRequest{ + Method: r.Method, + Path: r.URL.Path, + RequestSessionID: r.Header.Get("Mcp-Session-Id"), + Authorization: r.Header.Get("Authorization"), + } + if r.Method == http.MethodPost { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("reading request body: %v", err) + } + r.Body = io.NopCloser(bytes.NewReader(body)) + + var envelope struct { + Method string `json:"method"` + } + if err := json.Unmarshal(body, &envelope); err == nil { + recorded.JSONRPCMethod = envelope.Method + } + } + + rw := &recordingResponseWriter{ResponseWriter: w} + handler.ServeHTTP(rw, r) + + recorded.ResponseStatusCode = rw.statusCode() + recorded.ResponseContentType = normalizeContentType(rw.Header().Get("Content-Type")) + recorder.add(recorded) + })) + + return httpServer, recorder +} + +type recordingResponseWriter struct { + http.ResponseWriter + status int +} + +func (w *recordingResponseWriter) WriteHeader(status int) { + if w.status == 0 { + w.status = status + } + w.ResponseWriter.WriteHeader(status) +} + +func (w *recordingResponseWriter) Write(p []byte) (int, error) { + if w.status == 0 { + w.status = http.StatusOK + } + return w.ResponseWriter.Write(p) +} + +func (w *recordingResponseWriter) Flush() { + if flusher, ok := w.ResponseWriter.(http.Flusher); ok { + flusher.Flush() + } +} + +func (w *recordingResponseWriter) statusCode() int { + if w.status != 0 { + return w.status + } + return http.StatusOK +} + +func extractTextResult(t *testing.T, result *sdkmcp.CallToolResult) string { + t.Helper() + if result == nil || len(result.Content) != 1 { + t.Fatalf("unexpected CallToolResult: %#v", result) + } + text, ok := result.Content[0].(*sdkmcp.TextContent) + if !ok { + t.Fatalf("CallToolResult content type = %T, want *sdkmcp.TextContent", result.Content[0]) + } + return text.Text +} + +func assertRecordedCompatibility( + t *testing.T, + requests []recordedRequest, + wantResponseContentType string, +) { + t.Helper() + + var ( + getCount int + deleteCount int + postWithSession int + deleteWithSession int + requestsMissingAuth []string + observedContentTypesByMethod = map[string]string{} + ) + + for _, req := range requests { + switch req.Method { + case http.MethodGet: + getCount++ + case http.MethodPost: + if req.RequestSessionID != "" { + postWithSession++ + } + if req.JSONRPCMethod != "" && observedContentTypesByMethod[req.JSONRPCMethod] == "" { + observedContentTypesByMethod[req.JSONRPCMethod] = req.ResponseContentType + } + case http.MethodDelete: + deleteCount++ + if req.RequestSessionID != "" { + deleteWithSession++ + } + } + + if req.Authorization != "Bearer integration-token" { + requestsMissingAuth = append(requestsMissingAuth, req.Method+" "+req.Path) + } + } + + if getCount != 0 { + t.Fatalf("expected no standalone GET requests for streamable HTTP mode, saw %d", getCount) + } + if deleteCount != 1 { + t.Fatalf("DELETE count = %d, want 1", deleteCount) + } + if postWithSession == 0 { + t.Fatal("expected at least one POST request with Mcp-Session-Id") + } + if deleteWithSession != 1 { + t.Fatalf("expected exactly one DELETE with Mcp-Session-Id, got %d", deleteWithSession) + } + if len(requestsMissingAuth) > 0 { + t.Fatalf("Authorization header missing on requests: %v", requestsMissingAuth) + } + + for _, method := range []string{"initialize", "tools/list", "tools/call"} { + if observedContentTypesByMethod[method] == "" { + t.Fatalf("did not observe POST response for JSON-RPC method %q", method) + } + if observedContentTypesByMethod[method] != wantResponseContentType { + t.Fatalf( + "response content-type for %s = %q, want %q", + method, + observedContentTypesByMethod[method], + wantResponseContentType, + ) + } + } +} + +func normalizeContentType(value string) string { + return strings.TrimSpace(strings.SplitN(value, ";", 2)[0]) +} diff --git a/pkg/mcp/manager_real_server_integration_test.go b/pkg/mcp/manager_real_server_integration_test.go new file mode 100644 index 000000000..742a4fb2b --- /dev/null +++ b/pkg/mcp/manager_real_server_integration_test.go @@ -0,0 +1,148 @@ +//go:build integration + +package mcp + +import ( + "context" + "encoding/json" + "os" + "strconv" + "strings" + "testing" + "time" + + sdkmcp "github.com/modelcontextprotocol/go-sdk/mcp" + + "github.com/sipeed/picoclaw/pkg/config" +) + +// TestIntegration_RealConfiguredServer is an opt-in smoke test for a real MCP +// server configured via environment variables. +// +// Run with: +// +// go test -tags=integration ./pkg/mcp -run TestIntegration_RealConfiguredServer -v +// +// Minimum configuration: +// +// PICOCLAW_MCP_REAL_SERVER_JSON='{"enabled":true,"type":"http","url":"http://127.0.0.1:8080/mcp"}' +// +// Optional tool invocation: +// +// PICOCLAW_MCP_REAL_TOOL_NAME=echo +// PICOCLAW_MCP_REAL_TOOL_ARGS_JSON='{"message":"hello"}' +// PICOCLAW_MCP_REAL_EXPECT_SUBSTRING=hello +// +// Stdio subprocess example: +// +// PICOCLAW_MCP_REAL_SERVER_JSON='{"enabled":true,"type":"stdio","command":"npx","args":["-y","@modelcontextprotocol/server-filesystem","."]}' +func TestIntegration_RealConfiguredServer(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + serverJSON := strings.TrimSpace(os.Getenv("PICOCLAW_MCP_REAL_SERVER_JSON")) + if serverJSON == "" { + t.Skip("skipping integration test (set PICOCLAW_MCP_REAL_SERVER_JSON to enable)") + } + + serverCfg, err := loadRealServerConfig(serverJSON) + if err != nil { + t.Fatalf("loadRealServerConfig() error = %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + mgr := NewManager() + if err := mgr.ConnectServer(ctx, "real", serverCfg); err != nil { + t.Fatalf("ConnectServer() error = %v", err) + } + defer func() { + if err := mgr.Close(); err != nil { + t.Errorf("Manager.Close() error = %v", err) + } + }() + + tools := mgr.GetAllTools()["real"] + if len(tools) == 0 { + t.Fatal("expected at least one discovered tool from real MCP server") + } + + t.Logf("connected to real MCP server via %s with %d tool(s)", config.EffectiveMCPTransportType(serverCfg), len(tools)) + for _, tool := range tools { + if tool != nil { + t.Logf("discovered tool: %s", tool.Name) + } + } + + if expectedCountRaw := strings.TrimSpace(os.Getenv("PICOCLAW_MCP_REAL_EXPECT_TOOL_COUNT")); expectedCountRaw != "" { + expectedCount, err := strconv.Atoi(expectedCountRaw) + if err != nil { + t.Fatalf("invalid PICOCLAW_MCP_REAL_EXPECT_TOOL_COUNT %q: %v", expectedCountRaw, err) + } + if len(tools) != expectedCount { + t.Fatalf("tool count = %d, want %d", len(tools), expectedCount) + } + } + + toolName := strings.TrimSpace(os.Getenv("PICOCLAW_MCP_REAL_TOOL_NAME")) + if toolName == "" { + return + } + + toolArgs, err := loadRealToolArgs(os.Getenv("PICOCLAW_MCP_REAL_TOOL_ARGS_JSON")) + if err != nil { + t.Fatalf("loadRealToolArgs() error = %v", err) + } + + result, err := mgr.CallTool(ctx, "real", toolName, toolArgs) + if err != nil { + t.Fatalf("CallTool(%q) error = %v", toolName, err) + } + + textPayload := joinTextContents(result) + t.Logf("tool %q returned text payload: %q", toolName, textPayload) + + if want := os.Getenv("PICOCLAW_MCP_REAL_EXPECT_SUBSTRING"); want != "" && !strings.Contains(textPayload, want) { + t.Fatalf("tool result %q does not contain expected substring %q", textPayload, want) + } +} + +func loadRealServerConfig(raw string) (config.MCPServerConfig, error) { + var cfg config.MCPServerConfig + if err := json.Unmarshal([]byte(raw), &cfg); err != nil { + return config.MCPServerConfig{}, err + } + if !cfg.Enabled { + cfg.Enabled = true + } + return cfg, nil +} + +func loadRealToolArgs(raw string) (map[string]any, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return map[string]any{}, nil + } + + var args map[string]any + if err := json.Unmarshal([]byte(raw), &args); err != nil { + return nil, err + } + return args, nil +} + +func joinTextContents(result *sdkmcp.CallToolResult) string { + if result == nil || len(result.Content) == 0 { + return "" + } + + parts := make([]string, 0, len(result.Content)) + for _, content := range result.Content { + if text, ok := content.(*sdkmcp.TextContent); ok && text != nil { + parts = append(parts, text.Text) + } + } + return strings.Join(parts, "\n") +} diff --git a/pkg/mcp/manager_test.go b/pkg/mcp/manager_test.go index 5789a37a9..70932f371 100644 --- a/pkg/mcp/manager_test.go +++ b/pkg/mcp/manager_test.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "io" + "net/http" + "net/http/httptest" "os" "path/filepath" "strings" @@ -408,6 +410,131 @@ func TestCallTool_ErrorsForClosedOrMissingServer(t *testing.T) { }) } +func TestConnectServer_StreamableHTTPRequestResponseMode(t *testing.T) { + t.Parallel() + + for _, transportType := range []string{"http", "streamable-http"} { + t.Run(transportType, func(t *testing.T) { + server := sdkmcp.NewServer(&sdkmcp.Implementation{ + Name: "streamable-test-server", + Version: "1.0.0", + }, nil) + sdkmcp.AddTool(server, &sdkmcp.Tool{ + Name: "echo", + Description: "Echo test tool", + }, func(ctx context.Context, req *sdkmcp.CallToolRequest, args map[string]any) (*sdkmcp.CallToolResult, any, error) { + return &sdkmcp.CallToolResult{ + Content: []sdkmcp.Content{ + &sdkmcp.TextContent{Text: "ok"}, + }, + }, nil, nil + }) + + type observedRequest struct { + Method string + SessionID string + Authorization string + } + + var ( + mu sync.Mutex + observed []observedRequest + ) + + handler := sdkmcp.NewStreamableHTTPHandler(func(*http.Request) *sdkmcp.Server { + return server + }, nil) + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + observed = append(observed, observedRequest{ + Method: r.Method, + SessionID: r.Header.Get("Mcp-Session-Id"), + Authorization: r.Header.Get("Authorization"), + }) + mu.Unlock() + handler.ServeHTTP(w, r) + })) + defer httpServer.Close() + + conn, err := connectServer(context.Background(), "streamable", config.MCPServerConfig{ + Enabled: true, + Type: transportType, + URL: httpServer.URL, + Headers: map[string]string{ + "Authorization": "Bearer test-token", + }, + }) + if err != nil { + t.Fatalf("connectServer(%q) error = %v", transportType, err) + } + if got := len(conn.Tools); got != 1 { + t.Fatalf("len(conn.Tools) = %d, want 1", got) + } + if got := conn.Session.ID(); got == "" { + t.Fatal("expected non-empty streamable session ID") + } + if err := conn.Session.Close(); err != nil { + t.Fatalf("Session.Close() error = %v", err) + } + + mu.Lock() + defer mu.Unlock() + + var ( + getCount int + postCount int + deleteCount int + postWithSession bool + deleteWithSession bool + requestsWithAuth int + requestsWithoutAuth []string + ) + + for _, req := range observed { + switch req.Method { + case http.MethodGet: + getCount++ + case http.MethodPost: + postCount++ + if req.SessionID != "" { + postWithSession = true + } + case http.MethodDelete: + deleteCount++ + if req.SessionID != "" { + deleteWithSession = true + } + } + + if req.Authorization == "Bearer test-token" { + requestsWithAuth++ + } else { + requestsWithoutAuth = append(requestsWithoutAuth, req.Method) + } + } + + if getCount != 0 { + t.Fatalf("expected no standalone GET requests for %q transport, saw %d", transportType, getCount) + } + if postCount == 0 { + t.Fatal("expected POST requests during streamable HTTP handshake") + } + if deleteCount != 1 { + t.Fatalf("DELETE count = %d, want 1", deleteCount) + } + if !postWithSession { + t.Fatal("expected at least one POST request with Mcp-Session-Id") + } + if !deleteWithSession { + t.Fatal("expected DELETE request with Mcp-Session-Id") + } + if requestsWithAuth != len(observed) { + t.Fatalf("Authorization header missing on requests: %v", requestsWithoutAuth) + } + }) + } +} + func TestCallTool_ReconnectsWhenHTTPServerLosesSession(t *testing.T) { originalConnectServerFunc := connectServerFunc t.Cleanup(func() { diff --git a/scripts/run-integration-tests.sh b/scripts/run-integration-tests.sh new file mode 100644 index 000000000..bdb3b787d --- /dev/null +++ b/scripts/run-integration-tests.sh @@ -0,0 +1,117 @@ +#!/usr/bin/env bash + +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +BASE_COMPOSE="$ROOT_DIR/integration/docker-compose.runner.yml" +SUITES_DIR="$ROOT_DIR/integration/suites" + +if [[ ! -f "$BASE_COMPOSE" ]]; then + echo "missing base compose file: $BASE_COMPOSE" >&2 + exit 1 +fi + +if [[ ! -d "$SUITES_DIR" ]]; then + echo "missing integration suites directory: $SUITES_DIR" >&2 + exit 1 +fi + +sanitize_project_name() { + printf '%s' "$1" | tr '[:upper:]' '[:lower:]' | tr -cs 'a-z0-9' '-' +} + +collect_suite_dirs() { + if [[ "$#" -gt 0 ]]; then + local suite + for suite in "$@"; do + printf '%s\n' "$SUITES_DIR/$suite" + done + return + fi + + find "$SUITES_DIR" -mindepth 1 -maxdepth 1 -type d | sort +} + +run_suite() { + local suite_dir="$1" + local suite_name + suite_name="$(basename "$suite_dir")" + local manifest="$suite_dir/suite.env" + + if [[ ! -f "$manifest" ]]; then + echo "suite $suite_name is missing manifest: $manifest" >&2 + return 1 + fi + + local compose_args=() + compose_args+=(--project-directory "$ROOT_DIR" -p "picoclaw-int-$(sanitize_project_name "$suite_name")") + compose_args+=(-f "$BASE_COMPOSE") + + local compose_files=() + while IFS= read -r compose_file; do + compose_files+=("$compose_file") + compose_args+=(-f "$compose_file") + done < <(find "$suite_dir" -maxdepth 1 -type f \( -name 'docker-compose.yml' -o -name 'docker-compose.*.yml' \) | sort) + + if [[ "${#compose_files[@]}" -eq 0 ]]; then + echo "suite $suite_name has no docker-compose file" >&2 + return 1 + fi + + ( + set -a + # shellcheck disable=SC1090 + source "$manifest" + set +a + + : "${TEST_COMMAND:?suite $suite_name must define TEST_COMMAND in $manifest}" + runner_service="${RUNNER_SERVICE:-integration-runner}" + + cleanup() { + docker compose "${compose_args[@]}" down -v --remove-orphans >/dev/null 2>&1 || true + } + trap cleanup EXIT + + echo "==> [$suite_name] resolving services" + mapfile -t services < <(docker compose "${compose_args[@]}" config --services) + + local dependency_services=() + local service + for service in "${services[@]}"; do + if [[ "$service" != "$runner_service" ]]; then + dependency_services+=("$service") + fi + done + + if [[ "${#dependency_services[@]}" -gt 0 ]]; then + echo "==> [$suite_name] starting docker services: ${dependency_services[*]}" + docker compose "${compose_args[@]}" up -d --build --wait "${dependency_services[@]}" + fi + + echo "==> [$suite_name] running: $TEST_COMMAND" + docker compose "${compose_args[@]}" run --rm "$runner_service" "$TEST_COMMAND" + ) +} + +main() { + local suite_dirs=() + while IFS= read -r suite_dir; do + suite_dirs+=("$suite_dir") + done < <(collect_suite_dirs "$@") + + if [[ "${#suite_dirs[@]}" -eq 0 ]]; then + echo "no integration suites found" >&2 + exit 1 + fi + + local suite_dir + for suite_dir in "${suite_dirs[@]}"; do + if [[ ! -d "$suite_dir" ]]; then + echo "unknown integration suite: $suite_dir" >&2 + exit 1 + fi + run_suite "$suite_dir" + done +} + +main "$@"