diff --git a/Cargo.lock b/Cargo.lock index 89dffa918ae..b8b19256f47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,7 +34,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" dependencies = [ - "getrandom 0.2.16", + "getrandom 0.2.13", "once_cell", "version_check", ] @@ -276,6 +276,12 @@ dependencies = [ "syn 2.0.107", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -2552,9 +2558,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.16" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +checksum = "a06fddc2749e0528d2813f95e050e87e52c8cbbae56223b9babf73b3e53b0cc6" dependencies = [ "cfg-if", "js-sys", @@ -6114,7 +6120,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.16", + "getrandom 0.2.13", ] [[package]] @@ -6188,7 +6194,7 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" dependencies = [ - "getrandom 0.2.16", + "getrandom 0.2.13", "libredox", "thiserror 1.0.69", ] @@ -6427,7 +6433,7 @@ checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", "cfg-if", - "getrandom 0.2.16", + "getrandom 0.2.13", "libc", "untrusted", "windows-sys 0.52.0", @@ -7660,7 +7666,7 @@ checksum = "db18cb19c7499ba4a65b1504442179a7e4aba487dc35978d90966c5ca02ee16b" dependencies = [ "bytemuck", "derive_more 0.99.20", - "getrandom 0.2.16", + "getrandom 0.2.13", "log", "rand 0.8.5", "scoped-tls", @@ -7679,7 +7685,7 @@ dependencies = [ "bytemuck", "bytes", "derive_more 0.99.20", - "getrandom 0.2.16", + "getrandom 0.2.13", "http 1.3.1", "insta", "log", @@ -8104,6 +8110,7 @@ dependencies = [ "spacetimedb-physical-plan", "spacetimedb-primitives 2.2.0", "spacetimedb-query", + "spacetimedb-runtime", "spacetimedb-sats 2.2.0", "spacetimedb-schema", "spacetimedb-snapshot", @@ -8201,6 +8208,7 @@ dependencies = [ "spacetimedb-commitlog", "spacetimedb-fs-utils", "spacetimedb-paths", + "spacetimedb-runtime", "spacetimedb-sats 2.2.0", "tempfile", "thiserror 1.0.69", @@ -8469,6 +8477,17 @@ dependencies = [ "spacetimedb-lib 2.2.0", ] +[[package]] +name = "spacetimedb-runtime" +version = "2.2.0" +dependencies = [ + "async-task", + "futures", + "libc", + "spin", + "tokio", +] + [[package]] name = "spacetimedb-sats" version = "1.9.0" @@ -8649,6 +8668,7 @@ dependencies = [ "spacetimedb-lib 2.2.0", "spacetimedb-paths", "spacetimedb-primitives 2.2.0", + "spacetimedb-runtime", "spacetimedb-sats 2.2.0", "spacetimedb-schema", "spacetimedb-table", diff --git a/Cargo.toml b/Cargo.toml index d1488e186df..f4f74204ea3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "crates/physical-plan", "crates/primitives", "crates/query", + "crates/runtime", "crates/sats", "crates/schema", "crates/smoketests", @@ -139,6 +140,7 @@ spacetimedb-pg = { path = "crates/pg", version = "=2.2.0" } spacetimedb-physical-plan = { path = "crates/physical-plan", version = "=2.2.0" } spacetimedb-primitives = { path = "crates/primitives", version = "=2.2.0" } spacetimedb-query = { path = "crates/query", version = "=2.2.0" } +spacetimedb-runtime = { path = "crates/runtime", version = "=2.2.0" } spacetimedb-sats = { path = "crates/sats", version = "=2.2.0" } spacetimedb-schema = { path = "crates/schema", version = "=2.2.0" } spacetimedb-standalone = { path = "crates/standalone", version = "=2.2.0" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index acdc578080d..6e7075536c2 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -28,6 +28,7 @@ spacetimedb-primitives.workspace = true spacetimedb-paths.workspace = true spacetimedb-physical-plan.workspace = true spacetimedb-query.workspace = true +spacetimedb-runtime = { workspace = true, features = ["tokio"] } spacetimedb-sats = { workspace = true, features = ["serde"] } spacetimedb-schema.workspace = true spacetimedb-table.workspace = true diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index c17a10e9f63..f749f72850a 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -9,9 +9,9 @@ use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData}; use spacetimedb_durability::Transaction; use spacetimedb_lib::Identity; use spacetimedb_sats::ProductValue; -use tokio::{runtime, time::timeout}; use crate::db::persistence::Durability; +use spacetimedb_runtime::Handle; pub(super) fn request_durability( durability: &Durability, @@ -32,11 +32,11 @@ pub(super) fn request_durability( })); } -pub(super) fn spawn_close(durability: Arc, runtime: &runtime::Handle, database_identity: Identity) { - let rt = runtime.clone(); - rt.spawn(async move { - let label = format!("[{database_identity}]"); - match timeout(Duration::from_secs(10), durability.close()).await { +pub(super) fn spawn_close(durability: Arc, runtime: &Handle, database_identity: Identity) { + let label = format!("[{database_identity}]"); + let runtime = runtime.clone(); + runtime.clone().spawn(async move { + match runtime.timeout(Duration::from_secs(10), durability.close()).await { Err(_elapsed) => { error!("{label} timeout waiting for durability shutdown"); } diff --git a/crates/core/src/db/persistence.rs b/crates/core/src/db/persistence.rs index 5b0daa5145c..ce3ef5d6841 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/core/src/db/persistence.rs @@ -7,6 +7,7 @@ use spacetimedb_paths::server::ServerDataDir; use spacetimedb_snapshot::DynSnapshotRepo; use crate::{messages::control_db::Database, util::asyncify}; +use spacetimedb_runtime::Handle; use super::{ relational_db::{self, Txdata}, @@ -41,8 +42,8 @@ pub struct Persistence { /// persistent (as opposed to in-memory) databases. This is enforced by /// this type. pub snapshots: Option, - /// The tokio runtime onto which durability-related tasks shall be spawned. - pub runtime: tokio::runtime::Handle, + /// Runtime onto which durability-related tasks shall be spawned. + pub runtime: Handle, } impl Persistence { @@ -52,6 +53,15 @@ impl Persistence { disk_size: impl Fn() -> io::Result + Send + Sync + 'static, snapshots: Option, runtime: tokio::runtime::Handle, + ) -> Self { + Self::new_with_runtime(durability, disk_size, snapshots, Handle::tokio(runtime)) + } + + pub fn new_with_runtime( + durability: impl spacetimedb_durability::Durability + 'static, + disk_size: impl Fn() -> io::Result + Send + Sync + 'static, + snapshots: Option, + runtime: Handle, ) -> Self { Self { durability: Arc::new(durability), @@ -91,7 +101,7 @@ impl Persistence { Option>, Option, Option, - Option, + Option, ) { this.map( |Self { @@ -143,13 +153,15 @@ impl PersistenceProvider for LocalPersistenceProvider { async fn persistence(&self, database: &Database, replica_id: u64) -> anyhow::Result { let replica_dir = self.data_dir.replica(replica_id); let snapshot_dir = replica_dir.snapshots(); + let runtime = Handle::tokio_current(); let database_identity = database.database_identity; let snapshot_worker = asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id)) .await - .map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled))?; - let (durability, disk_size) = relational_db::local_durability(replica_dir, Some(&snapshot_worker)).await?; + .map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled, runtime.clone()))?; + let (durability, disk_size) = + relational_db::local_durability(replica_dir, runtime.clone(), Some(&snapshot_worker)).await?; tokio::spawn(relational_db::snapshot_watching_commitlog_compressor( snapshot_worker.subscribe(), @@ -162,7 +174,7 @@ impl PersistenceProvider for LocalPersistenceProvider { durability, disk_size, snapshots: Some(snapshot_worker), - runtime: tokio::runtime::Handle::current(), + runtime, }) } } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 9f041c92ccb..57230e8866b 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -42,6 +42,7 @@ use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Identity; use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; +use spacetimedb_runtime::Handle; use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::raw_identifier::RawIdentifier; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; @@ -99,7 +100,7 @@ pub struct RelationalDB { inner: Locking, durability: Option>, - durability_runtime: Option, + durability_runtime: Option, snapshot_worker: Option, row_count_fn: RowCountFn, @@ -1669,9 +1670,9 @@ pub type LocalDurability = Arc>; /// of the commitlog. pub async fn local_durability( replica_dir: ReplicaDir, + runtime: Handle, snapshot_worker: Option<&SnapshotWorker>, ) -> Result<(LocalDurability, DiskSizeFn), DBError> { - let rt = tokio::runtime::Handle::current(); let on_new_segment = snapshot_worker.map(|snapshot_worker| { let snapshot_worker = snapshot_worker.clone(); Arc::new(move || { @@ -1683,7 +1684,7 @@ pub async fn local_durability( let local = asyncify(move || { durability::Local::open( replica_dir.clone(), - rt, + runtime, <_>::default(), // Give the durability a handle to request a new snapshot run, // which it will send down whenever we rotate commitlog segments. @@ -1949,19 +1950,22 @@ pub mod tests_utils { ) -> Result<(RelationalDB, Arc>), DBError> { let snapshots = want_snapshot_repo .then(|| { - open_snapshot_repo(root.snapshots(), db_identity, replica_id) - .map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled)) + open_snapshot_repo(root.snapshots(), db_identity, replica_id).map(|repo| { + SnapshotWorker::new(repo, snapshot::Compression::Disabled, Handle::tokio(rt.clone())) + }) }) .transpose()?; - let (local, disk_size_fn) = rt.block_on(local_durability(root.clone(), snapshots.as_ref()))?; + let runtime = Handle::tokio(rt.clone()); + let (local, disk_size_fn) = + rt.block_on(local_durability(root.clone(), runtime.clone(), snapshots.as_ref()))?; let history = local.as_history(); let persistence = Persistence { durability: local.clone(), disk_size: disk_size_fn, snapshots, - runtime: rt, + runtime, }; let (db, _) = RelationalDB::open( @@ -2074,17 +2078,20 @@ pub mod tests_utils { ) -> Result<(RelationalDB, Arc>), DBError> { let snapshots = want_snapshot_repo .then(|| { - open_snapshot_repo(root.snapshots(), Identity::ZERO, 0) - .map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled)) + open_snapshot_repo(root.snapshots(), Identity::ZERO, 0).map(|repo| { + SnapshotWorker::new(repo, snapshot::Compression::Disabled, Handle::tokio(rt.clone())) + }) }) .transpose()?; - let (local, disk_size_fn) = rt.block_on(local_durability(root.clone(), snapshots.as_ref()))?; + let runtime = Handle::tokio(rt.clone()); + let (local, disk_size_fn) = + rt.block_on(local_durability(root.clone(), runtime.clone(), snapshots.as_ref()))?; let history = local.as_history(); let persistence = Persistence { durability: local.clone(), disk_size: disk_size_fn, snapshots, - runtime: rt, + runtime, }; let db = Self::open_db(history, Some(persistence), None, 0)?; diff --git a/crates/core/src/db/snapshot.rs b/crates/core/src/db/snapshot.rs index 26e3d8373cf..178bbda3d72 100644 --- a/crates/core/src/db/snapshot.rs +++ b/crates/core/src/db/snapshot.rs @@ -17,7 +17,8 @@ use spacetimedb_lib::Identity; use spacetimedb_snapshot::{CompressionStats, DynSnapshotRepo}; use tokio::sync::watch; -use crate::{util::asyncify, worker_metrics::WORKER_METRICS}; +use crate::worker_metrics::WORKER_METRICS; +use spacetimedb_runtime::Handle; pub type SnapshotDatabaseState = Arc>; @@ -69,7 +70,7 @@ impl SnapshotWorker { /// The handle is only partially initialized, as it is lacking the /// [SnapshotDatabaseState]. This allows control code to [Self::subscribe] /// to future snapshots before handing off the worker to the database. - pub fn new(snapshot_repository: Arc, compression: Compression) -> Self { + pub fn new(snapshot_repository: Arc, compression: Compression, rt: Handle) -> Self { let database = snapshot_repository.database_identity(); let latest_snapshot = snapshot_repository.latest_snapshot().ok().flatten().unwrap_or(0); let (snapshot_created, _) = watch::channel(latest_snapshot); @@ -80,13 +81,15 @@ impl SnapshotWorker { snapshot_repo: snapshot_repository.clone(), snapshot_created: snapshot_created.clone(), metrics: SnapshotMetrics::new(database), + rt: rt.clone(), compression: compression.is_enabled().then(|| Compressor { snapshot_repo: snapshot_repository.clone(), metrics: CompressionMetrics::new(database), stats: <_>::default(), + rt: rt.clone(), }), }; - tokio::spawn(actor.run()); + rt.spawn(actor.run()); Self { snapshot_created, @@ -169,6 +172,7 @@ struct SnapshotWorkerActor { snapshot_repo: Arc, snapshot_created: watch::Sender, metrics: SnapshotMetrics, + rt: Handle, compression: Option, } @@ -220,21 +224,24 @@ impl SnapshotWorkerActor { let inner_timer = self.metrics.snapshot_timing_inner.clone(); let snapshot_repo = self.snapshot_repo.clone(); + let runtime = self.rt.clone(); let database_identity = self.snapshot_repo.database_identity(); - let maybe_snapshot = asyncify(move || { - let _timer = inner_timer.start_timer(); - Locking::take_snapshot_internal(&state, snapshot_repo.as_ref()) - }) - .await - .with_context(|| format!("error capturing snapshot of database {}", database_identity))?; - let (snapshot_offset, unflushed_snapshot) = maybe_snapshot.with_context(|| { - format!( - "refusing to take snapshot of database {} at TX offset -1", - database_identity - ) - })?; + let maybe_snapshot = runtime + .spawn_blocking(move || { + let _timer = inner_timer.start_timer(); + Locking::take_snapshot_internal(&state, snapshot_repo.as_ref()) + }) + .await + .with_context(|| format!("error capturing snapshot of database {}", database_identity))? + .with_context(|| { + format!( + "refusing to take snapshot of database {} at TX offset -1", + database_identity + ) + })?; + let (snapshot_offset, unflushed_snapshot) = maybe_snapshot; self.metrics .snapshot_timing_fsync .observe_closure_duration(|| unflushed_snapshot.sync_all())?; @@ -310,6 +317,7 @@ struct Compressor { snapshot_repo: Arc, metrics: CompressionMetrics, stats: Option, + rt: Handle, } impl Compressor { @@ -341,15 +349,17 @@ impl Compressor { let range = start..latest_snapshot; let mut stats = self.stats.take().unwrap_or_default(); - let (mut stats, res) = asyncify({ - let range = range.clone(); - move || { - let _timer = inner_timer.start_timer(); - let res = snapshot_repo.compress_snapshots(&mut stats, range); - (stats, res) - } - }) - .await; + let rt = self.rt.clone(); + let (mut stats, res) = rt + .spawn_blocking({ + let range = range.clone(); + move || { + let _timer = inner_timer.start_timer(); + let res = snapshot_repo.compress_snapshots(&mut stats, range); + (stats, res) + } + }) + .await; let elapsed = Duration::from_secs_f64(timer.stop_and_record()); self.metrics.report_and_reset(&mut stats); // Store stats for reuse. diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index d748fdd09ab..4c94df74ab8 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -2103,7 +2103,7 @@ mod tests { durability: durability.clone(), disk_size: Arc::new(|| Ok(<_>::default())), snapshots: None, - runtime: rt, + runtime: spacetimedb_runtime::Handle::tokio(rt), }), None, 0, diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index 0ea8022fcbe..4eaa3870001 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -21,6 +21,7 @@ scopeguard.workspace = true spacetimedb-commitlog.workspace = true spacetimedb-fs-utils.workspace = true spacetimedb-paths.workspace = true +spacetimedb-runtime = { workspace = true, features = ["tokio"] } spacetimedb-sats.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 04d46d8f634..e3eca56e5d9 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -19,11 +19,9 @@ use spacetimedb_commitlog::{ }; use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; use spacetimedb_paths::server::ReplicaDir; +use spacetimedb_runtime::{Handle, JoinHandle}; use thiserror::Error; -use tokio::{ - sync::watch, - task::{spawn_blocking, JoinHandle}, -}; +use tokio::sync::watch; use tracing::{instrument, Span}; use crate::{Close, Durability, DurableOffset, History, PreparedTx, TxOffset}; @@ -119,13 +117,13 @@ impl Local { /// /// `replica_dir` must already exist. /// - /// Background tasks are spawned onto the provided tokio runtime. + /// Background tasks are spawned onto the provided runtime. /// /// We will send a message down the `on_new_segment` channel whenever we begin a new commitlog segment. /// This is used to capture a snapshot each new segment. pub fn open( replica_dir: ReplicaDir, - rt: tokio::runtime::Handle, + rt: Handle, opts: Options, on_new_segment: Option>, ) -> Result { @@ -150,7 +148,7 @@ where R: RepoWithoutLockFile + Send + Sync + 'static, { /// Create a [`Local`] instance backed by the provided commitlog repo. - pub fn open_with_repo(repo: R, rt: tokio::runtime::Handle, opts: Options) -> Result { + pub fn open_with_repo(repo: R, rt: Handle, opts: Options) -> Result { info!("open local durability"); let clog = Arc::new(Commitlog::open_with_repo(repo, opts.commitlog)?); Self::open_inner(clog, rt, opts, None) @@ -164,7 +162,7 @@ where { fn open_inner( clog: Arc, R>>, - rt: tokio::runtime::Handle, + rt: Handle, opts: Options, lock: Option, ) -> Result { @@ -172,16 +170,13 @@ where let (queue, txdata_rx) = async_channel::bounded(queue_capacity); let queue_depth = Arc::new(AtomicU64::new(0)); let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); - let actor = rt.spawn( Actor { clog: clog.clone(), - durable_offset: durable_tx, queue_depth: queue_depth.clone(), - batch_capacity: opts.batch_capacity, - + runtime: rt.clone(), lock, } .run(txdata_rx), @@ -246,6 +241,7 @@ where queue_depth: Arc, batch_capacity: NonZeroUsize, + runtime: Handle, #[allow(unused)] lock: Option, @@ -281,15 +277,16 @@ where let clog = self.clog.clone(); let ready_len = tx_buf.len(); self.queue_depth.fetch_sub(ready_len as u64, Relaxed); - tx_buf = spawn_blocking(move || -> io::Result>>> { - for tx in tx_buf.drain(..) { - clog.commit([tx.into_transaction()])?; - } - Ok(tx_buf) - }) - .await - .expect("commitlog write panicked") - .expect("commitlog write failed"); + let runtime = self.runtime.clone(); + tx_buf = runtime + .spawn_blocking(move || -> io::Result>>> { + for tx in tx_buf.drain(..) { + clog.commit([tx.into_transaction()])?; + } + Ok(tx_buf) + }) + .await + .expect("commitlog write failed"); if self.flush_and_sync().await.is_err() { sync_on_exit = false; break; @@ -318,21 +315,22 @@ where let clog = self.clog.clone(); let span = Span::current(); - spawn_blocking(move || { - let _span = span.enter(); - clog.flush_and_sync() - }) - .await - .expect("commitlog flush-and-sync blocking task panicked") - .inspect_err(|e| warn!("error flushing commitlog: {e:#}")) - .inspect(|maybe_offset| { - if let Some(new_offset) = maybe_offset { - trace!("synced to offset {new_offset}"); - self.durable_offset.send_modify(|val| { - val.replace(*new_offset); - }); - } - }) + let runtime = self.runtime.clone(); + runtime + .spawn_blocking(move || { + let _span = span.enter(); + clog.flush_and_sync() + }) + .await + .inspect_err(|e| warn!("error flushing commitlog: {e:#}")) + .inspect(|maybe_offset| { + if let Some(new_offset) = maybe_offset { + trace!("synced to offset {new_offset}"); + self.durable_offset.send_modify(|val| { + val.replace(*new_offset); + }); + } + }) } } diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index 64e50faf4cc..2783b2178ec 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -161,7 +161,7 @@ async fn local_durability( ) -> Result, spacetimedb_durability::local::OpenError> { spacetimedb_durability::Local::open( dir, - tokio::runtime::Handle::current(), + spacetimedb_runtime::Runtime::tokio_current(), spacetimedb_durability::local::Options { commitlog: spacetimedb_commitlog::Options { max_segment_size, diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml new file mode 100644 index 00000000000..4cd0af60869 --- /dev/null +++ b/crates/runtime/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "spacetimedb-runtime" +version.workspace = true +edition.workspace = true +license-file = "LICENSE" +description = "Runtime and deterministic simulation utilities for SpacetimeDB" +rust-version.workspace = true + +[lints] +workspace = true + +[dependencies] +tokio = { workspace = true, optional = true } +async-task = { version = "4.4", default-features = false, optional = true } +spin = { version = "0.9", default-features = false, features = ["mutex", "spin_mutex"], optional = true } +libc = { version = "0.2", optional = true } + +[dev-dependencies] +futures.workspace = true + +[features] +default = ["tokio"] +tokio = ["dep:tokio"] +simulation = ["dep:async-task", "dep:spin", "dep:libc"] diff --git a/crates/runtime/DETERMINISM_COVERAGE.md b/crates/runtime/DETERMINISM_COVERAGE.md new file mode 100644 index 00000000000..076efb865e4 --- /dev/null +++ b/crates/runtime/DETERMINISM_COVERAGE.md @@ -0,0 +1,50 @@ +# Determinism Coverage + +This document tracks which sources of nondeterminism are under control in `spacetimedb-runtime`, which ones are only constrained by current architecture, and which ones still escape the simulator boundary. + +It is meant to serve two purposes: + +1. Make the current determinism boundary explicit for runtime code, core crates, and DST harnesses. +2. Provide a place to record and review assumptions when a PR changes that boundary. + +## Status Definitions + +- `Controlled` + The simulator or runtime owns this source of nondeterminism directly. Given the same seed and the same simulated inputs, behavior should replay the same way. + +- `Constrained` + This surface is not fully simulator-controlled, but the current architecture limits how it is used. Replay should remain stable if those constraints continue to hold. + +- `Audited` + This surface is not mechanically controlled. Current usage has been reviewed and is believed not to affect replay, but that guarantee depends on call patterns and can regress. + +- `Known Leak` + This source can currently escape simulator control and affect replay. It should be treated as explicit technical debt or a documented exception. + +- `Out of Scope` + This crate does not try to control this surface. If it matters for DST, it must be modeled by a higher-level abstraction or test harness. + +## Control Matrix + +| Surface | Status | Boundary | Current control or assumption | Failure mode if violated | Required direction | +| --- | --- | --- | --- | --- | --- | +| Executor scheduling | Controlled | `runtime::sim::executor` | Runnable selection is driven by seeded simulator RNG | Replay diverges across runs | - | +| Virtual time and timers | Controlled | `runtime::sim::time` | Simulated time advances only through explicit advance or next-timer jump | Timeouts and ordering become host-timing dependent | - | +| Runtime RNG and buggify | Controlled | `runtime::sim::rng` | Runtime RNG drives scheduler and probabilistic fault-injection decisions | RNG and fault decisions are not replayable | - | +| OS thread creation during simulation | Controlled | `runtime::sim_std` | Unix thread hook rejects `std::thread::spawn` while simulation is active | Host scheduler escapes simulator control | - | +| OS entropy | Known Leak | `runtime::sim_std` | Randomness requests warn and then delegate to the OS | Same seed can produce different traces | Add backtrace to warnings, remove call sites, eventually fail closed or fully model the source | +| `HashMap` randomized iteration | Audited | Runtime and caller code | Runtime does not force deterministic hash seeding; correctness must not depend on iteration order | Hidden ordering dependencies cause flaky replay | Prefer ordered maps or explicit sorting where observable order matters | +| `tokio::sync` primitives | Constrained | Core crates above runtime | These can be replay-compatible only when all participating tasks remain simulator-owned and progress stays on simulator-controlled async paths | Wake ordering or blocking semantics diverge once code depends on a real runtime or host-driven progress | Audit per primitive and push deep-core paths toward runtime-owned or single-threaded structures | +| `parking_lot::{}` and `std::sync::{}` | Constrained | Core crates, especially datastore | Safe only where access stays single-threaded or non-contended under DST | Host synchronization leaks nondeterministic acquisition order | Keep out of deep-core execution paths; prefer runtime-owned or single-threaded structures | +| File and network I/O | Out of Scope | Runtime crate | Runtime does not simulate filesystem or network behavior | Real I/O timing, ordering, and errors are not replayable | Model via domain-specific DST abstractions | +| Heap allocation and OOM | Known Leak | Broad, especially deep-core direction | Allocation happens through normal Rust paths; deterministic allocation failure is not modeled | Resource-exhaustion behavior is not reproducible | Move the simulation core and eventually deep-core paths toward `no_std + alloc` with explicit allocation boundaries | +| Snapshot / commitlog / datastore host effects | Out of Scope | Higher-level durability and storage layers | Runtime only provides scheduling, time, and fault-decision primitives | Storage semantics depend on real host behavior unless wrapped | Model durable behavior through domain-specific DST abstractions | + +## Update Rule + +A PR should update this document if it: + +- introduces a new source of nondeterminism, +- changes the control status of an existing surface, +- adds a new assumption about single-threading, iteration order, runtime ownership, or host behavior, or +- removes a leak or upgrades a surface from `Audited` or `Constrained` to `Controlled`. diff --git a/crates/runtime/LICENSE b/crates/runtime/LICENSE new file mode 120000 index 00000000000..8540cf8a991 --- /dev/null +++ b/crates/runtime/LICENSE @@ -0,0 +1 @@ +../../licenses/BSL.txt \ No newline at end of file diff --git a/crates/runtime/README.md b/crates/runtime/README.md new file mode 100644 index 00000000000..68037d752bf --- /dev/null +++ b/crates/runtime/README.md @@ -0,0 +1,61 @@ +> Welcome to the Matrix! + +# spacetimedb-runtime + +`spacetimedb-runtime` is a runtime boundary that lets SpacetimeDB core code run under deterministic simulation testing (DST). + +DST runs code inside a deterministic simulator that controls nondeterministic inputs instead of letting them come directly from the OS and real runtime environment. Given the same seed, the simulator should produce the same trace. When it finds a bug, the seed should be enough to reproduce that bug exactly. + +For this to work, code under test must not read clocks, randomness, scheduling, I/O, or network behavior directly from the outer environment. Those effects need interfaces that production can implement with real runtime-backed services and DST can replace with simulated ones. + +This crate provides the execution-control part of that boundary: spawning, timeouts, virtual time, deterministic randomness, task scheduling, and fault decisions. Storage, networking, and replication should be modeled through higher-level abstractions. + +For a tracked view of what is currently under simulator control, what is only constrained by convention, and what still leaks host behavior, see [DETERMINISM_COVERAGE.md](./DETERMINISM_COVERAGE.md). + +## Architecture + +[src/lib.rs](./src/lib.rs) exposes `Handle`, a small runtime handle shared code carries. It has two variants: + +- `Handle::Tokio(TokioHandle)` for real runtime execution. +- `Handle::Simulation(sim::Handle)` for deterministic simulation. + +[src/sim](./src/sim) contains the simulation core. It is single-threaded and targets `no_std + alloc`. The module includes: + +- `executor`: single-threaded task scheduler with deterministic runnable selection. +- `time`: virtual clock, sleeps, and timeouts. +- `rng`: seeded deterministic randomness for scheduler and workload decisions. +- `buggify`: fault-injection surface. Calls rng to decide probabilistically whether to inject failures into simulated operations. +- `node`: node builders and node-local scheduling handles. + +[src/sim_std.rs](./src/sim_std.rs) contains `std`/OS glue around the simulator: + +- `block_on` installs simulation guards for tests running in a normal process. +- `check_determinism` replays the same seeded workload twice and compares traces. +- libc randomness hooks warn and delegate if code reaches OS entropy. +- Unix thread hooks reject accidental `std::thread::spawn` while simulation is active. + +Tokio integration is intentionally small and lives directly in [src/lib.rs](./src/lib.rs). + +Feature flags: + +- `tokio`: enables the Tokio runtime backend and remains in the default feature set. +- `simulation`: enables the deterministic simulation runtime and `sim_std` helpers. + +## Design Principles + +- **Single-threaded runtime.** The simulator exposes interleaving and timeout bugs, but not bugs that require true parallel execution. The direction is to keep deep-core code single-threaded or close to thread-per-core; simulating real parallelism is out of scope. + +- **No built-in network, storage, or I/O simulation.** This crate provides deterministic execution primitives only. Higher-level harnesses should model message delivery, disk behavior, and failures. + +- **Not a Tokio replacement.** This crate does not aim to simulate APIs like `tokio::net` or `tokio::fs`. Code that depends on them needs a higher-level abstraction boundary. + +- **Zero dependency.** The simulation core in `sim/` is already `no_std + alloc`. The `sim_std` module is a thin OS-facing wrapper — the std dependency lives there, not in the simulation core itself. It stays until the application logic above this crate also moves to `no_std`. + +## Current Limitations + + +- **One shared virtual clock.** All simulated nodes share a single clock. This masks bugs related to timing mismatch across machines. + +- **No good alternative for blocking APIs.** The simulation backend has no `spawn_blocking` pool or OS thread escape hatch. API like `spawn_blocking` or `Handle::block_on` delegate to the single executor thread, so blocking inside them stalls all simulated tasks. The direction is to avoid relying on blocking semantics inside the simulation boundary. + +- **OS randomness is not controlled.** `sim_std` warns if code reaches OS entropy. The direction is to keep application code and testing harnesses off OS randomness entirely. diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs new file mode 100644 index 00000000000..6fdeeb81dfb --- /dev/null +++ b/crates/runtime/src/lib.rs @@ -0,0 +1,338 @@ +#[cfg(feature = "simulation")] +extern crate alloc; + +use core::{ + fmt, + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +#[cfg(feature = "simulation")] +pub mod sim; +#[cfg(feature = "simulation")] +pub mod sim_std; + +#[cfg(feature = "tokio")] +pub type TokioHandle = tokio::runtime::Handle; + +#[derive(Clone)] +pub enum Handle { + #[cfg(feature = "tokio")] + Tokio(TokioHandle), + #[cfg(feature = "simulation")] + Simulation(sim::Handle), +} + +pub struct JoinHandle { + inner: JoinHandleInner, +} + +pub struct AbortHandle { + inner: AbortHandleInner, +} + +enum JoinHandleInner { + #[cfg(feature = "tokio")] + Tokio(tokio::task::JoinHandle), + #[cfg(feature = "simulation")] + Simulation(sim::JoinHandle), + Detached(PhantomData), +} + +enum AbortHandleInner { + #[cfg(feature = "tokio")] + Tokio(tokio::task::AbortHandle), + #[cfg(feature = "simulation")] + Simulation(sim::AbortHandle), +} + +#[derive(Debug)] +pub struct JoinError { + inner: JoinErrorInner, +} + +#[derive(Debug)] +enum JoinErrorInner { + #[cfg(feature = "tokio")] + Tokio(tokio::task::JoinError), + #[cfg(feature = "simulation")] + Simulation(sim::JoinError), +} + +impl AbortHandle { + pub fn abort(&self) { + match &self.inner { + #[cfg(feature = "tokio")] + AbortHandleInner::Tokio(handle) => handle.abort(), + #[cfg(feature = "simulation")] + AbortHandleInner::Simulation(handle) => handle.abort(), + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + _ => unreachable!("runtime abort handle has no enabled backend"), + } + } +} + +impl JoinErrorInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + #[cfg(feature = "tokio")] + Self::Tokio(err) => fmt::Display::fmt(err, f), + #[cfg(feature = "simulation")] + Self::Simulation(err) => fmt::Display::fmt(err, f), + } + } +} + +impl fmt::Display for JoinError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + let _ = f; + #[cfg(any(feature = "tokio", feature = "simulation"))] + return self.inner.fmt(f); + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + unreachable!("runtime join error has no enabled backend") + } +} + +#[cfg(any(feature = "tokio", feature = "simulation"))] +impl std::error::Error for JoinError {} + +impl JoinHandleInner { + fn abort_handle(&self) -> AbortHandle { + match self { + #[cfg(feature = "tokio")] + Self::Tokio(handle) => AbortHandle { + inner: AbortHandleInner::Tokio(handle.abort_handle()), + }, + #[cfg(feature = "simulation")] + Self::Simulation(handle) => AbortHandle { + inner: AbortHandleInner::Simulation(handle.abort_handle()), + }, + Self::Detached(_) => unreachable!("abort_handle called on a completed handle"), + } + } + + fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll> { + match self { + #[cfg(feature = "tokio")] + Self::Tokio(handle) => match Pin::new(handle).poll(cx) { + Poll::Ready(Ok(output)) => Poll::Ready(Ok(output)), + Poll::Ready(Err(err)) => Poll::Ready(Err(JoinError { + inner: JoinErrorInner::Tokio(err), + })), + Poll::Pending => Poll::Pending, + }, + #[cfg(feature = "simulation")] + Self::Simulation(handle) => match Pin::new(handle).poll_join(cx) { + Poll::Ready(Ok(output)) => Poll::Ready(Ok(output)), + Poll::Ready(Err(err)) => Poll::Ready(Err(JoinError { + inner: JoinErrorInner::Simulation(err), + })), + Poll::Pending => Poll::Pending, + }, + Self::Detached(_) => unreachable!("poll_result called on a completed handle"), + } + } +} + +impl JoinHandle { + pub fn abort_handle(&self) -> AbortHandle { + self.inner.abort_handle() + } +} + +impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + let _ = cx; + match self.inner.poll_result(cx) { + Poll::Ready(Ok(output)) => { + self.inner = JoinHandleInner::Detached(PhantomData); + Poll::Ready(Ok(output)) + } + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => Poll::Pending, + } + } +} + +impl Drop for JoinHandle { + fn drop(&mut self) { + let inner = core::mem::replace(&mut self.inner, JoinHandleInner::Detached(PhantomData)); + #[cfg(feature = "simulation")] + if let JoinHandleInner::Simulation(handle) = inner { + handle.detach(); + return; + } + // For Tokio (and Detached), dropping the handle does not cancel the task. + drop(inner); + } +} + +impl Unpin for JoinHandle {} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct RuntimeTimeout; + +impl fmt::Display for RuntimeTimeout { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("runtime operation timed out") + } +} + +#[cfg(any(feature = "tokio", feature = "simulation"))] +impl std::error::Error for RuntimeTimeout {} + +#[cfg(feature = "tokio")] +impl Handle { + pub fn tokio(handle: TokioHandle) -> Self { + Self::Tokio(handle) + } + + pub fn tokio_current() -> Self { + Self::tokio(TokioHandle::current()) + } +} + +#[cfg(feature = "simulation")] +impl Handle { + pub fn simulation(handle: sim::Handle) -> Self { + Self::Simulation(handle) + } +} + +impl Handle { + pub fn spawn(&self, future: impl Future + Send + 'static) -> JoinHandle { + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + let _ = future; + match self { + #[cfg(feature = "tokio")] + Self::Tokio(handle) => JoinHandle { + inner: JoinHandleInner::Tokio(handle.spawn(future)), + }, + #[cfg(feature = "simulation")] + Self::Simulation(handle) => JoinHandle { + inner: JoinHandleInner::Simulation(handle.spawn_on(sim::NodeId::MAIN, future)), + }, + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + _ => unreachable!("runtime dispatch has no enabled backend"), + } + } + + pub async fn spawn_blocking(&self, f: F) -> R + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + let _ = &f; + match self { + #[cfg(feature = "tokio")] + Self::Tokio(_) => tokio::task::spawn_blocking(f) + .await + .unwrap_or_else(|e| match e.try_into_panic() { + Ok(panic_payload) => std::panic::resume_unwind(panic_payload), + Err(e) => panic!("Unexpected JoinError: {e}"), + }), + // This is only a facade placeholder for simulation today. It + // delegates to a normal simulated task, so the closure still runs + // on the single executor thread and can block overall runtime + // progress. Callers should not expect blocking-pool semantics on + // the simulation backend. + #[cfg(feature = "simulation")] + Self::Simulation(handle) => handle + .spawn_on(sim::NodeId::MAIN, async move { f() }) + .await + .expect("simulation spawn_blocking task should not be cancelled"), + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + _ => unreachable!("runtime dispatch has no enabled backend"), + } + } + + pub async fn timeout( + &self, + timeout_after: Duration, + future: impl Future, + ) -> Result { + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + let _ = (timeout_after, future); + match self { + #[cfg(feature = "tokio")] + Self::Tokio(_) => tokio::time::timeout(timeout_after, future) + .await + .map_err(|_| RuntimeTimeout), + #[cfg(feature = "simulation")] + Self::Simulation(handle) => handle.timeout(timeout_after, future).await.map_err(|_| RuntimeTimeout), + #[cfg(not(any(feature = "tokio", feature = "simulation")))] + _ => unreachable!("runtime dispatch has no enabled backend"), + } + } +} + +#[cfg(test)] +mod tests { + #[allow(unused_imports)] + use super::*; + #[allow(unused_imports)] + use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }; + + #[cfg(feature = "simulation")] + #[test] + fn dropping_joinhandle_does_not_cancel_task_in_simulation() { + use crate::sim::Runtime; + let mut rt = Runtime::new(4); + let handle = Handle::simulation(rt.handle()); + let flag = Arc::new(AtomicBool::new(false)); + let flag_clone = flag.clone(); + + rt.block_on(async { + let jh = handle.spawn(async move { + flag_clone.store(true, Ordering::Release); + }); + drop(jh); + + // Yield so the spawned task gets polled. + handle + .timeout(std::time::Duration::from_millis(50), async {}) + .await + .ok(); + }); + + assert!(flag.load(Ordering::Acquire)); + } + + #[cfg(feature = "simulation")] + #[test] + fn abort_cancels_task_in_simulation() { + use crate::sim::Runtime; + let mut rt = Runtime::new(4); + let handle = Handle::simulation(rt.handle()); + let flag = Arc::new(AtomicBool::new(false)); + let flag_clone = flag.clone(); + let handle_for_spawn = handle.clone(); + + rt.block_on(async move { + let jh = handle.spawn(async move { + handle_for_spawn + .timeout(std::time::Duration::from_millis(100), async {}) + .await + .ok(); + flag_clone.store(true, Ordering::Release); + }); + jh.abort_handle().abort(); + + let result = jh.await; + let _ = handle.timeout(std::time::Duration::from_millis(500), async {}).await; + assert!(result.is_err()); + assert!(!flag.load(Ordering::Acquire)); + }); + } +} diff --git a/crates/runtime/src/sim/buggify.rs b/crates/runtime/src/sim/buggify.rs new file mode 100644 index 00000000000..07188c6c207 --- /dev/null +++ b/crates/runtime/src/sim/buggify.rs @@ -0,0 +1,51 @@ +use crate::sim::Runtime; + +/// Probabilistic fault-injection helpers for simulation code. +/// +/// Reference: . +/// +/// Buggify is tied to a specific simulation runtime. Callers toggle it on that +/// runtime, then ask whether a fault should be injected at a particular point. +pub fn enable(runtime: &Runtime) { + runtime.enable_buggify(); +} + +/// Disable probabilistic fault injection for the given simulation runtime. +pub fn disable(runtime: &Runtime) { + runtime.disable_buggify(); +} + +/// Returns whether buggify is enabled for the given simulation runtime. +pub fn is_enabled(runtime: &Runtime) -> bool { + runtime.is_buggify_enabled() +} + +/// Returns whether the runtime should inject a fault at this point using the +/// default deterministic probability. +pub fn should_inject_fault(runtime: &Runtime) -> bool { + runtime.buggify() +} + +/// Returns whether the runtime should inject a fault at this point using the +/// provided deterministic probability. +pub fn should_inject_fault_with_prob(runtime: &Runtime, probability: f64) -> bool { + runtime.buggify_with_prob(probability) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn runtime_owned_buggify_controls_fault_injection() { + let runtime = Runtime::new(7); + + assert!(!is_enabled(&runtime)); + enable(&runtime); + assert!(is_enabled(&runtime)); + assert!(should_inject_fault_with_prob(&runtime, 1.0)); + disable(&runtime); + assert!(!is_enabled(&runtime)); + assert!(!should_inject_fault_with_prob(&runtime, 1.0)); + } +} diff --git a/crates/runtime/src/sim/executor/mod.rs b/crates/runtime/src/sim/executor/mod.rs new file mode 100644 index 00000000000..ff75cba0aef --- /dev/null +++ b/crates/runtime/src/sim/executor/mod.rs @@ -0,0 +1,793 @@ +use alloc::{collections::BTreeMap, sync::Arc, vec::Vec}; +use core::{ + fmt, + future::Future, + pin::Pin, + sync::atomic::{AtomicBool, AtomicU64, Ordering}, + task::{Context, Poll, Waker}, + time::Duration, +}; + +use spin::Mutex; + +use crate::sim::{time::TimeHandle, Rng}; + +mod task; +use task::Abortable; +pub use task::{AbortHandle, JoinError, JoinHandle}; + +type Runnable = async_task::Runnable; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct RuntimeConfig { + pub seed: u64, +} + +impl RuntimeConfig { + pub const fn new(seed: u64) -> Self { + Self { seed } + } +} + +impl Default for RuntimeConfig { + fn default() -> Self { + Self::new(0) + } +} + +/// A unique identifier for a simulated node. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct NodeId(u64); + +impl NodeId { + /// The default node for single-node simulation or top-level runtime work. + pub const MAIN: Self = Self(0); +} + +impl fmt::Display for NodeId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +/// Immutable metadata attached to one simulated node. +#[derive(Clone, Debug, Default, Eq, PartialEq)] +struct NodeConfig { + name: Option, +} + +/// Builder for configuring a simulated node before it is created. +pub struct NodeBuilder { + handle: Handle, + config: NodeConfig, +} + +impl NodeBuilder { + /// Assign a human-readable name to the node. + pub fn name(mut self, name: impl Into) -> Self { + self.config.name = Some(name.into()); + self + } + + /// Create the node with the accumulated configuration. + pub fn build(self) -> Node { + self.handle.build_node(self.config) + } +} + +/// Handle to one simulated node in the runtime. +#[derive(Clone)] +pub struct Node { + id: NodeId, + handle: Handle, + config: Arc, +} + +impl Node { + /// Return the stable identifier for this simulated node. + pub fn id(&self) -> NodeId { + self.id + } + + /// Return the optional human-readable name for this node. + pub fn name(&self) -> Option<&str> { + self.config.name.as_deref() + } + + /// Pause scheduling for this node. + pub fn pause(&self) { + self.handle.pause(self.id); + } + + /// Resume scheduling for this node. + pub fn resume(&self) { + self.handle.resume(self.id); + } + + /// Spawn a `Send` future onto this simulated node. + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.handle.spawn_on(self.id, future) + } + + /// Spawn a non-`Send` future onto this simulated node. + pub fn spawn_local(&self, future: F) -> JoinHandle + where + F: Future + 'static, + F::Output: 'static, + { + self.handle.spawn_local_on(self.id, future) + } +} + +/// A small single-threaded runtime for DST's top-level future. +/// +/// futures are scheduled as runnables, the ready queue +/// is sampled by deterministic RNG, and pending execution without future events +/// is considered a test hang. +pub struct Runtime { + executor: Arc, +} + +impl Runtime { + /// Create a simulation runtime seeded for deterministic scheduling and RNG. + pub fn new(seed: u64) -> Self { + Self::with_config(RuntimeConfig::new(seed)) + } + + /// Create a simulation runtime from an explicit runtime configuration. + pub fn with_config(config: RuntimeConfig) -> Self { + Self { + executor: Arc::new(Executor::new(config)), + } + } + + /// Drive a top-level future to completion on the simulation executor. + /// + /// While the future runs, spawned tasks share the same deterministic + /// scheduler, timer wheel, and runtime RNG. + pub fn block_on(&mut self, future: F) -> F::Output { + self.executor.block_on(future) + } + + /// Return the amount of virtual time elapsed in this runtime. + pub fn elapsed(&self) -> Duration { + self.executor.elapsed() + } + + /// Get a cloneable handle for spawning tasks and accessing runtime services. + pub fn handle(&self) -> Handle { + Handle { + executor: Arc::clone(&self.executor), + } + } + + /// Create a new simulated node. + /// + /// Nodes are a scheduling/pausing boundary rather than separate executors: + /// all nodes still run on the same single-threaded runtime. + pub fn create_node(&self) -> NodeBuilder { + self.handle().create_node() + } + + /// Pause scheduling for a node. + /// + /// Tasks already queued for the node are retained and will run only after + /// the node is resumed. + pub fn pause(&self, node: NodeId) { + self.handle().pause(node); + } + + /// Resume scheduling for a previously paused node. + pub fn resume(&self, node: NodeId) { + self.handle().resume(node); + } + + /// Spawn a `Send` future onto a specific simulated node. + pub fn spawn_on(&self, node: NodeId, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.handle().spawn_on(node, future) + } + + pub fn enable_buggify(&self) { + self.executor.enable_buggify(); + } + + /// Disable probabilistic fault injection for this runtime. + pub fn disable_buggify(&self) { + self.executor.disable_buggify(); + } + + /// Return whether buggify is enabled for this runtime. + pub fn is_buggify_enabled(&self) -> bool { + self.executor.is_buggify_enabled() + } + + /// Sample the default runtime buggify probability. + pub fn buggify(&self) -> bool { + self.executor.buggify() + } + + /// Sample a caller-provided runtime buggify probability. + pub fn buggify_with_prob(&self, probability: f64) -> bool { + self.executor.buggify_with_prob(probability) + } + + #[allow(dead_code)] + pub(crate) fn enable_determinism_log(&self) { + self.executor.rng.enable_determinism_log(); + } + + #[allow(dead_code)] + pub(crate) fn enable_determinism_check(&self, log: crate::sim::DeterminismLog) { + self.executor.rng.enable_determinism_check(log); + } + + #[allow(dead_code)] + pub(crate) fn take_determinism_log(&self) -> Option { + self.executor.rng.take_determinism_log() + } + + #[allow(dead_code)] + pub(crate) fn finish_determinism_check(&self) -> Result<(), alloc::string::String> { + self.executor.rng.finish_determinism_check() + } +} + +/// Cloneable access to the simulation executor. +#[derive(Clone)] +pub struct Handle { + executor: Arc, +} + +impl Handle { + /// Create a new simulated node owned by this runtime. + pub fn create_node(&self) -> NodeBuilder { + NodeBuilder { + handle: self.clone(), + config: NodeConfig::default(), + } + } + + fn build_node(&self, config: NodeConfig) -> Node { + let id = self.executor.create_node(config.clone()); + let config = self.executor.node_config(id); + Node { + id, + handle: self.clone(), + config, + } + } + + /// Pause scheduling for a node. + pub fn pause(&self, node: NodeId) { + self.executor.pause(node); + } + + /// Resume scheduling for a node and requeue any buffered tasks for it. + pub fn resume(&self, node: NodeId) { + self.executor.resume(node); + } + + /// Spawn a `Send` future onto a specific simulated node. + pub fn spawn_on(&self, node: NodeId, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.executor.spawn_on(node, future) + } + + /// Spawn a non-`Send` future onto a specific simulated node. + /// + /// This is only valid because the simulation executor is single-threaded. + pub fn spawn_local_on(&self, node: NodeId, future: F) -> JoinHandle + where + F: Future + 'static, + F::Output: 'static, + { + self.executor.spawn_local_on(node, future) + } + + /// Return the current virtual time for this runtime. + pub fn now(&self) -> Duration { + self.executor.time.now() + } + + /// Move virtual time forward explicitly. + pub fn advance(&self, duration: Duration) { + self.executor.time.advance(duration); + } + + /// Create a future that becomes ready after `duration` of virtual time. + pub fn sleep(&self, duration: Duration) -> crate::sim::time::Sleep { + self.executor.time.sleep(duration) + } + + /// Race a future against a virtual-time timeout. + pub async fn timeout( + &self, + duration: Duration, + future: impl Future, + ) -> Result { + self.executor.time.timeout(duration, future).await + } + + pub fn enable_buggify(&self) { + self.executor.enable_buggify(); + } + + /// Disable probabilistic fault injection for this runtime. + pub fn disable_buggify(&self) { + self.executor.disable_buggify(); + } + + /// Return whether buggify is enabled for this runtime. + pub fn is_buggify_enabled(&self) -> bool { + self.executor.is_buggify_enabled() + } + + /// Sample the default runtime buggify probability. + pub fn buggify(&self) -> bool { + self.executor.buggify() + } + + /// Sample a caller-provided runtime buggify probability. + pub fn buggify_with_prob(&self, probability: f64) -> bool { + self.executor.buggify_with_prob(probability) + } +} + +/// Core single-threaded scheduler backing a simulation [`Runtime`]. +/// +/// The executor owns the runnable queue, per-node pause state, deterministic +/// RNG, and virtual time. Tasks are selected from the queue using the runtime +/// RNG so the schedule is reproducible for a given seed. +struct Executor { + queue: Receiver, + sender: Sender, + nodes: spin::Mutex>>, + next_node: AtomicU64, + rng: Rng, + time: TimeHandle, +} + +impl Executor { + /// Construct a fresh executor with one default `MAIN` node. + fn new(config: RuntimeConfig) -> Self { + let queue = Queue::new(); + let mut nodes = BTreeMap::new(); + nodes.insert(NodeId::MAIN, Arc::new(NodeRecord::default())); + Self { + queue: queue.receiver(), + sender: queue.sender(), + nodes: spin::Mutex::new(nodes), + next_node: AtomicU64::new(1), + rng: Rng::new(config.seed), + time: TimeHandle::new(), + } + } + + fn elapsed(&self) -> Duration { + self.time.now() + } + + fn enable_buggify(&self) { + self.rng.enable_buggify(); + } + + fn disable_buggify(&self) { + self.rng.disable_buggify(); + } + + fn is_buggify_enabled(&self) -> bool { + self.rng.is_buggify_enabled() + } + + fn buggify(&self) -> bool { + self.rng.buggify() + } + + fn buggify_with_prob(&self, probability: f64) -> bool { + self.rng.buggify_with_prob(probability) + } + + fn create_node(&self, config: NodeConfig) -> NodeId { + let id = NodeId(self.next_node.fetch_add(1, Ordering::Relaxed)); + self.nodes.lock().insert( + id, + Arc::new(NodeRecord { + config: Arc::new(config), + state: NodeState::default(), + }), + ); + id + } + + fn node_config(&self, node: NodeId) -> Arc { + self.node_record(node).config.clone() + } + + /// Mark a node as paused so newly selected runnables are buffered. + fn pause(&self, node: NodeId) { + self.node_record(node).state.paused.store(true, Ordering::Relaxed); + } + + /// Mark a node as runnable again and requeue any buffered tasks for it. + fn resume(&self, node: NodeId) { + let record = self.node_record(node); + record.state.paused.store(false, Ordering::Relaxed); + + let mut paused = record.state.paused_queue.lock(); + for runnable in paused.drain(..) { + self.sender.send(runnable); + } + } + + /// Spawn a `Send` task and enqueue its runnable on the shared runtime queue. + fn spawn_on(&self, node: NodeId, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.assert_known_node(node); + + let abort = AbortHandle::new(); + let abortable = Abortable::new(future, abort.clone()); + let sender = self.sender.clone(); + let (runnable, task) = async_task::Builder::new() + .metadata(node) + .spawn(move |_| abortable, move |runnable| sender.send(runnable)); + runnable.schedule(); + + JoinHandle { task, abort } + } + + /// Spawn a non-`Send` task on the single-threaded runtime. + fn spawn_local_on(&self, node: NodeId, future: F) -> JoinHandle + where + F: Future + 'static, + F::Output: 'static, + { + self.assert_known_node(node); + + let abort = AbortHandle::new(); + let abortable = Abortable::new(future, abort.clone()); + let sender = self.sender.clone(); + let (runnable, task) = unsafe { + async_task::Builder::new() + .metadata(node) + .spawn_unchecked(move |_| abortable, move |runnable| sender.send(runnable)) + }; + runnable.schedule(); + + JoinHandle { task, abort } + } + + #[track_caller] + /// Run the top-level future until completion. + /// + /// The executor repeatedly drains runnable tasks, then advances virtual + /// time to the next timer when the queue is empty. If neither runnable work + /// nor timers remain, the simulation is considered deadlocked. + fn block_on(&self, future: F) -> F::Output { + let sender = self.sender.clone(); + let (runnable, mut task) = unsafe { + async_task::Builder::new() + .metadata(NodeId::MAIN) + .spawn_unchecked(move |_| future, move |runnable| sender.send(runnable)) + }; + runnable.schedule(); + + loop { + self.run_all_ready(); + if task.is_finished() { + let waker = Waker::noop(); + return match Pin::new(&mut task).poll(&mut Context::from_waker(waker)) { + Poll::Ready(output) => output, + Poll::Pending => unreachable!("task.is_finished() was true"), + }; + } + + if self.time.wake_next_timer() { + continue; + } + + panic!("no runnable tasks; all simulated tasks are blocked"); + } + } + + /// Drain the runnable queue, selecting tasks in deterministic RNG order. + /// + /// Paused-node tasks are diverted into that node's paused buffer instead of + /// being polled immediately. + fn run_all_ready(&self) { + while let Some(runnable) = self.queue.try_recv_random(&self.rng) { + let node = *runnable.metadata(); + let record = self.node_record(node); + if record.state.paused.load(Ordering::Relaxed) { + record.state.paused_queue.lock().push(runnable); + continue; + } + // TODO: Do some time advance here too + runnable.run(); + } + } + + /// Look up the record for a node, panicking if the node is unknown. + fn node_record(&self, node: NodeId) -> Arc { + self.nodes + .lock() + .get(&node) + .cloned() + .unwrap_or_else(|| panic!("unknown simulated node {node}")) + } + + fn assert_known_node(&self, node: NodeId) { + let _ = self.node_record(node); + } +} + +/// One simulated node's immutable metadata plus scheduler state. +#[derive(Clone, Default)] +struct NodeRecord { + config: Arc, + state: NodeState, +} + +/// Per-node scheduler state shared by tasks assigned to that node. +#[derive(Clone, Default)] +struct NodeState { + paused: Arc, + paused_queue: Arc>>, +} + +/// Yield back to the scheduler once. +/// +/// This is the smallest explicit interleaving point available to simulated +/// tasks when they need to give other runnables a chance to execute. +pub async fn yield_now() { + YieldNow { yielded: false }.await +} + +/// One-shot future backing [`yield_now`]. +struct YieldNow { + yielded: bool, +} + +impl Future for YieldNow { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.yielded { + Poll::Ready(()) + } else { + self.yielded = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +/// Shared runnable queue used by the simulation executor. +/// TODO: Make it generic over T +struct Queue { + inner: Arc, +} + +/// Sending end of the runnable queue. +#[derive(Clone)] +struct Sender { + inner: Arc, +} + +/// Receiving end of the runnable queue. +#[derive(Clone)] +struct Receiver { + inner: Arc, +} + +/// Queue storage for runnables awaiting scheduling. +struct QueueInner { + queue: Mutex>, +} + +impl Queue { + fn new() -> Self { + Self { + inner: Arc::new(QueueInner { + queue: Mutex::new(Vec::new()), + }), + } + } + + fn sender(&self) -> Sender { + Sender { + inner: self.inner.clone(), + } + } + + fn receiver(&self) -> Receiver { + Receiver { + inner: self.inner.clone(), + } + } +} + +impl Sender { + /// Push a runnable onto the shared queue. + fn send(&self, runnable: Runnable) { + self.inner.queue.lock().push(runnable); + } +} + +impl Receiver { + /// Remove one runnable using the runtime RNG to choose among ready tasks. + fn try_recv_random(&self, rng: &Rng) -> Option { + let mut queue = self.inner.queue.lock(); + if queue.is_empty() { + return None; + } + let idx = rng.index(queue.len()); + Some(queue.swap_remove(idx)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }; + + use super::*; + use crate::sim::RuntimeConfig; + + #[test] + fn paused_node_does_not_run_until_resumed() { + let mut runtime = Runtime::new(1); + let node = runtime.create_node().name("paused").build(); + node.pause(); + + let runs = Arc::new(AtomicUsize::new(0)); + let task_runs = Arc::clone(&runs); + let task = node.spawn(async move { + task_runs.fetch_add(1, Ordering::SeqCst); + 7 + }); + + runtime.block_on(async { + yield_now().await; + }); + assert_eq!(runs.load(Ordering::SeqCst), 0); + + node.resume(); + assert_eq!(runtime.block_on(task).expect("paused task should complete"), 7); + assert_eq!(runs.load(Ordering::SeqCst), 1); + } + + #[test] + fn handle_can_spawn_onto_node_from_simulated_task() { + let mut runtime = Runtime::new(2); + let handle = runtime.handle(); + + let value = runtime.block_on(async move { + let node = handle.create_node().name("spawned").build(); + node.spawn(async { 11 }).await.expect("spawned task should complete") + }); + + assert_eq!(value, 11); + } + + #[test] + fn runtime_config_sets_seed() { + let runtime = Runtime::with_config(RuntimeConfig::new(77)); + let handle = runtime.handle(); + handle.enable_buggify(); + + let actual = (0..8).map(|_| handle.buggify_with_prob(0.5)).collect::>(); + + let expected = { + let rng = Rng::new(77); + rng.enable_buggify(); + (0..8).map(|_| rng.buggify_with_prob(0.5)).collect::>() + }; + + assert_eq!(actual, expected); + } + + #[test] + fn runtime_and_handle_share_buggify_state() { + let runtime = Runtime::new(6); + let handle = runtime.handle(); + + assert!(!runtime.is_buggify_enabled()); + runtime.enable_buggify(); + assert!(handle.is_buggify_enabled()); + assert!(handle.buggify_with_prob(1.0)); + handle.disable_buggify(); + assert!(!runtime.is_buggify_enabled()); + } + + #[test] + fn aborted_task_returns_join_error_when_awaited() { + let mut runtime = Runtime::new(8); + let node = runtime.create_node().name("abort").build(); + let task = node.spawn(async move { + yield_now().await; + 99 + }); + task.abort_handle().abort(); + + let err = runtime + .block_on(task) + .expect_err("aborted task should surface JoinError instead of panicking"); + assert_eq!(err, JoinError); + } + + #[cfg(feature = "simulation")] + #[test] + fn sim_std_block_on_can_spawn_local_task_with_explicit_handle() { + let mut runtime = Runtime::new(5); + let handle = runtime.handle(); + let node = handle.create_node().name("local").build(); + let value = crate::sim_std::block_on(&mut runtime, async move { + let captured = std::rc::Rc::new(17); + node.spawn_local(async move { + yield_now().await; + *captured + }) + .await + .expect("spawned local task should complete") + }); + + assert_eq!(value, 17); + } + + #[test] + fn node_builder_sets_name() { + let runtime = Runtime::new(9); + let unnamed = runtime.create_node().build(); + let named = runtime.create_node().name("replica-1").build(); + + assert_eq!(unnamed.name(), None); + assert_eq!(named.name(), Some("replica-1")); + assert_ne!(unnamed.id(), named.id()); + } + + #[cfg(feature = "simulation")] + #[test] + fn check_determinism_runs_future_twice() { + static CALLS: AtomicUsize = AtomicUsize::new(0); + CALLS.store(0, Ordering::SeqCst); + + let value = crate::sim_std::check_determinism(3, || async { + CALLS.fetch_add(1, Ordering::SeqCst); + yield_now().await; + 13 + }); + + assert_eq!(value, 13); + assert_eq!(CALLS.load(Ordering::SeqCst), 2); + } + + #[cfg(feature = "simulation")] + #[test] + #[should_panic(expected = "non-determinism detected")] + fn check_determinism_rejects_different_scheduler_sequence() { + static FIRST_RUN: AtomicBool = AtomicBool::new(true); + FIRST_RUN.store(true, Ordering::SeqCst); + + crate::sim_std::check_determinism(4, || async { + if FIRST_RUN.swap(false, Ordering::SeqCst) { + yield_now().await; + } + }); + } +} diff --git a/crates/runtime/src/sim/executor/task.rs b/crates/runtime/src/sim/executor/task.rs new file mode 100644 index 00000000000..bf03a8293d3 --- /dev/null +++ b/crates/runtime/src/sim/executor/task.rs @@ -0,0 +1,159 @@ +use alloc::sync::Arc; +use core::{ + fmt, + future::Future, + pin::Pin, + sync::atomic::{AtomicBool, Ordering}, + task::{Context, Poll, Waker}, +}; + +use spin::Mutex; + +use super::NodeId; + +/// A spawned simulated task. +/// +/// Two handles reference the same underlying allocation: +/// - `JoinHandle` awaits the output and holds an `AbortHandle` for cancellation. +/// - The executor holds the `Runnable` (not visible here). +pub struct JoinHandle { + // async_task::Task owns a shared heap-allocated cell that holds the future, + // its output, metadata (NodeId), and waker. Polling it drives the future + // to completion. Dropping it without detach cancels the future. + pub(crate) task: async_task::Task, NodeId>, + // Clone of the same AbortHandle that Abortable holds inside the task. + pub(crate) abort: AbortHandle, +} + +impl JoinHandle { + /// Return a handle that can cancel this task. + pub fn abort_handle(&self) -> AbortHandle { + self.abort.clone() + } + + /// Drop the join handle without cancelling the task. + pub fn detach(self) { + // async_task::Task::detach makes Drop a no-op — the future keeps running. + self.task.detach(); + } + + /// Poll the underlying async_task::Task for its output. + pub(crate) fn poll_join(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // async_task::Task implements Future. Polling it drives the wrapped + // Abortable future inside the executor. + Pin::new(&mut self.task).poll(cx) + } +} + +impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.as_mut().poll_join(cx) + } +} + +/// Two-phase cancellation for a simulated task. +/// +/// [`AbortHandle`] and [`Abortable`] work together: +/// - `abort()` sets an atomic flag and wakes the task so it gets polled. +/// - On the next poll, `Abortable` checks the flag and returns `Err(JoinError)`. +/// - `JoinHandle::poll` reads that error and surfaces it to the awaiting code. +/// - The task's future is dropped naturally when `Abortable` returns `Err`. +/// +/// `abort()` is thread-safe — it can be called from any task or node, and the +/// waker ensures the target task is re-scheduled even if it was blocked on I/O +/// or a timer. +#[derive(Clone)] +pub struct AbortHandle { + state: Arc, +} + +impl AbortHandle { + pub(crate) fn new() -> Self { + Self { + state: Arc::new(AbortState::new()), + } + } + + pub fn abort(&self) { + // Step 1: atomically mark the task as aborted. + self.state.aborted.store(true, Ordering::Relaxed); + // Step 2: wake the task so the executor re-schedules it for polling. + // If the task is blocked on a timer, the waker cancels that wait. + if let Some(waker) = self.state.waker.lock().take() { + waker.wake(); + } + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct JoinError; + +impl fmt::Display for JoinError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("task was cancelled") + } +} + +#[cfg(feature = "simulation")] +impl std::error::Error for JoinError {} + +// Shared state between AbortHandle and Abortable. +struct AbortState { + // Set to true by AbortHandle::abort(), read by Abortable::poll(). + aborted: AtomicBool, + // The executor's waker, registered by Abortable on every poll. + // Stored so abort() can wake the task even if it's waiting on I/O. + waker: Mutex>, +} + +impl AbortState { + fn new() -> Self { + Self { + aborted: AtomicBool::new(false), + waker: Mutex::new(None), + } + } +} + +/// Wraps a future so it can be cancelled via an [`AbortHandle`]. +/// +/// The executor wraps every spawned future in `Abortable`. On each poll it +/// checks the cancellation flag before progressing the inner future. +pub(crate) struct Abortable { + future: F, + abort: AbortHandle, +} + +impl Abortable { + pub(crate) fn new(future: F, abort: AbortHandle) -> Self { + Self { future, abort } + } +} + +impl Future for Abortable { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Check cancellation before doing any work. + if self.abort.state.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Err(JoinError)); + } + + // Register the waker so abort() can wake this task. + self.abort.state.waker.lock().replace(cx.waker().clone()); + + // SAFETY: The `Abortable` struct is `#[repr(transparent)]`-like in its + // pin projection: `future` is behind the cancellation fields (`abort`) + // that are never moved once pinned. We use `map_unchecked_mut` to project + // through the struct layout, which is safe because: + // 1. `future` is a direct field of `Abortable` — no indirection. + // 2. `abort` is never moved or modified in ways that would change the + // address of `future` relative to `self`. + // 3. The caller guarantees `self` stays pinned for the lifetime of the + // future. + let mut future = unsafe { self.map_unchecked_mut(|this| &mut this.future) }; + future.as_mut().poll(cx).map(Ok) + } +} diff --git a/crates/runtime/src/sim/mod.rs b/crates/runtime/src/sim/mod.rs new file mode 100644 index 00000000000..ccdcc104991 --- /dev/null +++ b/crates/runtime/src/sim/mod.rs @@ -0,0 +1,10 @@ +pub mod buggify; +mod executor; +mod rng; +pub mod time; + +pub use executor::{ + yield_now, AbortHandle, Handle, JoinError, JoinHandle, Node, NodeBuilder, NodeId, Runtime, RuntimeConfig, +}; +pub(crate) use rng::DeterminismLog; +pub use rng::{GlobalRng, Rng}; diff --git a/crates/runtime/src/sim/rng.rs b/crates/runtime/src/sim/rng.rs new file mode 100644 index 00000000000..c210ff8b781 --- /dev/null +++ b/crates/runtime/src/sim/rng.rs @@ -0,0 +1,202 @@ +use alloc::{format, string::String}; +use alloc::{sync::Arc, vec::Vec}; +use spin::Mutex; + +pub type Rng = GlobalRng; + +/// Shared deterministic RNG for the simulation core. +/// +/// The simulator owns one runtime-wide RNG handle and uses it for scheduler +/// choices, probabilistic fault injection, and determinism checks. Hosted +/// conveniences such as thread-local current-RNG access and libc random hooks +/// live in `crate::sim_std`, not here. +#[derive(Clone, Debug)] +pub struct GlobalRng { + inner: Arc>, +} + +#[derive(Debug)] +struct Inner { + /// Seed used to initialize the runtime RNG, carried for diagnostics and replay. + seed: u64, + /// Deterministic generator used for scheduler choices and fault injection decisions. + rng: SplitMix64, + /// Checkpoints recorded during the first determinism run. + log: Option>, + /// Expected checkpoints plus the number already consumed during replay. + check: Option<(Vec, usize)>, + /// Whether probabilistic fault injection is currently enabled for this runtime. + buggify_enabled: bool, +} + +const GAMMA: u64 = 0x9e37_79b9_7f4a_7c15; + +/// Reference for SplitMix64 algorithm: https://rosettacode.org/wiki/Pseudo-random_numbers/Splitmix64 +/// Splitmix64 is the default pseudo-random number generator algorithm. +/// It uses a fairly simple algorithm that, though it is considered +/// to be poor for cryptographic purposes, is very fast to calculate, +/// and is "good enough" for many random number needs. +/// It passes several fairly rigorous PRNG "fitness" tests that some more complex algorithms fail. +#[derive(Clone, Debug)] +struct SplitMix64 { + state: u64, +} + +impl SplitMix64 { + fn new(seed: u64) -> Self { + Self { state: seed } + } + + fn next_u64(&mut self) -> u64 { + self.state = self.state.wrapping_add(GAMMA); + mix64(self.state) + } + + fn fill_bytes(&mut self, dest: &mut [u8]) { + for chunk in dest.chunks_mut(core::mem::size_of::()) { + let bytes = self.next_u64().to_ne_bytes(); + chunk.copy_from_slice(&bytes[..chunk.len()]); + } + } +} + +fn mix64(mut x: u64) -> u64 { + x = (x ^ (x >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9); + x = (x ^ (x >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb); + x ^ (x >> 31) +} + +impl GlobalRng { + /// Create a new deterministic RNG for a simulation runtime. + pub fn new(seed: u64) -> Self { + Self { + inner: Arc::new(Mutex::new(Inner { + seed, + rng: SplitMix64::new(seed), + log: None, + check: None, + buggify_enabled: false, + })), + } + } + + pub fn next_u64(&self) -> u64 { + self.with_inner(|inner| inner.rng.next_u64()) + } + + pub fn index(&self, len: usize) -> usize { + assert!(len > 0, "len must be non-zero"); + (self.next_u64() as usize) % len + } + + pub fn sample_probability(&self, probability: f64) -> bool { + probability_sample(self.next_u64(), probability) + } + + pub fn enable_buggify(&self) { + self.inner.lock().buggify_enabled = true; + } + + pub fn disable_buggify(&self) { + self.inner.lock().buggify_enabled = false; + } + + pub fn is_buggify_enabled(&self) -> bool { + self.inner.lock().buggify_enabled + } + + pub fn buggify(&self) -> bool { + self.buggify_with_prob(0.25) + } + + pub fn buggify_with_prob(&self, probability: f64) -> bool { + self.is_buggify_enabled() && self.sample_probability(probability) + } + + #[allow(dead_code)] + pub(crate) fn seed(&self) -> u64 { + self.inner.lock().seed + } + + fn with_inner(&self, f: impl FnOnce(&mut Inner) -> T) -> T { + let mut inner = self.inner.lock(); + let output = f(&mut inner); + if inner.log.is_some() || inner.check.is_some() { + let checkpoint = checksum(inner.rng.clone().next_u64()); + if let Some(log) = &mut inner.log { + log.push(checkpoint); + } + let seed = inner.seed; + if let Some((expected, consumed)) = &mut inner.check { + if expected.get(*consumed) != Some(&checkpoint) { + panic!("non-determinism detected for seed {} at checkpoint {consumed}", seed); + } + *consumed += 1; + } + } + output + } + + #[allow(dead_code)] + pub(crate) fn fill_bytes(&self, dest: &mut [u8]) { + self.with_inner(|inner| inner.rng.fill_bytes(dest)); + } + + #[allow(dead_code)] + pub(crate) fn enable_determinism_log(&self) { + let mut inner = self.inner.lock(); + inner.log = Some(Vec::new()); + inner.check = None; + } + + #[allow(dead_code)] + pub(crate) fn enable_determinism_check(&self, log: DeterminismLog) { + let mut inner = self.inner.lock(); + inner.check = Some((log.0, 0)); + inner.log = None; + } + + #[allow(dead_code)] + pub(crate) fn take_determinism_log(&self) -> Option { + let mut inner = self.inner.lock(); + inner + .log + .take() + .or_else(|| inner.check.take().map(|(log, _)| log)) + .map(DeterminismLog) + } + + #[allow(dead_code)] + pub(crate) fn finish_determinism_check(&self) -> Result<(), String> { + let inner = self.inner.lock(); + if let Some((log, consumed)) = &inner.check + && *consumed != log.len() + { + return Err(format!( + "non-determinism detected for seed {}: consumed {consumed} of {} checkpoints", + inner.seed, + log.len() + )); + } + Ok(()) + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct DeterminismLog(Vec); + +fn probability_sample(value: u64, probability: f64) -> bool { + if probability <= 0.0 { + return false; + } + if probability >= 1.0 { + return true; + } + + let unit = (value >> 11) as f64 * (1.0 / ((1u64 << 53) as f64)); + unit < probability +} + +fn checksum(value: u64) -> u8 { + value.to_ne_bytes().into_iter().fold(0, |acc, byte| acc ^ byte) +} diff --git a/crates/runtime/src/sim/time/mod.rs b/crates/runtime/src/sim/time/mod.rs new file mode 100644 index 00000000000..56f707201be --- /dev/null +++ b/crates/runtime/src/sim/time/mod.rs @@ -0,0 +1,308 @@ +mod sleep; + +use alloc::{collections::BTreeMap, sync::Arc, vec::Vec}; +use core::{ + fmt, + future::Future, + pin::pin, + task::{Poll, Waker}, + time::Duration, +}; +use sleep::wake_all; +use spin::Mutex; + +pub use sleep::Sleep; + +/// Shared virtual clock and timer registry for one simulation runtime. +/// +/// Virtual clock that only advances when explicitly driven — no wall-clock +/// progression, like Tokio's time-pause mode. +/// +/// All cloned handles observe the same virtual `now`, pending timers, and +/// timer-id sequence. The executor uses this handle both for explicit +/// time-travel operations and for jumping directly to the next pending timer +/// when the runnable queue is empty. +#[derive(Clone, Debug)] +pub struct TimeHandle { + inner: Arc>, +} + +impl TimeHandle { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(TimeState::default())), + } + } + + pub fn now(&self) -> Duration { + self.inner.lock().now + } + + /// Move virtual time forward by an explicit amount. + /// + /// This is the direct "advance the clock" operation used by tests and + /// higher-level simulation code. It updates `now`, removes any timers that + /// became due at the new instant, and wakes the corresponding tasks after + /// releasing the lock. + pub fn advance(&self, duration: Duration) { + if duration.is_zero() { + return; + } + + let wakers = { + let mut state = self.inner.lock(); + state.now = state.now.saturating_add(duration); + state.take_due_wakers() + }; + wake_all(wakers); + } + + /// Jump virtual time to the earliest outstanding timer and wake it. + /// + /// The executor calls this when there are no runnable tasks left. Instead + /// of incrementing time in wall-clock steps, simulation time jumps + /// directly to the minimum timer deadline. Returns `false` if there are no + /// timers to wake. + pub fn wake_next_timer(&self) -> bool { + let wakers = { + let mut state = self.inner.lock(); + let Some(next_deadline) = state.timers.values().map(|timer| timer.deadline).min() else { + return false; + }; + if next_deadline > state.now { + state.now = next_deadline; + } + state.take_due_wakers() + }; + let woke = !wakers.is_empty(); + wake_all(wakers); + woke + } + + /// Register or refresh a timer entry for a sleeping future. + /// + /// Sleep futures keep a stable `TimerId` across polls. Re-registering with + /// the same id updates the stored waker without creating duplicate timers. + fn register_timer(&self, id: TimerId, deadline: Duration, waker: &Waker) { + let mut state = self.inner.lock(); + state.timers.insert( + id, + TimerEntry { + deadline, + waker: waker.clone(), + }, + ); + } + + /// Remove a timer entry if it is still present. + /// + /// Cancellation is best-effort because the timer may already have been + /// removed by a wakeup path before the caller reaches this point. + fn cancel_timer(&self, id: TimerId) { + self.inner.lock().timers.remove(&id); + } + + /// Allocate a fresh timer id for a new sleep future. + /// + /// Stable timer ids are what let a `Sleep` future re-register itself + /// across polls while still mapping back to a single timer entry. + fn next_timer_id(&self) -> TimerId { + let mut state = self.inner.lock(); + let id = TimerId(state.next_timer_id); + state.next_timer_id = state.next_timer_id.saturating_add(1); + id + } + + /// Create a future that becomes ready after `duration` of virtual time. + /// + /// The returned future is lazy: it does not allocate a timer entry until + /// the first poll, when it can anchor its deadline to the current virtual + /// time. + pub fn sleep(&self, duration: Duration) -> Sleep { + Sleep::new(self.clone(), duration) + } + + /// Race a future against a virtual-time sleep. + /// + /// Uses a biased `poll_fn` that polls `future` before `sleep`. If both are + /// ready in the same step, the main future wins — completion beats timeout + /// deterministically. + pub async fn timeout(&self, duration: Duration, future: impl Future) -> Result { + let sleep = self.sleep(duration); + let mut future = pin!(future); + let mut sleep = pin!(sleep); + + core::future::poll_fn(|cx| { + if let Poll::Ready(output) = future.as_mut().poll(cx) { + return Poll::Ready(Ok(output)); + } + if let Poll::Ready(()) = sleep.as_mut().poll(cx) { + return Poll::Ready(Err(TimeoutElapsed { duration })); + } + Poll::Pending + }) + .await + } +} + +impl Default for TimeHandle { + fn default() -> Self { + Self::new() + } +} + +/// Mutable state behind a [`TimeHandle`]. +/// +/// `timers` is keyed by stable `TimerId` so a `Sleep` future can refresh its +/// waker across polls without accumulating duplicate entries. A `BTreeMap` is +/// used to keep due-timer iteration deterministic. +#[derive(Debug, Default)] +struct TimeState { + now: Duration, + next_timer_id: u64, + timers: BTreeMap, +} + +impl TimeState { + /// Remove every timer whose deadline is at or before the current virtual + /// time and return their wakers. + fn take_due_wakers(&mut self) -> Vec { + let due = self + .timers + .iter() + .filter_map(|(id, timer)| (timer.deadline <= self.now).then_some(*id)) + .collect::>(); + due.into_iter() + .filter_map(|id| self.timers.remove(&id).map(|timer| timer.waker)) + .collect() + } +} + +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct TimerId(u64); + +/// Stored metadata for one pending timer. +#[derive(Debug)] +struct TimerEntry { + deadline: Duration, + waker: Waker, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct TimeoutElapsed { + duration: Duration, +} + +impl TimeoutElapsed { + pub fn duration(self) -> Duration { + self.duration + } +} + +impl fmt::Display for TimeoutElapsed { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "simulated timeout elapsed after {:?}", self.duration) + } +} + +#[cfg(any(feature = "tokio", feature = "simulation"))] +impl std::error::Error for TimeoutElapsed {} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, time::Duration}; + + use crate::sim; + use spin::Mutex; + + #[test] + fn sleep_fast_forwards_virtual_time() { + let mut runtime = sim::Runtime::new(101); + let handle = runtime.handle(); + + runtime.block_on(async move { + assert_eq!(handle.now(), Duration::ZERO); + handle.sleep(Duration::from_millis(5)).await; + assert_eq!(handle.now(), Duration::from_millis(5)); + }); + } + + #[test] + fn shorter_timer_wakes_first() { + let mut runtime = sim::Runtime::new(102); + let handle = runtime.handle(); + let order = Arc::new(Mutex::new(Vec::new())); + + runtime.block_on({ + let order = Arc::clone(&order); + async move { + let slow_order = Arc::clone(&order); + let slow_handle = handle.clone(); + let slow = handle.spawn_on(sim::NodeId::MAIN, async move { + slow_handle.sleep(Duration::from_millis(10)).await; + slow_order.lock().push(10); + }); + + let fast_order = Arc::clone(&order); + let fast_handle = handle.clone(); + let fast = handle.spawn_on(sim::NodeId::MAIN, async move { + fast_handle.sleep(Duration::from_millis(3)).await; + fast_order.lock().push(3); + }); + + fast.await.expect("fast timer task should complete"); + slow.await.expect("slow timer task should complete"); + } + }); + + assert_eq!(*order.lock(), vec![3, 10]); + assert_eq!(runtime.elapsed(), Duration::from_millis(10)); + } + + #[test] + fn explicit_advance_moves_virtual_time() { + let mut runtime = sim::Runtime::new(103); + let handle = runtime.handle(); + + runtime.block_on(async move { + handle.advance(Duration::from_millis(7)); + assert_eq!(handle.now(), Duration::from_millis(7)); + }); + } + + #[test] + fn timeout_returns_future_output_before_deadline() { + let mut runtime = sim::Runtime::new(104); + let handle = runtime.handle(); + + let output = runtime.block_on(async move { + handle + .timeout(Duration::from_millis(10), async { + handle.sleep(Duration::from_millis(3)).await; + 9 + }) + .await + }); + + assert_eq!(output, Ok(9)); + assert_eq!(runtime.elapsed(), Duration::from_millis(3)); + } + + #[test] + fn timeout_expires_at_virtual_deadline() { + let mut runtime = sim::Runtime::new(105); + let handle = runtime.handle(); + + let output = runtime.block_on(async move { + handle + .timeout(Duration::from_millis(4), async { + handle.sleep(Duration::from_millis(20)).await; + 9 + }) + .await + }); + + assert_eq!(output.unwrap_err().duration(), Duration::from_millis(4)); + assert_eq!(runtime.elapsed(), Duration::from_millis(4)); + } +} diff --git a/crates/runtime/src/sim/time/sleep.rs b/crates/runtime/src/sim/time/sleep.rs new file mode 100644 index 00000000000..53d5555ffc3 --- /dev/null +++ b/crates/runtime/src/sim/time/sleep.rs @@ -0,0 +1,108 @@ +use alloc::vec::Vec; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll, Waker}, + time::Duration, +}; + +use super::{TimeHandle, TimerId}; + +/// Future returned by [`TimeHandle::sleep`]. +/// +/// Three-state machine: +/// +/// 1. **Unregistered** — first poll. Converts the relative `duration` into an +/// absolute `deadline` using the current virtual time and registers with the +/// time handle's timer table. Transitions to `Registered`. +/// +/// 2. **Registered** — subsequent polls. If virtual time has reached the +/// deadline, the timer is cancelled and the future returns `Ready`. +/// Otherwise, the waker is refreshed in the timer entry and the future +/// returns `Pending`. +/// +/// 3. **Done** — any later poll returns `Ready(()`) immediately. +/// +/// On drop while `Registered`, the timer entry is cancelled to prevent stale +/// wakers from firing after the future is abandoned. +pub struct Sleep { + duration: Duration, + state: SleepState, +} + +impl Sleep { + pub(super) fn new(handle: TimeHandle, duration: Duration) -> Self { + Self { + duration, + state: SleepState::Unregistered { handle }, + } + } +} + +/// Internal state machine for [`Sleep`]. +enum SleepState { + Unregistered { + handle: TimeHandle, + }, + Registered { + handle: TimeHandle, + id: TimerId, + deadline: Duration, + }, + Done, +} + +impl Future for Sleep { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if matches!(self.state, SleepState::Done) { + return Poll::Ready(()); + } + + if let SleepState::Unregistered { handle } = &self.state { + let handle = handle.clone(); + let deadline = handle.now().saturating_add(self.duration); + let id = handle.next_timer_id(); + self.state = SleepState::Registered { handle, id, deadline }; + } + + let SleepState::Registered { handle, id, deadline } = &self.state else { + unreachable!("sleep state should be registered or done"); + }; + + if handle.now() >= *deadline { + let handle = handle.clone(); + let id = *id; + handle.cancel_timer(id); + self.state = SleepState::Done; + Poll::Ready(()) + } else { + handle.register_timer(*id, *deadline, cx.waker()); + Poll::Pending + } + } +} + +impl Drop for Sleep { + /// Remove a pending timer entry when the future is dropped early. + /// + /// This prevents stale wakers from remaining in the runtime after the + /// corresponding task has been cancelled or a timeout race has completed. + fn drop(&mut self) { + if let SleepState::Registered { handle, id, .. } = &self.state { + handle.cancel_timer(*id); + } + } +} + +/// Wake every task collected from a due-timer scan. +/// +/// Waking happens only after the time-state mutex has been released so resumed +/// tasks can inspect or mutate timer state without deadlocking on the same +/// lock. +pub(super) fn wake_all(wakers: Vec) { + for waker in wakers { + waker.wake(); + } +} diff --git a/crates/runtime/src/sim_std.rs b/crates/runtime/src/sim_std.rs new file mode 100644 index 00000000000..5305c6ea166 --- /dev/null +++ b/crates/runtime/src/sim_std.rs @@ -0,0 +1,207 @@ +//! Std-hosted entry points for running the deterministic simulator in tests. +//! +//! The portable simulator lives in [`crate::sim`]. This module is deliberately +//! host-specific: it installs thread-local context while a simulation is +//! running, checks determinism by replaying a seed in fresh OS threads, and +//! intercepts a few libc calls so std code cannot silently escape determinism. + +#![allow(clippy::disallowed_macros)] + +use alloc::boxed::Box; +use core::{cell::Cell, future::Future}; +use std::sync::OnceLock; + +use crate::sim; + +// Public entry points. + +/// Run a future to completion with std-hosted determinism guards installed. +/// +/// This wraps [`sim::Runtime::block_on`] and is the normal entry point for DST +/// tests that execute inside a hosted process. While the future runs, this +/// marks the thread as inside simulation so OS thread spawns can be rejected. +pub fn block_on(runtime: &mut sim::Runtime, future: F) -> F::Output { + let _system_thread_context = enter_simulation_thread(); + runtime.block_on(future) +} + +/// Run the same future factory twice and assert that both runs consume the same +/// deterministic RNG/scheduler trace. +/// +/// Each pass runs on a fresh OS thread so thread-local std state is not shared +/// between the recording and replay passes. +pub fn check_determinism(seed: u64, make_future: M) -> F::Output +where + M: Fn() -> F + Clone + Send + 'static, + F: Future + 'static, + F::Output: Send + 'static, +{ + let first = make_future.clone(); + let log = std::thread::spawn(move || { + let mut runtime = sim::Runtime::new(seed); + runtime.enable_determinism_log(); + block_on(&mut runtime, first()); + runtime + .take_determinism_log() + .expect("determinism log should be enabled") + }) + .join() + .map_err(|payload| panic_with_seed(seed, payload)) + .unwrap(); + + std::thread::spawn(move || { + let mut runtime = sim::Runtime::new(seed); + runtime.enable_determinism_check(log); + let output = block_on(&mut runtime, make_future()); + runtime.finish_determinism_check().unwrap_or_else(|err| panic!("{err}")); + output + }) + .join() + .map_err(|payload| panic_with_seed(seed, payload)) + .unwrap() +} + +fn panic_with_seed(seed: u64, payload: Box) -> ! { + eprintln!("note: run with --seed {seed} to reproduce this error"); + std::panic::resume_unwind(payload); +} + +// Simulation thread context. + +// Ambient state used only while `sim_std::block_on` is driving a simulation. +// +// The simulator itself stays explicit-handle based. This thread-local only +// marks whether the current OS thread is owned by a running simulation so +// host thread creation can be rejected. +thread_local! { + // Marks the current OS thread as simulation-owned so thread creation hooks + // can reject accidental escapes to the host scheduler. + static IN_SIMULATION: Cell = const { Cell::new(false) }; +} + +struct SimulationThreadGuard { + previous: bool, +} + +fn enter_simulation_thread() -> SimulationThreadGuard { + let previous = IN_SIMULATION.with(|state| state.replace(true)); + SimulationThreadGuard { previous } +} + +fn in_simulation() -> bool { + IN_SIMULATION.with(Cell::get) +} + +impl Drop for SimulationThreadGuard { + fn drop(&mut self) { + IN_SIMULATION.with(|state| { + state.set(self.previous); + }); + } +} + +// Thread hook. + +// Hook Unix thread creation by interposing `pthread_attr_init`. +// +// `std::thread::Builder::spawn` initializes pthread attributes before creating +// the thread. Returning an error here while simulation is active makes hidden +// OS thread creation fail early, before host scheduling can affect replay. +// Outside simulation, this delegates to the real libc symbol through `RTLD_NEXT`. +#[cfg(unix)] +#[unsafe(no_mangle)] +#[inline(never)] +unsafe extern "C" fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int { + // std::thread enters libc through pthread_attr_init on Unix. Refusing that + // call while in simulation keeps hidden OS scheduling out of DST. + if in_simulation() { + eprintln!("attempt to spawn a system thread in simulation."); + eprintln!("note: use simulator tasks instead."); + return -1; + } + + type PthreadAttrInit = unsafe extern "C" fn(*mut libc::pthread_attr_t) -> libc::c_int; + static PTHREAD_ATTR_INIT: OnceLock = OnceLock::new(); + let original = PTHREAD_ATTR_INIT.get_or_init(|| unsafe { + // `RTLD_NEXT` skips this interposed function and finds the libc + // implementation that would have been called without the simulator. + let ptr = libc::dlsym(libc::RTLD_NEXT, c"pthread_attr_init".as_ptr().cast()); + assert!(!ptr.is_null(), "failed to resolve original pthread_attr_init"); + std::mem::transmute(ptr) + }); + unsafe { original(attr) } +} + +// Randomness syscall hooks. + +// Hook OS randomness by interposing `getrandom`. +// +// This crate no longer tries to make host randomness deterministic. Any such +// request is surfaced with a warning and then delegated to the host OS. +#[unsafe(no_mangle)] +#[inline(never)] +unsafe extern "C" fn getrandom(buf: *mut u8, buflen: usize, flags: u32) -> isize { + if in_simulation() { + eprintln!("warning: randomness requested; delegating to host OS"); + eprintln!("{}", std::backtrace::Backtrace::force_capture()); + } + unsafe { real_getrandom()(buf, buflen, flags) } +} + +#[cfg(target_os = "linux")] +fn real_getrandom() -> unsafe extern "C" fn(*mut u8, usize, u32) -> isize { + type GetrandomFn = unsafe extern "C" fn(*mut u8, usize, u32) -> isize; + static GETRANDOM: OnceLock = OnceLock::new(); + *GETRANDOM.get_or_init(|| unsafe { + let ptr = libc::dlsym(libc::RTLD_NEXT, c"getrandom".as_ptr().cast()); + assert!(!ptr.is_null(), "failed to resolve original getrandom"); + std::mem::transmute(ptr) + }) +} + +#[cfg(not(target_os = "linux"))] +fn real_getrandom() -> unsafe extern "C" fn(*mut u8, usize, u32) -> isize { + compile_error!("unsupported OS for DST getrandom override"); +} + +// Hook `getentropy` and route it through the same deterministic path as +// `getrandom`. +// +// The 256-byte limit is part of the getentropy contract. Keeping this wrapper +// small means all entropy decisions stay centralized in `getrandom`. +#[unsafe(no_mangle)] +#[inline(never)] +unsafe extern "C" fn getentropy(buf: *mut u8, buflen: usize) -> i32 { + if buflen > 256 { + return -1; + } + match unsafe { getrandom(buf, buflen, 0) } { + -1 => -1, + _ => 0, + } +} + +#[cfg(test)] +mod tests { + use crate::sim; + + use super::getentropy; + + #[test] + #[cfg(unix)] + fn runtime_forbids_system_thread_spawn() { + let mut runtime = sim::Runtime::new(200); + super::block_on(&mut runtime, async { + let result = std::panic::catch_unwind(|| std::thread::Builder::new().spawn(|| {})); + assert!(result.is_err()); + }); + } + + #[test] + fn getentropy_delegates_to_host_randomness_outside_simulation() { + let mut actual = [0u8; 24]; + unsafe { + assert_eq!(getentropy(actual.as_mut_ptr(), actual.len()), 0); + } + } +} diff --git a/crates/runtime/tests/sim_e2e.rs b/crates/runtime/tests/sim_e2e.rs new file mode 100644 index 00000000000..53c218da729 --- /dev/null +++ b/crates/runtime/tests/sim_e2e.rs @@ -0,0 +1,367 @@ +#![cfg(feature = "simulation")] +#![allow(clippy::disallowed_macros)] + +use std::{sync::Arc, time::Duration}; + +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; +use spacetimedb_runtime::sim::{buggify, Rng, Runtime}; +use spin::Mutex; + +/// One reply produced by the simulated server. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +struct Response { + id: u64, + value: u64, + at: Duration, +} + +/// Trace entries recorded by the server so tests can assert schedule/fault outcomes. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum ServerEvent { + Received { id: u64, at: Duration }, + Dropped { id: u64, at: Duration }, + Replied { id: u64, at: Duration }, +} + +/// A client request submitted to the simulated server. +struct Request { + id: u64, + input: u64, + respond_to: oneshot::Sender, +} + +/// Complete result of the client/server workload for one seed. +#[derive(Debug, Eq, PartialEq)] +struct ClientServerRun { + responses: Vec<(u64, Option)>, + server_events: Vec, + elapsed: Duration, +} + +/// Checks the "same seed, same trace" side of the client/server workload. +/// Both the client-visible results and the server-side event trace should stay +/// stable for one fixed seed. +#[test] +fn client_server_buggify_injects_deterministic_faults() { + let run = run_buggified_client_server(404); + + assert_eq!( + run.responses, + vec![ + (0, None), + ( + 1, + Some(Response { + id: 1, + value: 50, + at: Duration::from_millis(2), + }), + ), + ( + 2, + Some(Response { + id: 2, + value: 70, + at: Duration::from_millis(3), + }), + ), + (3, None), + ( + 4, + Some(Response { + id: 4, + value: 110, + at: Duration::from_millis(5), + }), + ), + ] + ); + assert_eq!( + run.server_events, + vec![ + ServerEvent::Received { + id: 3, + at: Duration::ZERO, + }, + ServerEvent::Received { + id: 0, + at: Duration::ZERO, + }, + ServerEvent::Received { + id: 2, + at: Duration::ZERO, + }, + ServerEvent::Received { + id: 4, + at: Duration::ZERO, + }, + ServerEvent::Received { + id: 1, + at: Duration::ZERO, + }, + ServerEvent::Dropped { + id: 0, + at: Duration::from_millis(1), + }, + ServerEvent::Replied { + id: 1, + at: Duration::from_millis(2), + }, + ServerEvent::Replied { + id: 2, + at: Duration::from_millis(3), + }, + ServerEvent::Dropped { + id: 3, + at: Duration::from_millis(4), + }, + ServerEvent::Replied { + id: 4, + at: Duration::from_millis(5), + }, + ] + ); + assert_eq!(run.elapsed, Duration::from_millis(5)); +} + +/// Checks the "different seed, different exploration" side of the same +/// client/server workload. The full run result should differ across seeds. +#[test] +fn client_server_buggify_differs_across_seeds() { + let seed_404 = run_buggified_client_server(404); + let seed_405 = run_buggified_client_server(405); + + eprintln!("seed 404: {seed_404:#?}"); + eprintln!("seed 405: {seed_405:#?}"); + assert_ne!(seed_404, seed_405); +} + +/// Fixed request set used by the client workload. +const CLIENT_REQUESTS: [(u64, u64); 5] = [(0, 4), (1, 5), (2, 7), (3, 9), (4, 11)]; + +/// Run a small concurrent client/server workload under one seed. +/// +/// The client submits every request from its own simulated task. The server +/// receives requests in scheduler order, then spawns one worker per request. +/// Each worker sleeps for deterministic virtual latency and may drop the reply +/// based on buggify. +fn run_buggified_client_server(seed: u64) -> ClientServerRun { + // --- setup: runtime, buggify, two nodes, and communication channels --- + let mut runtime = Runtime::new(seed); + buggify::enable(&runtime); + let handle = runtime.handle(); + let client_node = runtime.create_node().name("client").build(); + let server_node = runtime.create_node().name("server").build(); + // mpsc channel: client tasks send Request messages to the server task + let (request_tx, mut request_rx) = mpsc::unbounded::(); + let server_events = Arc::new(Mutex::new(Vec::new())); + + let (responses, server_events) = runtime.block_on(async move { + // --- server: receive 5 requests, spawn one worker per request --- + let server_handle = handle.clone(); + let server_events_for_server = Arc::clone(&server_events); + let server = server_node.clone().spawn(async move { + let mut workers = Vec::new(); + // Receive all 5 requests before processing any replies + for _ in 0..5 { + let request = request_rx.next().await.expect("client should send request"); + server_events_for_server.lock().push(ServerEvent::Received { + id: request.id, + at: server_handle.now(), + }); + + // --- server worker: simulate latency, then drop or reply based on buggify --- + let worker_handle = server_handle.clone(); + let worker_events = Arc::clone(&server_events_for_server); + workers.push(server_node.clone().spawn(async move { + // Deterministic virtual latency: each request id has a distinct sleep + worker_handle.sleep(Duration::from_millis(request.id + 1)).await; + // buggify decides whether to drop this request (40% probability) + if worker_handle.buggify_with_prob(0.4) { + worker_events.lock().push(ServerEvent::Dropped { + id: request.id, + at: worker_handle.now(), + }); + return; + } + + // No fault injected: send the reply + let response = Response { + id: request.id, + value: request.input * 10, + at: worker_handle.now(), + }; + worker_events.lock().push(ServerEvent::Replied { + id: request.id, + at: response.at, + }); + request + .respond_to + .send(response) + .expect("client should wait for response"); + })); + } + + // Wait for all server workers to complete + for worker in workers { + worker.await.expect("server worker should complete"); + } + }); + + // --- client: spawn one task per request, send them to server, collect responses --- + let client_outer_node = client_node.clone(); + let client = client_node.spawn(async move { + let mut requests = Vec::new(); + // Spawn a task for each request so they submit concurrently + for (id, input) in CLIENT_REQUESTS { + let request_tx = request_tx.clone(); + let client_request_node = client_outer_node.clone(); + requests.push(client_request_node.spawn(async move { + let (respond_to, response_rx) = oneshot::channel(); + request_tx + .unbounded_send(Request { id, input, respond_to }) + .expect("server inbox should be open"); + // Await the server's reply (None if the server dropped this request) + (id, response_rx.await.ok()) + })); + } + // All requests sent, close the channel so the server loop terminates + drop(request_tx); + + // Collect responses in spawn order + let mut responses = Vec::new(); + for request in requests { + responses.push(request.await.expect("client request task should complete")); + } + responses + }); + + // Drive both client and server to completion + let responses = client.await.expect("client task should complete"); + server.await.expect("server task should complete"); + (responses, server_events.lock().clone()) + }); + + // --- package the results: client responses, server trace, and total virtual time --- + ClientServerRun { + responses, + server_events, + elapsed: runtime.elapsed(), + } +} + +/// Exercises the executor, node pause/resume, and timer wheel together: +/// paused node work must not run until resumed, and all nodes must observe +/// one shared virtual clock. +#[test] +fn multi_node_runtime_coordinates_pause_resume_and_virtual_time() { + let mut runtime = Runtime::new(101); + let handle = runtime.handle(); + let node_a = runtime.create_node().name("a").build(); + let node_b = runtime.create_node().name("b").build(); + let events = Arc::new(Mutex::new(Vec::new())); + + node_b.pause(); + + runtime.block_on({ + let events = Arc::clone(&events); + async move { + let a_handle = handle.clone(); + let a_events = Arc::clone(&events); + let a = node_a.spawn(async move { + a_events.lock().push(("a_started", a_handle.now())); + a_handle.sleep(Duration::from_millis(3)).await; + a_events.lock().push(("a_finished", a_handle.now())); + }); + + let b_handle = handle.clone(); + let b_events = Arc::clone(&events); + let b = node_b.spawn(async move { + b_events.lock().push(("b_started", b_handle.now())); + b_handle.sleep(Duration::from_millis(2)).await; + b_events.lock().push(("b_finished", b_handle.now())); + }); + + handle.sleep(Duration::from_millis(1)).await; + events.lock().push(("main_resumed_b", handle.now())); + node_b.resume(); + + a.await.expect("node a task should complete"); + b.await.expect("node b task should complete"); + } + }); + + let events = events.lock().clone(); + assert!(events.contains(&("a_started", Duration::ZERO))); + assert!(events.contains(&("main_resumed_b", Duration::from_millis(1)))); + assert!(events.contains(&("b_started", Duration::from_millis(1)))); + assert!(events.contains(&("a_finished", Duration::from_millis(3)))); + assert!(events.contains(&("b_finished", Duration::from_millis(3)))); + assert_eq!(runtime.elapsed(), Duration::from_millis(3)); +} + +/// Checks that runtime-owned buggify decisions consume the same seeded RNG +/// sequence as an explicit `Rng`, making injected faults replayable by seed. +#[test] +fn runtime_buggify_matches_standalone_rng_sequence() { + let seed = 77; + let runtime = Runtime::new(seed); + let expected = Rng::new(seed); + + buggify::enable(&runtime); + expected.enable_buggify(); + + let actual = (0..8) + .map(|_| buggify::should_inject_fault_with_prob(&runtime, 0.5)) + .collect::>(); + let expected = (0..8).map(|_| expected.buggify_with_prob(0.5)).collect::>(); + + assert_eq!(actual, expected); + assert!(buggify::is_enabled(&runtime)); + + buggify::disable(&runtime); + assert!(!buggify::is_enabled(&runtime)); + assert!(!buggify::should_inject_fault_with_prob(&runtime, 1.0)); +} + +/// Verifies timeout races are driven by virtual time, not wall time: the fast +/// node completes at 2ms, then the slow node times out at the shared 4ms +/// deadline. +#[test] +fn multi_node_timeout_uses_shared_virtual_clock() { + let mut runtime = Runtime::new(303); + let handle = runtime.handle(); + let slow_node = runtime.create_node().name("slow").build(); + let fast_node = runtime.create_node().name("fast").build(); + + let output = runtime.block_on(async move { + let slow_handle = handle.clone(); + let slow = slow_node.spawn(async move { + slow_handle + .timeout(Duration::from_millis(4), async { + slow_handle.sleep(Duration::from_millis(10)).await; + "slow-finished" + }) + .await + }); + + let fast_handle = handle.clone(); + let fast = fast_node.spawn(async move { + fast_handle.sleep(Duration::from_millis(2)).await; + ("fast-finished", fast_handle.now()) + }); + + ( + slow.await.expect("slow node task should complete"), + fast.await.expect("fast node task should complete"), + ) + }); + + let (slow, fast) = output; + assert_eq!(fast, ("fast-finished", Duration::from_millis(2))); + assert_eq!(slow.unwrap_err().duration(), Duration::from_millis(4)); + assert_eq!(runtime.elapsed(), Duration::from_millis(4)); +} diff --git a/crates/snapshot/Cargo.toml b/crates/snapshot/Cargo.toml index f9f767ce18e..aa51c4e3bd8 100644 --- a/crates/snapshot/Cargo.toml +++ b/crates/snapshot/Cargo.toml @@ -35,6 +35,7 @@ spacetimedb-core = { path = "../core", features = ["test"] } spacetimedb-schema = { path = "../schema" } spacetimedb-datastore = { path = "../datastore", features = ["test"] } spacetimedb-durability = { workspace = true, features = ["test"] } +spacetimedb-runtime = { workspace = true } anyhow.workspace = true env_logger.workspace = true diff --git a/crates/snapshot/tests/remote.rs b/crates/snapshot/tests/remote.rs index 41097b33abd..1c6c51fe8e7 100644 --- a/crates/snapshot/tests/remote.rs +++ b/crates/snapshot/tests/remote.rs @@ -227,14 +227,14 @@ impl SourceSnapshot { async fn create_snapshot(repo: Arc) -> anyhow::Result { let start = Instant::now(); - let rt = tokio::runtime::Handle::current(); + let rt = spacetimedb_runtime::Handle::tokio_current(); // NOTE: `_db` needs to stay alive until the snapshot is taken, // because the snapshot worker holds only a weak reference. let (mut watch, _db) = spawn_blocking(|| { let persistence = Persistence { durability: Arc::new(NoDurability::default()), disk_size: Arc::new(|| Ok(<_>::default())), - snapshots: Some(SnapshotWorker::new(repo, snapshot::Compression::Disabled)), + snapshots: Some(SnapshotWorker::new(repo, snapshot::Compression::Disabled, rt.clone())), runtime: rt, }; let db = TestDB::open_db(EmptyHistory::new(), Some(persistence), None, 0)?;