From b3d9f86a01e634cc0afa5b6d95520ac01dd29fc4 Mon Sep 17 00:00:00 2001 From: Hoshina Date: Sun, 26 Apr 2026 17:41:00 +0800 Subject: [PATCH] feat(events): add configurable runtime event logging --- config/config.example.json | 9 + docs/architecture/README.md | 1 + docs/architecture/runtime-events.md | 216 +++++++++++++++++ docs/architecture/runtime-events.zh.md | 216 +++++++++++++++++ docs/guides/configuration.zh.md | 37 +++ pkg/agent/agent.go | 10 +- pkg/agent/agent_event.go | 100 -------- pkg/agent/agent_init.go | 1 + pkg/agent/runtime_event_logger.go | 316 +++++++++++++++++++++++++ pkg/agent/runtime_event_logger_test.go | 99 ++++++++ pkg/config/config.go | 1 + pkg/config/defaults.go | 3 + pkg/config/events.go | 48 ++++ pkg/config/events_test.go | 103 ++++++++ 14 files changed, 1057 insertions(+), 103 deletions(-) create mode 100644 docs/architecture/runtime-events.md create mode 100644 docs/architecture/runtime-events.zh.md create mode 100644 pkg/agent/runtime_event_logger.go create mode 100644 pkg/agent/runtime_event_logger_test.go create mode 100644 pkg/config/events.go create mode 100644 pkg/config/events_test.go diff --git a/config/config.example.json b/config/config.example.json index 30460c231..a90d78e24 100644 --- a/config/config.example.json +++ b/config/config.example.json @@ -476,6 +476,15 @@ "approval_timeout_ms": 60000 } }, + "events": { + "logging": { + "enabled": true, + "include": ["agent.*"], + "exclude": [], + "min_severity": "info", + "include_payload": false + } + }, "gateway": { "_comment": "Default log level is set to 'fatal'. Other available options are 'debug', 'info', 'warn' and 'error'.", "host": "localhost", diff --git a/docs/architecture/README.md b/docs/architecture/README.md index 6df7447a7..e5fc3b540 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -6,6 +6,7 @@ Internal architecture notes for major runtime mechanisms and subsystem design. - [SubTurn Mechanism](subturn.md): sub-agent coordination, concurrency control, and lifecycle handling. - [Session System](session-system.md): session scope allocation, JSONL persistence, alias compatibility, and migration. ([ZH](session-system.zh.md)) - [Routing System](routing-system.md): agent dispatch, session policy selection, and light/heavy model routing. ([ZH](routing-system.zh.md)) +- [Runtime Events](runtime-events.md): runtime event envelope, centralized event logging, filters, and examples. ([ZH](runtime-events.zh.md)) - [Hook System Guide](hooks/README.md): current hook architecture and protocol details. - [Agent Refactor](agent-refactor/README.md): notes and checkpoints for the agent refactor work. diff --git a/docs/architecture/runtime-events.md b/docs/architecture/runtime-events.md new file mode 100644 index 000000000..5d625a34b --- /dev/null +++ b/docs/architecture/runtime-events.md @@ -0,0 +1,216 @@ +# Runtime Events And Event Logging + +PicoClaw runtime events are the read-only observation surface for agent, channel, gateway, message bus, and MCP activity. Publishing events and printing logs are separate responsibilities: + +- Event publishing: components publish `pkg/events.Event` values to the runtime event bus for hooks, tests, diagnostics, and future UI consumers. +- Event logging: the built-in runtime event logger subscribes to the same bus and prints only the events selected by configuration. + +This keeps runtime code focused on publishing events while log policy stays centralized. + +## Default Behavior + +By default, only `agent.*` events are printed: + +```json +{ + "events": { + "logging": { + "enabled": true, + "include": ["agent.*"], + "min_severity": "info", + "include_payload": false + } + } +} +``` + +This preserves the previous behavior: agent turn, LLM, tool, steering, subturn, and error events appear in logs. Channel, gateway, bus, and MCP events are still published to the runtime event bus, but they are not printed unless configured. + +## Configuration + +The configuration lives under `events.logging` in `config.json`: + +| Field | Type | Default | Description | +| ----- | ---- | ------- | ----------- | +| `enabled` | bool | `true` | Enables the built-in event logger subscription | +| `include` | string[] | `["agent.*"]` | Event kinds to print; supports exact matches, `*`, and patterns such as `agent.*` | +| `exclude` | string[] | `[]` | Event kinds to suppress after include matching | +| `min_severity` | string | `info` | Minimum severity: `debug`, `info`, `warn`, or `error` | +| `include_payload` | bool | `false` | Adds raw event payloads to log fields | + +`include_payload` is disabled by default. Agent events print safe summary fields such as `user_len`, `args_count`, and `content_len` instead of full user messages or tool arguments. Enable raw payload logging only for short-lived diagnostics in a trusted log environment. + +## Matching Rules + +`include` and `exclude` match the `Event.Kind` string: + +```json +{ + "events": { + "logging": { + "include": ["gateway.*", "channel.lifecycle.*", "agent.error"], + "exclude": ["gateway.ready"], + "min_severity": "info" + } + } +} +``` + +Common patterns: + +- `["agent.*"]`: print agent events only. +- `["*"]`: print all runtime events. +- `["gateway.*", "channel.*"]`: print gateway and channel events only. +- `exclude: ["agent.llm.delta"]`: suppress high-volume streaming delta events. +- `min_severity: "warn"`: print warn and error events only. + +## Environment Variables + +The same settings can be overridden with environment variables: + +```bash +PICOCLAW_EVENTS_LOGGING_ENABLED=true +PICOCLAW_EVENTS_LOGGING_INCLUDE="gateway.*,channel.lifecycle.*" +PICOCLAW_EVENTS_LOGGING_EXCLUDE="gateway.ready" +PICOCLAW_EVENTS_LOGGING_MIN_SEVERITY=info +PICOCLAW_EVENTS_LOGGING_INCLUDE_PAYLOAD=false +``` + +`include` and `exclude` use comma-separated values. + +## Event Names And Triggers + +The table below lists the current runtime event kinds, when they are emitted, and the most useful event details. `Source`, `Scope`, and `Correlation` are shared envelope fields that may appear on every event. The "Details" column refers to useful payload fields or log summary fields. + +### Agent + +| Event | Trigger | Details | +| ----- | ------- | ------- | +| `agent.turn.start` | An agent starts processing one user or system input after the turn scope has been created. | `user_len`, `media_count`; scope usually includes `agent_id`, `session_key`, `turn_id`, `channel`, `chat_id`, `message_id` | +| `agent.turn.end` | A turn exits, whether it completed, errored, or was hard-aborted. | `status` (`completed`/`error`/`aborted`), `iterations_total`, `duration_ms`, `final_len` | +| `agent.llm.request` | Before each LLM provider request. | `model`, `messages`, `tools`, `max_tokens` | +| `agent.llm.delta` | Reserved for streaming LLM deltas; the kind is defined, but the current implementation has no natural emit site. | `content_delta_len`, `reasoning_delta_len` | +| `agent.llm.response` | After the LLM provider returns a complete response. | `content_len`, `tool_calls`, `has_reasoning` | +| `agent.llm.retry` | Before retrying an LLM request after context, rate-limit, transient provider, or fallback handling. | `attempt`, `max_retries`, `reason`, `error`, `backoff_ms` | +| `agent.context.compress` | Agent context history is compressed, for example during proactive budget checks or LLM retry handling. | `reason`, `dropped_messages`, `remaining_messages` | +| `agent.session.summarize` | Async session history summarization completes. | `summarized_messages`, `kept_messages`, `summary_len`, `omitted_oversized` | +| `agent.tool.exec_start` | Before the agent executes a tool call. | `tool`, `args_count`; full arguments are not logged by default | +| `agent.tool.exec_end` | After a tool call completes, including successful results, tool errors, and async results. | `tool`, `duration_ms`, `for_llm_len`, `for_user_len`, `is_error`, `async` | +| `agent.tool.exec_skipped` | A tool call is skipped because the tool is unavailable, arguments are invalid, or turn control logic requires skipping it. | `tool`, `reason` | +| `agent.steering.injected` | Queued steering messages are injected into the next LLM context. | `count`, `total_content_len` | +| `agent.follow_up.queued` | An async tool result is queued back into the inbound/follow-up flow. | `source_tool`, `content_len` | +| `agent.interrupt.received` | A turn accepts steering, graceful interrupt, or hard-abort input. | `interrupt_kind`, `role`, `content_len`, `queue_depth`, `hint_len` | +| `agent.subturn.spawn` | A parent turn creates a child turn/subagent. | `child_agent_id`, `label`, `parent_turn_id` | +| `agent.subturn.end` | A child turn ends. | `child_agent_id`, `status` | +| `agent.subturn.result_delivered` | A child turn result is delivered to the target channel/chat. | `target_channel`, `target_chat_id`, `content_len` | +| `agent.subturn.orphan` | A child turn result cannot be delivered or cannot be associated back to its parent turn. | `parent_turn_id`, `child_turn_id`, `reason` | +| `agent.error` | Agent execution reports an error. | `stage`, `error` | + +### Channel + +| Event | Trigger | Details | +| ----- | ------- | ------- | +| `channel.lifecycle.initialized` | The channel manager creates and registers a channel instance from config. | `type`; scope includes `channel` | +| `channel.lifecycle.started` | Channel `Start()` succeeds and worker goroutines have been started; added channels during hot reload also emit it. | `type` | +| `channel.lifecycle.start_failed` | Channel `Start()` fails. | `type`, `error`; severity is `error` | +| `channel.lifecycle.stopped` | Channel `Stop()` succeeds. | `type` | +| `channel.webhook.registered` | A channel webhook handler is registered on the shared HTTP mux. | `type`; scope includes `channel` | +| `channel.webhook.unregistered` | A channel webhook handler is removed from the shared HTTP mux. | `type`; scope includes `channel` | +| `channel.message.outbound_queued` | An outbound text or media message is queued into its channel worker. | `media`, `content_len`, `reply_to_message_id`; scope comes from the original inbound context | +| `channel.message.outbound_sent` | An outbound text or media message is sent successfully, or a placeholder edit handled the response. | `media`, `content_len`, `message_ids`, `reply_to_message_id` | +| `channel.message.outbound_failed` | An outbound text or media message exhausts retries or hits a permanent failure. | `media`, `content_len`, `retries`, `error`, `reply_to_message_id`; severity is `error` | +| `channel.rate_limited` | A channel worker is waiting for a rate-limit token and the context is canceled, interrupting this delivery. | `media`, `content_len`, `error`, `reply_to_message_id`; severity is `warn` | + +### Message Bus + +| Event | Trigger | Details | +| ----- | ------- | ------- | +| `bus.publish.failed` | Publishing inbound, outbound, media, audio, or voice-control data fails, or required context is missing. | `stream`, `error`; scope is derived from message context when possible | +| `bus.close.started` | Message bus shutdown begins. | `drained` is usually `0` | +| `bus.close.drained` | Shutdown waits for buffered messages to drain and at least one buffered message was drained. | `drained` | +| `bus.close.completed` | Message bus shutdown completes. | `drained` | + +### Gateway + +| Event | Trigger | Details | +| ----- | ------- | ------- | +| `gateway.start` | Gateway startup reaches the agent/runtime event bus/bootstrap binding point. | `duration_ms` | +| `gateway.ready` | Gateway services, channel manager, HTTP server, and other core services are ready. | `duration_ms` | +| `gateway.shutdown` | Gateway shutdown begins. | No fixed payload; envelope fields may be the only fields | +| `gateway.reload.started` | Hot reload execution starts. | `duration_ms` | +| `gateway.reload.completed` | Hot reload completes successfully. | `duration_ms` | +| `gateway.reload.failed` | Hot reload fails. | `duration_ms`, `error`; severity is `error` | + +### MCP + +| Event | Trigger | Details | +| ----- | ------- | ------- | +| `mcp.server.connecting` | The MCP manager is about to connect to a server. | `server`, `type`, `url`, `command` | +| `mcp.server.connected` | An MCP server connects and its tool list has been initialized. | `server`, `type`, `url`, `command`, `tool_count` | +| `mcp.server.failed` | An MCP server connection fails, or the manager is closed before connecting. | `server`, `type`, `url`, `command`, `error`; severity is `error` | +| `mcp.tool.discovered` | A tool from an MCP server is discovered and registered. | `server`, `type`, `url`, `command`, `tool` | +| `mcp.tool.call.start` | The MCP tool wrapper starts a remote tool call. | `server`, `tool`; when emitted inside an agent turn, scope includes turn/chat information | +| `mcp.tool.call.end` | The MCP tool wrapper finishes a remote tool call, including failures. | `server`, `tool`, `duration_ms`, `is_error`, `error` | + +## Log Fields + +Runtime event logs include stable envelope fields when available: + +- `event_id` +- `event_kind` +- `severity` +- `event_time` +- `source_component` +- `source_name` +- `agent_id` +- `session_key` +- `turn_id` +- `channel` +- `account` +- `chat_id` +- `topic_id` +- `space_id` +- `space_type` +- `chat_type` +- `sender_id` +- `message_id` +- `trace_id` +- `parent_turn_id` +- `request_id` +- `reply_to_id` + +Agent events add safe payload summaries: + +| Event | Summary fields | +| ----- | -------------- | +| `agent.turn.start` | `user_len`, `media_count` | +| `agent.turn.end` | `status`, `iterations_total`, `duration_ms`, `final_len` | +| `agent.llm.request` | `model`, `messages`, `tools`, `max_tokens` | +| `agent.llm.delta` | `content_delta_len`, `reasoning_delta_len` | +| `agent.llm.response` | `content_len`, `tool_calls`, `has_reasoning` | +| `agent.llm.retry` | `attempt`, `max_retries`, `reason`, `error`, `backoff_ms` | +| `agent.context.compress` | `reason`, `dropped_messages`, `remaining_messages` | +| `agent.session.summarize` | `summarized_messages`, `kept_messages`, `summary_len`, `omitted_oversized` | +| `agent.tool.exec_start` | `tool`, `args_count` | +| `agent.tool.exec_end` | `tool`, `duration_ms`, `for_llm_len`, `for_user_len`, `is_error`, `async` | +| `agent.tool.exec_skipped` | `tool`, `reason` | +| `agent.steering.injected` | `count`, `total_content_len` | +| `agent.follow_up.queued` | `source_tool`, `content_len` | +| `agent.interrupt.received` | `interrupt_kind`, `role`, `content_len`, `queue_depth`, `hint_len` | +| `agent.subturn.spawn` | `child_agent_id`, `label` | +| `agent.subturn.end` | `child_agent_id`, `status` | +| `agent.subturn.result_delivered` | `target_channel`, `target_chat_id`, `content_len` | +| `agent.subturn.orphan` | `parent_turn_id`, `child_turn_id`, `reason` | +| `agent.error` | `stage`, `error` | + +## Event Domains + +Runtime event kinds are defined in `pkg/events/kind.go`. Event logging can select these domains: + +- `agent.*`: agent turn, LLM, tool, context, steering, interrupt, subturn, and error events. +- `channel.*`: channel lifecycle, webhook registration, outbound queued/sent/failed, and rate limiting. +- `bus.*`: publish failures and close lifecycle. +- `gateway.*`: start, ready, shutdown, and reload lifecycle. +- `mcp.*`: MCP server connection, tool discovery, and tool call events. + +See [`../../config/config.example.json`](../../config/config.example.json) for the default event logging example. diff --git a/docs/architecture/runtime-events.zh.md b/docs/architecture/runtime-events.zh.md new file mode 100644 index 000000000..3ed384537 --- /dev/null +++ b/docs/architecture/runtime-events.zh.md @@ -0,0 +1,216 @@ +# Runtime Events 与事件日志 + +PicoClaw 的 runtime event 是运行时观察面,用来描述 agent、channel、gateway、message bus、MCP 等组件发生了什么。事件发布和日志打印是两件事: + +- 事件发布:组件把 `pkg/events.Event` 发布到 runtime event bus,供 hook、测试、调试工具或后续 UI 消费。 +- 事件日志:内置 runtime event logger 订阅同一个 bus,并按配置把匹配的事件打印到日志。 + +这样可以让业务流程继续只负责发布事件,日志策略统一收口到一个地方。 + +## 默认行为 + +默认配置只打印 `agent.*` 事件: + +```json +{ + "events": { + "logging": { + "enabled": true, + "include": ["agent.*"], + "min_severity": "info", + "include_payload": false + } + } +} +``` + +这个默认值保持了旧行为:agent turn、LLM、tool、steering、subturn、error 等事件会出现在日志中;channel、gateway、bus、MCP 事件仍会发布到 runtime event bus,但默认不打印,避免网关启动和消息投递日志过于嘈杂。 + +## 配置项 + +配置位于 `config.json` 的 `events.logging`: + +| 字段 | 类型 | 默认值 | 说明 | +| ---- | ---- | ------ | ---- | +| `enabled` | bool | `true` | 是否启用内置事件日志订阅器 | +| `include` | string[] | `["agent.*"]` | 允许打印的事件 kind,支持精确匹配、`*`、`agent.*` 这类 glob/prefix | +| `exclude` | string[] | `[]` | 在 include 命中后排除的事件 kind,匹配规则同 include | +| `min_severity` | string | `info` | 最低打印级别:`debug`、`info`、`warn`、`error` | +| `include_payload` | bool | `false` | 是否把原始 payload 放进日志字段 | + +`include_payload` 默认关闭。agent 事件日志会输出安全摘要字段,例如 `user_len`、`args_count`、`content_len`,不会默认输出完整用户消息或工具参数。只有在排查问题、并且确认日志存储环境可信时,才建议临时打开 `include_payload`。 + +## 匹配规则 + +`include` 和 `exclude` 都匹配 `Event.Kind` 字符串: + +```json +{ + "events": { + "logging": { + "include": ["gateway.*", "channel.lifecycle.*", "agent.error"], + "exclude": ["gateway.ready"], + "min_severity": "info" + } + } +} +``` + +常用写法: + +- `["agent.*"]`:只打印 agent 事件。 +- `["*"]`:打印所有 runtime events。 +- `["gateway.*", "channel.*"]`:只打印 gateway 和 channel 事件。 +- `exclude: ["agent.llm.delta"]`:排除高频流式 delta 事件。 +- `min_severity: "warn"`:只打印 warn/error 事件。 + +## 环境变量 + +同一组配置也可以通过环境变量覆盖,适合临时调试: + +```bash +PICOCLAW_EVENTS_LOGGING_ENABLED=true +PICOCLAW_EVENTS_LOGGING_INCLUDE="gateway.*,channel.lifecycle.*" +PICOCLAW_EVENTS_LOGGING_EXCLUDE="gateway.ready" +PICOCLAW_EVENTS_LOGGING_MIN_SEVERITY=info +PICOCLAW_EVENTS_LOGGING_INCLUDE_PAYLOAD=false +``` + +`include` 和 `exclude` 的环境变量使用逗号分隔。 + +## 事件名称与触发时机 + +下面列出当前 runtime event kind、触发时机和主要事件详情。`Source`、`Scope`、`Correlation` 是所有事件都可能携带的 envelope 字段;表里的“主要详情”指 payload 或日志摘要中最有用的字段。 + +### Agent + +| 事件名 | 触发时机 | 主要详情 | +| ------ | -------- | -------- | +| `agent.turn.start` | agent 开始处理一次用户输入或系统输入,turn scope 已创建时 | `user_len`, `media_count`; scope 通常包含 `agent_id`, `session_key`, `turn_id`, `channel`, `chat_id`, `message_id` | +| `agent.turn.end` | 一次 turn 退出时,无论完成、报错还是 hard abort | `status` (`completed`/`error`/`aborted`), `iterations_total`, `duration_ms`, `final_len` | +| `agent.llm.request` | 每次调用 LLM provider 前 | `model`, `messages`, `tools`, `max_tokens` | +| `agent.llm.delta` | 预留给流式 LLM delta;当前实现已定义但没有自然发送点 | `content_delta_len`, `reasoning_delta_len` | +| `agent.llm.response` | LLM provider 返回完整响应后 | `content_len`, `tool_calls`, `has_reasoning` | +| `agent.llm.retry` | LLM 请求因上下文、限流、临时错误等原因准备重试前 | `attempt`, `max_retries`, `reason`, `error`, `backoff_ms` | +| `agent.context.compress` | 上下文历史被压缩时,例如主动预算检查或 LLM retry 处理 | `reason`, `dropped_messages`, `remaining_messages` | +| `agent.session.summarize` | 会话历史异步摘要完成时 | `summarized_messages`, `kept_messages`, `summary_len`, `omitted_oversized` | +| `agent.tool.exec_start` | agent 准备执行一个工具调用前 | `tool`, `args_count`; 默认不打印完整参数 | +| `agent.tool.exec_end` | 工具调用完成后,包括成功、工具错误和 async 结果 | `tool`, `duration_ms`, `for_llm_len`, `for_user_len`, `is_error`, `async` | +| `agent.tool.exec_skipped` | 工具调用被跳过时,例如工具不可用、参数无效或 turn 控制逻辑要求跳过 | `tool`, `reason` | +| `agent.steering.injected` | queued steering message 被注入下一轮 LLM 上下文时 | `count`, `total_content_len` | +| `agent.follow_up.queued` | async 工具结果被重新排入 inbound/follow-up 流程时 | `source_tool`, `content_len` | +| `agent.interrupt.received` | turn 接受 steering、graceful interrupt 或 hard abort 指令时 | `interrupt_kind`, `role`, `content_len`, `queue_depth`, `hint_len` | +| `agent.subturn.spawn` | 父 turn 创建子 turn/subagent 时 | `child_agent_id`, `label`, `parent_turn_id` | +| `agent.subturn.end` | 子 turn 结束时 | `child_agent_id`, `status` | +| `agent.subturn.result_delivered` | 子 turn 结果成功投递到目标 channel/chat 时 | `target_channel`, `target_chat_id`, `content_len` | +| `agent.subturn.orphan` | 子 turn 结果无法投递或无法关联回父 turn 时 | `parent_turn_id`, `child_turn_id`, `reason` | +| `agent.error` | agent 执行流程报告错误时 | `stage`, `error` | + +### Channel + +| 事件名 | 触发时机 | 主要详情 | +| ------ | -------- | -------- | +| `channel.lifecycle.initialized` | channel manager 根据配置创建并注册 channel 实例后 | `type`; scope 包含 `channel` | +| `channel.lifecycle.started` | channel `Start()` 成功,worker 已启动时;热重载新增 channel 也会触发 | `type` | +| `channel.lifecycle.start_failed` | channel `Start()` 失败时 | `type`, `error`; severity 为 `error` | +| `channel.lifecycle.stopped` | channel `Stop()` 成功后 | `type` | +| `channel.webhook.registered` | channel 的 webhook handler 被注册到共享 HTTP mux 时 | `type`; scope 包含 `channel` | +| `channel.webhook.unregistered` | channel 的 webhook handler 从共享 HTTP mux 移除时 | `type`; scope 包含 `channel` | +| `channel.message.outbound_queued` | outbound 文本或媒体消息被放入对应 channel worker 队列时 | `media`, `content_len`, `reply_to_message_id`; scope 来自原 inbound context | +| `channel.message.outbound_sent` | outbound 文本或媒体消息成功发送,或 placeholder edit 已处理响应时 | `media`, `content_len`, `message_ids`, `reply_to_message_id` | +| `channel.message.outbound_failed` | outbound 文本或媒体消息重试耗尽或遇到永久失败时 | `media`, `content_len`, `retries`, `error`, `reply_to_message_id`; severity 为 `error` | +| `channel.rate_limited` | channel worker 等待 rate limiter token 时被 context 取消,导致本次发送被限流/中断 | `media`, `content_len`, `error`, `reply_to_message_id`; severity 为 `warn` | + +### Message Bus + +| 事件名 | 触发时机 | 主要详情 | +| ------ | -------- | -------- | +| `bus.publish.failed` | inbound、outbound、media、audio 或 voice control 发布失败,或缺少必要 context 时 | `stream`, `error`; scope 尽量来自消息 context | +| `bus.close.started` | message bus 开始关闭时 | `drained` 通常为 `0` | +| `bus.close.drained` | close 期间等待队列 drain,并且 drain 到至少一条 buffered message 时 | `drained` | +| `bus.close.completed` | message bus 完成关闭时 | `drained` | + +### Gateway + +| 事件名 | 触发时机 | 主要详情 | +| ------ | -------- | -------- | +| `gateway.start` | gateway 完成 agent/runtime event bus/bootstrap 绑定后 | `duration_ms` | +| `gateway.ready` | gateway 服务、channel manager、HTTP 等关键服务启动完成后 | `duration_ms` | +| `gateway.shutdown` | gateway 开始关闭流程时 | 无固定 payload,可能只有 envelope 字段 | +| `gateway.reload.started` | 热重载开始执行时 | `duration_ms` | +| `gateway.reload.completed` | 热重载成功完成时 | `duration_ms` | +| `gateway.reload.failed` | 热重载失败时 | `duration_ms`, `error`; severity 为 `error` | + +### MCP + +| 事件名 | 触发时机 | 主要详情 | +| ------ | -------- | -------- | +| `mcp.server.connecting` | MCP manager 准备连接某个 server 前 | `server`, `type`, `url`, `command` | +| `mcp.server.connected` | MCP server 连接成功并完成工具列表初始化后 | `server`, `type`, `url`, `command`, `tool_count` | +| `mcp.server.failed` | MCP server 连接失败,或 manager 已关闭导致无法连接时 | `server`, `type`, `url`, `command`, `error`; severity 为 `error` | +| `mcp.tool.discovered` | MCP server 的某个工具被发现并注册时 | `server`, `type`, `url`, `command`, `tool` | +| `mcp.tool.call.start` | MCP tool wrapper 开始执行一次远端工具调用前 | `server`, `tool`; 如果在 agent turn 内触发,scope 会带上对应 turn/chat 信息 | +| `mcp.tool.call.end` | MCP tool wrapper 完成一次远端工具调用后,包括失败结果 | `server`, `tool`, `duration_ms`, `is_error`, `error` | + +## 日志字段 + +所有事件日志都会尽量包含稳定 envelope 字段: + +- `event_id` +- `event_kind` +- `severity` +- `event_time` +- `source_component` +- `source_name` +- `agent_id` +- `session_key` +- `turn_id` +- `channel` +- `account` +- `chat_id` +- `topic_id` +- `space_id` +- `space_type` +- `chat_type` +- `sender_id` +- `message_id` +- `trace_id` +- `parent_turn_id` +- `request_id` +- `reply_to_id` + +agent 事件还会追加 payload 摘要字段: + +| 事件 | 摘要字段 | +| ---- | -------- | +| `agent.turn.start` | `user_len`, `media_count` | +| `agent.turn.end` | `status`, `iterations_total`, `duration_ms`, `final_len` | +| `agent.llm.request` | `model`, `messages`, `tools`, `max_tokens` | +| `agent.llm.delta` | `content_delta_len`, `reasoning_delta_len` | +| `agent.llm.response` | `content_len`, `tool_calls`, `has_reasoning` | +| `agent.llm.retry` | `attempt`, `max_retries`, `reason`, `error`, `backoff_ms` | +| `agent.context.compress` | `reason`, `dropped_messages`, `remaining_messages` | +| `agent.session.summarize` | `summarized_messages`, `kept_messages`, `summary_len`, `omitted_oversized` | +| `agent.tool.exec_start` | `tool`, `args_count` | +| `agent.tool.exec_end` | `tool`, `duration_ms`, `for_llm_len`, `for_user_len`, `is_error`, `async` | +| `agent.tool.exec_skipped` | `tool`, `reason` | +| `agent.steering.injected` | `count`, `total_content_len` | +| `agent.follow_up.queued` | `source_tool`, `content_len` | +| `agent.interrupt.received` | `interrupt_kind`, `role`, `content_len`, `queue_depth`, `hint_len` | +| `agent.subturn.spawn` | `child_agent_id`, `label` | +| `agent.subturn.end` | `child_agent_id`, `status` | +| `agent.subturn.result_delivered` | `target_channel`, `target_chat_id`, `content_len` | +| `agent.subturn.orphan` | `parent_turn_id`, `child_turn_id`, `reason` | +| `agent.error` | `stage`, `error` | + +## 可打印的事件域 + +当前 runtime event kind 定义在 `pkg/events/kind.go`。事件日志配置可以选择这些域: + +- `agent.*`:agent turn、LLM、tool、context、steering、interrupt、subturn、error。 +- `channel.*`:channel lifecycle、webhook 注册、outbound queued/sent/failed、rate limited。 +- `bus.*`:publish failed、close started/drained/completed。 +- `gateway.*`:start、ready、shutdown、reload started/completed/failed。 +- `mcp.*`:server connecting/connected/failed、tool discovered、tool call start/end。 + +默认事件日志示例见 [`../../config/config.example.json`](../../config/config.example.json)。 diff --git a/docs/guides/configuration.zh.md b/docs/guides/configuration.zh.md index dbc853d98..c41c3dae0 100644 --- a/docs/guides/configuration.zh.md +++ b/docs/guides/configuration.zh.md @@ -753,6 +753,42 @@ PicoClaw 按协议族路由提供商: +### 事件日志 + +PicoClaw 的 runtime events 会覆盖 agent、channel、gateway、message bus 和 MCP 等运行时组件。默认只打印 `agent.*` 事件,其他事件仍会发布到 runtime event bus,但不会进入日志。 + +```json +{ + "events": { + "logging": { + "enabled": true, + "include": ["agent.*"], + "exclude": [], + "min_severity": "info", + "include_payload": false + } + } +} +``` + +常用配置: + +```json +{ + "events": { + "logging": { + "include": ["*"], + "exclude": ["agent.llm.delta"], + "min_severity": "warn" + } + } +} +``` + +`include` / `exclude` 支持精确事件名和 `gateway.*`、`channel.lifecycle.*` 这类模式。`include_payload` 默认关闭,避免把完整用户消息或工具参数写入日志;agent 事件会默认输出长度、计数、状态等摘要字段。 + +更多字段说明和示例见 [Runtime Events 与事件日志](../architecture/runtime-events.zh.md)。 + ### 定时任务 / 提醒 PicoClaw 通过 `cron` 工具支持 cron 风格的定时任务。Agent 可以设置、列出和取消在指定时间触发的提醒或周期性任务。 @@ -775,6 +811,7 @@ PicoClaw 通过 `cron` 工具支持 cron 风格的定时任务。Agent 可以设 | 主题 | 说明 | | ---- | ---- | | [敏感数据过滤](../security/sensitive_data_filtering.zh.md) | 在发送给 LLM 前,从工具结果中过滤 API 密钥和令牌 | +| [Runtime Events 与事件日志](../architecture/runtime-events.zh.md) | 统一运行时事件、日志过滤和调试配置 | | [Hook 系统](../architecture/hooks/README.zh.md) | 事件驱动 Hook:观察者、拦截器、审批 Hook | | [Steering](../architecture/steering.md) | 在工具调用间向运行中的 Agent 注入消息 | | [SubTurn](../architecture/subturn.md) | 子 Agent 协调、并发控制、生命周期管理 | diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 7a9384a23..a394a6e97 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -39,9 +39,10 @@ type AgentLoop struct { state *state.Manager // Runtime event system - runtimeEvents runtimeevents.Bus - ownsRuntimeEvents bool - hooks *HookManager + runtimeEvents runtimeevents.Bus + ownsRuntimeEvents bool + runtimeEventLogSub runtimeevents.Subscription + hooks *HookManager // Runtime state running atomic.Bool @@ -285,6 +286,9 @@ func (al *AgentLoop) Close() { if al.hooks != nil { al.hooks.Close() } + if al.runtimeEventLogSub != nil { + _ = al.runtimeEventLogSub.Close() + } if al.runtimeEvents != nil && al.ownsRuntimeEvents { if err := al.runtimeEvents.Close(); err != nil { logger.ErrorCF("agent", "Failed to close runtime event bus", diff --git a/pkg/agent/agent_event.go b/pkg/agent/agent_event.go index d4174ced5..99ea2a18e 100644 --- a/pkg/agent/agent_event.go +++ b/pkg/agent/agent_event.go @@ -6,7 +6,6 @@ import ( "fmt" runtimeevents "github.com/sipeed/picoclaw/pkg/events" - "github.com/sipeed/picoclaw/pkg/logger" ) func (al *AgentLoop) newTurnEventScope(agentID, sessionKey string, turnCtx *TurnContext) turnEventScope { @@ -48,108 +47,9 @@ func (al *AgentLoop) emitEvent(kind runtimeevents.Kind, meta HookMeta, payload a return } - al.logEvent(evt, clonedMeta, eventCtx) - al.publishRuntimeEvent(evt) } -func (al *AgentLoop) logEvent(evt runtimeevents.Event, meta HookMeta, eventCtx *TurnContext) { - fields := map[string]any{ - "event_kind": evt.Kind.String(), - "agent_id": meta.AgentID, - "turn_id": meta.TurnID, - "session_key": meta.SessionKey, - "iteration": meta.Iteration, - } - - if meta.TracePath != "" { - fields["trace"] = meta.TracePath - } - if meta.Source != "" { - fields["source"] = meta.Source - } - - appendEventContextFields(fields, eventCtx) - - switch payload := evt.Payload.(type) { - case TurnStartPayload: - fields["user_len"] = len(payload.UserMessage) - fields["media_count"] = payload.MediaCount - case TurnEndPayload: - fields["status"] = payload.Status - fields["iterations_total"] = payload.Iterations - fields["duration_ms"] = payload.Duration.Milliseconds() - fields["final_len"] = payload.FinalContentLen - case LLMRequestPayload: - fields["model"] = payload.Model - fields["messages"] = payload.MessagesCount - fields["tools"] = payload.ToolsCount - fields["max_tokens"] = payload.MaxTokens - case LLMDeltaPayload: - fields["content_delta_len"] = payload.ContentDeltaLen - fields["reasoning_delta_len"] = payload.ReasoningDeltaLen - case LLMResponsePayload: - fields["content_len"] = payload.ContentLen - fields["tool_calls"] = payload.ToolCalls - fields["has_reasoning"] = payload.HasReasoning - case LLMRetryPayload: - fields["attempt"] = payload.Attempt - fields["max_retries"] = payload.MaxRetries - fields["reason"] = payload.Reason - fields["error"] = payload.Error - fields["backoff_ms"] = payload.Backoff.Milliseconds() - case ContextCompressPayload: - fields["reason"] = payload.Reason - fields["dropped_messages"] = payload.DroppedMessages - fields["remaining_messages"] = payload.RemainingMessages - case SessionSummarizePayload: - fields["summarized_messages"] = payload.SummarizedMessages - fields["kept_messages"] = payload.KeptMessages - fields["summary_len"] = payload.SummaryLen - fields["omitted_oversized"] = payload.OmittedOversized - case ToolExecStartPayload: - fields["tool"] = payload.Tool - fields["args_count"] = len(payload.Arguments) - case ToolExecEndPayload: - fields["tool"] = payload.Tool - fields["duration_ms"] = payload.Duration.Milliseconds() - fields["for_llm_len"] = payload.ForLLMLen - fields["for_user_len"] = payload.ForUserLen - fields["is_error"] = payload.IsError - fields["async"] = payload.Async - case ToolExecSkippedPayload: - fields["tool"] = payload.Tool - fields["reason"] = payload.Reason - case SteeringInjectedPayload: - fields["count"] = payload.Count - fields["total_content_len"] = payload.TotalContentLen - case FollowUpQueuedPayload: - fields["source_tool"] = payload.SourceTool - fields["content_len"] = payload.ContentLen - case InterruptReceivedPayload: - fields["interrupt_kind"] = payload.Kind - fields["role"] = payload.Role - fields["content_len"] = payload.ContentLen - fields["queue_depth"] = payload.QueueDepth - fields["hint_len"] = payload.HintLen - case SubTurnSpawnPayload: - fields["child_agent_id"] = payload.AgentID - fields["label"] = payload.Label - case SubTurnEndPayload: - fields["child_agent_id"] = payload.AgentID - fields["status"] = payload.Status - case SubTurnResultDeliveredPayload: - fields["target_channel"] = payload.TargetChannel - fields["target_chat_id"] = payload.TargetChatID - fields["content_len"] = payload.ContentLen - case ErrorPayload: - fields["stage"] = payload.Stage - fields["error"] = payload.Message - } - - logger.InfoCF("eventbus", fmt.Sprintf("Agent event: %s", evt.Kind.String()), fields) -} - // MountHook registers an in-process hook on the agent loop. func (al *AgentLoop) MountHook(reg HookRegistration) error { if al == nil || al.hooks == nil { diff --git a/pkg/agent/agent_init.go b/pkg/agent/agent_init.go index 7ad884cfe..b8e535c36 100644 --- a/pkg/agent/agent_init.go +++ b/pkg/agent/agent_init.go @@ -75,6 +75,7 @@ func NewAgentLoop( al.runtimeEvents = runtimeevents.NewBus() al.ownsRuntimeEvents = true } + al.runtimeEventLogSub = subscribeRuntimeEventLogger(cfg, al.runtimeEvents) al.providerFactory = providers.CreateProviderFromConfig al.hooks = NewHookManager(al.runtimeEvents.Channel()) configureHookManagerFromConfig(al.hooks, cfg) diff --git a/pkg/agent/runtime_event_logger.go b/pkg/agent/runtime_event_logger.go new file mode 100644 index 000000000..4922c9021 --- /dev/null +++ b/pkg/agent/runtime_event_logger.go @@ -0,0 +1,316 @@ +package agent + +import ( + "context" + "fmt" + "path" + "strings" + "time" + + "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" + "github.com/sipeed/picoclaw/pkg/logger" +) + +const runtimeEventLoggerBuffer = 256 + +type runtimeEventLogger struct { + cfg config.EventLoggingConfig +} + +func subscribeRuntimeEventLogger(cfg *config.Config, eventBus runtimeevents.Bus) runtimeevents.Subscription { + eventLogger := newRuntimeEventLogger(cfg) + sub, err := eventLogger.subscribe(context.Background(), eventBus) + if err != nil { + logger.WarnCF("events", "Failed to subscribe runtime event logger", map[string]any{"error": err.Error()}) + return nil + } + return sub +} + +func newRuntimeEventLogger(cfg *config.Config) *runtimeEventLogger { + logCfg := config.EffectiveEventLoggingConfig(cfg) + if !logCfg.Enabled { + return nil + } + return &runtimeEventLogger{cfg: logCfg} +} + +func (l *runtimeEventLogger) subscribe( + ctx context.Context, + eventBus runtimeevents.Bus, +) (runtimeevents.Subscription, error) { + if l == nil || eventBus == nil { + return nil, nil + } + return eventBus.Channel().Subscribe(ctx, runtimeevents.SubscribeOptions{ + Name: "runtime-event-logger", + Buffer: runtimeEventLoggerBuffer, + Concurrency: runtimeevents.Locked, + Backpressure: runtimeevents.DropNewest, + PanicPolicy: runtimeevents.RecoverAndLog, + }, l.handle) +} + +func (l *runtimeEventLogger) handle(_ context.Context, evt runtimeevents.Event) error { + if l == nil || !l.shouldLog(evt) { + return nil + } + + fields := runtimeEventLogFields(evt) + if l.cfg.IncludePayload && evt.Payload != nil { + fields["payload"] = evt.Payload + } + + logRuntimeEvent(evt, fields) + return nil +} + +func (l *runtimeEventLogger) shouldLog(evt runtimeevents.Event) bool { + if l == nil || !l.cfg.Enabled { + return false + } + if runtimeEventSeverityRank(evt.Severity) < runtimeEventSeverityRank(parseRuntimeEventSeverity(l.cfg.MinSeverity)) { + return false + } + + kind := evt.Kind.String() + if !matchAnyRuntimeEventPattern(l.cfg.Include, kind, true) { + return false + } + return !matchAnyRuntimeEventPattern(l.cfg.Exclude, kind, false) +} + +func logRuntimeEvent(evt runtimeevents.Event, fields map[string]any) { + message := fmt.Sprintf("Runtime event: %s", evt.Kind.String()) + switch normalizeRuntimeEventSeverity(evt.Severity) { + case runtimeevents.SeverityDebug: + logger.DebugCF("events", message, fields) + case runtimeevents.SeverityWarn: + logger.WarnCF("events", message, fields) + case runtimeevents.SeverityError: + logger.ErrorCF("events", message, fields) + default: + logger.InfoCF("events", message, fields) + } +} + +func runtimeEventLogFields(evt runtimeevents.Event) map[string]any { + fields := map[string]any{ + "event_id": evt.ID, + "event_kind": evt.Kind.String(), + "severity": string(normalizeRuntimeEventSeverity(evt.Severity)), + } + if !evt.Time.IsZero() { + fields["event_time"] = evt.Time.Format(time.RFC3339Nano) + } + appendRuntimeEventSourceFields(fields, evt.Source) + appendRuntimeEventScopeFields(fields, evt.Scope) + appendRuntimeEventCorrelationFields(fields, evt.Correlation) + appendRuntimeEventAttrs(fields, evt.Attrs) + appendRuntimeEventPayloadSummary(fields, evt.Payload) + return fields +} + +func appendRuntimeEventSourceFields(fields map[string]any, source runtimeevents.Source) { + if source.Component != "" { + fields["source_component"] = source.Component + } + if source.Name != "" { + fields["source_name"] = source.Name + } +} + +func appendRuntimeEventScopeFields(fields map[string]any, scope runtimeevents.Scope) { + setStringField(fields, "runtime_id", scope.RuntimeID) + setStringField(fields, "agent_id", scope.AgentID) + setStringField(fields, "session_key", scope.SessionKey) + setStringField(fields, "turn_id", scope.TurnID) + setStringField(fields, "channel", scope.Channel) + setStringField(fields, "account", scope.Account) + setStringField(fields, "chat_id", scope.ChatID) + setStringField(fields, "topic_id", scope.TopicID) + setStringField(fields, "space_id", scope.SpaceID) + setStringField(fields, "space_type", scope.SpaceType) + setStringField(fields, "chat_type", scope.ChatType) + setStringField(fields, "sender_id", scope.SenderID) + setStringField(fields, "message_id", scope.MessageID) +} + +func appendRuntimeEventCorrelationFields(fields map[string]any, correlation runtimeevents.Correlation) { + setStringField(fields, "trace_id", correlation.TraceID) + setStringField(fields, "parent_turn_id", correlation.ParentTurnID) + setStringField(fields, "request_id", correlation.RequestID) + setStringField(fields, "reply_to_id", correlation.ReplyToID) +} + +func appendRuntimeEventAttrs(fields map[string]any, attrs map[string]any) { + for key, value := range attrs { + if key == "" || value == nil { + continue + } + if _, exists := fields[key]; exists { + fields["attr_"+key] = value + continue + } + fields[key] = value + } +} + +func appendRuntimeEventPayloadSummary(fields map[string]any, payload any) { + switch payload := payload.(type) { + case TurnStartPayload: + fields["user_len"] = len(payload.UserMessage) + fields["media_count"] = payload.MediaCount + case TurnEndPayload: + fields["status"] = payload.Status + fields["iterations_total"] = payload.Iterations + fields["duration_ms"] = payload.Duration.Milliseconds() + fields["final_len"] = payload.FinalContentLen + case LLMRequestPayload: + fields["model"] = payload.Model + fields["messages"] = payload.MessagesCount + fields["tools"] = payload.ToolsCount + fields["max_tokens"] = payload.MaxTokens + case LLMDeltaPayload: + fields["content_delta_len"] = payload.ContentDeltaLen + fields["reasoning_delta_len"] = payload.ReasoningDeltaLen + case LLMResponsePayload: + fields["content_len"] = payload.ContentLen + fields["tool_calls"] = payload.ToolCalls + fields["has_reasoning"] = payload.HasReasoning + case LLMRetryPayload: + fields["attempt"] = payload.Attempt + fields["max_retries"] = payload.MaxRetries + fields["reason"] = payload.Reason + fields["error"] = payload.Error + fields["backoff_ms"] = payload.Backoff.Milliseconds() + case ContextCompressPayload: + fields["reason"] = payload.Reason + fields["dropped_messages"] = payload.DroppedMessages + fields["remaining_messages"] = payload.RemainingMessages + case SessionSummarizePayload: + fields["summarized_messages"] = payload.SummarizedMessages + fields["kept_messages"] = payload.KeptMessages + fields["summary_len"] = payload.SummaryLen + fields["omitted_oversized"] = payload.OmittedOversized + case ToolExecStartPayload: + fields["tool"] = payload.Tool + fields["args_count"] = len(payload.Arguments) + case ToolExecEndPayload: + fields["tool"] = payload.Tool + fields["duration_ms"] = payload.Duration.Milliseconds() + fields["for_llm_len"] = payload.ForLLMLen + fields["for_user_len"] = payload.ForUserLen + fields["is_error"] = payload.IsError + fields["async"] = payload.Async + case ToolExecSkippedPayload: + fields["tool"] = payload.Tool + fields["reason"] = payload.Reason + case SteeringInjectedPayload: + fields["count"] = payload.Count + fields["total_content_len"] = payload.TotalContentLen + case FollowUpQueuedPayload: + fields["source_tool"] = payload.SourceTool + fields["content_len"] = payload.ContentLen + case InterruptReceivedPayload: + fields["interrupt_kind"] = payload.Kind + fields["role"] = payload.Role + fields["content_len"] = payload.ContentLen + fields["queue_depth"] = payload.QueueDepth + fields["hint_len"] = payload.HintLen + case SubTurnSpawnPayload: + fields["child_agent_id"] = payload.AgentID + fields["label"] = payload.Label + case SubTurnEndPayload: + fields["child_agent_id"] = payload.AgentID + fields["status"] = payload.Status + case SubTurnResultDeliveredPayload: + fields["target_channel"] = payload.TargetChannel + fields["target_chat_id"] = payload.TargetChatID + fields["content_len"] = payload.ContentLen + case SubTurnOrphanPayload: + fields["parent_turn_id"] = payload.ParentTurnID + fields["child_turn_id"] = payload.ChildTurnID + fields["reason"] = payload.Reason + case ErrorPayload: + fields["stage"] = payload.Stage + fields["error"] = payload.Message + } +} + +func setStringField(fields map[string]any, key, value string) { + if value != "" { + fields[key] = value + } +} + +func matchAnyRuntimeEventPattern(patterns []string, kind string, emptyMatches bool) bool { + if len(patterns) == 0 { + return emptyMatches + } + for _, pattern := range patterns { + if matchRuntimeEventPattern(pattern, kind) { + return true + } + } + return false +} + +func matchRuntimeEventPattern(pattern, kind string) bool { + pattern = strings.TrimSpace(pattern) + if pattern == "" { + return false + } + if pattern == "*" { + return true + } + if strings.HasSuffix(pattern, ".*") { + return strings.HasPrefix(kind, strings.TrimSuffix(pattern, "*")) + } + matched, err := path.Match(pattern, kind) + if err == nil { + return matched + } + return pattern == kind +} + +func parseRuntimeEventSeverity(severity string) runtimeevents.Severity { + switch strings.ToLower(strings.TrimSpace(severity)) { + case "debug": + return runtimeevents.SeverityDebug + case "warn", "warning": + return runtimeevents.SeverityWarn + case "error": + return runtimeevents.SeverityError + default: + return runtimeevents.SeverityInfo + } +} + +func normalizeRuntimeEventSeverity(severity runtimeevents.Severity) runtimeevents.Severity { + switch severity { + case runtimeevents.SeverityDebug, + runtimeevents.SeverityInfo, + runtimeevents.SeverityWarn, + runtimeevents.SeverityError: + return severity + default: + return runtimeevents.SeverityInfo + } +} + +func runtimeEventSeverityRank(severity runtimeevents.Severity) int { + switch normalizeRuntimeEventSeverity(severity) { + case runtimeevents.SeverityDebug: + return 0 + case runtimeevents.SeverityInfo: + return 1 + case runtimeevents.SeverityWarn: + return 2 + case runtimeevents.SeverityError: + return 3 + default: + return 1 + } +} diff --git a/pkg/agent/runtime_event_logger_test.go b/pkg/agent/runtime_event_logger_test.go new file mode 100644 index 000000000..d056eb15c --- /dev/null +++ b/pkg/agent/runtime_event_logger_test.go @@ -0,0 +1,99 @@ +package agent + +import ( + "testing" + + "github.com/sipeed/picoclaw/pkg/config" + runtimeevents "github.com/sipeed/picoclaw/pkg/events" +) + +func TestRuntimeEventLoggerFiltering(t *testing.T) { + cfg := config.DefaultConfig() + eventLogger := newRuntimeEventLogger(cfg) + if eventLogger == nil { + t.Fatal("default runtime event logger is nil") + } + + if !eventLogger.shouldLog(runtimeevents.Event{ + Kind: runtimeevents.KindAgentTurnStart, + Severity: runtimeevents.SeverityInfo, + }) { + t.Fatal("default config should log agent events") + } + if eventLogger.shouldLog(runtimeevents.Event{ + Kind: runtimeevents.KindChannelLifecycleStarted, + Severity: runtimeevents.SeverityInfo, + }) { + t.Fatal("default config should not log non-agent events") + } + + cfg.Events.Logging.Include = []string{"*"} + cfg.Events.Logging.Exclude = []string{"mcp.*"} + eventLogger = newRuntimeEventLogger(cfg) + if !eventLogger.shouldLog(runtimeevents.Event{ + Kind: runtimeevents.KindGatewayReady, + Severity: runtimeevents.SeverityInfo, + }) { + t.Fatal("include * should log gateway events") + } + if eventLogger.shouldLog(runtimeevents.Event{ + Kind: runtimeevents.KindMCPServerConnected, + Severity: runtimeevents.SeverityInfo, + }) { + t.Fatal("exclude mcp.* should suppress MCP events") + } + + cfg.Events.Logging.Exclude = nil + cfg.Events.Logging.MinSeverity = "warn" + eventLogger = newRuntimeEventLogger(cfg) + if eventLogger.shouldLog(runtimeevents.Event{ + Kind: runtimeevents.KindGatewayReady, + Severity: runtimeevents.SeverityInfo, + }) { + t.Fatal("min severity warn should suppress info events") + } + if !eventLogger.shouldLog(runtimeevents.Event{ + Kind: runtimeevents.KindGatewayReloadFailed, + Severity: runtimeevents.SeverityError, + }) { + t.Fatal("min severity warn should allow error events") + } + + cfg.Events.Logging.Enabled = false + if newRuntimeEventLogger(cfg) != nil { + t.Fatal("disabled config should not create runtime event logger") + } +} + +func TestRuntimeEventLogFieldsSummarizeAgentPayload(t *testing.T) { + fields := runtimeEventLogFields(runtimeevents.Event{ + ID: "evt-test", + Kind: runtimeevents.KindAgentToolExecStart, + Severity: runtimeevents.SeverityInfo, + Source: runtimeevents.Source{ + Component: "agent", + Name: "main", + }, + Scope: runtimeevents.Scope{ + AgentID: "main", + SessionKey: "session-1", + TurnID: "turn-1", + }, + Payload: ToolExecStartPayload{ + Tool: "exec", + Arguments: map[string]any{ + "secret": "should-not-be-logged-by-default", + }, + }, + }) + + if fields["event_id"] != "evt-test" || fields["source_component"] != "agent" { + t.Fatalf("missing common event fields: %#v", fields) + } + if fields["tool"] != "exec" || fields["args_count"] != 1 { + t.Fatalf("missing safe agent payload summary fields: %#v", fields) + } + if _, ok := fields["payload"]; ok { + t.Fatalf("raw payload should not be included by runtimeEventLogFields: %#v", fields) + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 6bb8d3ce6..35e217e78 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -39,6 +39,7 @@ type Config struct { Channels ChannelsConfig `json:"channel_list" yaml:"channel_list"` ModelList SecureModelList `json:"model_list" yaml:"model_list"` // New model-centric provider configuration Gateway GatewayConfig `json:"gateway" yaml:"-"` + Events EventsConfig `json:"events,omitempty" yaml:"-"` Hooks HooksConfig `json:"hooks,omitempty" yaml:"-"` Tools ToolsConfig `json:"tools" yaml:",inline"` Heartbeat HeartbeatConfig `json:"heartbeat" yaml:"-"` diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index f3aaca7ab..f7490dfed 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -294,6 +294,9 @@ func DefaultConfig() *Config { HotReload: false, LogLevel: DefaultGatewayLogLevel, }, + Events: EventsConfig{ + Logging: defaultEventLoggingConfig(), + }, Tools: ToolsConfig{ FilterSensitiveData: true, FilterMinLength: 8, diff --git a/pkg/config/events.go b/pkg/config/events.go new file mode 100644 index 000000000..54a2ce709 --- /dev/null +++ b/pkg/config/events.go @@ -0,0 +1,48 @@ +package config + +// EventsConfig groups runtime event configuration. +type EventsConfig struct { + Logging EventLoggingConfig `json:"logging,omitempty" envPrefix:"PICOCLAW_EVENTS_LOGGING_"` +} + +// EventLoggingConfig controls centralized runtime event logging. +type EventLoggingConfig struct { + // Enabled controls whether runtime events are printed by the built-in logger. + Enabled bool `json:"enabled" env:"ENABLED"` + // Include contains exact event kinds or glob patterns such as "agent.*" or "*". + Include []string `json:"include,omitempty" env:"INCLUDE"` + // Exclude contains exact event kinds or glob patterns to suppress after Include matches. + Exclude []string `json:"exclude,omitempty" env:"EXCLUDE"` + // MinSeverity filters out events below the configured severity: debug, info, warn, or error. + MinSeverity string `json:"min_severity,omitempty" env:"MIN_SEVERITY"` + // IncludePayload adds the raw payload to logs. Leave disabled unless detailed diagnostics are needed. + IncludePayload bool `json:"include_payload,omitempty" env:"INCLUDE_PAYLOAD"` +} + +// DefaultEventLoggingInclude keeps the pre-existing behavior where agent events +// are printed, while non-agent runtime events are published for subscribers only. +var DefaultEventLoggingInclude = []string{"agent.*"} + +// EffectiveEventLoggingConfig returns a logging config with stable defaults. +func EffectiveEventLoggingConfig(cfg *Config) EventLoggingConfig { + if cfg == nil { + return defaultEventLoggingConfig() + } + + out := cfg.Events.Logging + if out.MinSeverity == "" { + out.MinSeverity = "info" + } + if len(out.Include) == 0 { + out.Include = append([]string(nil), DefaultEventLoggingInclude...) + } + return out +} + +func defaultEventLoggingConfig() EventLoggingConfig { + return EventLoggingConfig{ + Enabled: true, + Include: append([]string(nil), DefaultEventLoggingInclude...), + MinSeverity: "info", + } +} diff --git a/pkg/config/events_test.go b/pkg/config/events_test.go new file mode 100644 index 000000000..6cd410492 --- /dev/null +++ b/pkg/config/events_test.go @@ -0,0 +1,103 @@ +package config + +import ( + "os" + "path/filepath" + "reflect" + "testing" +) + +func TestDefaultEventLoggingConfig(t *testing.T) { + cfg := DefaultConfig() + logCfg := EffectiveEventLoggingConfig(cfg) + + if !logCfg.Enabled { + t.Fatal("default event logging should be enabled") + } + if !reflect.DeepEqual(logCfg.Include, []string{"agent.*"}) { + t.Fatalf("default include = %#v, want agent.*", logCfg.Include) + } + if logCfg.MinSeverity != "info" { + t.Fatalf("default min severity = %q, want info", logCfg.MinSeverity) + } + if logCfg.IncludePayload { + t.Fatal("default event logging should not include raw payloads") + } +} + +func TestLoadConfigEventLoggingOverrides(t *testing.T) { + path := filepath.Join(t.TempDir(), "config.json") + data := []byte(`{ + "version": 3, + "events": { + "logging": { + "enabled": false, + "include": ["gateway.*"], + "exclude": ["gateway.ready"], + "min_severity": "warn", + "include_payload": true + } + } + }`) + if err := os.WriteFile(path, data, 0o600); err != nil { + t.Fatalf("write config: %v", err) + } + + cfg, err := LoadConfig(path) + if err != nil { + t.Fatalf("LoadConfig() error: %v", err) + } + logCfg := EffectiveEventLoggingConfig(cfg) + + if logCfg.Enabled { + t.Fatal("loaded event logging enabled = true, want false") + } + if !reflect.DeepEqual(logCfg.Include, []string{"gateway.*"}) { + t.Fatalf("loaded include = %#v, want gateway.*", logCfg.Include) + } + if !reflect.DeepEqual(logCfg.Exclude, []string{"gateway.ready"}) { + t.Fatalf("loaded exclude = %#v, want gateway.ready", logCfg.Exclude) + } + if logCfg.MinSeverity != "warn" { + t.Fatalf("loaded min severity = %q, want warn", logCfg.MinSeverity) + } + if !logCfg.IncludePayload { + t.Fatal("loaded include_payload = false, want true") + } +} + +func TestLoadConfigEventLoggingEnvOverrides(t *testing.T) { + t.Setenv("PICOCLAW_EVENTS_LOGGING_ENABLED", "false") + t.Setenv("PICOCLAW_EVENTS_LOGGING_INCLUDE", "gateway.*,channel.lifecycle.*") + t.Setenv("PICOCLAW_EVENTS_LOGGING_EXCLUDE", "gateway.ready") + t.Setenv("PICOCLAW_EVENTS_LOGGING_MIN_SEVERITY", "error") + t.Setenv("PICOCLAW_EVENTS_LOGGING_INCLUDE_PAYLOAD", "true") + + path := filepath.Join(t.TempDir(), "config.json") + data := []byte(`{"version": 3}`) + if err := os.WriteFile(path, data, 0o600); err != nil { + t.Fatalf("write config: %v", err) + } + + cfg, err := LoadConfig(path) + if err != nil { + t.Fatalf("LoadConfig() error: %v", err) + } + logCfg := EffectiveEventLoggingConfig(cfg) + + if logCfg.Enabled { + t.Fatal("env enabled override = true, want false") + } + if !reflect.DeepEqual(logCfg.Include, []string{"gateway.*", "channel.lifecycle.*"}) { + t.Fatalf("env include = %#v, want gateway/channel lifecycle", logCfg.Include) + } + if !reflect.DeepEqual(logCfg.Exclude, []string{"gateway.ready"}) { + t.Fatalf("env exclude = %#v, want gateway.ready", logCfg.Exclude) + } + if logCfg.MinSeverity != "error" { + t.Fatalf("env min severity = %q, want error", logCfg.MinSeverity) + } + if !logCfg.IncludePayload { + t.Fatal("env include_payload = false, want true") + } +}