Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
42e55dc
snapshot abstraction
Shubham8287 May 8, 2026
f508a04
lint
Shubham8287 May 8, 2026
5356b81
Add runtime crate and RuntimeDispatch integration
Shubham8287 May 8, 2026
c83ed2e
LockedFsRepo
Shubham8287 May 8, 2026
813e418
comments
Shubham8287 May 8, 2026
5946261
cleanup
Shubham8287 May 8, 2026
1f6bdcb
Merge remote-tracking branch 'origin/master' into shub/persistence-ab…
Shubham8287 May 11, 2026
2104ced
lint
Shubham8287 May 11, 2026
fc2e146
make sim module mostly non_std
Shubham8287 May 11, 2026
e4de2bd
drop durability in reopen test helper
Shubham8287 May 11, 2026
4050da2
Merge branch 'shub/persistence-abstraction' into shub/sim
Shubham8287 May 11, 2026
795a704
drop durability in test
Shubham8287 May 11, 2026
e072845
Merge branch 'shub/persistence-abstraction' into shub/sim
Shubham8287 May 11, 2026
425e728
fix snapshot compressor
Shubham8287 May 11, 2026
466481c
minor fixes
Shubham8287 May 11, 2026
7d1e21d
minor fix
Shubham8287 May 11, 2026
a521298
fixes
Shubham8287 May 12, 2026
e59ac12
fix unneccessary diff
Shubham8287 May 12, 2026
d074cf0
polishing
Shubham8287 May 13, 2026
9789d70
more polishing
Shubham8287 May 13, 2026
8cd609c
update readme
Shubham8287 May 13, 2026
c62e8b2
Merge remote-tracking branch 'origin/master' into shub/sim
Shubham8287 May 13, 2026
730028f
Runtime -> Handle
Shubham8287 May 13, 2026
35cbea9
Apply suggestions from code review
Shubham8287 May 13, 2026
5af7fd9
Update crates/commitlog/src/lib.rs
Shubham8287 May 13, 2026
52783ce
compile fix
Shubham8287 May 13, 2026
30012db
lint
Shubham8287 May 13, 2026
d9f009b
fix Cargo.toml
Shubham8287 May 13, 2026
3b76725
endlines on README
Shubham8287 May 13, 2026
9996a16
comments
Shubham8287 May 13, 2026
d5992a2
add extern alloc
Shubham8287 May 14, 2026
6079ef0
remove futures dependency
Shubham8287 May 14, 2026
8601d78
coverage matrics
Shubham8287 May 13, 2026
b27b021
update determinism coverage
Shubham8287 May 14, 2026
4f6ca23
put extern alloc behing gate
Shubham8287 May 14, 2026
651f8c6
join handle cleanup
Shubham8287 May 14, 2026
74b283c
update readme
Shubham8287 May 14, 2026
15b98a0
README about blcoking code
Shubham8287 May 14, 2026
3c525f2
comment
Shubham8287 May 14, 2026
77ebb41
executor split
Shubham8287 May 14, 2026
76a8228
lint
Shubham8287 May 14, 2026
7876599
lint
Shubham8287 May 14, 2026
0b2c53c
unused import lint
Shubham8287 May 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"crates/physical-plan",
"crates/primitives",
"crates/query",
"crates/runtime",
"crates/sats",
"crates/schema",
"crates/smoketests",
Expand Down Expand Up @@ -139,6 +140,7 @@ spacetimedb-pg = { path = "crates/pg", version = "=2.2.0" }
spacetimedb-physical-plan = { path = "crates/physical-plan", version = "=2.2.0" }
spacetimedb-primitives = { path = "crates/primitives", version = "=2.2.0" }
spacetimedb-query = { path = "crates/query", version = "=2.2.0" }
spacetimedb-runtime = { path = "crates/runtime", version = "=2.2.0" }
spacetimedb-sats = { path = "crates/sats", version = "=2.2.0" }
spacetimedb-schema = { path = "crates/schema", version = "=2.2.0" }
spacetimedb-standalone = { path = "crates/standalone", version = "=2.2.0" }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spacetimedb-primitives.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-physical-plan.workspace = true
spacetimedb-query.workspace = true
spacetimedb-runtime = { workspace = true, features = ["tokio"] }
spacetimedb-sats = { workspace = true, features = ["serde"] }
spacetimedb-schema.workspace = true
spacetimedb-table.workspace = true
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/db/durability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
use spacetimedb_durability::Transaction;
use spacetimedb_lib::Identity;
use spacetimedb_sats::ProductValue;
use tokio::{runtime, time::timeout};

use crate::db::persistence::Durability;
use spacetimedb_runtime::Handle;

pub(super) fn request_durability(
durability: &Durability,
Expand All @@ -32,11 +32,11 @@ pub(super) fn request_durability(
}));
}

pub(super) fn spawn_close(durability: Arc<Durability>, runtime: &runtime::Handle, database_identity: Identity) {
let rt = runtime.clone();
rt.spawn(async move {
let label = format!("[{database_identity}]");
match timeout(Duration::from_secs(10), durability.close()).await {
pub(super) fn spawn_close(durability: Arc<Durability>, runtime: &Handle, database_identity: Identity) {
let label = format!("[{database_identity}]");
let runtime = runtime.clone();
runtime.clone().spawn(async move {
match runtime.timeout(Duration::from_secs(10), durability.close()).await {
Err(_elapsed) => {
error!("{label} timeout waiting for durability shutdown");
}
Expand Down
24 changes: 18 additions & 6 deletions crates/core/src/db/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use spacetimedb_paths::server::ServerDataDir;
use spacetimedb_snapshot::DynSnapshotRepo;

use crate::{messages::control_db::Database, util::asyncify};
use spacetimedb_runtime::Handle;

use super::{
relational_db::{self, Txdata},
Expand Down Expand Up @@ -41,8 +42,8 @@ pub struct Persistence {
/// persistent (as opposed to in-memory) databases. This is enforced by
/// this type.
pub snapshots: Option<SnapshotWorker>,
/// The tokio runtime onto which durability-related tasks shall be spawned.
pub runtime: tokio::runtime::Handle,
/// Runtime onto which durability-related tasks shall be spawned.
pub runtime: Handle,
}

impl Persistence {
Expand All @@ -52,6 +53,15 @@ impl Persistence {
disk_size: impl Fn() -> io::Result<SizeOnDisk> + Send + Sync + 'static,
snapshots: Option<SnapshotWorker>,
runtime: tokio::runtime::Handle,
) -> Self {
Self::new_with_runtime(durability, disk_size, snapshots, Handle::tokio(runtime))
}

pub fn new_with_runtime(
durability: impl spacetimedb_durability::Durability<TxData = Txdata> + 'static,
disk_size: impl Fn() -> io::Result<SizeOnDisk> + Send + Sync + 'static,
snapshots: Option<SnapshotWorker>,
runtime: Handle,
) -> Self {
Self {
durability: Arc::new(durability),
Expand Down Expand Up @@ -91,7 +101,7 @@ impl Persistence {
Option<Arc<Durability>>,
Option<DiskSizeFn>,
Option<SnapshotWorker>,
Option<tokio::runtime::Handle>,
Option<Handle>,
) {
this.map(
|Self {
Expand Down Expand Up @@ -143,13 +153,15 @@ impl PersistenceProvider for LocalPersistenceProvider {
async fn persistence(&self, database: &Database, replica_id: u64) -> anyhow::Result<Persistence> {
let replica_dir = self.data_dir.replica(replica_id);
let snapshot_dir = replica_dir.snapshots();
let runtime = Handle::tokio_current();

let database_identity = database.database_identity;
let snapshot_worker =
asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id))
.await
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled))?;
let (durability, disk_size) = relational_db::local_durability(replica_dir, Some(&snapshot_worker)).await?;
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled, runtime.clone()))?;
let (durability, disk_size) =
relational_db::local_durability(replica_dir, runtime.clone(), Some(&snapshot_worker)).await?;

tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
snapshot_worker.subscribe(),
Expand All @@ -162,7 +174,7 @@ impl PersistenceProvider for LocalPersistenceProvider {
durability,
disk_size,
snapshots: Some(snapshot_worker),
runtime: tokio::runtime::Handle::current(),
runtime,
})
}
}
29 changes: 18 additions & 11 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use spacetimedb_lib::ConnectionId;
use spacetimedb_lib::Identity;
use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath};
use spacetimedb_primitives::*;
use spacetimedb_runtime::Handle;
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::raw_identifier::RawIdentifier;
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
Expand Down Expand Up @@ -99,7 +100,7 @@ pub struct RelationalDB {

inner: Locking,
durability: Option<Arc<Durability>>,
durability_runtime: Option<tokio::runtime::Handle>,
durability_runtime: Option<Handle>,
snapshot_worker: Option<SnapshotWorker>,

row_count_fn: RowCountFn,
Expand Down Expand Up @@ -1669,9 +1670,9 @@ pub type LocalDurability = Arc<durability::Local<ProductValue>>;
/// of the commitlog.
pub async fn local_durability(
replica_dir: ReplicaDir,
runtime: Handle,
snapshot_worker: Option<&SnapshotWorker>,
) -> Result<(LocalDurability, DiskSizeFn), DBError> {
let rt = tokio::runtime::Handle::current();
let on_new_segment = snapshot_worker.map(|snapshot_worker| {
let snapshot_worker = snapshot_worker.clone();
Arc::new(move || {
Expand All @@ -1683,7 +1684,7 @@ pub async fn local_durability(
let local = asyncify(move || {
durability::Local::open(
replica_dir.clone(),
rt,
runtime,
<_>::default(),
// Give the durability a handle to request a new snapshot run,
// which it will send down whenever we rotate commitlog segments.
Expand Down Expand Up @@ -1949,19 +1950,22 @@ pub mod tests_utils {
) -> Result<(RelationalDB, Arc<durability::Local<ProductValue>>), DBError> {
let snapshots = want_snapshot_repo
.then(|| {
open_snapshot_repo(root.snapshots(), db_identity, replica_id)
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled))
open_snapshot_repo(root.snapshots(), db_identity, replica_id).map(|repo| {
SnapshotWorker::new(repo, snapshot::Compression::Disabled, Handle::tokio(rt.clone()))
})
})
.transpose()?;

let (local, disk_size_fn) = rt.block_on(local_durability(root.clone(), snapshots.as_ref()))?;
let runtime = Handle::tokio(rt.clone());
let (local, disk_size_fn) =
rt.block_on(local_durability(root.clone(), runtime.clone(), snapshots.as_ref()))?;
let history = local.as_history();

let persistence = Persistence {
durability: local.clone(),
disk_size: disk_size_fn,
snapshots,
runtime: rt,
runtime,
};

let (db, _) = RelationalDB::open(
Expand Down Expand Up @@ -2074,17 +2078,20 @@ pub mod tests_utils {
) -> Result<(RelationalDB, Arc<durability::Local<ProductValue>>), DBError> {
let snapshots = want_snapshot_repo
.then(|| {
open_snapshot_repo(root.snapshots(), Identity::ZERO, 0)
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled))
open_snapshot_repo(root.snapshots(), Identity::ZERO, 0).map(|repo| {
SnapshotWorker::new(repo, snapshot::Compression::Disabled, Handle::tokio(rt.clone()))
})
})
.transpose()?;
let (local, disk_size_fn) = rt.block_on(local_durability(root.clone(), snapshots.as_ref()))?;
let runtime = Handle::tokio(rt.clone());
let (local, disk_size_fn) =
rt.block_on(local_durability(root.clone(), runtime.clone(), snapshots.as_ref()))?;
let history = local.as_history();
let persistence = Persistence {
durability: local.clone(),
disk_size: disk_size_fn,
snapshots,
runtime: rt,
runtime,
};
let db = Self::open_db(history, Some(persistence), None, 0)?;

Expand Down
Loading
Loading