Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
739397a
dst
Shubham8287 Apr 3, 2026
1ae1d51
dst
Shubham8287 Apr 3, 2026
f26c11d
Merge branch 'master' into shub/dst
Shubham8287 Apr 11, 2026
927db4a
fix seed logic
Shubham8287 Apr 13, 2026
5555da3
remove dst levels
Shubham8287 Apr 14, 2026
90c0365
turbo influenced
Shubham8287 Apr 21, 2026
70b16a0
code organisation
Shubham8287 Apr 21, 2026
f52ef9e
local mocks
Shubham8287 Apr 22, 2026
63753df
remove deadcode
Shubham8287 Apr 22, 2026
749b118
target owned properties
Shubham8287 Apr 22, 2026
d8e3fd6
traits
Shubham8287 Apr 23, 2026
1372b3a
delete code
Shubham8287 Apr 23, 2026
dd33707
pluggable property
Shubham8287 Apr 23, 2026
3d96cf2
open close durability
Shubham8287 Apr 24, 2026
3cac9ef
typo
Shubham8287 Apr 24, 2026
0983c62
better properties
Shubham8287 Apr 24, 2026
bc93e1a
Merge remote-tracking branch 'origin/master' into shub/dst
Shubham8287 Apr 24, 2026
17a40c6
tmp
Shubham8287 Apr 27, 2026
7289133
broken standlone target
Shubham8287 Apr 28, 2026
19badab
cleanups
Shubham8287 Apr 29, 2026
12cf035
inmemory commitlog
Shubham8287 Apr 29, 2026
8b5f5bc
larger surface
Shubham8287 Apr 29, 2026
9d0e692
Merge branch 'master' into shub/dst
Shubham8287 Apr 29, 2026
858e09a
properties better
Shubham8287 Apr 29, 2026
c6fa137
improvements
Shubham8287 Apr 30, 2026
0e850dc
readability
Shubham8287 May 3, 2026
87f97ea
crash property
Shubham8287 May 4, 2026
22702a4
Merge branch 'master' into shub/dst
Shubham8287 May 4, 2026
b0490af
split properties
Shubham8287 May 4, 2026
831e9f2
improve relatioandb_commit:
Shubham8287 May 4, 2026
7282b9b
slim down to datastore focused
Shubham8287 May 5, 2026
f5197c4
improved simulator
Shubham8287 May 5, 2026
051f8ac
runtime crate
Shubham8287 May 5, 2026
37aa55b
snapshots sim
Shubham8287 May 6, 2026
d2b5eed
make dst snapshot in-memory
Shubham8287 May 6, 2026
8c6814d
Merge remote-tracking branch 'origin/master' into shub/dst
Shubham8287 May 7, 2026
27cb858
snapshot abstraction at worker
Shubham8287 May 7, 2026
692dfe2
cleanup
Shubham8287 May 7, 2026
76db165
Merge branch 'master' into shub/dst
Shubham8287 May 7, 2026
0b9875f
cleanup
Shubham8287 May 7, 2026
494b927
Merge branch 'shub/sim' into shub/dst
Shubham8287 May 12, 2026
13d53a5
fix
Shubham8287 May 12, 2026
d92ac0a
README
Shubham8287 May 12, 2026
a051857
Merge shub/sim into shub/dst
Shubham8287 May 13, 2026
5266bae
Merge branch 'shub/sim' into shub/dst
Shubham8287 May 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"crates/commitlog",
"crates/core",
"crates/data-structures",
"crates/dst",
"crates/datastore",
"crates/durability",
"crates/execution",
Expand Down
4 changes: 2 additions & 2 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Options {
/// The canonical commitlog API over a repository backend `R`.
///
/// The default backend is the on-disk filesystem repository
/// [`repo::Fs`], but tests may supply another [`Repo`]
/// [`repo::Fs`], but tests and simulators may supply another [`Repo`]
/// implementation.
///
/// Records in the log are of type `T`, which canonically is instantiated to
Expand Down Expand Up @@ -203,7 +203,7 @@ where
{
/// Open the log in `repo` with [`Options`].
///
/// This is useful for tests which provide a repository
/// This is useful for tests and simulators which provide a repository
/// implementation other than [`repo::Fs`].
pub fn open_with_repo(repo: R, opts: Options) -> io::Result<Self> {
let inner = commitlog::Generic::open(repo, opts)?;
Expand Down
2 changes: 2 additions & 0 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ pub trait RepoWithoutLockFile: Repo {}

impl<T: RepoWithoutLockFile> RepoWithoutLockFile for &T {}

impl RepoWithoutLockFile for Fs {}

#[cfg(any(test, feature = "test"))]
impl RepoWithoutLockFile for Memory {}

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/database_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, BufReader};
use tokio::io::{AsyncRead, BufReader, ReadBuf};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
Expand Down Expand Up @@ -592,7 +592,7 @@ fn seek_to(file: &mut File, buf: &mut [u8], num_lines: u32) -> io::Result<()> {
Ok(())
}

fn read_exact_at(file: &std::fs::File, buf: &mut [u8], offset: u64) -> io::Result<()> {
fn read_exact_at(file: &File, buf: &mut [u8], offset: u64) -> io::Result<()> {
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
Expand Down Expand Up @@ -641,7 +641,7 @@ impl MaybeFile {
}

impl AsyncRead for MaybeFile {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>) -> Poll<io::Result<()>> {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
match self.project() {
MaybeFileProj::File { inner } => inner.poll_read(cx, buf),
MaybeFileProj::Empty => Poll::Ready(Ok(())),
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/db/durability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub(super) fn spawn_close(durability: Arc<Durability>, runtime: &Handle, databas
info!("{label} durability shut down at tx offset: {offset:?}");
}
}
log::info!("closing spawn close");
});
}

Expand Down
17 changes: 15 additions & 2 deletions crates/core/src/db/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use async_trait::async_trait;
use spacetimedb_commitlog::SizeOnDisk;
use spacetimedb_durability::{DurabilityExited, TxOffset};
use spacetimedb_paths::server::ServerDataDir;
use spacetimedb_snapshot::DynSnapshotRepo;
use spacetimedb_runtime::Handle;
use spacetimedb_snapshot::{DynSnapshotRepo, SnapshotStore};

use crate::{messages::control_db::Database, util::asyncify};
use spacetimedb_runtime::Handle;

use super::{
relational_db::{self, Txdata},
Expand Down Expand Up @@ -36,6 +36,8 @@ pub struct Persistence {
/// Currently the expectation is that the reported size is the commitlog
/// size only.
pub disk_size: DiskSizeFn,
/// Optional snapshot store used during database restore.
pub snapshot_store: Option<Arc<dyn SnapshotStore>>,
/// An optional [SnapshotWorker].
///
/// The current expectation is that snapshots are only enabled for
Expand Down Expand Up @@ -63,9 +65,11 @@ impl Persistence {
snapshots: Option<SnapshotWorker>,
runtime: Handle,
) -> Self {
let snapshot_store = snapshots.as_ref().map(SnapshotWorker::snapshot_store);
Self {
durability: Arc::new(durability),
disk_size: Arc::new(disk_size),
snapshot_store,
snapshots,
runtime,
}
Expand All @@ -76,6 +80,13 @@ impl Persistence {
self.snapshots.as_ref().map(|worker| worker.snapshot_repo())
}

/// If snapshot restore is enabled, get the [SnapshotStore] to read from.
pub fn snapshot_store(&self) -> Option<Arc<dyn SnapshotStore>> {
self.snapshot_store
.clone()
.or_else(|| self.snapshots.as_ref().map(SnapshotWorker::snapshot_store))
}

/// Get the [TxOffset] reported as durable by the [Durability] impl.
///
/// Returns `Ok(None)` if no offset is durable yet, and `Err(DurabilityExited)`
Expand Down Expand Up @@ -107,6 +118,7 @@ impl Persistence {
|Self {
durability,
disk_size,
snapshot_store: _,
snapshots,
runtime,
}| (Some(durability), Some(disk_size), snapshots, Some(runtime)),
Expand Down Expand Up @@ -173,6 +185,7 @@ impl PersistenceProvider for LocalPersistenceProvider {
Ok(Persistence {
durability,
disk_size,
snapshot_store: Some(snapshot_worker.snapshot_store()),
snapshots: Some(snapshot_worker),
runtime,
})
Expand Down
24 changes: 13 additions & 11 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use spacetimedb_schema::schema::{
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema,
};
use spacetimedb_schema::table_name::TableName;
use spacetimedb_snapshot::{DynSnapshotRepo, ReconstructedSnapshot, SnapshotError, SnapshotRepository};
use spacetimedb_snapshot::{DynSnapshotRepo, ReconstructedSnapshot, SnapshotError, SnapshotRepository, SnapshotStore};
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::page_pool::PagePool;
use spacetimedb_table::table::{RowRef, TableScanIter};
Expand Down Expand Up @@ -279,10 +279,10 @@ impl RelationalDB {

let start_time = std::time::Instant::now();

let snapshot_repo = persistence.as_ref().and_then(|p| p.snapshot_repo());
let snapshot_store = persistence.as_ref().and_then(|p| p.snapshot_store());
let inner = Self::restore_from_snapshot_or_bootstrap(
database_identity,
snapshot_repo.as_deref(),
snapshot_store.as_deref(),
durable_tx_offset,
min_commitlog_offset,
page_pool,
Expand Down Expand Up @@ -473,22 +473,22 @@ impl RelationalDB {

fn restore_from_snapshot_or_bootstrap(
database_identity: Identity,
snapshot_repo: Option<&DynSnapshotRepo>,
snapshot_store: Option<&dyn SnapshotStore>,
durable_tx_offset: Option<TxOffset>,
min_commitlog_offset: TxOffset,
page_pool: PagePool,
) -> Result<Locking, RestoreSnapshotError> {
// Try to load the `ReconstructedSnapshot` at `snapshot_offset`.
fn try_load_snapshot(
database_identity: &Identity,
snapshot_repo: &DynSnapshotRepo,
snapshot_store: &dyn SnapshotStore,
snapshot_offset: TxOffset,
page_pool: &PagePool,
) -> Result<ReconstructedSnapshot, Box<SnapshotError>> {
log::info!("[{database_identity}] DATABASE: restoring snapshot of tx_offset {snapshot_offset}");
let start = std::time::Instant::now();

let snapshot = snapshot_repo
let snapshot = snapshot_store
.read_snapshot(snapshot_offset, page_pool)
.map_err(Box::new)?;

Expand Down Expand Up @@ -554,11 +554,11 @@ impl RelationalDB {
}
}

if let Some((snapshot_repo, durable_tx_offset)) = snapshot_repo.zip(durable_tx_offset) {
if let Some((snapshot_store, durable_tx_offset)) = snapshot_store.zip(durable_tx_offset) {
// Mark any newer snapshots as invalid, as the history past
// `durable_tx_offset` may have been reset and thus diverge from
// any snapshots taken earlier.
snapshot_repo
snapshot_store
.invalidate_newer_snapshots(durable_tx_offset)
.map_err(|e| RestoreSnapshotError::Invalidate {
offset: durable_tx_offset,
Expand All @@ -569,7 +569,7 @@ impl RelationalDB {
// range `(min_commitlog_offset + 1)..=durable_tx_offset`.
let mut upper_bound = durable_tx_offset;
loop {
let Some(snapshot_offset) = snapshot_repo
let Some(snapshot_offset) = snapshot_store
.latest_snapshot_older_than(upper_bound)
.map_err(Box::new)?
else {
Expand All @@ -579,7 +579,7 @@ impl RelationalDB {
log::debug!("snapshot_offset={snapshot_offset} min_commitlog_offset={min_commitlog_offset}");
break;
}
match try_load_snapshot(&database_identity, snapshot_repo, snapshot_offset, &page_pool) {
match try_load_snapshot(&database_identity, snapshot_store, snapshot_offset, &page_pool) {
Ok(snapshot) if snapshot.database_identity != database_identity => {
return Err(RestoreSnapshotError::IdentityMismatch {
expected: database_identity,
Expand All @@ -595,7 +595,7 @@ impl RelationalDB {
// Newly created snapshots should not depend on it.
if !is_transient_error(&e) {
log::info!("invalidating bad snapshot at {snapshot_offset}");
snapshot_repo.invalidate_snapshot(snapshot_offset).map_err(|e| {
snapshot_store.invalidate_snapshot(snapshot_offset).map_err(|e| {
RestoreSnapshotError::Invalidate {
offset: snapshot_offset,
source: Box::new(e),
Expand Down Expand Up @@ -1964,6 +1964,7 @@ pub mod tests_utils {
let persistence = Persistence {
durability: local.clone(),
disk_size: disk_size_fn,
snapshot_store: snapshots.as_ref().map(SnapshotWorker::snapshot_store),
snapshots,
runtime,
};
Expand Down Expand Up @@ -2090,6 +2091,7 @@ pub mod tests_utils {
let persistence = Persistence {
durability: local.clone(),
disk_size: disk_size_fn,
snapshot_store: snapshots.as_ref().map(SnapshotWorker::snapshot_store),
snapshots,
runtime,
};
Expand Down
26 changes: 19 additions & 7 deletions crates/core/src/db/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use prometheus::{Histogram, IntGauge};
use spacetimedb_datastore::locking_tx_datastore::{committed_state::CommittedState, datastore::Locking};
use spacetimedb_durability::TxOffset;
use spacetimedb_lib::Identity;
use spacetimedb_snapshot::{CompressionStats, DynSnapshotRepo};
use spacetimedb_snapshot::{BoxedPendingSnapshot, CompressionStats, DynSnapshotRepo, SnapshotRepo, SnapshotStore};
use tokio::sync::watch;

use crate::worker_metrics::WORKER_METRICS;
Expand Down Expand Up @@ -62,6 +62,7 @@ pub struct SnapshotWorker {
snapshot_created: watch::Sender<TxOffset>,
request_snapshot: mpsc::UnboundedSender<Request>,
snapshot_repository: Arc<DynSnapshotRepo>,
snapshot_store: Arc<dyn SnapshotStore>,
}

impl SnapshotWorker {
Expand All @@ -70,20 +71,25 @@ impl SnapshotWorker {
/// The handle is only partially initialized, as it is lacking the
/// [SnapshotDatabaseState]. This allows control code to [Self::subscribe]
/// to future snapshots before handing off the worker to the database.
pub fn new(snapshot_repository: Arc<DynSnapshotRepo>, compression: Compression, rt: Handle) -> Self {
let database = snapshot_repository.database_identity();
let latest_snapshot = snapshot_repository.latest_snapshot().ok().flatten().unwrap_or(0);
pub fn new<R>(snapshot_repo: Arc<R>, compression: Compression, rt: Handle) -> Self
where
R: SnapshotRepo<Pending = BoxedPendingSnapshot> + 'static,
{
let snapshot_store: Arc<dyn SnapshotStore> = snapshot_repo.clone();
let snapshot_repo: Arc<DynSnapshotRepo> = snapshot_repo;
let database = snapshot_repo.database_identity();
let latest_snapshot = snapshot_repo.latest_snapshot().ok().flatten().unwrap_or(0);
let (snapshot_created, _) = watch::channel(latest_snapshot);
let (request_tx, request_rx) = mpsc::unbounded();

let actor = SnapshotWorkerActor {
snapshot_requests: request_rx,
snapshot_repo: snapshot_repository.clone(),
snapshot_repo: snapshot_repo.clone(),
snapshot_created: snapshot_created.clone(),
metrics: SnapshotMetrics::new(database),
rt: rt.clone(),
compression: compression.is_enabled().then(|| Compressor {
snapshot_repo: snapshot_repository.clone(),
snapshot_repo: snapshot_repo.clone(),
metrics: CompressionMetrics::new(database),
stats: <_>::default(),
rt: rt.clone(),
Expand All @@ -94,7 +100,8 @@ impl SnapshotWorker {
Self {
snapshot_created,
request_snapshot: request_tx,
snapshot_repository,
snapshot_repository: snapshot_repo,
snapshot_store,
}
}

Expand All @@ -113,6 +120,11 @@ impl SnapshotWorker {
self.snapshot_repository.clone()
}

/// Get the snapshot store this worker is operating on.
pub fn snapshot_store(&self) -> Arc<dyn SnapshotStore> {
self.snapshot_store.clone()
}

/// Request a snapshot to be taken.
///
/// The snapshot will be taken at some point in the future.
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2102,6 +2102,7 @@ mod tests {
Some(Persistence {
durability: durability.clone(),
disk_size: Arc::new(|| Ok(<_>::default())),
snapshot_store: None,
snapshots: None,
runtime: spacetimedb_runtime::Handle::tokio(rt),
}),
Expand Down
Loading
Loading