From 28cd8dac2e40711679b81139aa503e2619c56064 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 25 Jun 2026 13:06:43 -0700 Subject: [PATCH] fix: plug twelve runtime memory leaks across sidecar, kernel, VFS, V8 A leak audit of the VM runtime found tracking maps/registries populated on create but released only on a happy-path teardown (skipped on error '?' or never), plus per-timer threads with no cancellation. Each fix releases on every termination path; each ships a fast safeguard-firing test. sidecar: - dispose_vm_internal removes per-VM tracking on every exit path (VM removed before the fallible teardown half; cleanup runs unconditionally, error surfaced after) and the dispose_session/remove_connection loops attempt every item and aggregate errors instead of '?'-ing out (H1). - extension_process_output_buffers cleared on VM disposal, not just on successful handoff (M6). - disposed sessions are now untracked from the stdio active_sessions set (M5). - loopback-TLS endpoints remove their own registry entry on Drop instead of relying on the lazy retain() sweep (L4). - new additive Extension::on_session_disposed hook, fired on DisposeReason::ConnectionClosed, so extensions (e.g. ACP) can free per-session state on client disconnect (enables agent-os H4). sidecar-browser: vms/contexts maps cleared on every dispose path (H1). execution: bridge/kernel timer threads are delay-capped and cancellable via a generation check + clear-on-teardown, so guest timers can't exhaust OS threads or outlive their session (H2, M3). v8-runtime: VM_CONTEXTS slots evicted on context finalize, not only on error, so reused isolates don't hit MAX_VM_CONTEXTS (M1); pending promise-resolver Globals reset before the isolate is dropped on every run_event_loop exit (Shutdown/abort), fixing the leak and a V8 lifetime-contract violation (M2). kernel: socket ids allocated only after the backlog check passes, so a full-backlog connect failure no longer consumes an id (M4). vfs: rename rolls back staged snapshot entries on copy/rename failure (L2); InMemoryMetadataStore gains delete_snapshot() and a real gc() that reclaims unreferenced blocks (L3). L1 (bounded, documented Box::into_raw isolate handle) intentionally left as-is. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/execution/src/javascript.rs | 299 +++++++++++++--- crates/kernel/src/socket_table.rs | 147 +++++++- crates/sidecar-browser/src/service.rs | 368 ++++++++++++++++++-- crates/sidecar/src/execution.rs | 106 +++++- crates/sidecar/src/extension.rs | 12 + crates/sidecar/src/service.rs | 341 +++++++++++++++++- crates/sidecar/src/state.rs | 7 + crates/sidecar/src/stdio.rs | 60 +++- crates/sidecar/src/vm.rs | 96 ++--- crates/sidecar/tests/service.rs | 17 + crates/v8-runtime/src/bridge.rs | 157 ++++++++- crates/v8-runtime/src/session.rs | 137 +++++++- crates/vfs/src/engine/mem/metadata_store.rs | 145 +++++++- crates/vfs/src/posix/overlay_fs.rs | 210 ++++++++++- 14 files changed, 1949 insertions(+), 153 deletions(-) diff --git a/crates/execution/src/javascript.rs b/crates/execution/src/javascript.rs index a7466b6cb..d8dc13548 100644 --- a/crates/execution/src/javascript.rs +++ b/crates/execution/src/javascript.rs @@ -493,6 +493,23 @@ struct LocalBridgeState { module_reader: Option>, } +impl Drop for LocalBridgeState { + /// Tear down all tracked timers when the bridge state is dropped (which + /// happens when the event-bridge service loop exits on session termination — + /// success, error, or shutdown). Clearing the shared `timers` map cancels both + /// kernel and bridge timers: any in-flight timer thread that wakes afterwards + /// finds its entry gone and suppresses its callback via `timer_should_fire`, + /// so a destroyed session's timers do not fire and their thread stops touching + /// the session. Without this, kernel-timer threads (M3) and bridge-timer + /// threads (H2) would outlive the session, holding a session `Arc` and firing + /// after the fact. + fn drop(&mut self) { + if let Ok(mut timers) = self.timers.lock() { + timers.clear(); + } + } +} + #[derive(Debug, Default)] struct LocalKernelStdinBridge { state: Mutex, @@ -572,6 +589,14 @@ enum LocalBridgeCallResult { Deferred, } +/// Upper bound on guest-supplied timer delays, matching the JS `TIMEOUT_MAX` +/// ceiling (`2**31 - 1` ms, ~24.8 days). Guest code can pass a delay up to +/// `u64::MAX` ms; without a cap each `_scheduleTimer` / `kernelTimerCreate` call +/// spawns a thread that sleeps for an effectively unbounded duration while +/// pinning a clone of the session `Arc`, blocking session cleanup. Clamping the +/// delay bounds how long a timer thread can outlive its session. +const MAX_TIMER_DELAY_MS: u64 = 2_147_483_647; + fn timer_delay_ms(value: Option<&Value>) -> u64 { let delay = match value { Some(Value::Number(number)) => number.as_f64().unwrap_or(0.0), @@ -582,10 +607,40 @@ fn timer_delay_ms(value: Option<&Value>) -> u64 { if !delay.is_finite() || delay <= 0.0 { 0 } else { - delay.floor().min(u64::MAX as f64) as u64 + delay.floor().min(MAX_TIMER_DELAY_MS as f64) as u64 } } +/// Decide whether a woken timer thread should fire, and reclaim its tracking +/// entry. Returns `false` (suppressing the callback) when the timer is gone from +/// the map (cleared, or wiped on session teardown) or its generation no longer +/// matches the one captured at scheduling time (re-armed/cancelled). A one-shot +/// (`repeat == false`) timer that does fire is removed from the map so its id is +/// reclaimed. Shared by the kernel-timer and bridge-timer paths so both honor the +/// same cancellation semantics. +fn timer_should_fire( + timers: &Arc>>, + timer_id: u64, + generation: u64, +) -> bool { + timers + .lock() + .ok() + .and_then(|mut timers| { + let (current_generation, repeat) = timers + .get(&timer_id) + .map(|entry| (entry.generation, entry.repeat))?; + if current_generation != generation { + return Some(false); + } + if !repeat { + timers.remove(&timer_id); + } + Some(true) + }) + .unwrap_or(false) +} + impl GuestPathTranslator { fn from_host_context( env: &BTreeMap, @@ -917,17 +972,17 @@ impl ModuleResolutionTestHarness { ]; sort_guest_path_mappings(&mut mappings); - Self { - local_bridge: LocalBridgeState { - translator: GuestPathTranslator { - implicit_guest_cwd: String::from("/root"), - implicit_host_cwd: host_root, - sandbox_root: None, - mappings, - }, - ..LocalBridgeState::default() - }, - } + // Build via default + in-place assignment rather than `..default()`: + // LocalBridgeState implements Drop (to cancel timers on session teardown), + // and functional-record-update would move fields out of a Drop type (E0509). + let mut local_bridge = LocalBridgeState::default(); + local_bridge.translator = GuestPathTranslator { + implicit_guest_cwd: String::from("/root"), + implicit_host_cwd: host_root, + sandbox_root: None, + mappings, + }; + Self { local_bridge } } pub fn resolve_import(&mut self, specifier: &str, from_path: &str) -> Option { @@ -949,14 +1004,11 @@ pub fn handle_internal_bridge_call_from_host_context( method: &str, args: &[Value], ) -> Option { - let mut local_bridge = LocalBridgeState { - translator: GuestPathTranslator::from_host_context( - env, - host_cwd.to_path_buf(), - guest_cwd.to_owned(), - ), - ..LocalBridgeState::default() - }; + // default + in-place assign: LocalBridgeState is Drop, so `..default()` (E0509) + // is not allowed. + let mut local_bridge = LocalBridgeState::default(); + local_bridge.translator = + GuestPathTranslator::from_host_context(env, host_cwd.to_path_buf(), guest_cwd.to_owned()); match local_bridge.handle_internal_bridge_call(0, method, args) { Some(LocalBridgeCallResult::Immediate(value)) => Some(value), @@ -1828,19 +1880,20 @@ impl JavascriptExecutionEngine { let pending_sync_rpc = Arc::new(Mutex::new(None)); let kernel_stdin = Arc::new(LocalKernelStdinBridge::default()); let standalone_translator = translator.clone(); + // default + in-place assign: LocalBridgeState is Drop, so `..Default::default()` + // (E0509) is not allowed. + let mut local_bridge = LocalBridgeState::default(); + local_bridge.translator = translator; + local_bridge.kernel_stdin = kernel_stdin.clone(); + local_bridge.v8_session = Some(v8_session.clone()); + local_bridge.module_reader = module_reader; + local_bridge.module_resolution = GuestModuleResolution::from_env(&request.env); let events = spawn_v8_event_bridge( frame_receiver, pending_sync_rpc.clone(), sync_rpc_timeout, v8_session.clone(), - LocalBridgeState { - translator, - kernel_stdin: kernel_stdin.clone(), - v8_session: Some(v8_session.clone()), - module_reader, - module_resolution: GuestModuleResolution::from_env(&request.env), - ..Default::default() - }, + local_bridge, ); // Install the direct module reader on the session thread BEFORE the Execute @@ -3097,6 +3150,25 @@ impl LocalBridgeState { self.next_timer_id } + /// Allocate a fresh timer id and register a one-shot (`repeat == false`) + /// tracking entry at generation 0. Used by the bridge-timer path so the + /// spawned thread can be cancelled (its entry removed) on `clear`/teardown. + fn register_oneshot_timer(&mut self, delay_ms: u64) -> u64 { + self.next_timer_id += 1; + let timer_id = self.next_timer_id; + if let Ok(mut timers) = self.timers.lock() { + timers.insert( + timer_id, + LocalTimerEntry { + delay_ms, + generation: 0, + repeat: false, + }, + ); + } + timer_id + } + fn arm_kernel_timer(&self, timer_id: u64) { let Some(session) = self.v8_session.clone() else { return; @@ -3117,23 +3189,7 @@ impl LocalBridgeState { thread::sleep(Duration::from_millis(delay_ms)); } - let should_fire = timers - .lock() - .ok() - .and_then(|mut timers| { - let (current_generation, repeat) = timers - .get(&timer_id) - .map(|entry| (entry.generation, entry.repeat))?; - if current_generation != generation { - return Some(false); - } - if !repeat { - timers.remove(&timer_id); - } - Some(true) - }) - .unwrap_or(false); - if !should_fire { + if !timer_should_fire(&timers, timer_id, generation) { return; } @@ -3148,15 +3204,31 @@ impl LocalBridgeState { } } - fn schedule_bridge_timer_response(&self, call_id: u64, delay_ms: u64) { + fn schedule_bridge_timer_response(&mut self, call_id: u64, delay_ms: u64) { let Some(session) = self.v8_session.clone() else { return; }; + // Register the bridge timer in the shared `timers` map with a generation, + // mirroring the kernel-timer cancellation path. Previously this spawned a + // bare, untracked thread holding the session `Arc` with no cancellation and + // no generation check, so a cleared/destroyed session's timer would still + // fire (and pin the session) after the full delay. Tracking it means that + // when `LocalBridgeState` is dropped on session teardown (which clears the + // map) or the entry is otherwise removed, the woken thread observes the + // missing/mismatched generation via `timer_should_fire` and suppresses the + // response instead of touching the torn-down session. + let timer_id = self.register_oneshot_timer(delay_ms); + let generation = 0; + let timers = self.timers.clone(); + thread::spawn(move || { if delay_ms > 0 { thread::sleep(Duration::from_millis(delay_ms)); } + if !timer_should_fire(&timers, timer_id, generation) { + return; + } let _ = session.send_bridge_response(call_id, 0, Vec::new()); }); } @@ -6908,4 +6980,139 @@ mod tests { drop(receiver); host.unregister_session(&session_id); } + + // --- Timer cancellation / cap regression tests (U4: H2 bridge timers, M3 + // kernel timers). These assert the *safeguards firing* (delay clamped, timer + // entry reclaimed, callback suppressed) and never spawn unbounded threads. --- + + #[test] + fn timer_delay_is_clamped_to_the_cap() { + // A guest can pass an arbitrarily large delay (up to u64::MAX ms); without + // a cap each scheduled timer spawns a thread that sleeps essentially + // forever while pinning the session Arc. The cap bounds that lifetime. + assert_eq!( + timer_delay_ms(Some(&json!(u64::MAX))), + MAX_TIMER_DELAY_MS, + "a u64::MAX delay must be clamped to MAX_TIMER_DELAY_MS" + ); + assert_eq!( + timer_delay_ms(Some(&json!(1.0e308_f64))), + MAX_TIMER_DELAY_MS, + "an enormous float delay must be clamped to the cap" + ); + assert_eq!( + timer_delay_ms(Some(&json!(MAX_TIMER_DELAY_MS + 1))), + MAX_TIMER_DELAY_MS, + "a delay one past the cap must clamp down to the cap" + ); + // Below-cap values pass through unchanged so normal timers are unaffected. + assert_eq!(timer_delay_ms(Some(&json!(250))), 250); + assert_eq!(timer_delay_ms(Some(&json!(0))), 0); + } + + #[test] + fn cleared_timer_is_suppressed_and_entry_reclaimed() { + // Mirrors what a woken bridge/kernel timer thread does after sleeping: it + // consults the shared map via `timer_should_fire`. When the entry has been + // removed (clear or session teardown), the callback must be suppressed. + let timers: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + timers.lock().unwrap().insert( + 7, + LocalTimerEntry { + delay_ms: 1_000, + generation: 0, + repeat: false, + }, + ); + + // Simulate `kernelTimerClear` / teardown removing the entry before the + // thread wakes. + timers.lock().unwrap().remove(&7); + + assert!( + !timer_should_fire(&timers, 7, 0), + "a cleared timer must not fire" + ); + assert!( + timers.lock().unwrap().is_empty(), + "tracking map stays empty after a cleared timer is evaluated" + ); + } + + #[test] + fn rearmed_timer_generation_mismatch_suppresses_stale_thread() { + // The bridge/kernel timer thread captures the generation at schedule time. + // If the timer is re-armed (generation bumped) before the stale thread + // wakes, the stale thread must observe the mismatch and suppress, while the + // entry survives for the live generation. + let timers: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + timers.lock().unwrap().insert( + 3, + LocalTimerEntry { + delay_ms: 10, + generation: 1, + repeat: false, + }, + ); + + // Stale thread captured generation 0; current entry is at generation 1. + assert!( + !timer_should_fire(&timers, 3, 0), + "a stale generation must be suppressed" + ); + assert!( + timers.lock().unwrap().contains_key(&3), + "the live entry must survive a stale-generation evaluation" + ); + + // The matching (current) generation fires and reclaims the one-shot entry. + assert!( + timer_should_fire(&timers, 3, 1), + "the current generation must fire" + ); + assert!( + timers.lock().unwrap().is_empty(), + "a fired one-shot timer must reclaim its id from the map" + ); + } + + #[test] + fn bridge_timer_registration_is_tracked_and_drop_clears_timers() { + // H2: the bridge-timer path must register its timer (so it is cancellable) + // rather than spawning a bare untracked thread, and session teardown + // (dropping LocalBridgeState) must wipe the tracking map so in-flight timer + // threads are cancelled. + let mut state = LocalBridgeState::default(); + // Observe the same map the spawned threads would consult. + let timers = state.timers.clone(); + + let id_a = state.register_oneshot_timer(MAX_TIMER_DELAY_MS); + let id_b = state.register_oneshot_timer(500); + assert_ne!(id_a, id_b, "each bridge timer gets a fresh id"); + assert_eq!( + timers.lock().unwrap().len(), + 2, + "registered bridge timers are tracked in the shared map" + ); + // A registered timer would fire for its captured generation (proving the + // entry is real and consultable) ... + assert!(timer_should_fire(&timers, id_a, 0)); + // ... and seeding a still-pending one before teardown: + let id_c = state.register_oneshot_timer(1_000); + + // Session teardown: dropping the bridge state must clear every timer so any + // sleeping thread wakes to a missing entry and suppresses its callback. + drop(state); + + assert!( + timers.lock().unwrap().is_empty(), + "dropping LocalBridgeState must clear the timers map on teardown" + ); + assert!( + !timer_should_fire(&timers, id_c, 0), + "a pending bridge timer is suppressed after teardown" + ); + } } diff --git a/crates/kernel/src/socket_table.rs b/crates/kernel/src/socket_table.rs index 6a6afe51a..d7e09cdbc 100644 --- a/crates/kernel/src/socket_table.rs +++ b/crates/kernel/src/socket_table.rs @@ -948,25 +948,41 @@ impl SocketTable { .sockets .remove(&socket_id) .ok_or_else(|| SocketTableError::not_found(socket_id))?; - let accepted_socket_id = next_socket_id(&mut table); - let result = (|| { + // Validate the listener and confirm backlog capacity BEFORE consuming a + // socket id. The id counter is monotonic (saturating_add) and never + // reclaims, so allocating an id before this check leaks one on every + // rejected connect (for example when the backlog is full). + { + let listener = table + .sockets + .get(&listener_socket_id) + .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?; + validate_connect_to_listener(&client, listener)?; + + let listener_state = listener.listener_state.as_ref().ok_or_else(|| { + SocketTableError::invalid_argument(format!( + "socket {listener_socket_id} has no listener state" + )) + })?; + if listener_state.pending_accepts.len() >= listener_state.backlog { + return Err(SocketTableError::would_block(format!( + "listener {listener_socket_id} backlog is full" + ))); + } + } + + // Capacity confirmed: only now is it safe to consume a socket id. + let accepted_socket_id = next_socket_id(&mut table); let listener = table .sockets .get_mut(&listener_socket_id) .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?; - validate_connect_to_listener(&client, listener)?; - let listener_state = listener.listener_state.as_mut().ok_or_else(|| { SocketTableError::invalid_argument(format!( "socket {listener_socket_id} has no listener state" )) })?; - if listener_state.pending_accepts.len() >= listener_state.backlog { - return Err(SocketTableError::would_block(format!( - "listener {listener_socket_id} backlog is full" - ))); - } let accepted = SocketRecord { id: accepted_socket_id, @@ -1005,6 +1021,7 @@ impl SocketTable { match result { Ok(accepted) => { + let accepted_socket_id = accepted.id; table.sockets.insert(socket_id, client); table.sockets.insert(accepted_socket_id, accepted.clone()); table @@ -1055,25 +1072,41 @@ impl SocketTable { .sockets .remove(&socket_id) .ok_or_else(|| SocketTableError::not_found(socket_id))?; - let accepted_socket_id = next_socket_id(&mut table); - let result = (|| { + // Validate the listener and confirm backlog capacity BEFORE consuming a + // socket id. The id counter is monotonic (saturating_add) and never + // reclaims, so allocating an id before this check leaks one on every + // rejected connect (for example when the backlog is full). + { + let listener = table + .sockets + .get(&listener_socket_id) + .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?; + validate_connect_to_listener(&client, listener)?; + + let listener_state = listener.listener_state.as_ref().ok_or_else(|| { + SocketTableError::invalid_argument(format!( + "socket {listener_socket_id} has no listener state" + )) + })?; + if listener_state.pending_accepts.len() >= listener_state.backlog { + return Err(SocketTableError::would_block(format!( + "listener {listener_socket_id} backlog is full" + ))); + } + } + + // Capacity confirmed: only now is it safe to consume a socket id. + let accepted_socket_id = next_socket_id(&mut table); let listener = table .sockets .get_mut(&listener_socket_id) .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?; - validate_connect_to_listener(&client, listener)?; - let listener_state = listener.listener_state.as_mut().ok_or_else(|| { SocketTableError::invalid_argument(format!( "socket {listener_socket_id} has no listener state" )) })?; - if listener_state.pending_accepts.len() >= listener_state.backlog { - return Err(SocketTableError::would_block(format!( - "listener {listener_socket_id} backlog is full" - ))); - } let accepted = SocketRecord { id: accepted_socket_id, @@ -1112,6 +1145,7 @@ impl SocketTable { match result { Ok(accepted) => { + let accepted_socket_id = accepted.id; table.sockets.insert(socket_id, client); table.sockets.insert(accepted_socket_id, accepted.clone()); table @@ -1985,3 +2019,80 @@ fn lock_or_recover<'a, T>(mutex: &'a Mutex) -> MutexGuard<'a, T> { Err(poisoned) => poisoned.into_inner(), } } + +#[cfg(test)] +mod tests { + use super::*; + + /// Reads the monotonic socket-id counter without advancing it, so a test can + /// observe whether a code path consumed an id. + fn peek_next_socket_id(table: &SocketTable) -> SocketId { + lock_or_recover(&table.inner.state).next_socket_id + } + + #[test] + fn full_backlog_unix_connect_does_not_consume_socket_id() { + let table = SocketTable::new(); + let path = "/tmp/leak-test/server.sock"; + + let listener = table.allocate(1, SocketSpec::unix_stream()); + table + .bind_unix(listener.id, path) + .expect("bind unix listener"); + table.listen(listener.id, 1).expect("listen with backlog 1"); + + // Fill the only backlog slot with one pending connection. + let first = table.allocate(2, SocketSpec::unix_stream()); + table + .connect_to_bound_unix_stream(first.id, path) + .expect("first connect fills the backlog"); + + // A second connect must be rejected because the backlog is full, and it + // must NOT consume a socket id (the counter is monotonic and never reclaims). + let second = table.allocate(2, SocketSpec::unix_stream()); + let before = peek_next_socket_id(&table); + let error = table + .connect_to_bound_unix_stream(second.id, path) + .expect_err("full-backlog connect must fail"); + assert_eq!(error.code(), "EAGAIN"); + let after = peek_next_socket_id(&table); + + assert_eq!( + before, after, + "full-backlog unix connect leaked a socket id (counter advanced from {before} to {after})" + ); + } + + #[test] + fn full_backlog_inet_connect_does_not_consume_socket_id() { + let table = SocketTable::new(); + let target = InetSocketAddress::new("127.0.0.1", 49222); + + let listener = table.allocate(1, SocketSpec::tcp()); + table + .bind_inet(listener.id, target.clone()) + .expect("bind inet listener"); + table.listen(listener.id, 1).expect("listen with backlog 1"); + + // Fill the only backlog slot with one pending connection. + let first = table.allocate(2, SocketSpec::tcp()); + table + .connect_to_bound_inet_stream(first.id, target.clone()) + .expect("first connect fills the backlog"); + + // A second connect must be rejected because the backlog is full, and it + // must NOT consume a socket id (the counter is monotonic and never reclaims). + let second = table.allocate(2, SocketSpec::tcp()); + let before = peek_next_socket_id(&table); + let error = table + .connect_to_bound_inet_stream(second.id, target) + .expect_err("full-backlog connect must fail"); + assert_eq!(error.code(), "EAGAIN"); + let after = peek_next_socket_id(&table); + + assert_eq!( + before, after, + "full-backlog inet connect leaked a socket id (counter advanced from {before} to {after})" + ); + } +} diff --git a/crates/sidecar-browser/src/service.rs b/crates/sidecar-browser/src/service.rs index 4e9844702..062bd36a3 100644 --- a/crates/sidecar-browser/src/service.rs +++ b/crates/sidecar-browser/src/service.rs @@ -466,42 +466,51 @@ where } pub fn dispose_vm(&mut self, vm_id: &str) -> Result<(), BrowserSidecarError> { - let Some(vm_state) = self.vms.get(vm_id) else { + // Remove the VM bookkeeping up front and take ownership of its state, so + // that EVERY exit path below — including a mid-dispose `?` failure while + // releasing executions or emitting lifecycle events — reclaims the + // VmState (and the BrowserKernel it owns) instead of stranding it in the + // `vms` map for the process lifetime. + let Some(vm_state) = self.vms.remove(vm_id) else { return Err(BrowserSidecarError::InvalidState(format!( "unknown browser sidecar VM: {vm_id}" ))); }; - let execution_ids = vm_state - .active_executions - .iter() - .cloned() - .collect::>(); - for execution_id in execution_ids { - self.release_execution(&execution_id, "browser.worker.disposed")?; + // Dropping per-context bookkeeping is infallible, so do it + // unconditionally; `contexts` can never retain an entry for a VM that + // has already been removed from `vms`. + for context_id in &vm_state.contexts { + self.contexts.remove(context_id); } - let context_ids = self - .vms - .get(vm_id) - .expect("VM should still exist while disposing contexts") - .contexts - .iter() - .cloned() - .collect::>(); - for context_id in context_ids { - self.contexts.remove(&context_id); + // Release every execution, attempting all of them and retaining only the + // first error. A single worker-termination failure must not abandon the + // remaining executions (their `ExecutionState`s would otherwise leak), + // and `release_execution` already removes each entry from `executions` + // before doing fallible bridge work, so the maps stay drained even when + // the bridge reports an error. + let mut first_error: Option = None; + for execution_id in &vm_state.active_executions { + if let Err(error) = self.release_execution(execution_id, "browser.worker.disposed") { + first_error.get_or_insert(error); + } } - self.vms.remove(vm_id); - self.emit_lifecycle( + // Emit the terminal lifecycle event regardless of the outcome above; the + // VM is already gone from the registry either way. + let terminated = self.emit_lifecycle( vm_id, LifecycleState::Terminated, Some(String::from( "browser sidecar VM disposed on the main thread", )), - )?; - Ok(()) + ); + + match first_error { + Some(error) => Err(error), + None => terminated, + } } pub fn create_javascript_context( @@ -1085,3 +1094,318 @@ fn execution_signal_to_kernel(signal: secure_exec_bridge::ExecutionSignal) -> i3 secure_exec_bridge::ExecutionSignal::Kill => 9, } } + +#[cfg(test)] +impl BrowserSidecar +where + B: BrowserSidecarBridge, + BridgeError: fmt::Debug, +{ + /// Test-only: number of entries still tracked in the global `contexts` map. + pub(crate) fn test_total_context_count(&self) -> usize { + self.contexts.len() + } + + /// Test-only: number of entries still tracked in the global `executions` map. + pub(crate) fn test_total_execution_count(&self) -> usize { + self.executions.len() + } + + /// Test-only: inject a context directly into both the global `contexts` map + /// and the owning VM's context set, bypassing the bridge round-trip so a + /// dispose-path test can exercise cleanup at the smallest seam. + pub(crate) fn test_insert_context(&mut self, vm_id: &str, context_id: &str) { + self.contexts.insert( + context_id.to_string(), + ContextState { + vm_id: vm_id.to_string(), + runtime: GuestRuntime::JavaScript, + entrypoint: BrowserWorkerEntrypoint::JavaScript { + bootstrap_module: None, + }, + }, + ); + if let Some(vm) = self.vms.get_mut(vm_id) { + vm.contexts.insert(context_id.to_string()); + } + } + + /// Test-only: inject an active execution directly into both the global + /// `executions` map and the owning VM's active-execution set. + pub(crate) fn test_insert_execution(&mut self, vm_id: &str, execution_id: &str) { + self.executions.insert( + execution_id.to_string(), + ExecutionState { + vm_id: vm_id.to_string(), + worker: BrowserWorkerHandle { + worker_id: format!("worker-{execution_id}"), + runtime: GuestRuntime::JavaScript, + }, + kernel_pid: 0, + stdin_write_fd: 0, + }, + ); + if let Some(vm) = self.vms.get_mut(vm_id) { + vm.active_executions.insert(execution_id.to_string()); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use secure_exec_bridge::{ + ChmodRequest, ClockRequest, CommandPermissionRequest, CreateDirRequest, DiagnosticRecord, + DirectoryEntry, EnvironmentPermissionRequest, ExecutionHandleRequest, FileMetadata, + FilesystemPermissionRequest, FilesystemSnapshot, FlushFilesystemStateRequest, + LoadFilesystemStateRequest, LogRecord, NetworkPermissionRequest, PathRequest, + PermissionDecision, RandomBytesRequest, ReadDirRequest, ReadFileRequest, RenameRequest, + ScheduleTimerRequest, ScheduledTimer, SymlinkRequest, TruncateRequest, WriteFileRequest, + }; + use secure_exec_bridge::{ + ClockBridge, EventBridge, ExecutionBridge, FilesystemBridge, PermissionBridge, + PersistenceBridge, RandomBridge, + }; + use secure_exec_kernel::kernel::KernelVmConfig; + use std::time::SystemTime; + + #[derive(Debug, Clone, PartialEq, Eq)] + struct TestBridgeError(String); + + /// Minimal bridge whose `terminate_worker` can be forced to fail, used to + /// drive a mid-dispose error through `release_execution`. + #[derive(Default)] + struct TerminateFailingBridge { + fail_terminate: bool, + } + + impl BridgeTypes for TerminateFailingBridge { + type Error = TestBridgeError; + } + + impl FilesystemBridge for TerminateFailingBridge { + fn read_file(&mut self, _request: ReadFileRequest) -> Result, Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn write_file(&mut self, _request: WriteFileRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn stat(&mut self, _request: PathRequest) -> Result { + unimplemented!("not exercised by dispose test") + } + fn lstat(&mut self, _request: PathRequest) -> Result { + unimplemented!("not exercised by dispose test") + } + fn read_dir( + &mut self, + _request: ReadDirRequest, + ) -> Result, Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn create_dir(&mut self, _request: CreateDirRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn remove_file(&mut self, _request: PathRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn remove_dir(&mut self, _request: PathRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn rename(&mut self, _request: RenameRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn symlink(&mut self, _request: SymlinkRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn read_link(&mut self, _request: PathRequest) -> Result { + unimplemented!("not exercised by dispose test") + } + fn chmod(&mut self, _request: ChmodRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn truncate(&mut self, _request: TruncateRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn exists(&mut self, _request: PathRequest) -> Result { + unimplemented!("not exercised by dispose test") + } + } + + impl PermissionBridge for TerminateFailingBridge { + fn check_filesystem_access( + &mut self, + _request: FilesystemPermissionRequest, + ) -> Result { + unimplemented!("not exercised by dispose test") + } + fn check_network_access( + &mut self, + _request: NetworkPermissionRequest, + ) -> Result { + unimplemented!("not exercised by dispose test") + } + fn check_command_execution( + &mut self, + _request: CommandPermissionRequest, + ) -> Result { + unimplemented!("not exercised by dispose test") + } + fn check_environment_access( + &mut self, + _request: EnvironmentPermissionRequest, + ) -> Result { + unimplemented!("not exercised by dispose test") + } + } + + impl PersistenceBridge for TerminateFailingBridge { + fn load_filesystem_state( + &mut self, + _request: LoadFilesystemStateRequest, + ) -> Result, Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn flush_filesystem_state( + &mut self, + _request: FlushFilesystemStateRequest, + ) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + } + + impl ClockBridge for TerminateFailingBridge { + fn wall_clock(&mut self, _request: ClockRequest) -> Result { + unimplemented!("not exercised by dispose test") + } + fn monotonic_clock(&mut self, _request: ClockRequest) -> Result { + unimplemented!("not exercised by dispose test") + } + fn schedule_timer( + &mut self, + _request: ScheduleTimerRequest, + ) -> Result { + unimplemented!("not exercised by dispose test") + } + } + + impl RandomBridge for TerminateFailingBridge { + fn fill_random_bytes( + &mut self, + _request: RandomBytesRequest, + ) -> Result, Self::Error> { + unimplemented!("not exercised by dispose test") + } + } + + impl EventBridge for TerminateFailingBridge { + fn emit_structured_event( + &mut self, + _event: StructuredEventRecord, + ) -> Result<(), Self::Error> { + Ok(()) + } + fn emit_diagnostic(&mut self, _event: DiagnosticRecord) -> Result<(), Self::Error> { + Ok(()) + } + fn emit_log(&mut self, _event: LogRecord) -> Result<(), Self::Error> { + Ok(()) + } + fn emit_lifecycle(&mut self, _event: LifecycleEventRecord) -> Result<(), Self::Error> { + Ok(()) + } + } + + impl ExecutionBridge for TerminateFailingBridge { + fn create_javascript_context( + &mut self, + _request: CreateJavascriptContextRequest, + ) -> Result { + unimplemented!("not exercised by dispose test") + } + fn create_wasm_context( + &mut self, + _request: CreateWasmContextRequest, + ) -> Result { + unimplemented!("not exercised by dispose test") + } + fn start_execution( + &mut self, + _request: StartExecutionRequest, + ) -> Result { + unimplemented!("not exercised by dispose test") + } + fn write_stdin(&mut self, _request: WriteExecutionStdinRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn close_stdin(&mut self, _request: ExecutionHandleRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn kill_execution(&mut self, _request: KillExecutionRequest) -> Result<(), Self::Error> { + unimplemented!("not exercised by dispose test") + } + fn poll_execution_event( + &mut self, + _request: PollExecutionEventRequest, + ) -> Result, Self::Error> { + unimplemented!("not exercised by dispose test") + } + } + + impl crate::BrowserWorkerBridge for TerminateFailingBridge { + fn create_worker( + &mut self, + _request: BrowserWorkerSpawnRequest, + ) -> Result { + unimplemented!("not exercised by dispose test") + } + + fn terminate_worker( + &mut self, + _request: BrowserWorkerHandleRequest, + ) -> Result<(), Self::Error> { + if self.fail_terminate { + Err(TestBridgeError(String::from("forced terminate failure"))) + } else { + Ok(()) + } + } + } + + // A mid-dispose worker-termination failure must still drain the VM, context, + // and execution bookkeeping for that id — otherwise the VmState (holding a + // BrowserKernel) and ContextState leak for the process lifetime. + #[test] + fn dispose_vm_drains_maps_even_when_worker_termination_fails() { + let bridge = TerminateFailingBridge { + fail_terminate: true, + }; + let mut sidecar = BrowserSidecar::new(bridge, BrowserSidecarConfig::default()); + + sidecar + .create_vm(KernelVmConfig::new("vm-leak")) + .expect("create vm"); + sidecar.test_insert_context("vm-leak", "ctx-leak"); + sidecar.test_insert_execution("vm-leak", "exec-leak"); + + assert_eq!(sidecar.vm_count(), 1); + assert_eq!(sidecar.test_total_context_count(), 1); + assert_eq!(sidecar.test_total_execution_count(), 1); + + // The forced terminate_worker failure surfaces as an error, but the + // dispose must still have reclaimed every entry for `vm-leak`. + let result = sidecar.dispose_vm("vm-leak"); + assert!(result.is_err(), "forced terminate failure should surface"); + + assert_eq!(sidecar.vm_count(), 0, "VmState leaked after failed dispose"); + assert_eq!( + sidecar.test_total_context_count(), + 0, + "ContextState leaked after failed dispose" + ); + assert_eq!( + sidecar.test_total_execution_count(), + 0, + "ExecutionState leaked after failed dispose" + ); + } +} diff --git a/crates/sidecar/src/execution.rs b/crates/sidecar/src/execution.rs index f4eea8a7a..80a974b81 100644 --- a/crates/sidecar/src/execution.rs +++ b/crates/sidecar/src/execution.rs @@ -633,6 +633,23 @@ fn loopback_tls_transport_registry( REGISTRY.get_or_init(|| Mutex::new(BTreeMap::new())) } +#[cfg(test)] +#[allow(dead_code)] +pub(crate) fn loopback_tls_registry_len() -> usize { + loopback_tls_transport_registry() + .lock() + .expect("loopback TLS transport registry lock poisoned") + .len() +} + +#[cfg(test)] +pub(crate) fn loopback_tls_registry_contains(key: &str) -> bool { + loopback_tls_transport_registry() + .lock() + .expect("loopback TLS transport registry lock poisoned") + .contains_key(key) +} + fn loopback_tls_transport_key( vm_id: &str, socket_id: SocketId, @@ -667,12 +684,13 @@ fn loopback_tls_endpoint( state: Mutex::new(crate::state::LoopbackTlsTransportPairState::default()), ready: std::sync::Condvar::new(), }); - transports.insert(key, Arc::downgrade(&pair)); + transports.insert(key.clone(), Arc::downgrade(&pair)); pair }); Ok(crate::state::LoopbackTlsEndpoint { pair, is_lower_socket: socket_id <= peer_socket_id, + registry_key: Some(key), }) } @@ -785,6 +803,39 @@ fn wait_for_loopback_peer_socket_id( impl Drop for crate::state::LoopbackTlsEndpoint { fn drop(&mut self) { let _ = self.close_endpoint(); + + // Eagerly prune this endpoint's registry entry once we are the last owner + // of the shared transport pair. Without this, the `Weak` entry survives + // until the next `loopback_tls_endpoint()` call runs its lazy `retain()`, + // so dead entries accumulate under intermittent use. We must NOT remove + // the entry while a peer endpoint still shares the pair, otherwise a later + // connection for the same socket pair would fail to find it and build a + // mismatched fresh pair. + let Some(key) = self.registry_key.take() else { + return; + }; + let Ok(mut transports) = loopback_tls_transport_registry().lock() else { + // Lock poisoned: leave the entry for the lazy `retain()` to reclaim. + return; + }; + let should_remove = match transports.get(&key) { + // Only prune when the registered entry still points at *our* pair and + // `self` is its last strong owner. During `Drop` `self.pair` is still + // alive, so a strong count of 1 (after dropping the temporary upgrade) + // means no other endpoint references it. + Some(weak) => match weak.upgrade() { + Some(existing) => { + let same_pair = Arc::ptr_eq(&existing, &self.pair); + drop(existing); + same_pair && Arc::strong_count(&self.pair) <= 1 + } + None => true, + }, + None => false, + }; + if should_remove { + transports.remove(&key); + } } } @@ -876,6 +927,59 @@ impl Write for crate::state::LoopbackTlsEndpoint { } } +#[cfg(test)] +mod loopback_tls_registry_tests { + use super::{ + loopback_tls_endpoint, loopback_tls_registry_contains, loopback_tls_transport_key, + }; + + // Each test uses a unique vm_id so the process-global registry stays + // partitioned across concurrently running tests. + + #[test] + fn dropping_endpoint_removes_its_registry_entry() { + let vm_id = "loopback-tls-drop-removes-entry"; + let key = loopback_tls_transport_key(vm_id, 1, 2); + + let endpoint = loopback_tls_endpoint(vm_id, 1, 2).expect("create endpoint"); + assert!( + loopback_tls_registry_contains(&key), + "registry should contain the key while the endpoint is alive" + ); + + drop(endpoint); + assert!( + !loopback_tls_registry_contains(&key), + "registry entry must be pruned in the endpoint's Drop, not left for the lazy retain()" + ); + } + + #[test] + fn registry_entry_survives_until_last_peer_endpoint_drops() { + let vm_id = "loopback-tls-shared-pair"; + let key = loopback_tls_transport_key(vm_id, 3, 4); + + // Both peers of a loopback connection share the same transport pair. + let lower = loopback_tls_endpoint(vm_id, 3, 4).expect("create lower endpoint"); + let higher = loopback_tls_endpoint(vm_id, 4, 3).expect("create higher endpoint"); + assert!(loopback_tls_registry_contains(&key)); + + // Dropping one peer must keep the entry, since the other peer still owns + // the shared pair and a later connection must be able to find it. + drop(lower); + assert!( + loopback_tls_registry_contains(&key), + "entry must survive while a peer endpoint still shares the pair" + ); + + drop(higher); + assert!( + !loopback_tls_registry_contains(&key), + "entry must be pruned once the last peer endpoint drops" + ); + } +} + // TCP types moved to crate::state struct ActiveTcpConnectRequest<'a, B> { diff --git a/crates/sidecar/src/extension.rs b/crates/sidecar/src/extension.rs index 0b322a722..9398ed27c 100644 --- a/crates/sidecar/src/extension.rs +++ b/crates/sidecar/src/extension.rs @@ -573,6 +573,18 @@ pub trait Extension: Send + Sync { Box::pin(async { Ok(()) }) } + /// Per-session teardown hook. The host invokes this for every registered + /// extension when a session is disposed because its connection closed + /// (`DisposeReason::ConnectionClosed`), giving the extension the disposed + /// session's ownership scope so it can release the per-session state it + /// keyed on that session. Default is a no-op. This is the only signal an + /// extension receives that a client has disconnected, so it is what lets an + /// ACP-style extension free per-session state instead of leaking it for the + /// process lifetime. + fn on_session_disposed<'a>(&'a self, _ctx: ExtensionSnapshot) -> ExtensionFuture<'a, ()> { + Box::pin(async { Ok(()) }) + } + fn is_blocking_request(&self, _payload: &[u8]) -> bool { false } diff --git a/crates/sidecar/src/service.rs b/crates/sidecar/src/service.rs index 9addda859..4bb86ade3 100644 --- a/crates/sidecar/src/service.rs +++ b/crates/sidecar/src/service.rs @@ -874,6 +874,10 @@ pub struct NativeSidecar { pub(crate) extension_sessions: BTreeMap<(String, String), ExtensionSessionResources>, pub(crate) extension_process_output_buffers: BTreeMap<(String, String), ExtensionBufferedProcessOutput>, + /// Session scopes (connection_id, session_id) disposed since the stdio + /// transport last drained them. Lets the transport remove dead sessions from + /// its active-session set instead of iterating them forever (M5). + pub(crate) disposed_sessions: Vec<(String, String)>, } #[derive(Debug)] @@ -979,6 +983,7 @@ where extensions: BTreeMap::new(), extension_sessions: BTreeMap::new(), extension_process_output_buffers: BTreeMap::new(), + disposed_sessions: Vec::new(), }) } @@ -1014,6 +1019,26 @@ where }); } + /// Reclaim every per-VM tracking entry owned by the sidecar for `vm_id`. + /// + /// Called unconditionally from `dispose_vm_internal` so that a fallible + /// teardown step (root-filesystem snapshot/flush, kernel dispose, permission + /// reset) erroring out with `?` can never strand these maps for the rest of + /// the process lifetime (H1). This also reclaims the ACP output-buffer map, + /// which was previously removed only on a successful handoff and leaked on VM + /// or session disposal (M6). + pub(crate) fn reclaim_vm_tracking(&mut self, session_id: &str, vm_id: &str) { + self.javascript_engine.dispose_vm(vm_id); + self.python_engine.dispose_vm(vm_id); + self.wasm_engine.dispose_vm(vm_id); + self.prune_extension_vm_resource(vm_id); + self.extension_process_output_buffers + .retain(|(buffer_vm_id, _process_id), _| buffer_vm_id != vm_id); + if let Some(session) = self.sessions.get_mut(session_id) { + session.vm_ids.remove(vm_id); + } + } + pub(crate) fn capture_extension_process_output_event( &mut self, vm_id: &str, @@ -1649,14 +1674,27 @@ where .collect::>(); let mut events = Vec::new(); + let mut first_error: Option = None; for session_id in session_ids { - events.extend( - self.dispose_session(connection_id, &session_id, DisposeReason::ConnectionClosed) - .await?, - ); + // Attempt EVERY session; aggregate errors instead of `?`-ing out on + // the first so one wedged session cannot abandon the rest (H1). + match self + .dispose_session(connection_id, &session_id, DisposeReason::ConnectionClosed) + .await + { + Ok(session_events) => events.extend(session_events), + Err(error) => { + if first_error.is_none() { + first_error = Some(error); + } + } + } } self.connections.remove(connection_id); + if let Some(error) = first_error { + return Err(error); + } Ok(events) } @@ -1793,20 +1831,92 @@ where .collect::>(); let mut events = Vec::new(); + let mut first_error: Option = None; for vm_id in vm_ids { - events.extend( - self.dispose_vm_internal(connection_id, session_id, &vm_id, reason.clone()) - .await?, - ); + // Attempt EVERY VM; aggregate errors instead of `?`-ing out on the + // first so one stuck VM cannot strand the remaining VMs' teardown and + // leave the session permanently un-reclaimed (H1). + match self + .dispose_vm_internal(connection_id, session_id, &vm_id, reason.clone()) + .await + { + Ok(vm_events) => events.extend(vm_events), + Err(error) => { + if first_error.is_none() { + first_error = Some(error); + } + } + } + } + + // On client disconnect, give every registered extension a chance to free + // the per-session state it tracks (H4): the host owns the only signal an + // extension gets that a session has gone away. + if matches!(reason, DisposeReason::ConnectionClosed) { + if let Err(error) = self + .dispose_extension_session_state(connection_id, session_id) + .await + { + if first_error.is_none() { + first_error = Some(error); + } + } } self.sessions.remove(session_id); if let Some(connection) = self.connections.get_mut(connection_id) { connection.sessions.remove(session_id); } + // Tell the stdio transport this session is gone so it stops iterating a + // dead entry every event-pump tick and the set stops growing (M5). + self.disposed_sessions + .push((connection_id.to_owned(), session_id.to_owned())); + + if let Some(error) = first_error { + return Err(error); + } Ok(events) } + /// Invoke each registered extension's per-session teardown hook so it can + /// release the state it keyed on this host session. Errors are aggregated so + /// one misbehaving extension cannot prevent the others from cleaning up. + async fn dispose_extension_session_state( + &mut self, + connection_id: &str, + session_id: &str, + ) -> Result<(), SidecarError> { + let ownership = OwnershipScope::session(connection_id, session_id); + let extensions = self + .extensions + .values() + .cloned() + .collect::>>(); + let mut first_error: Option = None; + for extension in extensions { + let snapshot = ExtensionSnapshot::new( + extension.namespace().to_owned(), + ownership.clone(), + self.sidecar_requests.clone(), + ); + if let Err(error) = extension.on_session_disposed(snapshot).await { + if first_error.is_none() { + first_error = Some(error); + } + } + } + match first_error { + Some(error) => Err(error), + None => Ok(()), + } + } + + /// Drain the session scopes disposed since the last call so the stdio + /// transport can untrack them from its active-session set (M5). + pub(crate) fn take_disposed_sessions(&mut self) -> Vec<(String, String)> { + std::mem::take(&mut self.disposed_sessions) + } + // dispose_vm_internal, terminate_vm_processes, wait_for_vm_processes_to_exit moved to crate::vm // kill_process_internal, handle_execution_event, handle_python_vfs_rpc_request, @@ -3365,3 +3475,218 @@ mod structured_event_frame_tests { } } } + +#[cfg(test)] +mod dispose_lifecycle_tests { + use super::*; + use crate::extension::ExtensionResponse; + use crate::stdio::LocalBridge; + use std::sync::atomic::{AtomicUsize, Ordering}; + + fn block_on(future: F) -> F::Output { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("dispose lifecycle test runtime") + .block_on(future) + } + + fn test_sidecar() -> NativeSidecar { + NativeSidecar::new(LocalBridge::default()).expect("build test sidecar") + } + + // Register a connection + session directly so the dispose paths can be + // exercised without spinning up a V8-backed VM. + fn insert_session( + sidecar: &mut NativeSidecar, + connection_id: &str, + session_id: &str, + vm_ids: BTreeSet, + ) { + sidecar.connections.insert( + connection_id.to_string(), + ConnectionState { + auth_token: String::new(), + sessions: BTreeSet::from([session_id.to_string()]), + }, + ); + sidecar.sessions.insert( + session_id.to_string(), + SessionState { + connection_id: connection_id.to_string(), + placement: crate::protocol::SidecarPlacement::SidecarPlacementShared( + crate::protocol::SidecarPlacementShared { pool: None }, + ), + metadata: BTreeMap::new(), + vm_ids, + }, + ); + } + + struct RecordingExtension { + namespace: String, + session_disposed: Arc, + } + + impl Extension for RecordingExtension { + fn namespace(&self) -> &str { + &self.namespace + } + + fn handle_request<'a>( + &'a self, + _ctx: ExtensionContext<'a>, + _payload: Vec, + ) -> ExtensionFuture<'a, ExtensionResponse> { + Box::pin(async { Ok(ExtensionResponse::new(Vec::new())) }) + } + + fn on_session_disposed<'a>(&'a self, _ctx: ExtensionSnapshot) -> ExtensionFuture<'a, ()> { + let counter = self.session_disposed.clone(); + Box::pin(async move { + counter.fetch_add(1, Ordering::SeqCst); + Ok(()) + }) + } + } + + fn register_recording_extension(sidecar: &mut NativeSidecar) -> Arc { + let counter = Arc::new(AtomicUsize::new(0)); + sidecar + .register_extension(Box::new(RecordingExtension { + namespace: String::from("dev.test.dispose"), + session_disposed: counter.clone(), + })) + .expect("register recording extension"); + counter + } + + // H4: the extension per-session teardown hook fires on ConnectionClosed so an + // ACP-style extension can release per-session state on client disconnect. + #[test] + fn connection_closed_dispose_invokes_extension_session_teardown() { + let mut sidecar = test_sidecar(); + let counter = register_recording_extension(&mut sidecar); + insert_session(&mut sidecar, "conn-1", "session-1", BTreeSet::new()); + + block_on(sidecar.dispose_session("conn-1", "session-1", DisposeReason::ConnectionClosed)) + .expect("dispose session on connection close"); + + assert_eq!( + counter.load(Ordering::SeqCst), + 1, + "extension session-teardown hook must fire on ConnectionClosed" + ); + assert!( + !sidecar.sessions.contains_key("session-1"), + "the disposed session must be reclaimed" + ); + } + + // H4 (negative): a client-requested dispose is not a disconnect, so the + // teardown hook must not fire. + #[test] + fn requested_dispose_does_not_invoke_extension_session_teardown() { + let mut sidecar = test_sidecar(); + let counter = register_recording_extension(&mut sidecar); + insert_session(&mut sidecar, "conn-1", "session-1", BTreeSet::new()); + + block_on(sidecar.dispose_session("conn-1", "session-1", DisposeReason::Requested)) + .expect("dispose session on request"); + + assert_eq!( + counter.load(Ordering::SeqCst), + 0, + "the teardown hook is reserved for client disconnect" + ); + } + + // M5: disposing a session records its scope for the stdio transport to drain. + #[test] + fn dispose_session_records_disposed_scope() { + let mut sidecar = test_sidecar(); + insert_session(&mut sidecar, "conn-1", "session-1", BTreeSet::new()); + + block_on(sidecar.dispose_session("conn-1", "session-1", DisposeReason::Requested)) + .expect("dispose session"); + + assert_eq!( + sidecar.take_disposed_sessions(), + vec![(String::from("conn-1"), String::from("session-1"))], + "dispose must publish the session scope so stdio can untrack it" + ); + } + + // H1 + M6: every per-VM tracking map is reclaimed for a disposed VM. The + // output-buffer map (M6) was previously only removed on a successful handoff, + // and the engine/extension maps (H1) were only reclaimed after the fallible + // teardown steps' `?`, so any failure stranded them. + #[test] + fn reclaim_vm_tracking_clears_every_per_vm_map() { + let mut sidecar = test_sidecar(); + insert_session( + &mut sidecar, + "conn-1", + "session-1", + BTreeSet::from([String::from("vm-1")]), + ); + sidecar.extension_process_output_buffers.insert( + (String::from("vm-1"), String::from("proc-1")), + ExtensionBufferedProcessOutput::default(), + ); + sidecar.extension_sessions.insert( + (String::from("ns"), String::from("ext-sess-1")), + ExtensionSessionResources { + ownership: OwnershipScope::vm("conn-1", "session-1", "vm-1"), + process_ids: BTreeSet::new(), + vm_ids: BTreeSet::from([String::from("vm-1")]), + }, + ); + + sidecar.reclaim_vm_tracking("session-1", "vm-1"); + + assert!( + sidecar.extension_process_output_buffers.is_empty(), + "M6: the output-buffer map must be reclaimed on VM disposal" + ); + assert!( + sidecar.extension_sessions.is_empty(), + "H1: an extension session bound only to the VM must be reclaimed" + ); + assert!( + !sidecar + .sessions + .get("session-1") + .expect("session present") + .vm_ids + .contains("vm-1"), + "the VM id must be removed from its session" + ); + } + + // H1: a failing VM dispose inside the loop must not abandon the session. With + // unregistered VM ids, `dispose_vm_internal` fails on `require_owned_vm`; + // pre-fix the loop `?`-ed out and left the session in `self.sessions`. + #[test] + fn dispose_session_reclaims_session_even_when_a_vm_dispose_fails() { + let mut sidecar = test_sidecar(); + insert_session( + &mut sidecar, + "conn-1", + "session-1", + BTreeSet::from([String::from("vm-a"), String::from("vm-b")]), + ); + + let result = + block_on(sidecar.dispose_session("conn-1", "session-1", DisposeReason::Requested)); + + assert!( + result.is_err(), + "a failing VM dispose must still surface an error" + ); + assert!( + !sidecar.sessions.contains_key("session-1"), + "the session must be reclaimed even though VM dispose failed" + ); + } +} diff --git a/crates/sidecar/src/state.rs b/crates/sidecar/src/state.rs index d1b538287..62948e77e 100644 --- a/crates/sidecar/src/state.rs +++ b/crates/sidecar/src/state.rs @@ -726,6 +726,13 @@ pub(crate) struct LoopbackTlsTransportPairState { pub(crate) struct LoopbackTlsEndpoint { pub(crate) pair: Arc, pub(crate) is_lower_socket: bool, + /// Registry key (`vm_id:lower:higher`) under which this endpoint's transport + /// pair is registered in the loopback-TLS transport registry. Stored so the + /// endpoint's `Drop` can eagerly prune its own registry entry once it is the + /// last owner of the pair, instead of leaking a dead `Weak` entry until the + /// next lazy `retain()` in `loopback_tls_endpoint()`. `None` means the + /// endpoint was not registered (e.g. test-constructed) and Drop skips pruning. + pub(crate) registry_key: Option, } impl fmt::Debug for LoopbackTlsEndpoint { diff --git a/crates/sidecar/src/stdio.rs b/crates/sidecar/src/stdio.rs index 575a38a70..967b7ce78 100644 --- a/crates/sidecar/src/stdio.rs +++ b/crates/sidecar/src/stdio.rs @@ -321,7 +321,7 @@ async fn run_async(extensions: Vec>) -> Result<(), Box, +) { + for (connection_id, session_id) in disposed { + active_sessions.remove(&SessionScope { + connection_id: connection_id.clone(), + session_id: session_id.clone(), + }); + } +} + async fn dispatch_with_prompt_interrupt( sidecar: &mut NativeSidecar, request: RequestFrame, @@ -580,10 +599,12 @@ fn interrupted_extension_dispatch( async fn cleanup_connections( sidecar: &mut NativeSidecar, active_connections: &BTreeSet, + active_sessions: &mut BTreeSet, ) { for connection_id in active_connections { let _ = sidecar.remove_connection(connection_id).await; } + untrack_disposed_sessions(&sidecar.take_disposed_sessions(), active_sessions); } fn track_session_state( @@ -822,6 +843,39 @@ mod tests { assert_eq!(error.kind(), io::ErrorKind::BrokenPipe); } + // Regression (M5): the active-session set must shrink when a session is + // disposed. `track_session_state` is insert-only, so the transport relies on + // `untrack_disposed_sessions` draining the sidecar's disposed-session signal; + // without it a long-lived connection's set grows per session forever and the + // ~250us event pump iterates every dead entry. + #[test] + fn disposed_sessions_are_untracked_from_active_sessions() { + let mut active_sessions = BTreeSet::::new(); + let mut active_connections = BTreeSet::::new(); + track_session_state( + &ResponsePayload::SessionOpenedResponse(SessionOpenedResponse { + session_id: String::from("session-1"), + owner_connection_id: String::from("conn-1"), + }), + &mut active_sessions, + &mut active_connections, + ); + assert_eq!( + active_sessions.len(), + 1, + "opening a session should track it for the event pump" + ); + + untrack_disposed_sessions( + &[(String::from("conn-1"), String::from("session-1"))], + &mut active_sessions, + ); + assert!( + active_sessions.is_empty(), + "a disposed session must be removed from the active-session set" + ); + } + #[test] fn read_frame_decodes_wire_authenticate_request() { let codec = WireFrameCodec::new(wire::DEFAULT_MAX_FRAME_BYTES); @@ -1014,7 +1068,7 @@ mod tests { } #[derive(Debug, Clone)] -struct LocalBridge { +pub(crate) struct LocalBridge { started_at: Instant, next_timer_id: usize, snapshots: BTreeMap, @@ -1428,7 +1482,7 @@ impl SidecarRequestTransport for FrameSidecarRequestTransport { } #[derive(Debug, Clone, PartialEq, Eq)] -struct LocalBridgeError { +pub(crate) struct LocalBridgeError { message: String, } diff --git a/crates/sidecar/src/vm.rs b/crates/sidecar/src/vm.rs index b71fd8041..9c4a82f42 100644 --- a/crates/sidecar/src/vm.rs +++ b/crates/sidecar/src/vm.rs @@ -623,32 +623,66 @@ where vm_id, VmLifecycleState::Disposing, )]; - self.terminate_vm_processes(vm_id, &mut events).await?; - - { - let vm = self - .vms - .get_mut(vm_id) - .expect("owned VM should exist before disposal"); - shutdown_configured_mounts( - vm, - &MountPluginContext { - bridge: self.bridge.clone(), - connection_id: connection_id.to_owned(), - session_id: session_id.to_owned(), - vm_id: vm_id.to_owned(), - sidecar_requests: self.sidecar_requests.clone(), - max_pread_bytes: vm.kernel.resource_limits().max_pread_bytes, - }, - "dispose_vm", - true, - )?; - } - + // Process termination needs the VM live in `self.vms` (it looks up and + // signals the VM's active processes). Capture its result but keep tearing + // down: a process that refuses to die must not strand the VM's tracking + // entries for the process lifetime. + let terminate_result = self.terminate_vm_processes(vm_id, &mut events).await; + + // Detach the VM from `self.vms` BEFORE the remaining fallible teardown so + // no `?` below can leave the registry entry (or any per-VM map) behind. let mut vm = self .vms .remove(vm_id) .expect("owned VM should exist before disposal"); + + // `continue_on_error = true` => `shutdown_configured_mounts` never returns + // `Err` on the dispose path (it logs and presses on), so its result is + // intentionally discarded rather than `?`-ed. + let mount_context = MountPluginContext { + bridge: self.bridge.clone(), + connection_id: connection_id.to_owned(), + session_id: session_id.to_owned(), + vm_id: vm_id.to_owned(), + sidecar_requests: self.sidecar_requests.clone(), + max_pread_bytes: vm.kernel.resource_limits().max_pread_bytes, + }; + let _ = shutdown_configured_mounts(&mut vm, &mount_context, "dispose_vm", true); + + // Snapshot/flush/kernel-dispose/permission-reset can each fail; run them + // in a helper whose result is captured so cleanup below is unconditional. + let teardown_result = self.finish_vm_teardown(vm_id, &mut vm).await; + + // Reclaim EVERY per-VM tracking entry on EVERY exit path — even when a + // teardown step above errored. Pre-fix these ran only after the fallible + // steps' `?`, so any failure stranded the engine/extension maps (H1) and + // the output-buffer map was never reclaimed at all (M6). + self.reclaim_vm_tracking(session_id, vm_id); + let _ = fs::remove_dir_all(&vm.cwd); + + // Surface the first failure only AFTER cleanup has completed. + terminate_result?; + teardown_result?; + + events.push(self.vm_lifecycle_event( + connection_id, + session_id, + vm_id, + VmLifecycleState::Disposed, + )); + Ok(events) + } + + /// Run the fallible second half of VM disposal (root-filesystem snapshot + + /// flush, lifecycle event, kernel dispose, permission reset) against a VM + /// that has already been detached from `self.vms`. Kept separate so its + /// `?`-propagated errors are captured by the caller and the per-VM tracking + /// maps are still reclaimed afterward. + async fn finish_vm_teardown( + &mut self, + vm_id: &str, + vm: &mut VmState, + ) -> Result<(), SidecarError> { let snapshot = if vm.kernel.root_filesystem_mut().is_some() { Some(FilesystemSnapshot { format: String::from(ROOT_FILESYSTEM_SNAPSHOT_FORMAT), @@ -673,23 +707,7 @@ where })?; } self.bridge.clear_vm_permissions(vm_id)?; - self.javascript_engine.dispose_vm(vm_id); - self.python_engine.dispose_vm(vm_id); - self.wasm_engine.dispose_vm(vm_id); - self.prune_extension_vm_resource(vm_id); - let _ = fs::remove_dir_all(&vm.cwd); - - if let Some(session) = self.sessions.get_mut(session_id) { - session.vm_ids.remove(vm_id); - } - - events.push(self.vm_lifecycle_event( - connection_id, - session_id, - vm_id, - VmLifecycleState::Disposed, - )); - Ok(events) + Ok(()) } pub(crate) async fn terminate_vm_processes( diff --git a/crates/sidecar/tests/service.rs b/crates/sidecar/tests/service.rs index ca73a52a3..742e1dcdc 100644 --- a/crates/sidecar/tests/service.rs +++ b/crates/sidecar/tests/service.rs @@ -55,6 +55,20 @@ mod vm; #[path = "../src/wire.rs"] mod wire; +// The unit tests include!d from src/service.rs reference crate::stdio::LocalBridge, +// and stdio.rs in turn uses these crate-root re-exports (mirrored from lib.rs) so it +// compiles inside this integration-test crate too. +use extension::{ + Extension, ExtensionContext, ExtensionFuture, ExtensionInterruptRequest, + ExtensionInterruptResponse, ExtensionResponse, +}; +use service::NativeSidecarConfig; +use state::SidecarRequestTransport; + +#[allow(dead_code)] +#[path = "../src/stdio.rs"] +mod stdio; + mod service { include!("../src/service.rs"); @@ -2970,10 +2984,12 @@ ykAheWCsAteSEWVc0w==\n\ crate::state::LoopbackTlsEndpoint { pair: Arc::clone(&pair), is_lower_socket: true, + registry_key: None, }, crate::state::LoopbackTlsEndpoint { pair, is_lower_socket: false, + registry_key: None, }, ) } @@ -3143,6 +3159,7 @@ ykAheWCsAteSEWVc0w==\n\ let competing_reader = crate::state::LoopbackTlsEndpoint { pair: Arc::clone(&reader_endpoint.pair), is_lower_socket: reader_endpoint.is_lower_socket, + registry_key: None, }; let start = Arc::new(Barrier::new(3)); diff --git a/crates/v8-runtime/src/bridge.rs b/crates/v8-runtime/src/bridge.rs index 92cae7dce..400359f6a 100644 --- a/crates/v8-runtime/src/bridge.rs +++ b/crates/v8-runtime/src/bridge.rs @@ -1136,12 +1136,66 @@ fn remove_vm_context_slot(context_id: u32) { }); } -#[cfg(test)] -fn clear_vm_context_registry_for_test() { +/// Evict every `node:vm` context slot held by the current isolate thread and +/// reset the id counter. +/// +/// `VM_CONTEXTS` is a thread-local that lives for the lifetime of the (reused) +/// isolate thread, not a single execution. `reserve_vm_context_slot` adds a slot +/// for every `vm.createContext()`, but the success path +/// (`update_vm_context_slot`) never removes it — only the sandbox-mirroring error +/// path does. Without a teardown sweep, slots reserved by one execution survive +/// into every later execution on the same isolate, so the registry grows +/// monotonically until it hits the hard cap `MAX_VM_CONTEXTS` and all subsequent +/// `createContext()` calls fail (freed only on thread exit). This sweep releases +/// those slots at a session boundary so the registry returns to empty between +/// executions. See `VmContextRegistryGuard`. +fn reset_vm_context_registry() { VM_CONTEXTS.with(|contexts| contexts.borrow_mut().clear()); NEXT_VM_CONTEXT_ID.with(|next_id| next_id.set(1)); } +/// RAII owner of the thread-local `node:vm` context registry for one session. +/// +/// Mirrors the per-session ownership of [`PendingPromises`] (created fresh per +/// session, dropped at teardown): a session holds one guard, and dropping it +/// evicts every context slot the session reserved. Because `Drop` runs on *every* +/// termination path of the frame that holds the guard — normal return, `?` error, +/// early return, and panic unwinding — the slots are reclaimed unconditionally, +/// not only on the happy path. This prevents a reused isolate thread from +/// accumulating `vm.createContext()` slots across executions toward +/// `MAX_VM_CONTEXTS`. +#[must_use = "hold the guard for the session lifetime; dropping it evicts the vm context registry"] +pub struct VmContextRegistryGuard { + _private: (), +} + +impl VmContextRegistryGuard { + /// Begin a session's ownership of the vm context registry, sweeping any slots + /// a prior session left on this (reused) isolate thread so the new session + /// starts from an empty registry. + pub fn new() -> Self { + reset_vm_context_registry(); + VmContextRegistryGuard { _private: () } + } +} + +impl Default for VmContextRegistryGuard { + fn default() -> Self { + Self::new() + } +} + +impl Drop for VmContextRegistryGuard { + fn drop(&mut self) { + reset_vm_context_registry(); + } +} + +#[cfg(test)] +fn clear_vm_context_registry_for_test() { + reset_vm_context_registry(); +} + #[cfg(test)] fn fill_vm_context_registry_for_test<'s>( scope: &mut v8::HandleScope<'s>, @@ -2023,6 +2077,13 @@ pub fn replace_bridge_fns( sync_fns: &[&str], async_fns: &[&str], ) -> (BridgeFnStore, AsyncBridgeFnStore) { + // Per-session bridge installation runs once, before any user code executes in + // this context, so the only `node:vm` context slots present are leftovers from + // a prior session that reused this isolate thread. Sweep them here so a new + // session never inherits another session's contexts and the registry can never + // accumulate across executions toward `MAX_VM_CONTEXTS`. The session should + // also hold a `VmContextRegistryGuard` to evict its own slots at teardown. + reset_vm_context_registry(); let sync_store = register_sync_bridge_fns(scope, ctx, buffers, sync_fns); let async_store = register_async_bridge_fns(scope, ctx, pending, buffers, async_fns); (sync_store, async_store) @@ -2177,9 +2238,10 @@ mod tests { use super::{ bridge_error_code, clear_vm_context_registry_for_test, deserialize_cbor_value, fill_vm_context_registry_for_test, register_async_bridge_fns, register_sync_bridge_fns, - serialize_cbor_value, vm_context_capacity_error, vm_context_registry_len_for_test, - PendingPromises, SessionBuffers, MAX_CBOR_BRIDGE_CONTAINER_ITEMS, MAX_CBOR_BRIDGE_DEPTH, - MAX_PENDING_PROMISES, MAX_VM_CONTEXTS, + reserve_vm_context_slot, reset_vm_context_registry, serialize_cbor_value, + vm_context_capacity_error, vm_context_registry_len_for_test, PendingPromises, + SessionBuffers, VmContextRegistryGuard, MAX_CBOR_BRIDGE_CONTAINER_ITEMS, + MAX_CBOR_BRIDGE_DEPTH, MAX_PENDING_PROMISES, MAX_VM_CONTEXTS, }; use crate::host_call::BridgeCallContext; use crate::ipc_binary::{self, BinaryFrame}; @@ -2594,4 +2656,89 @@ mod tests { "unexpected error: {error}" ); } + + // Regression test for the `VM_CONTEXTS` leak: `reserve_vm_context_slot` adds a + // slot per `vm.createContext()`, but the success path never removes it. On a + // reused isolate thread that made the registry grow without bound across + // executions until it hit `MAX_VM_CONTEXTS` and every later `createContext()` + // failed. With the fix, a session's `VmContextRegistryGuard` evicts the slots + // at teardown, so the registry returns to empty between executions and the cap + // is never reached. This asserts the safeguard FIRING (slots reclaimed), it + // does not saturate any resource, so it stays in the default suite. + // + // Runs in a subprocess to match the V8-initializing test convention in this + // module (one isolate per process, no cross-test V8 interference). + #[test] + fn vm_context_registry_evicts_slots_on_finalize() { + const SUBPROCESS_ENV: &str = "AGENTOS_V8_VM_CONTEXT_FINALIZE_SUBPROCESS"; + if std::env::var_os(SUBPROCESS_ENV).is_none() { + let output = Command::new(std::env::current_exe().expect("current test binary")) + .arg("bridge::tests::vm_context_registry_evicts_slots_on_finalize") + .arg("--exact") + .arg("--nocapture") + .env(SUBPROCESS_ENV, "1") + .output() + .expect("spawn vm context finalize subprocess"); + assert!( + output.status.success(), + "vm context finalize subprocess failed with status {:?}\nstdout:\n{}\nstderr:\n{}", + output.status.code(), + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + return; + } + + isolate::init_v8_platform(); + let mut isolate = isolate::create_isolate(None); + let context = isolate::create_context(&mut isolate); + let scope = &mut v8::HandleScope::new(&mut isolate); + let context = v8::Local::new(scope, &context); + let scope = &mut v8::ContextScope::new(scope, context); + + reset_vm_context_registry(); + assert_eq!( + vm_context_registry_len_for_test(), + 0, + "registry starts empty" + ); + + // Simulate many executions on the same reused isolate. Each execution + // reserves several contexts and then finalizes (its session guard drops). + // Run far past the hard cap: a leaked slot per createContext would exhaust + // MAX_VM_CONTEXTS within the first ~341 executions and the `.expect` on the + // next reservation would panic. + const CONTEXTS_PER_EXECUTION: usize = 3; + let executions = MAX_VM_CONTEXTS * 4; + for execution in 0..executions { + { + let _guard = VmContextRegistryGuard::new(); + for _ in 0..CONTEXTS_PER_EXECUTION { + reserve_vm_context_slot(scope, context) + .expect("reserve vm context slot below cap; a leak would hit the cap"); + } + assert_eq!( + vm_context_registry_len_for_test(), + CONTEXTS_PER_EXECUTION, + "slots reserved during execution {execution} must be live" + ); + } + // Guard dropped above -> finalize. Asserting here, before the next + // execution's guard is constructed, proves the Drop sweep (not the + // start-of-session sweep) reclaimed the slots. + assert_eq!( + vm_context_registry_len_for_test(), + 0, + "registry must be empty after execution {execution} finalizes" + ); + } + + // After 4x the cap worth of reservations, a fresh createContext still + // succeeds because nothing leaked across executions. + let _guard = VmContextRegistryGuard::new(); + assert!( + reserve_vm_context_slot(scope, context).is_ok(), + "createContext must keep succeeding; leaked slots would have hit MAX_VM_CONTEXTS" + ); + } } diff --git a/crates/v8-runtime/src/session.rs b/crates/v8-runtime/src/session.rs index 35ae86123..e412db66a 100644 --- a/crates/v8-runtime/src/session.rs +++ b/crates/v8-runtime/src/session.rs @@ -780,7 +780,7 @@ fn session_thread( let mut from_snapshot = false; #[cfg(not(test))] - let pending = bridge::PendingPromises::new(); + let mut pending = bridge::PendingPromises::new(); // Store latest InjectGlobals V8 payload for re-injection into fresh contexts #[cfg(not(test))] @@ -902,6 +902,13 @@ fn session_thread( *isolate_handle .lock() .expect("session isolate handle lock poisoned") = None; + // Reset pending promise-resolver Globals BEFORE this + // isolate is dropped. The registry is reused across + // isolate rebuilds, and a prior execution that was + // terminated early (Shutdown / timeout-abort) can + // leave resolvers registered, so they would otherwise + // outlive the isolate that created them. + reset_pending_promises(&mut pending); drop(_v8_context.take()); drop(v8_isolate.take()); from_snapshot = false; @@ -1512,6 +1519,12 @@ fn session_thread( *isolate_handle .lock() .expect("session isolate handle lock poisoned") = None; + // Reset pending promise-resolver Globals BEFORE the isolate is dropped on + // thread teardown. run_event_loop can exit early (Shutdown / timeout-abort) + // with resolvers still registered, so without this the Globals would drop + // after their isolate — leaking across session create/destroy churn and + // violating the V8 lifetime contract. + reset_pending_promises(&mut pending); drop(_v8_context.take()); drop(v8_isolate.take()); } @@ -1723,6 +1736,25 @@ pub(crate) const ASYNC_BRIDGE_FNS: &[&str] = &[ "_fsUtimesAsync", ]; +/// Reset every pending promise-resolver `v8::Global` handle held by `pending`. +/// +/// `v8::Global` handles MUST be reset/dropped *before* the `v8::Isolate` that +/// created them is torn down. The session reuses a single `PendingPromises` +/// registry across executions and across isolate rebuilds, and `run_event_loop` +/// can exit early (Shutdown at the `SessionCommand::Shutdown` arm, or +/// timeout-abort via the `abort_rx` branch) while resolvers are still +/// registered. On those paths the registry can outlive an isolate. Call this +/// immediately before every isolate drop (rebuild and thread teardown) so the +/// `Global` handles are dropped while their isolate is still +/// alive — preventing both a leak across session create/destroy churn (bounded +/// by `MAX_PENDING_PROMISES`) and a V8 lifetime-contract violation. +#[doc(hidden)] +pub fn reset_pending_promises(pending: &mut crate::bridge::PendingPromises) { + // Swap in an empty registry and drop the populated one in place. Dropping a + // `PendingPromises` resets all of its `Global` handles. + drop(std::mem::take(pending)); +} + /// Run the session event loop: dispatch incoming messages to V8. /// /// Called after script/module execution when there are pending async promises. @@ -2502,4 +2534,107 @@ mod tests { mgr.destroy_session("late-bridge").expect("destroy session"); } + + /// Regression test for the pending-promise-resolver leak / V8 lifetime-contract + /// violation: when `run_event_loop` exits early (Shutdown or timeout-abort) the + /// `PendingPromises` registry can still hold `Global` handles, + /// and the session-thread teardown must reset them *before* dropping the isolate. + /// + /// This drives the real cleanup seam (`reset_pending_promises`) used on every + /// isolate-drop path. It populates the registry with live resolver Globals (as a + /// terminated execution would leave behind), runs the cleanup while the isolate + /// is still alive, and asserts the registry is empty (every Global dropped). + /// + /// Fast + bounded (a handful of resolvers, then the safeguard fires) — it asserts + /// the cleanup happens, it does not saturate `MAX_PENDING_PROMISES`. + #[test] + fn reset_pending_promises_drops_resolver_globals_before_isolate_teardown() { + use crate::bridge::{register_async_bridge_fns, PendingPromises, SessionBuffers}; + use crate::host_call::BridgeCallContext; + use crate::isolate; + use std::cell::RefCell; + use std::process::Command; + + // V8 isolates must be created in an isolated process: doing it inline in a + // parallel `cargo test` thread races the process-global V8 platform and + // segfaults. Re-exec this one test as a subprocess (matching the crate's + // bridge_v8_hardening_* / vm_context_registry convention). + const SUBPROCESS_ENV: &str = "AGENTOS_V8_RESET_PENDING_PROMISES_SUBPROCESS"; + if std::env::var_os(SUBPROCESS_ENV).is_none() { + let output = Command::new(std::env::current_exe().expect("current test binary")) + .arg("session::tests::reset_pending_promises_drops_resolver_globals_before_isolate_teardown") + .arg("--exact") + .arg("--nocapture") + .env(SUBPROCESS_ENV, "1") + .output() + .expect("spawn reset-pending-promises subprocess"); + assert!( + output.status.success(), + "reset-pending-promises subprocess failed with status {:?}\nstdout:\n{}\nstderr:\n{}", + output.status.code(), + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + return; + } + + isolate::init_v8_platform(); + + let mut v8_isolate = isolate::create_isolate(None); + let context = isolate::create_context(&mut v8_isolate); + let scope = &mut v8::HandleScope::new(&mut v8_isolate); + let context = v8::Local::new(scope, &context); + let scope = &mut v8::ContextScope::new(scope, context); + + let session_buffers = RefCell::new(SessionBuffers::new()); + let bridge_ctx = BridgeCallContext::new( + Box::new(std::io::sink()), + Box::new(std::io::empty()), + String::from("reset-pending-test"), + ); + let mut pending = PendingPromises::new(); + + // Each `_asyncFn(i)` call synchronously registers a pending promise + // resolver Global in `pending` and returns an unresolved Promise — + // exactly what remains registered when the event loop exits early on + // Shutdown / timeout-abort. + const REGISTERED: usize = 8; + let _async_fns = register_async_bridge_fns( + scope, + &bridge_ctx as *const BridgeCallContext, + &pending as *const PendingPromises, + &session_buffers as *const RefCell, + &["_asyncFn"], + ); + let source = format!("for (let i = 0; i < {REGISTERED}; i++) {{ _asyncFn(i); }}"); + { + let tc = &mut v8::TryCatch::new(scope); + let code = v8::String::new(tc, &source).unwrap(); + let script = v8::Script::compile(tc, code, None).unwrap(); + assert!( + script.run(tc).is_some(), + "async bridge calls should register resolvers, not throw" + ); + assert!(!tc.has_caught(), "async bridge calls should not throw"); + } + assert_eq!( + pending.len(), + REGISTERED, + "each _asyncFn call must register a pending resolver Global" + ); + + // The cleanup invoked on every session-thread isolate-drop path. It must + // empty the registry (resetting every Global) while the + // isolate is still alive. + reset_pending_promises(&mut pending); + + assert_eq!( + pending.len(), + 0, + "reset_pending_promises must drop all pending resolver Globals before isolate teardown" + ); + + // Isolate is still alive here: the Globals were reset above, so dropping + // the scope/isolate below honors the V8 lifetime contract. + } } diff --git a/crates/vfs/src/engine/mem/metadata_store.rs b/crates/vfs/src/engine/mem/metadata_store.rs index 76936b688..b4b3fc397 100644 --- a/crates/vfs/src/engine/mem/metadata_store.rs +++ b/crates/vfs/src/engine/mem/metadata_store.rs @@ -117,6 +117,37 @@ impl InMemoryMetadataStore { })), } } + + /// Drop a snapshot and release every block ref it pinned. + /// + /// Without this, `state.snapshots` grows forever: each snapshot pins cloned + /// inodes/dentries/chunks and holds an elevated `block_refs` count for every + /// block it references, which keeps those blocks from ever being reclaimed + /// by `gc()`. Removing the snapshot from the map first releases the cloned + /// metadata (the `Snapshot` is owned out of the map and dropped at end of + /// scope), and there is no fallible work after the removal, so the + /// per-chunk decrement always runs once the snapshot exists. + pub async fn delete_snapshot(&self, id: SnapshotId) -> VfsResult> { + let mut state = self.state.lock().expect("metadata mutex poisoned"); + let snapshot = state + .snapshots + .remove(&id) + .ok_or_else(|| VfsError::enoent(format!("snapshot {}", id.0)))?; + let mut freed = Vec::new(); + for chunk in snapshot.chunks.values() { + state.dec_block_ref(&chunk.key, &mut freed); + } + Ok(freed) + } + + #[cfg(test)] + pub(crate) fn snapshot_count(&self) -> usize { + self.state + .lock() + .expect("metadata mutex poisoned") + .snapshots + .len() + } } impl State { @@ -256,6 +287,23 @@ impl State { *self.block_refs.entry(key.clone()).or_insert(0) += 1; } + /// Recompute the *true* block refcounts from the current live inode chunks + /// plus every live snapshot's chunks. This is the authoritative reference + /// set used by `gc()`: any `block_refs` entry absent here is no longer + /// referenced by anything and may be reclaimed. + fn live_block_refs(&self) -> BTreeMap { + let mut counts: BTreeMap = BTreeMap::new(); + for chunk in self.chunks.values() { + *counts.entry(chunk.key.clone()).or_insert(0) += 1; + } + for snapshot in self.snapshots.values() { + for chunk in snapshot.chunks.values() { + *counts.entry(chunk.key.clone()).or_insert(0) += 1; + } + } + counts + } + fn drop_inode_content(&mut self, ino: u64, freed: &mut Vec) { let keys: Vec = self .chunks @@ -636,6 +684,101 @@ impl MetadataStore for InMemoryMetadataStore { } async fn gc(&self) -> VfsResult> { - Ok(Vec::new()) + let mut state = self.state.lock().expect("metadata mutex poisoned"); + // Mark: recompute the authoritative reference set from live inode chunks + // and every live snapshot's chunks. + let live = state.live_block_refs(); + // Sweep: any tracked block absent from the live set is referenced by + // nothing and is reclaimed; reconcile the surviving counts to truth so + // refcount drift cannot pin blocks indefinitely. + let freed: Vec = state + .block_refs + .keys() + .filter(|key| !live.contains_key(*key)) + .cloned() + .collect(); + state.block_refs = live; + Ok(freed) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::engine::types::{ChunkEdit, CreateInodeAttrs, Storage}; + + fn block_key(tag: &str) -> BlockKey { + BlockKey::from_content(tag.as_bytes()) + } + + #[tokio::test] + async fn delete_snapshot_releases_pinned_block_refs() { + let store = InMemoryMetadataStore::new(); + let key = block_key("chunk-a"); + + // A chunked file under root referencing one block. + let file = store + .create( + InMemoryMetadataStore::ROOT_INO, + "f", + CreateInodeAttrs::file(0o644, 0, 0, Storage::Chunked { chunk_size: 4096 }), + ) + .await + .unwrap(); + store + .commit_write( + file.ino, + vec![ChunkEdit { + index: 0, + key: key.clone(), + len: 10, + }], + 10, + ) + .await + .unwrap(); + assert_eq!(store.refcount(&key), 1, "live inode holds one ref"); + + // Snapshotting pins a second ref to the same block. + let snap = store + .snapshot(InMemoryMetadataStore::ROOT_INO) + .await + .unwrap(); + assert_eq!(store.refcount(&key), 2, "snapshot pins block"); + assert_eq!(store.snapshot_count(), 1); + + // Removing the live file leaves the block pinned by the snapshot, so the + // block cannot be reclaimed while the snapshot is alive. + store + .remove(InMemoryMetadataStore::ROOT_INO, "f") + .await + .unwrap(); + assert_eq!(store.refcount(&key), 1, "snapshot still pins the block"); + + // Deleting the snapshot must release its pinned ref and free the block. + let freed = store.delete_snapshot(snap).await.unwrap(); + assert_eq!(store.refcount(&key), 0, "snapshot ref released"); + assert!(freed.contains(&key), "freed block reported by delete"); + assert_eq!(store.snapshot_count(), 0, "snapshot dropped from map"); + } + + #[tokio::test] + async fn gc_reclaims_unreferenced_blocks() { + let store = InMemoryMetadataStore::new(); + let orphan = block_key("orphan"); + + // A tracked block that no live inode or snapshot references (drift from a + // partially-applied edit). A correct gc() must reclaim it. + store + .state + .lock() + .unwrap() + .block_refs + .insert(orphan.clone(), 3); + assert_eq!(store.refcount(&orphan), 3); + + let freed = store.gc().await.unwrap(); + assert!(freed.contains(&orphan), "gc reclaims unreferenced block"); + assert_eq!(store.refcount(&orphan), 0, "orphan entry removed"); } } diff --git a/crates/vfs/src/posix/overlay_fs.rs b/crates/vfs/src/posix/overlay_fs.rs index 25aa86b9e..feaa13cf3 100644 --- a/crates/vfs/src/posix/overlay_fs.rs +++ b/crates/vfs/src/posix/overlay_fs.rs @@ -43,6 +43,34 @@ struct OverlaySnapshotEntry { kind: OverlaySnapshotKind, } +/// Records every upper-layer mutation performed while staging a rename so that a +/// failure between staging and the (successful) upper rename can be rolled back +/// without orphaning staged inodes / `path_index` entries. Unlike +/// `remove_snapshot_entries` (which finalizes a *successful* move and therefore +/// whiteouts lower-backed source paths), this captures only the entries/markers +/// that staging itself newly created so they can be removed verbatim — never +/// hiding a still-present lower source on the error path. +#[derive(Debug, Default)] +struct StagedRollback { + /// Upper paths newly created by staging, in creation order (a parent is + /// always recorded before any child created underneath it). `is_dir` mirrors + /// the removal split used by `remove_snapshot_entries`. + created_paths: Vec<(String, bool)>, + /// Overlay markers newly set during staging / metadata copy, recorded so the + /// marker files (themselves upper inodes) are cleared on rollback. + created_markers: Vec<(OverlayMarkerKind, String)>, +} + +impl StagedRollback { + fn record_path(&mut self, path: &str, is_dir: bool) { + self.created_paths.push((String::from(path), is_dir)); + } + + fn record_marker(&mut self, kind: OverlayMarkerKind, path: &str) { + self.created_markers.push((kind, String::from(path))); + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] struct OverlayCopyUpUsage { total_bytes: u64, @@ -892,6 +920,55 @@ impl OverlayFileSystem { Ok(()) } + /// Ancestor-materialization variant used by rename staging. It mirrors + /// `ensure_ancestor_directories_in_upper` but records every ancestor it newly + /// creates into `rollback`, so a later staging failure can remove them and + /// avoid orphaning the freshly created directory inodes. + fn ensure_ancestor_directories_in_upper_recording( + &mut self, + path: &str, + rollback: &mut StagedRollback, + ) -> VfsResult<()> { + if Self::is_internal_metadata_path(path) { + return Err(VfsError::permission_denied("mkdir", path)); + } + let normalized = Self::normalized(path); + let parts = normalized + .split('/') + .filter(|part| !part.is_empty()) + .collect::>(); + + let mut current = String::new(); + for part in parts.iter().take(parts.len().saturating_sub(1)) { + current.push('/'); + current.push_str(part); + + if self.exists_in_upper(¤t) { + continue; + } + + if let Some(index) = self.find_lower_by_exists(¤t) { + let stat = self.lowers[index].stat(¤t)?; + if !stat.is_directory { + return Err(Self::not_directory(¤t)); + } + + let upper = self.writable_upper(¤t)?; + upper.mkdir(¤t, false)?; + upper.chmod(¤t, stat.mode)?; + upper.chown(¤t, stat.uid, stat.gid)?; + rollback.record_path(¤t, true); + continue; + } + + let upper = self.writable_upper(¤t)?; + upper.mkdir(¤t, false)?; + rollback.record_path(¤t, true); + } + + Ok(()) + } + fn copy_up_path(&mut self, path: &str) -> VfsResult<()> { if self.has_entry_in_upper(path) { return Ok(()); @@ -1191,7 +1268,12 @@ impl OverlayFileSystem { Ok(()) } - fn copy_subtree_metadata(&mut self, old_root: &str, new_root: &str) -> VfsResult<()> { + fn copy_subtree_metadata( + &mut self, + old_root: &str, + new_root: &str, + rollback: &mut StagedRollback, + ) -> VfsResult<()> { let old_normalized = Self::normalized(old_root); let new_normalized = Self::normalized(new_root); @@ -1200,6 +1282,9 @@ impl OverlayFileSystem { if Self::path_in_subtree(&marker_path, &old_normalized) { let destination = Self::rebase_path(&marker_path, &old_normalized, &new_normalized); + if !self.marker_exists(kind, &destination) { + rollback.record_marker(kind, &destination); + } self.set_marker(kind, &destination, true)?; } } @@ -1211,13 +1296,15 @@ impl OverlayFileSystem { fn stage_snapshot_entries_in_upper( &mut self, entries: &[OverlaySnapshotEntry], + rollback: &mut StagedRollback, ) -> VfsResult<()> { for entry in entries { match &entry.kind { OverlaySnapshotKind::Directory => { if !self.has_entry_in_upper(&entry.path) { - self.ensure_ancestor_directories_in_upper(&entry.path)?; + self.ensure_ancestor_directories_in_upper_recording(&entry.path, rollback)?; self.writable_upper(&entry.path)?.create_dir(&entry.path)?; + rollback.record_path(&entry.path, true); } self.writable_upper(&entry.path)? .chmod(&entry.path, entry.stat.mode)?; @@ -1226,15 +1313,19 @@ impl OverlayFileSystem { entry.stat.uid, entry.stat.gid, )?; + if !self.marker_exists(OverlayMarkerKind::Opaque, &entry.path) { + rollback.record_marker(OverlayMarkerKind::Opaque, &entry.path); + } self.mark_opaque_directory(&entry.path)?; } OverlaySnapshotKind::File(data) => { if self.has_entry_in_upper(&entry.path) { continue; } - self.ensure_ancestor_directories_in_upper(&entry.path)?; + self.ensure_ancestor_directories_in_upper_recording(&entry.path, rollback)?; self.writable_upper(&entry.path)? .write_file(&entry.path, data.clone())?; + rollback.record_path(&entry.path, false); self.writable_upper(&entry.path)? .chmod(&entry.path, entry.stat.mode)?; self.writable_upper(&entry.path)?.chown( @@ -1247,15 +1338,38 @@ impl OverlayFileSystem { if self.has_entry_in_upper(&entry.path) { continue; } - self.ensure_ancestor_directories_in_upper(&entry.path)?; + self.ensure_ancestor_directories_in_upper_recording(&entry.path, rollback)?; self.writable_upper(&entry.path)? .symlink(target, &entry.path)?; + rollback.record_path(&entry.path, false); } } } Ok(()) } + + /// Best-effort undo of `stage_snapshot_entries_in_upper` / + /// `copy_subtree_metadata` for the error path. Removes only the upper entries + /// and markers that staging itself created — in reverse creation order so + /// children are removed before their parents — and never adds a whiteout, so + /// a still-present lower source remains visible after a failed rename. + fn rollback_staged_entries(&mut self, rollback: &StagedRollback) { + for (kind, marker_path) in &rollback.created_markers { + let _ = self.set_marker(*kind, marker_path, false); + } + + for (path, is_dir) in rollback.created_paths.iter().rev() { + let Some(upper) = self.upper.as_mut() else { + return; + }; + if *is_dir { + let _ = upper.remove_dir(path); + } else { + let _ = upper.remove_file(path); + } + } + } } fn sync_upper_root_metadata(upper: &mut MemoryFileSystem, lowers: &[MemoryFileSystem]) { @@ -1681,10 +1795,26 @@ impl VirtualFileSystem for OverlayFileSystem { self.clear_subtree_metadata(&resolved_new_normalized)?; } - self.stage_snapshot_entries_in_upper(&snapshot_entries)?; - self.copy_subtree_metadata(&old_normalized, &resolved_new_normalized)?; - self.writable_upper(&old_normalized)? - .rename(&old_normalized, &resolved_new_normalized)?; + // Stage the source subtree into the upper, copy its overlay metadata, and + // move it to the destination. Any failure between staging and a successful + // rename must not orphan the staged inodes / `path_index` entries, so the + // upper mutations are recorded and rolled back on the error path (the + // success path still finalizes via `remove_snapshot_entries`). A bare `?` + // here would leave the copied-up entries stranded until VM Drop. + let mut rollback = StagedRollback::default(); + let staged_result = (|| -> VfsResult<()> { + self.stage_snapshot_entries_in_upper(&snapshot_entries, &mut rollback)?; + self.copy_subtree_metadata(&old_normalized, &resolved_new_normalized, &mut rollback)?; + self.writable_upper(&old_normalized)? + .rename(&old_normalized, &resolved_new_normalized)?; + Ok(()) + })(); + + if let Err(error) = staged_result { + self.rollback_staged_entries(&rollback); + return Err(error); + } + self.remove_snapshot_entries(&snapshot_entries) } @@ -1861,9 +1991,71 @@ impl VirtualFileSystem for OverlayFileSystem { #[cfg(test)] mod tests { - use super::{OverlayFileSystem, OverlayMode}; + use super::{OverlayFileSystem, OverlayMode, OVERLAY_WHITEOUT_DIR}; use crate::posix::vfs::{MemoryFileSystem, VfsResult, VirtualFileSystem}; + /// Regression: a rename that fails *after* staging the source subtree into the + /// upper (here, `copy_subtree_metadata` aborts on a corrupt overlay marker) + /// must not orphan the staged inode / `path_index` entry. Before the fix the + /// `?` on `copy_subtree_metadata` short-circuited past + /// `remove_snapshot_entries`, leaving the copied-up source stranded in the + /// upper; the rollback now removes it without resurrecting a whiteout. + #[test] + fn rename_rolls_back_staged_entries_when_metadata_copy_fails() { + let mut lower = MemoryFileSystem::new(); + lower + .write_file("/src.txt", b"payload".to_vec()) + .expect("seed lower-only source file"); + + let mut overlay = OverlayFileSystem::with_upper(vec![lower], MemoryFileSystem::new()); + + // Plant a corrupt whiteout marker directly in the upper. `marker_paths_in_upper` + // parses every marker file as UTF-8, so this forces `copy_subtree_metadata` + // (the step after staging) to fail deterministically with a `?`. + { + let upper = overlay + .upper + .as_mut() + .expect("ephemeral overlay has an upper"); + upper + .mkdir(OVERLAY_WHITEOUT_DIR, true) + .expect("create whiteout marker directory"); + upper + .write_file(&format!("{OVERLAY_WHITEOUT_DIR}/corrupt"), vec![0xff, 0xfe]) + .expect("plant corrupt (non-UTF-8) marker"); + } + + // The rename must fail (corrupt marker), and staging must leave no residue. + let result = overlay.rename("/src.txt", "/dst.txt"); + assert!( + result.is_err(), + "rename should fail when overlay metadata copy aborts" + ); + + // The staged copy-up of the source must have been rolled out of the upper: + // no orphaned inode / path_index entry remains at the source or destination. + let upper = overlay.upper.as_ref().expect("overlay upper"); + assert!( + !upper.exists("/src.txt"), + "staged source copy must be removed from the upper on the error path" + ); + assert!( + !upper.exists("/dst.txt"), + "no destination entry should have been staged in the upper" + ); + + // The rollback must NOT whiteout the still-present lower source: a failed + // rename leaves the original visible in the merged view. + assert!( + overlay.exists("/src.txt"), + "lower-backed source must remain visible after a failed rename" + ); + assert!( + !overlay.exists("/dst.txt"), + "destination must not exist after a failed rename" + ); + } + #[test] fn symlink_into_metadata_namespace_cannot_read_or_resurrect_whiteouts() { let mut lower = MemoryFileSystem::new();