Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions core/rs/core/src/db_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::mem;

use crate::alloc::string::ToString;
use crate::alloc::{boxed::Box, vec::Vec};
use alloc::collections::btree_map::Entry;
use alloc::collections::BTreeMap;
use alloc::format;
use alloc::string::String;
Expand Down Expand Up @@ -223,11 +224,34 @@ pub fn insert_db_version(
(*ext_data).lastDbVersions as *mut BTreeMap<Vec<u8>, i64>,
));

if let Some(db_v) = last_db_versions.get(insert_site_id) {
if *db_v >= insert_db_vrsn {
// already inserted a greater or equal db version!
return Ok(());
let cached_db_v = match last_db_versions.entry(insert_site_id.to_vec()) {
Entry::Occupied(entry) => *entry.get(),
Entry::Vacant(entry) => {
let db_version_stmt = (*ext_data).pDbVersionStmt;
let bind_result =
db_version_stmt.bind_blob(1, insert_site_id, sqlite::Destructor::STATIC);
if let Err(rc) = bind_result {
reset_cached_stmt(db_version_stmt)?;
return Err(rc);
}
let fetched = match db_version_stmt.step() {
Ok(ResultCode::ROW) => db_version_stmt.column_int64(0),
// No version found for this site_id
// use -1 as it will never be greater than the inserted db version
Ok(_) => -1,
Err(rc) => {
reset_cached_stmt(db_version_stmt)?;
return Err(rc);
}
};
reset_cached_stmt(db_version_stmt)?;
*entry.insert(fetched)
}
};

if cached_db_v >= insert_db_vrsn {
// already inserted a greater or equal db version!
return Ok(());
}

// ensure the site_id exists in the crsql_site_id table
Expand Down
43 changes: 43 additions & 0 deletions core/rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,21 @@ pub extern "C" fn sqlite3_crsqlcore_init(
return null_mut();
}

#[cfg(feature = "test")]
if let Err(_) = db.create_function_v2(
"crsql_cache_db_version",
1,
sqlite::UTF8 | sqlite::DETERMINISTIC,
Some(ext_data as *mut c_void),
Some(x_crsql_cache_db_version),
None,
None,
None,
) {
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}

let rc = db
.create_function_v2(
"crsql_set_db_version",
Expand Down Expand Up @@ -1023,6 +1038,34 @@ unsafe extern "C" fn x_crsql_cache_pk_cl(
}
}

/**
* Get the db_version cached in the ext data for the current transaction for a given site_id.
* only used for test to inspect the lastDbVersions map.
*/
#[cfg(feature = "test")]
unsafe extern "C" fn x_crsql_cache_db_version(
ctx: *mut sqlite::context,
argc: i32,
argv: *mut *mut sqlite::value,
) {
if argc == 0 {
ctx.result_error(
"Wrong number of args provided to crsql_cache_db_version. Provide the site id.",
);
return;
}

let ext_data = ctx.user_data() as *mut c::crsql_ExtData;
let args = sqlite::args!(argc, argv);
let site_id = args[0].blob();

let db_versions_map = mem::ManuallyDrop::new(Box::from_raw(
(*ext_data).lastDbVersions as *mut BTreeMap<Vec<u8>, i64>,
));
let res = db_versions_map.get(site_id).cloned().unwrap_or(-1);
sqlite::result_int64(ctx, res);
}

/**
* Return the timestamp for the current transaction.
*/
Expand Down
87 changes: 87 additions & 0 deletions core/rs/integration_check/src/t/test_db_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,99 @@ fn get_cache_ordinal(db: *mut sqlite::sqlite3, site_id: &[u8]) -> Result<i64, Re
Ok(stmt.column_int64(0))
}

fn get_cache_db_version(db: *mut sqlite::sqlite3, site_id: &[u8]) -> Result<i64, ResultCode> {
let stmt = db.prepare_v2("SELECT crsql_cache_db_version(?);")?;
stmt.bind_blob(1, site_id, sqlite::Destructor::STATIC)?;
stmt.step()?;
Ok(stmt.column_int64(0))
}

/// Test that insert_db_version always populates the lastDbVersions cache.
/// This is a regression test for https://github.com/superfly/cr-sqlite/pull/21
/// Previously, the cache was only populated when the SET statement returned a ROW,
/// making it mostly useless since it's cleared on commit/rollback.
fn test_insert_db_version_cache() -> Result<(), ResultCode> {
let c = crate::opendb().expect("db opened");
let db = &c.db;
db.db
.exec_safe("CREATE TABLE foo (a primary key not null, b);")?;
db.db.exec_safe("SELECT crsql_as_crr('foo');")?;

let remote_site = "remote_site".as_bytes();

db.db.exec_safe("BEGIN TRANSACTION;")?;

// cache should be empty before any remote inserts
assert_eq!(-1, get_cache_db_version(db.db, remote_site)?);

// insert a change from a remote site with db_version=1
let pk1: [u8; 3] = [1, 9, 1];
let stmt = db.db.prepare_v2(
"INSERT INTO crsql_changes VALUES ('foo', ?, 'b', 1, 1, 1, ?, 1, 0, 0);",
)?;
stmt.bind_blob(1, &pk1, sqlite::Destructor::STATIC)?;
stmt.bind_blob(2, remote_site, sqlite::Destructor::STATIC)?;
stmt.step()?;

// cache should now contain db_version=1 for the remote site
assert_eq!(1, get_cache_db_version(db.db, remote_site)?);

// insert another change from the same remote site with db_version=3 (higher)
let pk2: [u8; 3] = [1, 9, 2];
let stmt2 = db.db.prepare_v2(
"INSERT INTO crsql_changes VALUES ('foo', ?, 'b', 2, 1, 3, ?, 1, 0, 0);",
)?;
stmt2.bind_blob(1, &pk2, sqlite::Destructor::STATIC)?;
stmt2.bind_blob(2, remote_site, sqlite::Destructor::STATIC)?;
stmt2.step()?;

// cache should now reflect the higher db_version=3
assert_eq!(3, get_cache_db_version(db.db, remote_site)?);

// insert a change with a lower db_version=2 from the same site
// this should be short-circuited by the cache (no DB write needed)
let pk3: [u8; 3] = [1, 9, 3];
let stmt3 = db.db.prepare_v2(
"INSERT INTO crsql_changes VALUES ('foo', ?, 'b', 3, 1, 2, ?, 1, 0, 0);",
)?;
stmt3.bind_blob(1, &pk3, sqlite::Destructor::STATIC)?;
stmt3.bind_blob(2, remote_site, sqlite::Destructor::STATIC)?;
stmt3.step()?;

// cache should still show db_version=3 (not downgraded)
assert_eq!(3, get_cache_db_version(db.db, remote_site)?);

// commit clears the cache
db.db.exec_safe("COMMIT;")?;
assert_eq!(-1, get_cache_db_version(db.db, remote_site)?);

// new transaction: inserting a lower db_version (2) should be short-circuited
// because we fetch the latest db_version from DB on cache miss
db.db.exec_safe("BEGIN TRANSACTION;")?;
let pk4: [u8; 3] = [1, 9, 4];
let stmt4 = db.db.prepare_v2(
"INSERT INTO crsql_changes VALUES ('foo', ?, 'b', 4, 1, 2, ?, 1, 0, 0);",
)?;
stmt4.bind_blob(1, &pk4, sqlite::Destructor::STATIC)?;
stmt4.bind_blob(2, remote_site, sqlite::Destructor::STATIC)?;
stmt4.step()?;

// The cache contains the latest db_version from the DB
assert_eq!(3, get_cache_db_version(db.db, remote_site)?);

db.db.exec_safe("COMMIT;")?;

Ok(())
}

pub fn run_suite() -> Result<(), String> {
test_fetch_db_version_from_storage()?;
test_next_db_version()?;
test_get_or_set_site_ordinal()
.map_err(|e| format!("test_get_or_set_site_ordinal failed: {:?}", e))?;
test_get_or_set_pk_cl().map_err(|e| format!("test_get_or_set_pk_cl failed: {:?}", e))?;
test_insert_db_version_cache()
.map_err(|e| format!("test_insert_db_version_cache failed: {:?}", e))?;
Ok(())
}

Expand Down
Loading