Merge branch 'main' into t3

This commit is contained in:
Cytown
2026-03-29 16:48:56 +08:00
50 changed files with 1940 additions and 62 deletions
+3 -3
View File
@@ -461,7 +461,7 @@ func (al *AgentLoop) Run(ctx context.Context) error {
if target == nil {
cancelDrain()
if finalResponse != "" {
al.publishResponseIfNeeded(ctx, msg.Channel, msg.ChatID, finalResponse)
al.PublishResponseIfNeeded(ctx, msg.Channel, msg.ChatID, finalResponse)
}
return
}
@@ -521,7 +521,7 @@ func (al *AgentLoop) Run(ctx context.Context) error {
}
if finalResponse != "" {
al.publishResponseIfNeeded(ctx, target.Channel, target.ChatID, finalResponse)
al.PublishResponseIfNeeded(ctx, target.Channel, target.ChatID, finalResponse)
}
}()
}
@@ -603,7 +603,7 @@ func (al *AgentLoop) Stop() {
al.running.Store(false)
}
func (al *AgentLoop) publishResponseIfNeeded(ctx context.Context, channel, chatID, response string) {
func (al *AgentLoop) PublishResponseIfNeeded(ctx context.Context, channel, chatID, response string) {
if response == "" {
return
}
+1 -1
View File
@@ -15,7 +15,7 @@ import (
"github.com/stretchr/testify/require"
)
// Test JSON unmarshal of private fields
// Test JSON unmarshal of private fields (unexported fields are never filled, with or without json tag).
func TestJSONUnmarshalPrivateFields(t *testing.T) {
type testStruct struct {
PublicField string `json:"public"`
+1
View File
@@ -25,6 +25,7 @@ type CronSchedule struct {
type CronPayload struct {
Kind string `json:"kind"`
Type string `json:"type"`
Message string `json:"message"`
Command string `json:"command,omitempty"`
Deliver bool `json:"deliver"`
+14 -3
View File
@@ -276,14 +276,25 @@ func (m *Manager) ConnectServer(
if cfg.URL == "" {
return fmt.Errorf("URL is required for SSE/HTTP transport")
}
// Configure DisableStandaloneSSE based on transport type.
// - "http": Request-response only mode. Disable the standalone SSE stream
// to avoid compatibility issues with servers that don't support GET /mcp.
// - "sse": Bidirectional mode. Enable the standalone SSE stream to receive
// server-initiated notifications (e.g., ToolListChangedNotification).
// - Empty or auto-detected: Defaults to "sse" behavior (standalone SSE enabled).
disableStandaloneSSE := (cfg.Type == "http")
logger.DebugCF("mcp", "Using SSE/HTTP transport",
map[string]any{
"server": name,
"url": cfg.URL,
"server": name,
"url": cfg.URL,
"disableStandaloneSSE": disableStandaloneSSE,
})
sseTransport := &mcp.StreamableClientTransport{
Endpoint: cfg.URL,
Endpoint: cfg.URL,
DisableStandaloneSSE: disableStandaloneSSE,
}
// Add custom headers if provided
+46 -8
View File
@@ -16,6 +16,9 @@ import (
// JobExecutor is the interface for executing cron jobs through the agent
type JobExecutor interface {
ProcessDirectWithChannel(ctx context.Context, content, sessionKey, channel, chatID string) (string, error)
// PublishResponseIfNeeded sends response to the outbound bus only when the
// agent did not already deliver content through the message tool in this round.
PublishResponseIfNeeded(ctx context.Context, channel, chatID, response string)
}
// CronTool provides scheduling capabilities for the agent
@@ -111,6 +114,11 @@ func (t *CronTool) Parameters() map[string]any {
"type": "string",
"description": "Job ID (for remove/enable/disable)",
},
"type": map[string]any{
"type": "string",
"enum": []string{"message", "directive"},
"description": "Message generation strategy. 'message' (default): content is sent directly as-is. 'directive': content is treated as instructions for an AI agent to execute before delivery.",
},
"deliver": map[string]any{
"type": "boolean",
"description": "If true, send message directly to channel. If false, let agent process message (for complex tasks). Default: false",
@@ -197,6 +205,12 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult
deliver = d
}
// Validate type parameter (server-side whitelist, not just LLM schema hint)
msgType, _ := args["type"].(string)
if msgType != "" && msgType != "message" && msgType != "directive" {
return ErrorResult(fmt.Sprintf("invalid type %q, must be 'message' or 'directive'", msgType))
}
// GHSA-pv8c-p6jf-3fpp: command scheduling requires internal channel. When
// allow_command is disabled, explicit confirmation is required as an override.
// Non-command reminders remain open to all channels.
@@ -230,9 +244,17 @@ func (t *CronTool) addJob(ctx context.Context, args map[string]any) *ToolResult
return ErrorResult(fmt.Sprintf("Error adding job: %v", err))
}
// Apply optional payload fields and persist in a single UpdateJob call
needsUpdate := false
if command != "" {
job.Payload.Command = command
// Need to save the updated payload
needsUpdate = true
}
if msgType != "" {
job.Payload.Type = msgType
needsUpdate = true
}
if needsUpdate {
t.cronService.UpdateJob(job)
}
@@ -347,8 +369,13 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
return "ok"
}
// If deliver=true, send message directly without agent processing
if job.Payload.Deliver {
// Determine message generation strategy
// Type="directive": treat message as instructions for AI agent to execute
// Type="" or "message" (default): static message content
isDirective := job.Payload.Type == "directive"
// If deliver=true and not directive, send message directly without agent processing
if job.Payload.Deliver && !isDirective {
pubCtx, pubCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer pubCancel()
t.msgBus.PublishOutbound(pubCtx, bus.OutboundMessage{
@@ -359,13 +386,23 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
return "ok"
}
// For deliver=false, process through agent (for complex tasks)
// For deliver=false OR directive mode, process through agent
sessionKey := fmt.Sprintf("cron-%s", job.ID)
// Call agent with job's message
// Prepare the prompt based on type
prompt := job.Payload.Message
if isDirective {
// For directive type, prefix to clarify this is an instruction
prompt = fmt.Sprintf(
"Please execute the following directive and provide the result:\n\n%s",
job.Payload.Message,
)
}
// Call agent with the prepared prompt
response, err := t.executor.ProcessDirectWithChannel(
ctx,
job.Payload.Message,
prompt,
sessionKey,
channel,
chatID,
@@ -374,7 +411,8 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
return fmt.Sprintf("Error: %v", err)
}
// Response is automatically sent via MessageBus by AgentLoop
_ = response // Will be sent by AgentLoop
if response != "" {
t.executor.PublishResponseIfNeeded(ctx, channel, chatID, response)
}
return "ok"
}
+259 -2
View File
@@ -2,6 +2,7 @@ package tools
import (
"context"
"fmt"
"path/filepath"
"strings"
"testing"
@@ -12,18 +13,59 @@ import (
"github.com/sipeed/picoclaw/pkg/cron"
)
func newTestCronToolWithConfig(t *testing.T, cfg *config.Config) *CronTool {
type stubJobExecutor struct {
response string
err error
alreadySent bool // simulate message tool having already sent in this round
lastPrompt string
lastKey string
lastChan string
lastChatID string
publishedResp string
publishedChan string
publishedChatID string
}
func (s *stubJobExecutor) ProcessDirectWithChannel(
_ context.Context,
content, sessionKey, channel, chatID string,
) (string, error) {
s.lastPrompt = content
s.lastKey = sessionKey
s.lastChan = channel
s.lastChatID = chatID
return s.response, s.err
}
func (s *stubJobExecutor) PublishResponseIfNeeded(
_ context.Context,
channel, chatID, response string,
) {
if s.alreadySent {
return
}
s.publishedResp = response
s.publishedChan = channel
s.publishedChatID = chatID
}
func newTestCronToolWithExecutorAndConfig(t *testing.T, executor JobExecutor, cfg *config.Config) *CronTool {
t.Helper()
storePath := filepath.Join(t.TempDir(), "cron.json")
cronService := cron.NewCronService(storePath, nil)
msgBus := bus.NewMessageBus()
tool, err := NewCronTool(cronService, nil, msgBus, t.TempDir(), true, 0, cfg)
tool, err := NewCronTool(cronService, executor, msgBus, t.TempDir(), true, 0, cfg)
if err != nil {
t.Fatalf("NewCronTool() error: %v", err)
}
return tool
}
func newTestCronToolWithConfig(t *testing.T, cfg *config.Config) *CronTool {
t.Helper()
return newTestCronToolWithExecutorAndConfig(t, nil, cfg)
}
func newTestCronTool(t *testing.T) *CronTool {
t.Helper()
return newTestCronToolWithConfig(t, config.DefaultConfig())
@@ -237,3 +279,218 @@ func TestCronTool_ExecuteJobPublishesErrorWhenExecDisabled(t *testing.T) {
t.Fatalf("expected exec disabled message, got: %s", msg.Content)
}
}
func TestCronTool_ExecuteJobPublishesAgentResponse(t *testing.T) {
executor := &stubJobExecutor{response: "generated reply"}
tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig())
job := &cron.CronJob{ID: "job-1"}
job.Payload.Channel = "telegram"
job.Payload.To = "chat-1"
job.Payload.Message = "send me a poem"
if got := tool.ExecuteJob(context.Background(), job); got != "ok" {
t.Fatalf("ExecuteJob() = %q, want ok", got)
}
if executor.lastKey != "cron-job-1" {
t.Fatalf("sessionKey = %q, want cron-job-1", executor.lastKey)
}
if executor.lastChan != "telegram" || executor.lastChatID != "chat-1" {
t.Fatalf("executor target = %s/%s, want telegram/chat-1", executor.lastChan, executor.lastChatID)
}
if executor.lastPrompt != "send me a poem" {
t.Fatalf("prompt = %q, want original message", executor.lastPrompt)
}
if executor.publishedResp != "generated reply" {
t.Fatalf("published response = %q, want generated reply", executor.publishedResp)
}
if executor.publishedChan != "telegram" || executor.publishedChatID != "chat-1" {
t.Fatalf("published target = %s/%s, want telegram/chat-1", executor.publishedChan, executor.publishedChatID)
}
}
func TestCronTool_ExecuteJobSkipsEmptyAgentResponse(t *testing.T) {
executor := &stubJobExecutor{}
tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig())
job := &cron.CronJob{ID: "job-empty"}
job.Payload.Channel = "telegram"
job.Payload.To = "chat-1"
job.Payload.Message = "say nothing"
if got := tool.ExecuteJob(context.Background(), job); got != "ok" {
t.Fatalf("ExecuteJob() = %q, want ok", got)
}
if executor.publishedResp != "" {
t.Fatalf("unexpected published response: %q", executor.publishedResp)
}
}
func TestCronTool_ExecuteJobSkipsWhenMessageToolAlreadySent(t *testing.T) {
executor := &stubJobExecutor{response: "Sent.", alreadySent: true}
tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig())
job := &cron.CronJob{ID: "job-msg-sent"}
job.Payload.Channel = "telegram"
job.Payload.To = "chat-1"
job.Payload.Message = "send weather"
if got := tool.ExecuteJob(context.Background(), job); got != "ok" {
t.Fatalf("ExecuteJob() = %q, want ok", got)
}
if executor.publishedResp != "" {
t.Fatalf("expected no published response when message tool already sent, got: %q", executor.publishedResp)
}
}
func TestCronTool_ExecuteJobDirectiveAddsPromptPrefix(t *testing.T) {
executor := &stubJobExecutor{response: "directive result"}
tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig())
originalMsg := "check the weather and summarize"
job := &cron.CronJob{ID: "job-dir-1"}
job.Payload.Channel = "telegram"
job.Payload.To = "chat-1"
job.Payload.Message = originalMsg
job.Payload.Type = "directive"
if got := tool.ExecuteJob(context.Background(), job); got != "ok" {
t.Fatalf("ExecuteJob() = %q, want ok", got)
}
wantPrompt := "Please execute the following directive and provide the result:\n\n" + originalMsg
if executor.lastPrompt != wantPrompt {
t.Fatalf("prompt = %q, want exact %q", executor.lastPrompt, wantPrompt)
}
if executor.publishedResp != "directive result" {
t.Fatalf("published response = %q, want %q", executor.publishedResp, "directive result")
}
}
func TestCronTool_ExecuteJobDirectiveWithDeliverRoutesToAgent(t *testing.T) {
executor := &stubJobExecutor{response: "agent processed"}
tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig())
job := &cron.CronJob{ID: "job-dir-deliver"}
job.Payload.Channel = "telegram"
job.Payload.To = "chat-1"
job.Payload.Message = "generate daily report"
job.Payload.Type = "directive"
job.Payload.Deliver = true
if got := tool.ExecuteJob(context.Background(), job); got != "ok" {
t.Fatalf("ExecuteJob() = %q, want ok", got)
}
if executor.lastPrompt == "" {
t.Fatal("expected agent to be called for directive+deliver, but ProcessDirectWithChannel was not invoked")
}
if executor.publishedResp != "agent processed" {
t.Fatalf("published response = %q, want %q", executor.publishedResp, "agent processed")
}
// Verify no direct publish happened on the bus (agent path, not direct path)
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
select {
case msg := <-tool.msgBus.OutboundChan():
t.Fatalf("unexpected direct bus message: %+v", msg)
case <-ctx.Done():
// expected: no direct bus message
}
}
func TestCronTool_ExecuteJobDeliverMessageDirectlyToBus(t *testing.T) {
executor := &stubJobExecutor{response: "should not be called"}
tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig())
job := &cron.CronJob{ID: "job-deliver"}
job.Payload.Channel = "telegram"
job.Payload.To = "chat-1"
job.Payload.Message = "hello world"
job.Payload.Deliver = true
if got := tool.ExecuteJob(context.Background(), job); got != "ok" {
t.Fatalf("ExecuteJob() = %q, want ok", got)
}
if executor.lastPrompt != "" {
t.Fatal("expected agent NOT to be invoked for deliver=true message type")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
select {
case msg := <-tool.msgBus.OutboundChan():
if msg.Content != "hello world" {
t.Fatalf("bus content = %q, want %q", msg.Content, "hello world")
}
case <-ctx.Done():
t.Fatal("timeout waiting for direct bus message")
}
}
func TestCronTool_ExecuteJobReturnsErrorWithoutPublish(t *testing.T) {
executor := &stubJobExecutor{
response: "this response must not be published",
err: fmt.Errorf("agent failure"),
}
tool := newTestCronToolWithExecutorAndConfig(t, executor, config.DefaultConfig())
job := &cron.CronJob{ID: "job-err"}
job.Payload.Channel = "telegram"
job.Payload.To = "chat-1"
job.Payload.Message = "do something"
got := tool.ExecuteJob(context.Background(), job)
if !strings.Contains(got, "agent failure") {
t.Fatalf("ExecuteJob() = %q, want error message", got)
}
if executor.publishedResp != "" {
t.Fatalf("unexpected publish on error path: %q", executor.publishedResp)
}
}
func TestCronTool_AddJobRejectsInvalidType(t *testing.T) {
tool := newTestCronTool(t)
ctx := WithToolContext(context.Background(), "cli", "direct")
result := tool.Execute(ctx, map[string]any{
"action": "add",
"message": "test",
"at_seconds": float64(60),
"type": "invalid_type",
})
if !result.IsError {
t.Fatal("expected error for invalid type parameter")
}
if !strings.Contains(result.ForLLM, "invalid type") {
t.Errorf("expected 'invalid type' error, got: %s", result.ForLLM)
}
}
func TestCronTool_AddJobAcceptsValidTypes(t *testing.T) {
for _, msgType := range []string{"", "message", "directive"} {
t.Run("type="+msgType, func(t *testing.T) {
tool := newTestCronTool(t)
ctx := WithToolContext(context.Background(), "cli", "direct")
args := map[string]any{
"action": "add",
"message": "test",
"at_seconds": float64(60),
}
if msgType != "" {
args["type"] = msgType
}
result := tool.Execute(ctx, args)
if result.IsError {
t.Fatalf("expected valid type %q to succeed, got: %s", msgType, result.ForLLM)
}
})
}
}