From 0cdd583c3c15314e76123903060ecaab85cffa9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=B6=AD=E5=93=B2?= Date: Wed, 6 May 2026 09:28:05 +0800 Subject: [PATCH 1/2] feat(acp): surface agent stderr and add startup health check - Pipe agent stderr instead of discarding it; each line is logged at WARN level (openab::agent target) and buffered in a 8-line ring buffer - On connection close, append the buffered stderr tail to the error message so "connection closed; agent stderr: ..." is self-contained - Add SessionPool::health_check() that runs initialize + session/new at startup and logs a clear ERROR if the agent cannot be reached, instead of failing silently on the first user message Co-Authored-By: Claude Sonnet 4.6 --- src/acp/connection.rs | 44 +++++++++++++++++++++++++++++++++++++++---- src/acp/pool.rs | 17 +++++++++++++++++ src/main.rs | 8 ++++++++ 3 files changed, 65 insertions(+), 4 deletions(-) diff --git a/src/acp/connection.rs b/src/acp/connection.rs index f49c0f50..8f0c6368 100644 --- a/src/acp/connection.rs +++ b/src/acp/connection.rs @@ -8,7 +8,8 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStdin}; use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::task::JoinHandle; -use tracing::{debug, error, info}; +use std::collections::VecDeque; +use tracing::{debug, error, info, warn}; /// Pick the most permissive selectable permission option from ACP options. @@ -162,7 +163,7 @@ impl AcpConnection { cmd.args(args) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::null()) + .stderr(std::process::Stdio::piped()) .current_dir(working_dir); // Create a new process group so we can kill the entire tree. // SAFETY: setpgid is async-signal-safe (POSIX.1-2008) and called @@ -230,6 +231,30 @@ impl AcpConnection { let stdin = proc.stdin.take().ok_or_else(|| anyhow!("no stdin"))?; let stdin = Arc::new(Mutex::new(stdin)); + // Stderr reader: log each line at warn level and keep a tail for error messages. + let stderr_tail: Arc>> = + Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(8))); + if let Some(stderr) = proc.stderr.take() { + let tail = stderr_tail.clone(); + tokio::spawn(async move { + let mut reader = BufReader::new(stderr); + let mut line = String::new(); + loop { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) | Err(_) => break, + Ok(_) => {} + } + let trimmed = line.trim().to_string(); + if trimmed.is_empty() { continue; } + warn!(target: "openab::agent", "[stderr] {trimmed}"); + let mut t = tail.lock().unwrap(); + if t.len() >= 8 { t.pop_front(); } + t.push_back(trimmed); + } + }); + } + let pending: Arc>>> = Arc::new(Mutex::new(HashMap::new())); let notify_tx: Arc>>> = @@ -239,6 +264,7 @@ impl AcpConnection { let pending = pending.clone(); let notify_tx = notify_tx.clone(); let stdin_clone = stdin.clone(); + let stderr_tail_reader = stderr_tail.clone(); tokio::spawn(async move { let mut reader = BufReader::new(stdout); let mut line = String::new(); @@ -309,7 +335,17 @@ impl AcpConnection { } } - // Connection closed — resolve all pending with error + // Connection closed — build error message, optionally with last stderr lines + let stderr_hint = { + let tail = stderr_tail_reader.lock().unwrap(); + if tail.is_empty() { + String::new() + } else { + format!("; agent stderr: {}", tail.iter().cloned().collect::>().join(" | ")) + } + }; + let close_msg = format!("connection closed{stderr_hint}"); + let mut map = pending.lock().await; for (_, tx) in map.drain() { let _ = tx.send(JsonRpcMessage { @@ -318,7 +354,7 @@ impl AcpConnection { result: None, error: Some(crate::acp::protocol::JsonRpcError { code: -1, - message: "connection closed".into(), + message: close_msg.clone(), }), params: None, }); diff --git a/src/acp/pool.rs b/src/acp/pool.rs index a146abb0..812320a4 100644 --- a/src/acp/pool.rs +++ b/src/acp/pool.rs @@ -85,6 +85,23 @@ impl SessionPool { } } + /// Spawn a temporary agent connection, run initialize + session/new, then drop it. + /// Returns Ok on success or Err with a description of what failed. + pub async fn health_check(&self) -> Result<()> { + let mut conn = AcpConnection::spawn( + &self.config.command, + &self.config.args, + &self.config.working_dir, + &self.config.env, + &self.config.inherit_env, + ) + .await?; + conn.initialize().await?; + conn.session_new(&self.config.working_dir).await?; + Ok(()) + // conn dropped here → kill_process_group() + } + fn load_mapping(path: &Path) -> HashMap { match std::fs::read_to_string(path) { Ok(data) => serde_json::from_str(&data).unwrap_or_else(|e| { diff --git a/src/main.rs b/src/main.rs index 04a0937f..9add173a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -123,6 +123,14 @@ async fn main() -> anyhow::Result<()> { let pool = Arc::new(acp::SessionPool::new(cfg.agent, cfg.pool.max_sessions)); let ttl_secs = cfg.pool.session_ttl_hours * 3600; + // Agent health check — verify the agent can spawn, initialize, and create a session + // before accepting any messages. Failure is logged but non-fatal so the process stays + // alive (launchd KeepAlive handles restarts); the error will surface on first message too. + match pool.health_check().await { + Ok(()) => info!("agent health check passed"), + Err(e) => error!("agent health check FAILED: {e} — all messages will fail until the agent is fixed"), + } + // Resolve STT config (auto-detect GROQ_API_KEY from env) if cfg.stt.enabled { if cfg.stt.api_key.is_empty() && cfg.stt.base_url.contains("groq.com") { From d0a398670d6ee00c5a943037e2f42583a39870d9 Mon Sep 17 00:00:00 2001 From: shaun-agent Date: Wed, 6 May 2026 13:12:18 +0000 Subject: [PATCH 2/2] fix(acp): wait for stderr drain on close --- src/acp/connection.rs | 154 +++++++++++++++++++++++++++++++----------- src/acp/pool.rs | 95 +++++++++++++++++++++----- 2 files changed, 194 insertions(+), 55 deletions(-) diff --git a/src/acp/connection.rs b/src/acp/connection.rs index 8f0c6368..59df0efc 100644 --- a/src/acp/connection.rs +++ b/src/acp/connection.rs @@ -1,16 +1,20 @@ -use crate::acp::protocol::{ConfigOption, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, parse_config_options}; +use crate::acp::protocol::{ + parse_config_options, ConfigOption, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, +}; use anyhow::{anyhow, Result}; use serde_json::{json, Value}; use std::collections::HashMap; +use std::collections::VecDeque; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStdin}; use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::task::JoinHandle; -use std::collections::VecDeque; use tracing::{debug, error, info, warn}; +const STDERR_TAIL_CAPACITY: usize = 8; +const STDERR_DRAIN_TIMEOUT_MS: u64 = 250; /// Pick the most permissive selectable permission option from ACP options. fn pick_best_option(options: &[Value]) -> Option { @@ -84,6 +88,17 @@ fn expand_env(val: &str) -> String { } use tokio::time::Instant; +fn build_connection_closed_message(stderr_tail: &VecDeque) -> String { + if stderr_tail.is_empty() { + "connection closed".into() + } else { + format!( + "connection closed; agent stderr: {}", + stderr_tail.iter().cloned().collect::>().join(" | ") + ) + } +} + /// A content block for the ACP prompt — either text or image. #[derive(Debug, Clone)] pub enum ContentBlock { @@ -188,20 +203,39 @@ impl AcpConnection { // Preserve the real HOME so agents can find OAuth/auth files (~/.codex, // ~/.claude, ~/.config/gh, etc.). working_dir is already set via // current_dir() above and is not necessarily the user's home directory. - cmd.env("HOME", std::env::var("HOME").unwrap_or_else(|_| working_dir.into())); - cmd.env("PATH", std::env::var("PATH").unwrap_or_else(|_| "/usr/local/bin:/usr/bin:/bin".into())); + cmd.env( + "HOME", + std::env::var("HOME").unwrap_or_else(|_| working_dir.into()), + ); + cmd.env( + "PATH", + std::env::var("PATH").unwrap_or_else(|_| "/usr/local/bin:/usr/bin:/bin".into()), + ); #[cfg(unix)] { - cmd.env("USER", std::env::var("USER").unwrap_or_else(|_| "agent".into())); + cmd.env( + "USER", + std::env::var("USER").unwrap_or_else(|_| "agent".into()), + ); } #[cfg(windows)] { // Windows requires SystemRoot for DLL loading and basic OS functionality. // USERPROFILE is the Windows equivalent of HOME. - cmd.env("USERPROFILE", std::env::var("USERPROFILE").unwrap_or_else(|_| working_dir.into())); - cmd.env("USERNAME", std::env::var("USERNAME").unwrap_or_else(|_| "agent".into())); - if let Ok(v) = std::env::var("SystemRoot") { cmd.env("SystemRoot", v); } - if let Ok(v) = std::env::var("SystemDrive") { cmd.env("SystemDrive", v); } + cmd.env( + "USERPROFILE", + std::env::var("USERPROFILE").unwrap_or_else(|_| working_dir.into()), + ); + cmd.env( + "USERNAME", + std::env::var("USERNAME").unwrap_or_else(|_| "agent".into()), + ); + if let Ok(v) = std::env::var("SystemRoot") { + cmd.env("SystemRoot", v); + } + if let Ok(v) = std::env::var("SystemDrive") { + cmd.env("SystemDrive", v); + } } for (k, v) in env { cmd.env(k, expand_env(v)); @@ -224,35 +258,48 @@ impl AcpConnection { let mut proc = cmd .spawn() .map_err(|e| anyhow!("failed to spawn {command}: {e}"))?; - let child_pgid = proc.id() - .and_then(|pid| i32::try_from(pid).ok()); + let child_pgid = proc.id().and_then(|pid| i32::try_from(pid).ok()); let stdout = proc.stdout.take().ok_or_else(|| anyhow!("no stdout"))?; let stdin = proc.stdin.take().ok_or_else(|| anyhow!("no stdin"))?; let stdin = Arc::new(Mutex::new(stdin)); // Stderr reader: log each line at warn level and keep a tail for error messages. - let stderr_tail: Arc>> = - Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(8))); + let stderr_tail: Arc>> = Arc::new(std::sync::Mutex::new( + VecDeque::with_capacity(STDERR_TAIL_CAPACITY), + )); + let (stderr_done_tx, stderr_done_rx) = oneshot::channel(); if let Some(stderr) = proc.stderr.take() { let tail = stderr_tail.clone(); + let done_tx = stderr_done_tx; tokio::spawn(async move { let mut reader = BufReader::new(stderr); let mut line = String::new(); loop { line.clear(); match reader.read_line(&mut line).await { - Ok(0) | Err(_) => break, + Ok(0) => break, + Err(e) => { + error!("stderr reader error: {e}"); + break; + } Ok(_) => {} } let trimmed = line.trim().to_string(); - if trimmed.is_empty() { continue; } + if trimmed.is_empty() { + continue; + } warn!(target: "openab::agent", "[stderr] {trimmed}"); let mut t = tail.lock().unwrap(); - if t.len() >= 8 { t.pop_front(); } + if t.len() >= STDERR_TAIL_CAPACITY { + t.pop_front(); + } t.push_back(trimmed); } + let _ = done_tx.send(()); }); + } else { + let _ = stderr_done_tx.send(()); } let pending: Arc>>> = @@ -265,6 +312,7 @@ impl AcpConnection { let notify_tx = notify_tx.clone(); let stdin_clone = stdin.clone(); let stderr_tail_reader = stderr_tail.clone(); + let stderr_done_rx = stderr_done_rx; tokio::spawn(async move { let mut reader = BufReader::new(stdout); let mut line = String::new(); @@ -335,16 +383,19 @@ impl AcpConnection { } } + // Wait briefly for stderr to drain so fast-fail startup errors + // are consistently surfaced on the synthesized close message. + let _ = tokio::time::timeout( + std::time::Duration::from_millis(STDERR_DRAIN_TIMEOUT_MS), + stderr_done_rx, + ) + .await; + // Connection closed — build error message, optionally with last stderr lines - let stderr_hint = { + let close_msg = { let tail = stderr_tail_reader.lock().unwrap(); - if tail.is_empty() { - String::new() - } else { - format!("; agent stderr: {}", tail.iter().cloned().collect::>().join(" | ")) - } + build_connection_closed_message(&tail) }; - let close_msg = format!("connection closed{stderr_hint}"); let mut map = pending.lock().await; for (_, tx) in map.drain() { @@ -439,19 +490,22 @@ impl AcpConnection { .and_then(|c| c.get("loadSession")) .and_then(|v| v.as_bool()) .unwrap_or(false); - info!(agent = agent_name, load_session = self.supports_load_session, "initialized"); + info!( + agent = agent_name, + load_session = self.supports_load_session, + "initialized" + ); Ok(()) } pub async fn session_new(&mut self, cwd: &str) -> Result { let resp = self - .send_request( - "session/new", - Some(json!({"cwd": cwd, "mcpServers": []})), - ) + .send_request("session/new", Some(json!({"cwd": cwd, "mcpServers": []}))) .await?; - let session_id = resp.result.as_ref() + let session_id = resp + .result + .as_ref() .and_then(|r| r.get("sessionId")) .and_then(|s| s.as_str()) .ok_or_else(|| anyhow!("no sessionId in session/new response"))? @@ -470,7 +524,11 @@ impl AcpConnection { /// Set a config option (e.g. model, mode) via ACP session/set_config_option. /// Returns the updated list of all config options. - pub async fn set_config_option(&mut self, config_id: &str, value: &str) -> Result> { + pub async fn set_config_option( + &mut self, + config_id: &str, + value: &str, + ) -> Result> { let session_id = self .acp_session_id .as_ref() @@ -498,7 +556,10 @@ impl AcpConnection { Err(_) => { // Fall back: send as a slash command (e.g. "/model claude-sonnet-4") let cmd = format!("/{config_id} {value}"); - info!(cmd, "set_config_option not supported, falling back to prompt"); + info!( + cmd, + "set_config_option not supported, falling back to prompt" + ); let _resp = self .send_request( "session/prompt", @@ -539,10 +600,7 @@ impl AcpConnection { let id = self.next_id(); // Convert content blocks to JSON - let prompt_json: Vec = content_blocks - .iter() - .map(|b| b.to_json()) - .collect(); + let prompt_json: Vec = content_blocks.iter().map(|b| b.to_json()).collect(); let req = JsonRpcRequest::new( id, @@ -608,11 +666,15 @@ impl AcpConnection { #[cfg(unix)] { // Stage 1: SIGTERM the process group - unsafe { libc::kill(-pgid, libc::SIGTERM); } + unsafe { + libc::kill(-pgid, libc::SIGTERM); + } // Stage 2: SIGKILL after brief grace (std::thread survives runtime shutdown) std::thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(1500)); - unsafe { libc::kill(-pgid, libc::SIGKILL); } + unsafe { + libc::kill(-pgid, libc::SIGKILL); + } }); } #[cfg(not(unix))] @@ -630,8 +692,12 @@ impl Drop for AcpConnection { #[cfg(test)] mod tests { - use super::{build_agent_env, build_permission_response, pick_best_option}; + use super::{ + build_agent_env, build_connection_closed_message, build_permission_response, + pick_best_option, + }; use serde_json::json; + use std::collections::VecDeque; #[test] fn picks_allow_always_over_other_options() { @@ -763,4 +829,16 @@ mod tests { assert!(!result.contains_key("OAB_TEST_NONEXISTENT_VAR_12345")); assert!(inherited.is_empty()); } + + #[test] + fn connection_closed_message_includes_stderr_tail() { + let mut tail = VecDeque::new(); + tail.push_back("first".to_string()); + tail.push_back("second".to_string()); + + assert_eq!( + build_connection_closed_message(&tail), + "connection closed; agent stderr: first | second" + ); + } } diff --git a/src/acp/pool.rs b/src/acp/pool.rs index 812320a4..160f28ba 100644 --- a/src/acp/pool.rs +++ b/src/acp/pool.rs @@ -32,12 +32,7 @@ pub struct SessionPool { mapping_path: PathBuf, } -type EvictionCandidate = ( - String, - Arc>, - Instant, - Option, -); +type EvictionCandidate = (String, Arc>, Instant, Option); fn remove_if_same_handle( map: &mut HashMap>>, @@ -54,10 +49,7 @@ fn remove_if_same_handle( } } -fn get_or_insert_gate( - map: &mut HashMap>>, - key: &str, -) -> Arc> { +fn get_or_insert_gate(map: &mut HashMap>>, key: &str) -> Arc> { map.entry(key.to_string()) .or_insert_with(|| Arc::new(Mutex::new(()))) .clone() @@ -121,7 +113,9 @@ impl SessionPool { } }; let tmp = self.mapping_path.with_extension("json.tmp"); - if let Err(e) = std::fs::write(&tmp, &data).and_then(|_| std::fs::rename(&tmp, &self.mapping_path)) { + if let Err(e) = + std::fs::write(&tmp, &data).and_then(|_| std::fs::rename(&tmp, &self.mapping_path)) + { warn!(path = %self.mapping_path.display(), error = %e, "failed to persist thread mapping"); } } @@ -174,7 +168,12 @@ impl SessionPool { skipped_locked_candidates += 1; continue; }; - let candidate = (key, conn_handle, conn.last_active, conn.acp_session_id.clone()); + let candidate = ( + key, + conn_handle, + conn.last_active, + conn.acp_session_id.clone(), + ); match &eviction_candidate { Some((_, _, oldest_last_active, _)) if candidate.2 >= *oldest_last_active => {} _ => eviction_candidate = Some(candidate), @@ -267,7 +266,9 @@ impl SessionPool { state.active.insert(thread_id.to_string(), new_conn); self.save_mapping(&state.suspended); if !cancel_session_id.is_empty() { - state.cancel_handles.insert(thread_id.to_string(), (cancel_handle, cancel_session_id)); + state + .cancel_handles + .insert(thread_id.to_string(), (cancel_handle, cancel_session_id)); } Ok(()) } @@ -277,7 +278,9 @@ impl SessionPool { where F: for<'a> FnOnce( &'a mut AcpConnection, - ) -> std::pin::Pin> + Send + 'a>>, + ) -> std::pin::Pin< + Box> + Send + 'a>, + >, { let conn = { let state = self.state.read().await; @@ -328,7 +331,10 @@ impl SessionPool { pub async fn cancel_session(&self, thread_id: &str) -> Result<()> { let (stdin, session_id) = { let state = self.state.read().await; - state.cancel_handles.get(thread_id).cloned() + state + .cancel_handles + .get(thread_id) + .cloned() .ok_or_else(|| anyhow!("no session for thread {thread_id}"))? }; let data = serde_json::to_string(&serde_json::json!({ @@ -431,7 +437,11 @@ impl SessionPool { // awaiting a connection lock). let snapshot: Vec<(String, Arc>)> = { let state = self.state.read().await; - state.active.iter().map(|(k, v)| (k.clone(), Arc::clone(v))).collect() + state + .active + .iter() + .map(|(k, v)| (k.clone(), Arc::clone(v))) + .collect() }; let mut session_ids: Vec<(String, String)> = Vec::new(); @@ -455,8 +465,11 @@ impl SessionPool { #[cfg(test)] mod tests { - use super::{get_or_insert_gate, remove_if_same_handle}; + use super::{get_or_insert_gate, remove_if_same_handle, SessionPool}; + use crate::config::AgentConfig; use std::collections::HashMap; + use std::fs; + use std::path::PathBuf; use std::sync::Arc; use tokio::sync::Mutex; @@ -494,4 +507,52 @@ mod tests { assert!(Arc::ptr_eq(&first, &second)); assert_eq!(map.len(), 1); } + + #[cfg(unix)] + fn temp_working_dir(name: &str) -> PathBuf { + let path = std::env::temp_dir().join(format!( + "openab-{name}-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock before epoch") + .as_nanos() + )); + fs::create_dir_all(&path).expect("create temp working dir"); + path + } + + #[cfg(unix)] + #[tokio::test] + async fn health_check_surfaces_fast_fail_stderr() { + let working_dir = temp_working_dir("health-check"); + let pool = SessionPool::new( + AgentConfig { + command: "/bin/sh".into(), + args: vec![ + "-c".into(), + "printf 'env: node: No such file or directory\\n' >&2; exit 1".into(), + ], + working_dir: working_dir + .to_str() + .expect("temp path should be utf-8") + .to_string(), + env: HashMap::new(), + inherit_env: vec![], + }, + 1, + ); + + let err = pool + .health_check() + .await + .expect_err("health check should fail") + .to_string(); + + assert!( + err.contains("connection closed; agent stderr: env: node: No such file or directory") + ); + + let _ = fs::remove_dir_all(working_dir); + } }