From 8a8860a81f1b52134bcb3c976bcb461769b5e8f2 Mon Sep 17 00:00:00 2001 From: Grzegorz Uriasz Date: Tue, 31 Mar 2026 15:18:53 +0200 Subject: [PATCH] Fetch latest db version on cache miss + test --- core/rs/core/src/db_version.rs | 32 ++++++- core/rs/core/src/lib.rs | 43 +++++++++ .../src/t/test_db_version.rs | 87 +++++++++++++++++++ 3 files changed, 158 insertions(+), 4 deletions(-) diff --git a/core/rs/core/src/db_version.rs b/core/rs/core/src/db_version.rs index 72e86c3bd..815fc1c41 100644 --- a/core/rs/core/src/db_version.rs +++ b/core/rs/core/src/db_version.rs @@ -3,6 +3,7 @@ use core::mem; use crate::alloc::string::ToString; use crate::alloc::{boxed::Box, vec::Vec}; +use alloc::collections::btree_map::Entry; use alloc::collections::BTreeMap; use alloc::format; use alloc::string::String; @@ -223,11 +224,34 @@ pub fn insert_db_version( (*ext_data).lastDbVersions as *mut BTreeMap, i64>, )); - if let Some(db_v) = last_db_versions.get(insert_site_id) { - if *db_v >= insert_db_vrsn { - // already inserted a greater or equal db version! - return Ok(()); + let cached_db_v = match last_db_versions.entry(insert_site_id.to_vec()) { + Entry::Occupied(entry) => *entry.get(), + Entry::Vacant(entry) => { + let db_version_stmt = (*ext_data).pDbVersionStmt; + let bind_result = + db_version_stmt.bind_blob(1, insert_site_id, sqlite::Destructor::STATIC); + if let Err(rc) = bind_result { + reset_cached_stmt(db_version_stmt)?; + return Err(rc); + } + let fetched = match db_version_stmt.step() { + Ok(ResultCode::ROW) => db_version_stmt.column_int64(0), + // No version found for this site_id + // use -1 as it will never be greater than the inserted db version + Ok(_) => -1, + Err(rc) => { + reset_cached_stmt(db_version_stmt)?; + return Err(rc); + } + }; + reset_cached_stmt(db_version_stmt)?; + *entry.insert(fetched) } + }; + + if cached_db_v >= insert_db_vrsn { + // already inserted a greater or equal db version! + return Ok(()); } // ensure the site_id exists in the crsql_site_id table diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index 368ce6b54..8ad852a24 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -484,6 +484,21 @@ pub extern "C" fn sqlite3_crsqlcore_init( return null_mut(); } + #[cfg(feature = "test")] + if let Err(_) = db.create_function_v2( + "crsql_cache_db_version", + 1, + sqlite::UTF8 | sqlite::DETERMINISTIC, + Some(ext_data as *mut c_void), + Some(x_crsql_cache_db_version), + None, + None, + None, + ) { + unsafe { crsql_freeExtData(ext_data) }; + return null_mut(); + } + let rc = db .create_function_v2( "crsql_set_db_version", @@ -1023,6 +1038,34 @@ unsafe extern "C" fn x_crsql_cache_pk_cl( } } +/** + * Get the db_version cached in the ext data for the current transaction for a given site_id. + * only used for test to inspect the lastDbVersions map. + */ +#[cfg(feature = "test")] +unsafe extern "C" fn x_crsql_cache_db_version( + ctx: *mut sqlite::context, + argc: i32, + argv: *mut *mut sqlite::value, +) { + if argc == 0 { + ctx.result_error( + "Wrong number of args provided to crsql_cache_db_version. Provide the site id.", + ); + return; + } + + let ext_data = ctx.user_data() as *mut c::crsql_ExtData; + let args = sqlite::args!(argc, argv); + let site_id = args[0].blob(); + + let db_versions_map = mem::ManuallyDrop::new(Box::from_raw( + (*ext_data).lastDbVersions as *mut BTreeMap, i64>, + )); + let res = db_versions_map.get(site_id).cloned().unwrap_or(-1); + sqlite::result_int64(ctx, res); +} + /** * Return the timestamp for the current transaction. */ diff --git a/core/rs/integration_check/src/t/test_db_version.rs b/core/rs/integration_check/src/t/test_db_version.rs index 6eee9230d..2dfe7b4e9 100644 --- a/core/rs/integration_check/src/t/test_db_version.rs +++ b/core/rs/integration_check/src/t/test_db_version.rs @@ -362,12 +362,99 @@ fn get_cache_ordinal(db: *mut sqlite::sqlite3, site_id: &[u8]) -> Result Result { + let stmt = db.prepare_v2("SELECT crsql_cache_db_version(?);")?; + stmt.bind_blob(1, site_id, sqlite::Destructor::STATIC)?; + stmt.step()?; + Ok(stmt.column_int64(0)) +} + +/// Test that insert_db_version always populates the lastDbVersions cache. +/// This is a regression test for https://github.com/superfly/cr-sqlite/pull/21 +/// Previously, the cache was only populated when the SET statement returned a ROW, +/// making it mostly useless since it's cleared on commit/rollback. +fn test_insert_db_version_cache() -> Result<(), ResultCode> { + let c = crate::opendb().expect("db opened"); + let db = &c.db; + db.db + .exec_safe("CREATE TABLE foo (a primary key not null, b);")?; + db.db.exec_safe("SELECT crsql_as_crr('foo');")?; + + let remote_site = "remote_site".as_bytes(); + + db.db.exec_safe("BEGIN TRANSACTION;")?; + + // cache should be empty before any remote inserts + assert_eq!(-1, get_cache_db_version(db.db, remote_site)?); + + // insert a change from a remote site with db_version=1 + let pk1: [u8; 3] = [1, 9, 1]; + let stmt = db.db.prepare_v2( + "INSERT INTO crsql_changes VALUES ('foo', ?, 'b', 1, 1, 1, ?, 1, 0, 0);", + )?; + stmt.bind_blob(1, &pk1, sqlite::Destructor::STATIC)?; + stmt.bind_blob(2, remote_site, sqlite::Destructor::STATIC)?; + stmt.step()?; + + // cache should now contain db_version=1 for the remote site + assert_eq!(1, get_cache_db_version(db.db, remote_site)?); + + // insert another change from the same remote site with db_version=3 (higher) + let pk2: [u8; 3] = [1, 9, 2]; + let stmt2 = db.db.prepare_v2( + "INSERT INTO crsql_changes VALUES ('foo', ?, 'b', 2, 1, 3, ?, 1, 0, 0);", + )?; + stmt2.bind_blob(1, &pk2, sqlite::Destructor::STATIC)?; + stmt2.bind_blob(2, remote_site, sqlite::Destructor::STATIC)?; + stmt2.step()?; + + // cache should now reflect the higher db_version=3 + assert_eq!(3, get_cache_db_version(db.db, remote_site)?); + + // insert a change with a lower db_version=2 from the same site + // this should be short-circuited by the cache (no DB write needed) + let pk3: [u8; 3] = [1, 9, 3]; + let stmt3 = db.db.prepare_v2( + "INSERT INTO crsql_changes VALUES ('foo', ?, 'b', 3, 1, 2, ?, 1, 0, 0);", + )?; + stmt3.bind_blob(1, &pk3, sqlite::Destructor::STATIC)?; + stmt3.bind_blob(2, remote_site, sqlite::Destructor::STATIC)?; + stmt3.step()?; + + // cache should still show db_version=3 (not downgraded) + assert_eq!(3, get_cache_db_version(db.db, remote_site)?); + + // commit clears the cache + db.db.exec_safe("COMMIT;")?; + assert_eq!(-1, get_cache_db_version(db.db, remote_site)?); + + // new transaction: inserting a lower db_version (2) should be short-circuited + // because we fetch the latest db_version from DB on cache miss + db.db.exec_safe("BEGIN TRANSACTION;")?; + let pk4: [u8; 3] = [1, 9, 4]; + let stmt4 = db.db.prepare_v2( + "INSERT INTO crsql_changes VALUES ('foo', ?, 'b', 4, 1, 2, ?, 1, 0, 0);", + )?; + stmt4.bind_blob(1, &pk4, sqlite::Destructor::STATIC)?; + stmt4.bind_blob(2, remote_site, sqlite::Destructor::STATIC)?; + stmt4.step()?; + + // The cache contains the latest db_version from the DB + assert_eq!(3, get_cache_db_version(db.db, remote_site)?); + + db.db.exec_safe("COMMIT;")?; + + Ok(()) +} + pub fn run_suite() -> Result<(), String> { test_fetch_db_version_from_storage()?; test_next_db_version()?; test_get_or_set_site_ordinal() .map_err(|e| format!("test_get_or_set_site_ordinal failed: {:?}", e))?; test_get_or_set_pk_cl().map_err(|e| format!("test_get_or_set_pk_cl failed: {:?}", e))?; + test_insert_db_version_cache() + .map_err(|e| format!("test_insert_db_version_cache failed: {:?}", e))?; Ok(()) }