diff --git a/src/acp/connection.rs b/src/acp/connection.rs index 90c0eae2..7994d4d3 100644 --- a/src/acp/connection.rs +++ b/src/acp/connection.rs @@ -4,13 +4,17 @@ use crate::acp::protocol::{ use anyhow::{anyhow, Result}; use serde_json::{json, Value}; use std::collections::HashMap; +use std::collections::VecDeque; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStdin}; use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::task::JoinHandle; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; + +const STDERR_TAIL_CAPACITY: usize = 8; +const STDERR_DRAIN_TIMEOUT_MS: u64 = 1000; /// Pick the most permissive selectable permission option from ACP options. fn pick_best_option(options: &[Value]) -> Option { @@ -84,6 +88,22 @@ fn expand_env(val: &str) -> String { } use tokio::time::Instant; +fn build_connection_closed_message(stderr_tail: &VecDeque) -> String { + if stderr_tail.is_empty() { + "connection closed".into() + } else { + format!( + "connection closed; agent stderr: {}", + stderr_tail.iter().cloned().collect::>().join(" | ") + ) + } +} + +struct CloseContext { + stderr_tail: Arc>>, + stderr_done_rx: oneshot::Receiver<()>, +} + /// A content block for the ACP prompt — either text or image. #[derive(Debug, Clone)] pub enum ContentBlock { @@ -154,11 +174,12 @@ fn build_agent_env( /// and forwards notifications + stale id-bearing messages to the active /// subscriber. Extracted as a free generic function so unit tests can drive /// it with `tokio::io::duplex()` halves instead of a real child process. -pub(crate) async fn run_reader_loop( +async fn run_reader_loop( reader: R, writer: Arc>, pending: Arc>>>, notify_tx: Arc>>>, + close_context: Option, ) where R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static, @@ -236,6 +257,23 @@ pub(crate) async fn run_reader_loop( } } + let close_msg = if let Some(CloseContext { + stderr_tail, + stderr_done_rx, + }) = close_context + { + let _ = tokio::time::timeout( + std::time::Duration::from_millis(STDERR_DRAIN_TIMEOUT_MS), + stderr_done_rx, + ) + .await; + + let tail = stderr_tail.lock().unwrap(); + build_connection_closed_message(&tail) + } else { + "connection closed".to_string() + }; + // Connection closed — resolve all pending with error let mut map = pending.lock().await; for (_, tx) in map.drain() { @@ -245,7 +283,7 @@ pub(crate) async fn run_reader_loop( result: None, error: Some(crate::acp::protocol::JsonRpcError { code: -1, - message: "connection closed".into(), + message: close_msg.clone(), }), params: None, }); @@ -269,7 +307,7 @@ impl AcpConnection { cmd.args(args) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::null()) + .stderr(std::process::Stdio::piped()) .current_dir(working_dir); // Create a new process group so we can kill the entire tree. // SAFETY: setpgid is async-signal-safe (POSIX.1-2008) and called @@ -355,6 +393,43 @@ impl AcpConnection { let stdin = proc.stdin.take().ok_or_else(|| anyhow!("no stdin"))?; let stdin = Arc::new(Mutex::new(stdin)); + let stderr_tail: Arc>> = Arc::new(std::sync::Mutex::new( + VecDeque::with_capacity(STDERR_TAIL_CAPACITY), + )); + let (stderr_done_tx, stderr_done_rx) = oneshot::channel(); + if let Some(stderr) = proc.stderr.take() { + let tail = stderr_tail.clone(); + let done_tx = stderr_done_tx; + tokio::spawn(async move { + let mut reader = BufReader::new(stderr); + let mut line = String::new(); + loop { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) => break, + Err(e) => { + error!("stderr reader error: {e}"); + break; + } + Ok(_) => {} + } + let trimmed = line.trim().to_string(); + if trimmed.is_empty() { + continue; + } + warn!(target: "openab::agent", "[stderr] {trimmed}"); + let mut t = tail.lock().unwrap(); + if t.len() >= STDERR_TAIL_CAPACITY { + t.pop_front(); + } + t.push_back(trimmed); + } + let _ = done_tx.send(()); + }); + } else { + let _ = stderr_done_tx.send(()); + } + let pending: Arc>>> = Arc::new(Mutex::new(HashMap::new())); let notify_tx: Arc>>> = @@ -365,6 +440,10 @@ impl AcpConnection { stdin.clone(), pending.clone(), notify_tx.clone(), + Some(CloseContext { + stderr_tail: stderr_tail.clone(), + stderr_done_rx, + }), )); Ok(Self { @@ -404,7 +483,25 @@ impl AcpConnection { let (tx, rx) = oneshot::channel(); self.pending.lock().await.insert(id, tx); - self.send_raw(&data).await?; + if let Err(send_err) = self.send_raw(&data).await { + let close_resp = tokio::time::timeout( + std::time::Duration::from_millis(STDERR_DRAIN_TIMEOUT_MS), + rx, + ) + .await + .ok() + .and_then(|resp| resp.ok()); + + if let Some(resp) = close_resp { + if let Some(err) = &resp.error { + return Err(anyhow!("{err}")); + } + return Ok(resp); + } + + self.pending.lock().await.remove(&id); + return Err(send_err.into()); + } let timeout_secs = if method == "session/new" { 120 } else { 30 }; let resp = tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), rx) @@ -663,7 +760,11 @@ impl Drop for AcpConnection { #[cfg(test)] mod tests { - use super::{build_agent_env, build_permission_response, pick_best_option}; + use super::{ + build_agent_env, build_connection_closed_message, build_permission_response, + pick_best_option, + }; + use std::collections::VecDeque; use serde_json::json; #[test] @@ -796,6 +897,18 @@ mod tests { assert!(!result.contains_key("OAB_TEST_NONEXISTENT_VAR_12345")); assert!(inherited.is_empty()); } + + #[test] + fn connection_closed_message_includes_stderr_tail() { + let mut tail = VecDeque::new(); + tail.push_back("first".to_string()); + tail.push_back("second".to_string()); + + assert_eq!( + build_connection_closed_message(&tail), + "connection closed; agent stderr: first | second" + ); + } } #[cfg(test)] @@ -830,6 +943,7 @@ mod reader_loop_tests { writer, pending.clone(), notify_tx.clone(), + None, )); let stale = b"{\"jsonrpc\":\"2.0\",\"id\":42,\"result\":{\"stopReason\":\"ok\"}}\n"; @@ -876,6 +990,7 @@ mod reader_loop_tests { writer, pending.clone(), notify_tx.clone(), + None, )); let payload = b"{\"jsonrpc\":\"2.0\",\"id\":7,\"result\":{\"stopReason\":\"end_turn\"}}\n"; diff --git a/src/acp/pool.rs b/src/acp/pool.rs index 6ccd3631..9d1da987 100644 --- a/src/acp/pool.rs +++ b/src/acp/pool.rs @@ -77,6 +77,23 @@ impl SessionPool { } } + /// Spawn a temporary agent connection, run initialize + session/new, then drop it. + /// Returns Ok on success or Err with a description of what failed. + pub async fn health_check(&self) -> Result<()> { + let mut conn = AcpConnection::spawn( + &self.config.command, + &self.config.args, + &self.config.working_dir, + &self.config.env, + &self.config.inherit_env, + ) + .await?; + conn.initialize().await?; + conn.session_new(&self.config.working_dir).await?; + Ok(()) + // conn dropped here → kill_process_group() + } + fn load_mapping(path: &Path) -> HashMap { match std::fs::read_to_string(path) { Ok(data) => serde_json::from_str(&data).unwrap_or_else(|e| { @@ -448,8 +465,11 @@ impl SessionPool { #[cfg(test)] mod tests { - use super::{get_or_insert_gate, remove_if_same_handle}; + use super::{get_or_insert_gate, remove_if_same_handle, SessionPool}; + use crate::config::AgentConfig; use std::collections::HashMap; + use std::fs; + use std::path::PathBuf; use std::sync::Arc; use tokio::sync::Mutex; @@ -487,4 +507,53 @@ mod tests { assert!(Arc::ptr_eq(&first, &second)); assert_eq!(map.len(), 1); } + + #[cfg(unix)] + fn temp_working_dir(name: &str) -> PathBuf { + let path = std::env::temp_dir().join(format!( + "openab-{name}-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock before epoch") + .as_nanos() + )); + fs::create_dir_all(&path).expect("create temp working dir"); + path + } + + #[cfg(unix)] + #[tokio::test] + async fn health_check_surfaces_fast_fail_stderr() { + let working_dir = temp_working_dir("health-check"); + let pool = SessionPool::new( + AgentConfig { + command: "/bin/sh".into(), + args: vec![ + "-c".into(), + "printf 'env: node: No such file or directory\\n' >&2; exit 1".into(), + ], + working_dir: working_dir + .to_str() + .expect("temp path should be utf-8") + .to_string(), + env: HashMap::new(), + inherit_env: vec![], + }, + 1, + ); + + let err = pool + .health_check() + .await + .expect_err("health check should fail") + .to_string(); + + assert!( + err.contains("connection closed; agent stderr: env: node: No such file or directory"), + "{err}" + ); + + let _ = fs::remove_dir_all(working_dir); + } } diff --git a/src/main.rs b/src/main.rs index 706079b6..c6ed156a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -127,6 +127,14 @@ async fn main() -> anyhow::Result<()> { let pool = Arc::new(acp::SessionPool::new(cfg.agent, cfg.pool.max_sessions)); let ttl_secs = cfg.pool.session_ttl_hours * 3600; + // Agent health check — verify the agent can spawn, initialize, and create a session + // before accepting any messages. Failure is logged but non-fatal so the process stays + // alive (launchd KeepAlive handles restarts); the error will surface on first message too. + match pool.health_check().await { + Ok(()) => info!("agent health check passed"), + Err(e) => error!("agent health check FAILED: {e} — all messages will fail until the agent is fixed"), + } + // Resolve STT config (auto-detect GROQ_API_KEY from env) if cfg.stt.enabled { if cfg.stt.api_key.is_empty() && cfg.stt.base_url.contains("groq.com") {