Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 253 additions & 46 deletions crates/execution/src/javascript.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,23 @@ struct LocalBridgeState {
module_reader: Option<Box<dyn ModuleFsReader + Send>>,
}

impl Drop for LocalBridgeState {
/// Tear down all tracked timers when the bridge state is dropped (which
/// happens when the event-bridge service loop exits on session termination —
/// success, error, or shutdown). Clearing the shared `timers` map cancels both
/// kernel and bridge timers: any in-flight timer thread that wakes afterwards
/// finds its entry gone and suppresses its callback via `timer_should_fire`,
/// so a destroyed session's timers do not fire and their thread stops touching
/// the session. Without this, kernel-timer threads (M3) and bridge-timer
/// threads (H2) would outlive the session, holding a session `Arc` and firing
/// after the fact.
fn drop(&mut self) {
if let Ok(mut timers) = self.timers.lock() {
timers.clear();
}
}
}

#[derive(Debug, Default)]
struct LocalKernelStdinBridge {
state: Mutex<LocalKernelStdinState>,
Expand Down Expand Up @@ -572,6 +589,14 @@ enum LocalBridgeCallResult {
Deferred,
}

/// Upper bound on guest-supplied timer delays, matching the JS `TIMEOUT_MAX`
/// ceiling (`2**31 - 1` ms, ~24.8 days). Guest code can pass a delay up to
/// `u64::MAX` ms; without a cap each `_scheduleTimer` / `kernelTimerCreate` call
/// spawns a thread that sleeps for an effectively unbounded duration while
/// pinning a clone of the session `Arc`, blocking session cleanup. Clamping the
/// delay bounds how long a timer thread can outlive its session.
const MAX_TIMER_DELAY_MS: u64 = 2_147_483_647;

fn timer_delay_ms(value: Option<&Value>) -> u64 {
let delay = match value {
Some(Value::Number(number)) => number.as_f64().unwrap_or(0.0),
Expand All @@ -582,10 +607,40 @@ fn timer_delay_ms(value: Option<&Value>) -> u64 {
if !delay.is_finite() || delay <= 0.0 {
0
} else {
delay.floor().min(u64::MAX as f64) as u64
delay.floor().min(MAX_TIMER_DELAY_MS as f64) as u64
}
}

/// Decide whether a woken timer thread should fire, and reclaim its tracking
/// entry. Returns `false` (suppressing the callback) when the timer is gone from
/// the map (cleared, or wiped on session teardown) or its generation no longer
/// matches the one captured at scheduling time (re-armed/cancelled). A one-shot
/// (`repeat == false`) timer that does fire is removed from the map so its id is
/// reclaimed. Shared by the kernel-timer and bridge-timer paths so both honor the
/// same cancellation semantics.
fn timer_should_fire(
timers: &Arc<Mutex<HashMap<u64, LocalTimerEntry>>>,
timer_id: u64,
generation: u64,
) -> bool {
timers
.lock()
.ok()
.and_then(|mut timers| {
let (current_generation, repeat) = timers
.get(&timer_id)
.map(|entry| (entry.generation, entry.repeat))?;
if current_generation != generation {
return Some(false);
}
if !repeat {
timers.remove(&timer_id);
}
Some(true)
})
.unwrap_or(false)
}

impl GuestPathTranslator {
fn from_host_context(
env: &BTreeMap<String, String>,
Expand Down Expand Up @@ -917,17 +972,17 @@ impl ModuleResolutionTestHarness {
];
sort_guest_path_mappings(&mut mappings);

Self {
local_bridge: LocalBridgeState {
translator: GuestPathTranslator {
implicit_guest_cwd: String::from("/root"),
implicit_host_cwd: host_root,
sandbox_root: None,
mappings,
},
..LocalBridgeState::default()
},
}
// Build via default + in-place assignment rather than `..default()`:
// LocalBridgeState implements Drop (to cancel timers on session teardown),
// and functional-record-update would move fields out of a Drop type (E0509).
let mut local_bridge = LocalBridgeState::default();
local_bridge.translator = GuestPathTranslator {
implicit_guest_cwd: String::from("/root"),
implicit_host_cwd: host_root,
sandbox_root: None,
mappings,
};
Self { local_bridge }
}

pub fn resolve_import(&mut self, specifier: &str, from_path: &str) -> Option<String> {
Expand All @@ -949,14 +1004,11 @@ pub fn handle_internal_bridge_call_from_host_context(
method: &str,
args: &[Value],
) -> Option<Value> {
let mut local_bridge = LocalBridgeState {
translator: GuestPathTranslator::from_host_context(
env,
host_cwd.to_path_buf(),
guest_cwd.to_owned(),
),
..LocalBridgeState::default()
};
// default + in-place assign: LocalBridgeState is Drop, so `..default()` (E0509)
// is not allowed.
let mut local_bridge = LocalBridgeState::default();
local_bridge.translator =
GuestPathTranslator::from_host_context(env, host_cwd.to_path_buf(), guest_cwd.to_owned());

match local_bridge.handle_internal_bridge_call(0, method, args) {
Some(LocalBridgeCallResult::Immediate(value)) => Some(value),
Expand Down Expand Up @@ -1828,19 +1880,20 @@ impl JavascriptExecutionEngine {
let pending_sync_rpc = Arc::new(Mutex::new(None));
let kernel_stdin = Arc::new(LocalKernelStdinBridge::default());
let standalone_translator = translator.clone();
// default + in-place assign: LocalBridgeState is Drop, so `..Default::default()`
// (E0509) is not allowed.
let mut local_bridge = LocalBridgeState::default();
local_bridge.translator = translator;
local_bridge.kernel_stdin = kernel_stdin.clone();
local_bridge.v8_session = Some(v8_session.clone());
local_bridge.module_reader = module_reader;
local_bridge.module_resolution = GuestModuleResolution::from_env(&request.env);
let events = spawn_v8_event_bridge(
frame_receiver,
pending_sync_rpc.clone(),
sync_rpc_timeout,
v8_session.clone(),
LocalBridgeState {
translator,
kernel_stdin: kernel_stdin.clone(),
v8_session: Some(v8_session.clone()),
module_reader,
module_resolution: GuestModuleResolution::from_env(&request.env),
..Default::default()
},
local_bridge,
);

// Install the direct module reader on the session thread BEFORE the Execute
Expand Down Expand Up @@ -3097,6 +3150,25 @@ impl LocalBridgeState {
self.next_timer_id
}

/// Allocate a fresh timer id and register a one-shot (`repeat == false`)
/// tracking entry at generation 0. Used by the bridge-timer path so the
/// spawned thread can be cancelled (its entry removed) on `clear`/teardown.
fn register_oneshot_timer(&mut self, delay_ms: u64) -> u64 {
self.next_timer_id += 1;
let timer_id = self.next_timer_id;
if let Ok(mut timers) = self.timers.lock() {
timers.insert(
timer_id,
LocalTimerEntry {
delay_ms,
generation: 0,
repeat: false,
},
);
}
timer_id
}

fn arm_kernel_timer(&self, timer_id: u64) {
let Some(session) = self.v8_session.clone() else {
return;
Expand All @@ -3117,23 +3189,7 @@ impl LocalBridgeState {
thread::sleep(Duration::from_millis(delay_ms));
}

let should_fire = timers
.lock()
.ok()
.and_then(|mut timers| {
let (current_generation, repeat) = timers
.get(&timer_id)
.map(|entry| (entry.generation, entry.repeat))?;
if current_generation != generation {
return Some(false);
}
if !repeat {
timers.remove(&timer_id);
}
Some(true)
})
.unwrap_or(false);
if !should_fire {
if !timer_should_fire(&timers, timer_id, generation) {
return;
}

Expand All @@ -3148,15 +3204,31 @@ impl LocalBridgeState {
}
}

fn schedule_bridge_timer_response(&self, call_id: u64, delay_ms: u64) {
fn schedule_bridge_timer_response(&mut self, call_id: u64, delay_ms: u64) {
let Some(session) = self.v8_session.clone() else {
return;
};

// Register the bridge timer in the shared `timers` map with a generation,
// mirroring the kernel-timer cancellation path. Previously this spawned a
// bare, untracked thread holding the session `Arc` with no cancellation and
// no generation check, so a cleared/destroyed session's timer would still
// fire (and pin the session) after the full delay. Tracking it means that
// when `LocalBridgeState` is dropped on session teardown (which clears the
// map) or the entry is otherwise removed, the woken thread observes the
// missing/mismatched generation via `timer_should_fire` and suppresses the
// response instead of touching the torn-down session.
let timer_id = self.register_oneshot_timer(delay_ms);
let generation = 0;
let timers = self.timers.clone();

thread::spawn(move || {
if delay_ms > 0 {
thread::sleep(Duration::from_millis(delay_ms));
}
if !timer_should_fire(&timers, timer_id, generation) {
return;
}
let _ = session.send_bridge_response(call_id, 0, Vec::new());
});
}
Expand Down Expand Up @@ -6908,4 +6980,139 @@ mod tests {
drop(receiver);
host.unregister_session(&session_id);
}

// --- Timer cancellation / cap regression tests (U4: H2 bridge timers, M3
// kernel timers). These assert the *safeguards firing* (delay clamped, timer
// entry reclaimed, callback suppressed) and never spawn unbounded threads. ---

#[test]
fn timer_delay_is_clamped_to_the_cap() {
// A guest can pass an arbitrarily large delay (up to u64::MAX ms); without
// a cap each scheduled timer spawns a thread that sleeps essentially
// forever while pinning the session Arc. The cap bounds that lifetime.
assert_eq!(
timer_delay_ms(Some(&json!(u64::MAX))),
MAX_TIMER_DELAY_MS,
"a u64::MAX delay must be clamped to MAX_TIMER_DELAY_MS"
);
assert_eq!(
timer_delay_ms(Some(&json!(1.0e308_f64))),
MAX_TIMER_DELAY_MS,
"an enormous float delay must be clamped to the cap"
);
assert_eq!(
timer_delay_ms(Some(&json!(MAX_TIMER_DELAY_MS + 1))),
MAX_TIMER_DELAY_MS,
"a delay one past the cap must clamp down to the cap"
);
// Below-cap values pass through unchanged so normal timers are unaffected.
assert_eq!(timer_delay_ms(Some(&json!(250))), 250);
assert_eq!(timer_delay_ms(Some(&json!(0))), 0);
}

#[test]
fn cleared_timer_is_suppressed_and_entry_reclaimed() {
// Mirrors what a woken bridge/kernel timer thread does after sleeping: it
// consults the shared map via `timer_should_fire`. When the entry has been
// removed (clear or session teardown), the callback must be suppressed.
let timers: Arc<Mutex<HashMap<u64, LocalTimerEntry>>> =
Arc::new(Mutex::new(HashMap::new()));
timers.lock().unwrap().insert(
7,
LocalTimerEntry {
delay_ms: 1_000,
generation: 0,
repeat: false,
},
);

// Simulate `kernelTimerClear` / teardown removing the entry before the
// thread wakes.
timers.lock().unwrap().remove(&7);

assert!(
!timer_should_fire(&timers, 7, 0),
"a cleared timer must not fire"
);
assert!(
timers.lock().unwrap().is_empty(),
"tracking map stays empty after a cleared timer is evaluated"
);
}

#[test]
fn rearmed_timer_generation_mismatch_suppresses_stale_thread() {
// The bridge/kernel timer thread captures the generation at schedule time.
// If the timer is re-armed (generation bumped) before the stale thread
// wakes, the stale thread must observe the mismatch and suppress, while the
// entry survives for the live generation.
let timers: Arc<Mutex<HashMap<u64, LocalTimerEntry>>> =
Arc::new(Mutex::new(HashMap::new()));
timers.lock().unwrap().insert(
3,
LocalTimerEntry {
delay_ms: 10,
generation: 1,
repeat: false,
},
);

// Stale thread captured generation 0; current entry is at generation 1.
assert!(
!timer_should_fire(&timers, 3, 0),
"a stale generation must be suppressed"
);
assert!(
timers.lock().unwrap().contains_key(&3),
"the live entry must survive a stale-generation evaluation"
);

// The matching (current) generation fires and reclaims the one-shot entry.
assert!(
timer_should_fire(&timers, 3, 1),
"the current generation must fire"
);
assert!(
timers.lock().unwrap().is_empty(),
"a fired one-shot timer must reclaim its id from the map"
);
}

#[test]
fn bridge_timer_registration_is_tracked_and_drop_clears_timers() {
// H2: the bridge-timer path must register its timer (so it is cancellable)
// rather than spawning a bare untracked thread, and session teardown
// (dropping LocalBridgeState) must wipe the tracking map so in-flight timer
// threads are cancelled.
let mut state = LocalBridgeState::default();
// Observe the same map the spawned threads would consult.
let timers = state.timers.clone();

let id_a = state.register_oneshot_timer(MAX_TIMER_DELAY_MS);
let id_b = state.register_oneshot_timer(500);
assert_ne!(id_a, id_b, "each bridge timer gets a fresh id");
assert_eq!(
timers.lock().unwrap().len(),
2,
"registered bridge timers are tracked in the shared map"
);
// A registered timer would fire for its captured generation (proving the
// entry is real and consultable) ...
assert!(timer_should_fire(&timers, id_a, 0));
// ... and seeding a still-pending one before teardown:
let id_c = state.register_oneshot_timer(1_000);

// Session teardown: dropping the bridge state must clear every timer so any
// sleeping thread wakes to a missing entry and suppresses its callback.
drop(state);

assert!(
timers.lock().unwrap().is_empty(),
"dropping LocalBridgeState must clear the timers map on teardown"
);
assert!(
!timer_should_fire(&timers, id_c, 0),
"a pending bridge timer is suppressed after teardown"
);
}
}
Loading
Loading