From 1f5e488ae3634a54897f8f8aa1573ae48c670f22 Mon Sep 17 00:00:00 2001 From: Brett Chien <1193046+brettchien@users.noreply.github.com> Date: Wed, 6 May 2026 14:35:52 +0000 Subject: [PATCH 1/7] fix(acp): clean up pending + cancel agent on abandoned prompts (#732) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The flat 600s recv_timeout in adapter.rs:386 fires "Agent stopped responding" without removing pending[id] or sending session/cancel. The agent keeps running the abandoned prompt and eventually emits its final response with the original id. The reader at connection.rs:284 looks up pending[id], sees the now-stale entry, and forwards the message to the *current* notify_tx subscriber — which belongs to the next prompt. The next prompt's loop sees notification.id.is_some() and breaks immediately with empty text_buf, returning "(no response)". Each new prompt sent before the agent drains its backlog inherits the previous prompt's stale id and the cascade persists. Fix follows the issue's recommended A+B+C: (A) Replace flat 600s timeout with a tokio::select! loop in stream_prompt_blocks. Recv arm + 30s liveness arm. Liveness arm checks conn.alive() (cheap, just !reader_handle.is_finished()) and a configurable hard ceiling. Default ceiling is 30 min via pool.prompt_hard_timeout_secs. Long-running tools no longer trip the timeout — only a dead reader task or the hard ceiling abandon the prompt. (B) Add AcpConnection::abandon_request(request_id) called on every abandon path: drops pending[request_id] so a late response cannot route to a future subscriber, and best-effort writes session/cancel so the agent stops working on a request the broker has given up on. (C) Capture request_id from session_prompt() (was discarded as `_`) and skip notifications whose id doesn't match. Defense-in-depth at the routing layer; complements (B)'s cleanup if any future abandon path forgets to call abandon_request. No unit test for abandon_request — the connection has no test seam without spawning a real subprocess. Behavior is exercised end-to-end via the adapter loop on real ACP backends. Refs: - #76 (Assumption 2: prompts always complete) - #307 (sibling: same family, different visible symptom) - #470 (added the 600s recv timeout this issue exposes) Co-Authored-By: Claude Opus 4.7 --- config.toml.example | 2 ++ src/acp/connection.rs | 18 ++++++++++++++++ src/adapter.rs | 49 +++++++++++++++++++++++++++++++++---------- src/config.rs | 10 +++++++++ src/dispatch.rs | 1 + src/main.rs | 1 + 6 files changed, 70 insertions(+), 11 deletions(-) diff --git a/config.toml.example b/config.toml.example index cf99f6bd..5c3289c2 100644 --- a/config.toml.example +++ b/config.toml.example @@ -104,6 +104,8 @@ working_dir = "/home/agent" [pool] max_sessions = 10 session_ttl_hours = 24 +# Hard ceiling (sec) per prompt; see #732. Default: 1800 (30 min). +# prompt_hard_timeout_secs = 1800 [markdown] tables = "code" # "code" (default) | "bullets" | "off" diff --git a/src/acp/connection.rs b/src/acp/connection.rs index c1b36a47..d2b37804 100644 --- a/src/acp/connection.rs +++ b/src/acp/connection.rs @@ -557,6 +557,24 @@ impl AcpConnection { self.last_active = Instant::now(); } + /// Drop the pending entry for `request_id` and best-effort send + /// `session/cancel`. Errors are swallowed: the agent process may already + /// be dead, in which case the stdin write fails harmlessly. See #732. + pub async fn abandon_request(&self, request_id: u64) { + self.pending.lock().await.remove(&request_id); + let Some(session_id) = self.acp_session_id.as_deref() else { + return; + }; + let req = json!({ + "jsonrpc": "2.0", + "method": "session/cancel", + "params": {"sessionId": session_id}, + }); + if let Ok(data) = serde_json::to_string(&req) { + let _ = self.send_raw(&data).await; + } + } + /// Return a clone of the stdin handle for lock-free cancel. pub fn cancel_handle(&self) -> Arc> { Arc::clone(&self.stdin) diff --git a/src/adapter.rs b/src/adapter.rs index 106cd47b..156e5f26 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -153,12 +153,16 @@ pub trait ChatAdapter: Send + Sync + 'static { // --- AdapterRouter --- +/// Polling cadence for the recv-loop liveness check (#732). +const LIVENESS_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30); + /// Shared logic for routing messages to ACP agents, managing sessions, /// streaming edits, and controlling reactions. Platform-independent. pub struct AdapterRouter { pool: Arc, reactions_config: ReactionsConfig, table_mode: TableMode, + prompt_hard_timeout: std::time::Duration, } impl AdapterRouter { @@ -166,11 +170,13 @@ impl AdapterRouter { pool: Arc, reactions_config: ReactionsConfig, table_mode: TableMode, + prompt_hard_timeout_secs: u64, ) -> Self { Self { pool, reactions_config, table_mode, + prompt_hard_timeout: std::time::Duration::from_secs(prompt_hard_timeout_secs), } } @@ -335,6 +341,7 @@ impl AdapterRouter { let streaming = adapter.use_streaming(other_bot_present); let table_mode = self.table_mode; let tool_display = self.reactions_config.tool_display; + let prompt_hard_timeout = self.prompt_hard_timeout; self.pool .with_connection(thread_key, |conn| { @@ -343,7 +350,7 @@ impl AdapterRouter { let reset = conn.session_reset; conn.session_reset = false; - let (mut rx, _) = conn.session_prompt(content_blocks).await?; + let (mut rx, request_id) = conn.session_prompt(content_blocks).await?; reactions.set_thinking().await; let mut text_buf = String::new(); @@ -396,20 +403,40 @@ impl AdapterRouter { (None, None) }; - // Process ACP notifications + // (#732) Liveness-aware recv loop. Filters stale id-bearing + // messages and abandons cleanly on dead agent / hard ceiling + // so late responses cannot leak into the next prompt. let mut response_error: Option = None; - let recv_timeout = std::time::Duration::from_secs(600); + let prompt_start = std::time::Instant::now(); loop { - let notification = match tokio::time::timeout(recv_timeout, rx.recv()).await - { - Ok(Some(n)) => n, - Ok(None) => break, // channel closed - Err(_) => { - response_error = Some("Agent stopped responding".into()); - break; + let notification = tokio::select! { + msg = rx.recv() => match msg { + Some(n) => n, + // Reader saw EOF and already drained pending; nothing to abandon. + None => break, + }, + _ = tokio::time::sleep(LIVENESS_CHECK_INTERVAL) => { + if !conn.alive() { + response_error = Some("Agent process died".into()); + conn.abandon_request(request_id).await; + break; + } + if prompt_start.elapsed() > prompt_hard_timeout { + response_error = Some(format!( + "Agent exceeded hard timeout ({}m)", + prompt_hard_timeout.as_secs() / 60, + )); + conn.abandon_request(request_id).await; + break; + } + continue; } }; - if notification.id.is_some() { + if let Some(notification_id) = notification.id { + if notification_id != request_id { + // Stale response from a previously-abandoned prompt. + continue; + } if let Some(ref err) = notification.error { response_error = Some(format_coded_error(err.code, &err.message)); } diff --git a/src/config.rs b/src/config.rs index dd56484d..c633a08b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -313,6 +313,12 @@ pub struct PoolConfig { pub max_sessions: usize, #[serde(default = "default_ttl_hours")] pub session_ttl_hours: u64, + /// Hard ceiling for a single prompt (#732). Once exceeded, the broker + /// abandons the in-flight request, sends `session/cancel` to the agent, + /// and clears the pending entry so late responses cannot leak into the + /// next prompt's subscriber. + #[serde(default = "default_prompt_hard_timeout_secs")] + pub prompt_hard_timeout_secs: u64, } #[derive(Debug, Clone, Deserialize)] @@ -434,6 +440,9 @@ fn default_max_sessions() -> usize { fn default_ttl_hours() -> u64 { 4 } +pub(crate) fn default_prompt_hard_timeout_secs() -> u64 { + 30 * 60 +} fn default_true() -> bool { true } @@ -481,6 +490,7 @@ impl Default for PoolConfig { Self { max_sessions: default_max_sessions(), session_ttl_hours: default_ttl_hours(), + prompt_hard_timeout_secs: default_prompt_hard_timeout_secs(), } } } diff --git a/src/dispatch.rs b/src/dispatch.rs index a3fbec88..d30c5f60 100644 --- a/src/dispatch.rs +++ b/src/dispatch.rs @@ -1072,6 +1072,7 @@ mod tests { pool, crate::config::ReactionsConfig::default(), crate::markdown::TableMode::Off, + crate::config::default_prompt_hard_timeout_secs(), )); Dispatcher::with_idle_timeout(router, 10, 24_000, grouping, DEFAULT_CONSUMER_IDLE_TIMEOUT) } diff --git a/src/main.rs b/src/main.rs index 3cfce2db..076dfd92 100644 --- a/src/main.rs +++ b/src/main.rs @@ -147,6 +147,7 @@ async fn main() -> anyhow::Result<()> { pool.clone(), cfg.reactions, cfg.markdown.tables, + cfg.pool.prompt_hard_timeout_secs, )); // Shutdown signal for Slack adapter From c19371a4673cc151d3b02a6f772961f40aa972fd Mon Sep 17 00:00:00 2001 From: Brett Chien <1193046+brettchien@users.noreply.github.com> Date: Wed, 6 May 2026 17:12:18 +0000 Subject: [PATCH 2/7] chore(acp): apply chaodu-agent NITs from PR #760 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pool.liveness_check_secs: hoist the recv-loop poll cadence out of a hard-coded const onto PoolConfig so deployments can tune it. Default remains 30s. - adapter: change hard-timeout error message from ({}m) to ({}s) so non-multiple-of-60 ceilings render correctly (e.g. 90s → "(90s)"). - acp/connection: emit a tracing::trace! line when an id-bearing message arrives whose pending entry was already abandoned. Behaviour is unchanged — the adapter recv loop still filters by request_id; this just makes the stale-response path observable at trace level. cargo check + cargo clippy -- -D warnings + cargo test --bin openab all clean (238 passed). --- config.toml.example | 2 ++ src/acp/connection.rs | 6 +++++- src/adapter.rs | 14 ++++++++------ src/config.rs | 9 +++++++++ src/dispatch.rs | 1 + src/main.rs | 1 + 6 files changed, 26 insertions(+), 7 deletions(-) diff --git a/config.toml.example b/config.toml.example index 5c3289c2..a363dd32 100644 --- a/config.toml.example +++ b/config.toml.example @@ -106,6 +106,8 @@ max_sessions = 10 session_ttl_hours = 24 # Hard ceiling (sec) per prompt; see #732. Default: 1800 (30 min). # prompt_hard_timeout_secs = 1800 +# Liveness-check cadence (sec) for the recv loop; see #732. Default: 30. +# liveness_check_secs = 30 [markdown] tables = "code" # "code" (default) | "bullets" | "off" diff --git a/src/acp/connection.rs b/src/acp/connection.rs index d2b37804..5889f615 100644 --- a/src/acp/connection.rs +++ b/src/acp/connection.rs @@ -10,7 +10,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStdin}; use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::task::JoinHandle; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, trace}; /// Pick the most permissive selectable permission option from ACP options. fn pick_best_option(options: &[Value]) -> Option { @@ -319,6 +319,10 @@ impl AcpConnection { let _ = tx.send(msg); continue; } + // Stale id (#732): pending was already abandoned. Falls through + // to subscriber forwarding; the adapter recv loop filters by + // request_id so it can't leak into the next prompt. + trace!(request_id = id, "stale id-bearing message after abandon"); } // Notification → forward to subscriber diff --git a/src/adapter.rs b/src/adapter.rs index 156e5f26..eb72dfc9 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -153,9 +153,6 @@ pub trait ChatAdapter: Send + Sync + 'static { // --- AdapterRouter --- -/// Polling cadence for the recv-loop liveness check (#732). -const LIVENESS_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30); - /// Shared logic for routing messages to ACP agents, managing sessions, /// streaming edits, and controlling reactions. Platform-independent. pub struct AdapterRouter { @@ -163,6 +160,8 @@ pub struct AdapterRouter { reactions_config: ReactionsConfig, table_mode: TableMode, prompt_hard_timeout: std::time::Duration, + /// Polling cadence for the recv-loop liveness check (#732). + liveness_check_interval: std::time::Duration, } impl AdapterRouter { @@ -171,12 +170,14 @@ impl AdapterRouter { reactions_config: ReactionsConfig, table_mode: TableMode, prompt_hard_timeout_secs: u64, + liveness_check_secs: u64, ) -> Self { Self { pool, reactions_config, table_mode, prompt_hard_timeout: std::time::Duration::from_secs(prompt_hard_timeout_secs), + liveness_check_interval: std::time::Duration::from_secs(liveness_check_secs), } } @@ -342,6 +343,7 @@ impl AdapterRouter { let table_mode = self.table_mode; let tool_display = self.reactions_config.tool_display; let prompt_hard_timeout = self.prompt_hard_timeout; + let liveness_check_interval = self.liveness_check_interval; self.pool .with_connection(thread_key, |conn| { @@ -415,7 +417,7 @@ impl AdapterRouter { // Reader saw EOF and already drained pending; nothing to abandon. None => break, }, - _ = tokio::time::sleep(LIVENESS_CHECK_INTERVAL) => { + _ = tokio::time::sleep(liveness_check_interval) => { if !conn.alive() { response_error = Some("Agent process died".into()); conn.abandon_request(request_id).await; @@ -423,8 +425,8 @@ impl AdapterRouter { } if prompt_start.elapsed() > prompt_hard_timeout { response_error = Some(format!( - "Agent exceeded hard timeout ({}m)", - prompt_hard_timeout.as_secs() / 60, + "Agent exceeded hard timeout ({}s)", + prompt_hard_timeout.as_secs(), )); conn.abandon_request(request_id).await; break; diff --git a/src/config.rs b/src/config.rs index c633a08b..546f45ac 100644 --- a/src/config.rs +++ b/src/config.rs @@ -319,6 +319,11 @@ pub struct PoolConfig { /// next prompt's subscriber. #[serde(default = "default_prompt_hard_timeout_secs")] pub prompt_hard_timeout_secs: u64, + /// Polling cadence (seconds) for the recv-loop liveness check (#732). + /// Lower = faster reaction to a dead agent / hard ceiling at the cost of + /// more wakeups while the agent is streaming normally. + #[serde(default = "default_liveness_check_secs")] + pub liveness_check_secs: u64, } #[derive(Debug, Clone, Deserialize)] @@ -443,6 +448,9 @@ fn default_ttl_hours() -> u64 { pub(crate) fn default_prompt_hard_timeout_secs() -> u64 { 30 * 60 } +pub(crate) fn default_liveness_check_secs() -> u64 { + 30 +} fn default_true() -> bool { true } @@ -491,6 +499,7 @@ impl Default for PoolConfig { max_sessions: default_max_sessions(), session_ttl_hours: default_ttl_hours(), prompt_hard_timeout_secs: default_prompt_hard_timeout_secs(), + liveness_check_secs: default_liveness_check_secs(), } } } diff --git a/src/dispatch.rs b/src/dispatch.rs index d30c5f60..013ee3d8 100644 --- a/src/dispatch.rs +++ b/src/dispatch.rs @@ -1073,6 +1073,7 @@ mod tests { crate::config::ReactionsConfig::default(), crate::markdown::TableMode::Off, crate::config::default_prompt_hard_timeout_secs(), + crate::config::default_liveness_check_secs(), )); Dispatcher::with_idle_timeout(router, 10, 24_000, grouping, DEFAULT_CONSUMER_IDLE_TIMEOUT) } diff --git a/src/main.rs b/src/main.rs index 076dfd92..1f2a0639 100644 --- a/src/main.rs +++ b/src/main.rs @@ -148,6 +148,7 @@ async fn main() -> anyhow::Result<()> { cfg.reactions, cfg.markdown.tables, cfg.pool.prompt_hard_timeout_secs, + cfg.pool.liveness_check_secs, )); // Shutdown signal for Slack adapter From 0a8fc0a897a25707d701233a06866d693f2efedf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Wed, 6 May 2026 20:13:01 +0000 Subject: [PATCH 3/7] fix(acp): add precision doc + id to session/cancel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add ±liveness_check_secs precision note to prompt_hard_timeout_secs doc - Add JSON-RPC id field to session/cancel in abandon_request Co-authored-by: 超渡法師 --- src/acp/connection.rs | 1 + src/config.rs | 3 +++ 2 files changed, 4 insertions(+) diff --git a/src/acp/connection.rs b/src/acp/connection.rs index 5889f615..ef524fd1 100644 --- a/src/acp/connection.rs +++ b/src/acp/connection.rs @@ -571,6 +571,7 @@ impl AcpConnection { }; let req = json!({ "jsonrpc": "2.0", + "id": self.next_id(), "method": "session/cancel", "params": {"sessionId": session_id}, }); diff --git a/src/config.rs b/src/config.rs index 546f45ac..94a0aaa4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -317,6 +317,9 @@ pub struct PoolConfig { /// abandons the in-flight request, sends `session/cancel` to the agent, /// and clears the pending entry so late responses cannot leak into the /// next prompt's subscriber. + /// + /// Precision: checked every `liveness_check_secs`, so actual cutoff is + /// ±`liveness_check_secs` from this value. #[serde(default = "default_prompt_hard_timeout_secs")] pub prompt_hard_timeout_secs: u64, /// Polling cadence (seconds) for the recv-loop liveness check (#732). From 1a65244058be66931b6af0dab9ff63aa72abdfaa Mon Sep 17 00:00:00 2001 From: Brett Chien <1193046+brettchien@users.noreply.github.com> Date: Thu, 7 May 2026 00:13:57 +0000 Subject: [PATCH 4/7] chore(acp): apply chaodu-agent round-2 NITs from PR #760 - adapter::AdapterRouter::new: emit a tracing::warn! when liveness_check_secs >= prompt_hard_timeout_secs, since in that case the hard ceiling can only fire on the next liveness tick and may be effectively bypassed. Operator-visible warning, not a silent clamp. - adapter: switch prompt_start from std::time::Instant to tokio::time::Instant so the timer shares tokio's clock with the tokio::time::sleep in the same select! arm (cohesive with future tokio::time::pause()-based tests). - adapter + acp/connection: extend the stale-id filter / fall-through comments to note that the path is only exercised against a live subprocess and is covered by manual repro, not a unit test. Note: chaodu-agent NIT 2 (cancel response noise) requires no code change. abandon_request emits a JSON-RPC notification (no id field) per the ACP spec, so a spec-compliant agent must not respond, and even a non-compliant reply with no id would not hit the stale-id trace path. PR comment to follow. cargo check + cargo clippy -- -D warnings + cargo test --bin openab all clean (238 passed). --- src/acp/connection.rs | 5 ++++- src/adapter.rs | 17 +++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/acp/connection.rs b/src/acp/connection.rs index ef524fd1..a3a4549a 100644 --- a/src/acp/connection.rs +++ b/src/acp/connection.rs @@ -321,7 +321,10 @@ impl AcpConnection { } // Stale id (#732): pending was already abandoned. Falls through // to subscriber forwarding; the adapter recv loop filters by - // request_id so it can't leak into the next prompt. + // request_id so it can't leak into the next prompt. No automated + // test seam — the path only triggers when a real subprocess + // delivers a late response after abandon_request, which is + // covered by manual repro against a live agent. trace!(request_id = id, "stale id-bearing message after abandon"); } diff --git a/src/adapter.rs b/src/adapter.rs index eb72dfc9..5a80f29e 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -2,7 +2,7 @@ use anyhow::Result; use async_trait::async_trait; use serde::Serialize; use std::sync::Arc; -use tracing::error; +use tracing::{error, warn}; use crate::acp::{classify_notification, AcpEvent, ContentBlock, SessionPool}; use crate::config::{ReactionsConfig, ToolDisplay}; @@ -172,6 +172,15 @@ impl AdapterRouter { prompt_hard_timeout_secs: u64, liveness_check_secs: u64, ) -> Self { + if liveness_check_secs >= prompt_hard_timeout_secs { + warn!( + liveness_check_secs, + prompt_hard_timeout_secs, + "pool.liveness_check_secs >= pool.prompt_hard_timeout_secs; \ + the hard ceiling will only fire after the next liveness tick \ + and may be effectively bypassed. Lower liveness_check_secs." + ); + } Self { pool, reactions_config, @@ -409,7 +418,7 @@ impl AdapterRouter { // messages and abandons cleanly on dead agent / hard ceiling // so late responses cannot leak into the next prompt. let mut response_error: Option = None; - let prompt_start = std::time::Instant::now(); + let prompt_start = tokio::time::Instant::now(); loop { let notification = tokio::select! { msg = rx.recv() => match msg { @@ -437,6 +446,10 @@ impl AdapterRouter { if let Some(notification_id) = notification.id { if notification_id != request_id { // Stale response from a previously-abandoned prompt. + // No automated test seam: this path only triggers when a + // real subprocess emits a late response after the broker + // already called abandon_request — covered by manual + // repro against a live agent (see #732 PR description). continue; } if let Some(ref err) = notification.error { From ccef788aa446a5ec131cd58a092e298b62067854 Mon Sep 17 00:00:00 2001 From: Brett Chien <1193046+brettchien@users.noreply.github.com> Date: Thu, 7 May 2026 00:27:39 +0000 Subject: [PATCH 5/7] test(acp): add reader-loop unit tests for stale-id path (#732) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract the reader-loop body in AcpConnection::spawn into a free generic function `run_reader_loop` so tests can drive it with `tokio::io::duplex()` halves instead of a real subprocess. Production path is unchanged — spawn() now calls `tokio::spawn(run_reader_loop(...))` with the same args. Two new tests cover: - stale-id response forwarded to subscriber when `pending` is empty (the #732 fall-through path that the adapter recv loop filters by request_id) - matched-id response resolves the pending oneshot AND forwards a copy to the subscriber (regression guard for the dual branch) Co-Authored-By: Claude Opus 4.7 --- src/acp/connection.rs | 316 ++++++++++++++++++++++++++++-------------- 1 file changed, 215 insertions(+), 101 deletions(-) diff --git a/src/acp/connection.rs b/src/acp/connection.rs index a3a4549a..445b09c1 100644 --- a/src/acp/connection.rs +++ b/src/acp/connection.rs @@ -6,7 +6,7 @@ use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStdin}; use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::task::JoinHandle; @@ -149,6 +149,112 @@ fn build_agent_env( (result, inherited) } +/// Reader loop body: reads JSON-RPC messages from `reader`, auto-replies +/// `session/request_permission` via `writer`, resolves pending responses, +/// and forwards notifications + stale id-bearing messages to the active +/// subscriber. Extracted as a free generic function so unit tests can drive +/// it with `tokio::io::duplex()` halves instead of a real child process. +pub(crate) async fn run_reader_loop( + reader: R, + writer: Arc>, + pending: Arc>>>, + notify_tx: Arc>>>, +) where + R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static, +{ + let mut reader = BufReader::new(reader); + let mut line = String::new(); + loop { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) => break, // EOF + Ok(_) => {} + Err(e) => { + error!("reader error: {e}"); + break; + } + } + let msg: JsonRpcMessage = match serde_json::from_str(line.trim()) { + Ok(m) => m, + Err(_) => continue, + }; + debug!(line = line.trim(), "acp_recv"); + + // Auto-reply session/request_permission + if msg.method.as_deref() == Some("session/request_permission") { + if let Some(id) = msg.id { + let title = msg + .params + .as_ref() + .and_then(|p| p.get("toolCall")) + .and_then(|t| t.get("title")) + .and_then(|t| t.as_str()) + .unwrap_or("?"); + + let outcome = build_permission_response(msg.params.as_ref()); + info!(title, %outcome, "auto-respond permission"); + let reply = JsonRpcResponse::new(id, outcome); + if let Ok(data) = serde_json::to_string(&reply) { + let mut w = writer.lock().await; + let _ = w.write_all(format!("{data}\n").as_bytes()).await; + let _ = w.flush().await; + } + } + continue; + } + + // Response (has id) → resolve pending AND forward to subscriber + if let Some(id) = msg.id { + let mut map = pending.lock().await; + if let Some(tx) = map.remove(&id) { + // Forward to subscriber so they see the completion + let sub = notify_tx.lock().await; + if let Some(ntx) = sub.as_ref() { + // Clone the essential fields for the subscriber + let _ = ntx.send(JsonRpcMessage { + id: Some(id), + method: None, + result: msg.result.clone(), + error: msg.error.clone(), + params: None, + }); + } + let _ = tx.send(msg); + continue; + } + // Stale id (#732): pending was already abandoned. Falls through + // to subscriber forwarding; the adapter recv loop filters by + // request_id so it can't leak into the next prompt. + trace!(request_id = id, "stale id-bearing message after abandon"); + } + + // Notification → forward to subscriber + let sub = notify_tx.lock().await; + if let Some(tx) = sub.as_ref() { + let _ = tx.send(msg); + } + } + + // Connection closed — resolve all pending with error + let mut map = pending.lock().await; + for (_, tx) in map.drain() { + let _ = tx.send(JsonRpcMessage { + id: None, + method: None, + result: None, + error: Some(crate::acp::protocol::JsonRpcError { + code: -1, + message: "connection closed".into(), + }), + params: None, + }); + } + // Close the notify channel so rx.recv() returns None + let mut sub = notify_tx.lock().await; + *sub = None; +} + impl AcpConnection { pub async fn spawn( command: &str, @@ -254,106 +360,12 @@ impl AcpConnection { let notify_tx: Arc>>> = Arc::new(Mutex::new(None)); - let reader_handle = { - let pending = pending.clone(); - let notify_tx = notify_tx.clone(); - let stdin_clone = stdin.clone(); - tokio::spawn(async move { - let mut reader = BufReader::new(stdout); - let mut line = String::new(); - loop { - line.clear(); - match reader.read_line(&mut line).await { - Ok(0) => break, // EOF - Ok(_) => {} - Err(e) => { - error!("reader error: {e}"); - break; - } - } - let msg: JsonRpcMessage = match serde_json::from_str(line.trim()) { - Ok(m) => m, - Err(_) => continue, - }; - debug!(line = line.trim(), "acp_recv"); - - // Auto-reply session/request_permission - if msg.method.as_deref() == Some("session/request_permission") { - if let Some(id) = msg.id { - let title = msg - .params - .as_ref() - .and_then(|p| p.get("toolCall")) - .and_then(|t| t.get("title")) - .and_then(|t| t.as_str()) - .unwrap_or("?"); - - let outcome = build_permission_response(msg.params.as_ref()); - info!(title, %outcome, "auto-respond permission"); - let reply = JsonRpcResponse::new(id, outcome); - if let Ok(data) = serde_json::to_string(&reply) { - let mut w = stdin_clone.lock().await; - let _ = w.write_all(format!("{data}\n").as_bytes()).await; - let _ = w.flush().await; - } - } - continue; - } - - // Response (has id) → resolve pending AND forward to subscriber - if let Some(id) = msg.id { - let mut map = pending.lock().await; - if let Some(tx) = map.remove(&id) { - // Forward to subscriber so they see the completion - let sub = notify_tx.lock().await; - if let Some(ntx) = sub.as_ref() { - // Clone the essential fields for the subscriber - let _ = ntx.send(JsonRpcMessage { - id: Some(id), - method: None, - result: msg.result.clone(), - error: msg.error.clone(), - params: None, - }); - } - let _ = tx.send(msg); - continue; - } - // Stale id (#732): pending was already abandoned. Falls through - // to subscriber forwarding; the adapter recv loop filters by - // request_id so it can't leak into the next prompt. No automated - // test seam — the path only triggers when a real subprocess - // delivers a late response after abandon_request, which is - // covered by manual repro against a live agent. - trace!(request_id = id, "stale id-bearing message after abandon"); - } - - // Notification → forward to subscriber - let sub = notify_tx.lock().await; - if let Some(tx) = sub.as_ref() { - let _ = tx.send(msg); - } - } - - // Connection closed — resolve all pending with error - let mut map = pending.lock().await; - for (_, tx) in map.drain() { - let _ = tx.send(JsonRpcMessage { - id: None, - method: None, - result: None, - error: Some(crate::acp::protocol::JsonRpcError { - code: -1, - message: "connection closed".into(), - }), - params: None, - }); - } - // Close the notify channel so rx.recv() returns None - let mut sub = notify_tx.lock().await; - *sub = None; - }) - }; + let reader_handle = tokio::spawn(run_reader_loop( + stdout, + stdin.clone(), + pending.clone(), + notify_tx.clone(), + )); Ok(Self { _proc: proc, @@ -784,3 +796,105 @@ mod tests { assert!(inherited.is_empty()); } } + +#[cfg(test)] +mod reader_loop_tests { + use super::*; + use std::collections::HashMap; + use std::sync::Arc; + use tokio::io::{duplex, AsyncWriteExt}; + use tokio::sync::{mpsc, oneshot, Mutex}; + + /// #732 stale-id path: when a response arrives for an id the broker has + /// already abandoned, the reader must (a) not crash, (b) leave `pending` + /// untouched, and (c) still forward the message to whoever is currently + /// subscribed — the adapter recv loop is responsible for filtering by + /// request_id so the stray response never leaks into the next prompt. + #[tokio::test] + async fn stale_id_response_is_forwarded_without_pending_entry() { + let (mut agent_stdout_writer, agent_stdout_reader) = duplex(8 * 1024); + let (agent_stdin_writer, _agent_stdin_reader) = duplex(8 * 1024); + + let pending: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + let notify_tx: Arc>>> = + Arc::new(Mutex::new(None)); + + let (sub_tx, mut sub_rx) = mpsc::unbounded_channel(); + *notify_tx.lock().await = Some(sub_tx); + + let writer = Arc::new(Mutex::new(agent_stdin_writer)); + let handle = tokio::spawn(run_reader_loop( + agent_stdout_reader, + writer, + pending.clone(), + notify_tx.clone(), + )); + + let stale = b"{\"jsonrpc\":\"2.0\",\"id\":42,\"result\":{\"stopReason\":\"ok\"}}\n"; + agent_stdout_writer.write_all(stale).await.unwrap(); + agent_stdout_writer.flush().await.unwrap(); + + let forwarded = tokio::time::timeout( + std::time::Duration::from_secs(2), + sub_rx.recv(), + ) + .await + .expect("subscriber should receive stale message before timeout") + .expect("subscriber channel should not be closed"); + assert_eq!(forwarded.id, Some(42)); + assert!(pending.lock().await.is_empty()); + + drop(agent_stdout_writer); + handle.await.unwrap(); + } + + /// Matched-id path: when a response's id is in `pending`, the loop must + /// resolve the oneshot AND forward a copy to the subscriber so the + /// adapter's recv loop sees the completion. Guards against regressions + /// that would suppress the forward branch while keeping resolve. + #[tokio::test] + async fn matched_id_response_resolves_pending_and_forwards() { + let (mut agent_stdout_writer, agent_stdout_reader) = duplex(8 * 1024); + let (agent_stdin_writer, _agent_stdin_reader) = duplex(8 * 1024); + + let pending: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + let notify_tx: Arc>>> = + Arc::new(Mutex::new(None)); + + let (resp_tx, resp_rx) = oneshot::channel(); + pending.lock().await.insert(7, resp_tx); + + let (sub_tx, mut sub_rx) = mpsc::unbounded_channel(); + *notify_tx.lock().await = Some(sub_tx); + + let writer = Arc::new(Mutex::new(agent_stdin_writer)); + let handle = tokio::spawn(run_reader_loop( + agent_stdout_reader, + writer, + pending.clone(), + notify_tx.clone(), + )); + + let payload = b"{\"jsonrpc\":\"2.0\",\"id\":7,\"result\":{\"stopReason\":\"end_turn\"}}\n"; + agent_stdout_writer.write_all(payload).await.unwrap(); + agent_stdout_writer.flush().await.unwrap(); + + let resolved = tokio::time::timeout(std::time::Duration::from_secs(2), resp_rx) + .await + .expect("oneshot should resolve") + .expect("oneshot should not be cancelled"); + assert_eq!(resolved.id, Some(7)); + + let forwarded = tokio::time::timeout(std::time::Duration::from_secs(2), sub_rx.recv()) + .await + .expect("subscriber should receive forwarded copy") + .expect("subscriber channel should not be closed"); + assert_eq!(forwarded.id, Some(7)); + assert!(pending.lock().await.is_empty()); + + drop(agent_stdout_writer); + handle.await.unwrap(); + } +} From 5183143e682e20ac70fba929913efadc474b061f Mon Sep 17 00:00:00 2001 From: Brett Chien <1193046+brettchien@users.noreply.github.com> Date: Thu, 7 May 2026 01:12:07 +0000 Subject: [PATCH 6/7] docs(acp): clarify abandon_request stale-id intent (#732 NIT 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit session/cancel carries a fresh JSON-RPC id but is intentionally not registered in `pending`, so the agent's reply lands in the stale-id branch of run_reader_loop and only emits a trace! log. We never wait on the cancel response; the adapter recv loop's request_id filter is the actual safety net against leakage into the next prompt. Doc-only — no behavioural change. Co-Authored-By: Claude Opus 4.7 --- src/acp/connection.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/acp/connection.rs b/src/acp/connection.rs index 445b09c1..b671da94 100644 --- a/src/acp/connection.rs +++ b/src/acp/connection.rs @@ -579,6 +579,12 @@ impl AcpConnection { /// Drop the pending entry for `request_id` and best-effort send /// `session/cancel`. Errors are swallowed: the agent process may already /// be dead, in which case the stdin write fails harmlessly. See #732. + /// + /// `session/cancel` carries a fresh JSON-RPC id but is not registered in + /// `pending`, so the agent's reply lands in the stale-id branch of + /// `run_reader_loop` and only emits a `trace!`. That is intentional: we + /// never wait on the cancel response, and the adapter recv loop's + /// request_id filter prevents leakage into the next prompt. pub async fn abandon_request(&self, request_id: u64) { self.pending.lock().await.remove(&request_id); let Some(session_id) = self.acp_session_id.as_deref() else { From a162061850b3dd85ec9c754fce9ef6021c49c0c4 Mon Sep 17 00:00:00 2001 From: Brett Chien <1193046+brettchien@users.noreply.github.com> Date: Thu, 7 May 2026 06:28:44 +0000 Subject: [PATCH 7/7] chore(acp): apply chaodu-agent round-3 NITs from PR #760 NIT 1: `abandon_request` was sending `session/cancel` with a JSON-RPC id, making it request-shaped. Per ACP spec, `session/cancel` is a client notification (no id, no response expected). Pool-side `cancel_session` and `reset_session` were already notification-style; this aligns `abandon_request` with both spec and existing convention. Doc comment reverted to notification semantics. NIT 2: Reject `pool.liveness_check_secs = 0` in `parse_config`. Zero would make the `tokio::time::sleep` arm in the recv `select!` loop immediately ready, spinning the loop while the prompt is still under the hard timeout. --- src/acp/connection.rs | 13 ++++--------- src/config.rs | 4 ++++ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/acp/connection.rs b/src/acp/connection.rs index b671da94..90c0eae2 100644 --- a/src/acp/connection.rs +++ b/src/acp/connection.rs @@ -577,14 +577,10 @@ impl AcpConnection { } /// Drop the pending entry for `request_id` and best-effort send - /// `session/cancel`. Errors are swallowed: the agent process may already - /// be dead, in which case the stdin write fails harmlessly. See #732. - /// - /// `session/cancel` carries a fresh JSON-RPC id but is not registered in - /// `pending`, so the agent's reply lands in the stale-id branch of - /// `run_reader_loop` and only emits a `trace!`. That is intentional: we - /// never wait on the cancel response, and the adapter recv loop's - /// request_id filter prevents leakage into the next prompt. + /// `session/cancel` as a JSON-RPC notification (no id; per ACP spec the + /// agent does not reply). Errors are swallowed: the agent process may + /// already be dead, in which case the stdin write fails harmlessly. + /// See #732. pub async fn abandon_request(&self, request_id: u64) { self.pending.lock().await.remove(&request_id); let Some(session_id) = self.acp_session_id.as_deref() else { @@ -592,7 +588,6 @@ impl AcpConnection { }; let req = json!({ "jsonrpc": "2.0", - "id": self.next_id(), "method": "session/cancel", "params": {"sessionId": session_id}, }); diff --git a/src/config.rs b/src/config.rs index 94a0aaa4..6d4e103a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -639,6 +639,10 @@ fn parse_config(raw: &str, source: &str) -> anyhow::Result { "gateway.max_batch_tokens must be > 0" ); } + anyhow::ensure!( + config.pool.liveness_check_secs > 0, + "pool.liveness_check_secs must be > 0 (zero would spin the recv loop)" + ); Ok(config) }