From b446bbd83b368d1a89fb6f28b2e1abc6eef194a8 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Wed, 27 May 2026 13:38:30 -0300 Subject: [PATCH 1/4] chore: re-add execution of network transactions --- CHANGELOG.md | 1 + bin/ntx-builder/src/actor/execute.rs | 17 +- bin/ntx-builder/src/actor/mod.rs | 300 ++++++++++-------- bin/ntx-builder/src/chain_state.rs | 4 - bin/ntx-builder/src/commands/mod.rs | 17 + bin/ntx-builder/src/db/mod.rs | 14 + .../src/db/models/queries/notes.rs | 26 ++ .../src/db/models/queries/tests.rs | 32 ++ bin/ntx-builder/src/lib.rs | 70 ++-- 9 files changed, 327 insertions(+), 154 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7897fab72..ff3e3b242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ - [BREAKING] Removed `--wallet-filepath` / `--counter-filepath` flags and the `MIDEN_MONITOR_WALLET_FILEPATH` / `MIDEN_MONITOR_COUNTER_FILEPATH` env vars from the network monitor. The monitor now keeps wallet and counter accounts fully in memory and regenerates them on every startup; the dashboard's counter value resets to zero on restart. - Added `--counter-pending-unhealthy-threshold` (env `MIDEN_MONITOR_COUNTER_PENDING_UNHEALTHY_THRESHOLD`, default `5`) to the network monitor: the Network Transactions card now flips unhealthy when the gap between expected and observed counter values stays above the threshold for three consecutive polls. - Allowed network transaction submission conditionally via the gRPC `SubmitProvenTx` and `SubmitProvenTxBatch` endpoints: the NTX builder can now send a key in the `x-miden-network-tx-auth` header that enables submitting network transactions ([#2131](https://github.com/0xMiden/node/issues/2131)). +- Added `--tx-expiration-delta` (env `MIDEN_NODE_NTX_BUILDER_TX_EXPIRATION_DELTA`, default `30`) to the network transaction builder: submitted network transactions now expire on-chain after this many blocks, and the builder reuses the same delta as the local window before resubmitting a transaction that has not landed ([#2148](https://github.com/0xMiden/node/pull/2148)). ## v0.14.11 (TBD) diff --git a/bin/ntx-builder/src/actor/execute.rs b/bin/ntx-builder/src/actor/execute.rs index 063cd19f5..f4b858908 100644 --- a/bin/ntx-builder/src/actor/execute.rs +++ b/bin/ntx-builder/src/actor/execute.rs @@ -32,6 +32,7 @@ use miden_protocol::transaction::{ TransactionArgs, TransactionId, TransactionInputs, + TransactionScript, }; use miden_protocol::vm::FutureMaybeSend; use miden_remote_prover_client::RemoteTransactionProver; @@ -153,18 +154,27 @@ pub struct NtxContext { /// Maximum number of VM execution cycles for network transactions. max_cycles: u32, + /// Pre-compiled transaction script that sets the network tx's on-chain expiration delta. Cloned + /// into the [`TransactionArgs`] of the executed transaction. + expiration_script: TransactionScript, + /// [`ExponentialBuilder`] used to back off retries on transient request failures. request_backoff: ExponentialBuilder, } impl NtxContext { /// Creates a new [`NtxContext`] instance. + #[expect( + clippy::too_many_arguments, + reason = "execution context aggregates actor resources" + )] pub fn new( prover: Option, rpc: RpcClient, script_cache: LruCache, db: Db, max_cycles: u32, + expiration_script: TransactionScript, request_backoff_initial: Duration, request_backoff_max: Duration, ) -> Self { @@ -175,6 +185,7 @@ impl NtxContext { script_cache, db, max_cycles, + expiration_script, request_backoff, } } @@ -369,11 +380,15 @@ impl NtxContext { ) -> NtxResult { let executor = self.create_executor(data_store); + // Attach the pre-compiled expiration script so the submitted tx is rejected on-chain if it + // does not land within the configured block delta. + let tx_args = TransactionArgs::default().with_tx_script(self.expiration_script.clone()); + Box::pin(executor.execute_transaction( data_store.account.id(), data_store.reference_block.block_num(), notes, - TransactionArgs::default(), + tx_args, )) .await .map_err(NtxError::Execution) diff --git a/bin/ntx-builder/src/actor/mod.rs b/bin/ntx-builder/src/actor/mod.rs index affc33214..98a03bca3 100644 --- a/bin/ntx-builder/src/actor/mod.rs +++ b/bin/ntx-builder/src/actor/mod.rs @@ -9,14 +9,16 @@ use std::time::Duration; use allowlist::{NoteScriptNotAllowlisted, partition_by_allowlist}; use anyhow::Context; use candidate::TransactionCandidate; +use futures::FutureExt; use miden_node_utils::ErrorReport; use miden_node_utils::lru_cache::LruCache; use miden_protocol::Word; use miden_protocol::account::AccountId; use miden_protocol::block::BlockNumber; use miden_protocol::note::{NoteScript, Nullifier}; -use miden_protocol::transaction::TransactionId; +use miden_protocol::transaction::TransactionScript; use miden_remote_prover_client::RemoteTransactionProver; +use miden_standards::code_builder::CodeBuilder; use miden_tx::FailedNote; use tokio::sync::{Notify, Semaphore, mpsc}; @@ -25,6 +27,24 @@ use crate::chain_state::{ChainState, SharedChainState}; use crate::clients::RpcClient; use crate::db::Db; +/// Compiles the standalone transaction script that sets the on-chain expiration of a network +/// transaction to `delta` blocks. The script is account-independent, so the builder compiles it +/// once at startup and shares the resulting [`TransactionScript`] across all actors. +/// +/// ```masm +/// begin +/// push.{delta} exec.::miden::protocol::tx::update_expiration_block_delta +/// end +/// ``` +pub(crate) fn expiration_tx_script(delta: u16) -> anyhow::Result { + let source = format!( + "begin\n push.{delta} exec.::miden::protocol::tx::update_expiration_block_delta\nend" + ); + CodeBuilder::new() + .compile_tx_script(source) + .context("failed to compile network-tx expiration script") +} + // ACTOR REQUESTS // ================================================================================================ @@ -66,6 +86,9 @@ pub struct State { pub chain: Arc, /// Shared LRU cache for storing retrieved note scripts to avoid repeated RPC calls. pub script_cache: LruCache, + /// Pre-compiled transaction script that sets each network tx's on-chain expiration delta. + /// Shared into every executed transaction. + pub expiration_script: TransactionScript, } /// Per-actor configuration knobs. @@ -79,6 +102,9 @@ pub struct ActorConfig { pub idle_timeout: Duration, /// Maximum number of VM execution cycles for network transactions. pub max_cycles: u32, + /// Number of blocks after which a submitted transaction expires. Set as the on-chain expiration + /// delta and reused as the `WaitForBlock` retry timeout. + pub tx_expiration_delta: u16, /// Initial sleep applied between per-request retries on transient infrastructure failures /// (prover unreachable, RPC transport error, RPC gRPC hiccup). Doubles each retry up to /// [`Self::request_backoff_max`]. @@ -135,12 +161,15 @@ impl AccountActorContext { db: db.clone(), chain: chain_state, script_cache: LruCache::new(NonZeroUsize::new(1).unwrap()), + expiration_script: expiration_tx_script(30) + .expect("expiration script should compile"), }, config: ActorConfig { max_notes_per_tx: NonZeroUsize::new(1).unwrap(), max_note_attempts: 1, idle_timeout: Duration::from_secs(60), max_cycles: 1 << 18, + tx_expiration_delta: 30, request_backoff_initial: Duration::from_millis(1), request_backoff_max: Duration::from_millis(10), }, @@ -155,9 +184,22 @@ impl AccountActorContext { /// The mode of operation that the account actor is currently performing. #[derive(Debug)] enum ActorMode { + /// No notes targeting this account are currently available. The actor sleeps on the idle + /// timeout and awaits a coordinator notification to re-evaluate. NoViableNotes, + /// Notes are available for consumption. The actor acquires a transaction permit and submits a + /// candidate. NotesAvailable, - TransactionInflight(TransactionId), + /// A network transaction has been submitted; the actor waits for it to land in a committed + /// block. Landing is detected from the local DB: `apply_committed_block` marks each consumed + /// nullifier with `committed_at` so no RPC roundtrip is needed. + WaitForBlock { + /// Nullifiers of the network notes consumed by the submitted transaction. + submitted_nullifiers: Vec, + /// Chain tip block number at submission. With [`ActorConfig::tx_expiration_delta`] this + /// bounds how long the actor waits before retrying. + submitted_at: BlockNumber, + }, } // ACCOUNT ACTOR @@ -231,28 +273,63 @@ impl AccountActor { /// The return value signals the shutdown category to the coordinator: /// /// - `Ok(())`: intentional shutdown (idle timeout or account not committed in time). - /// - `Err(_)`: crash (database error or any other bug). - pub async fn run(self, _semaphore: Arc) -> anyhow::Result<()> { + /// - `Err(_)`: crash (database error, semaphore failure, or any other bug). + pub async fn run(self, semaphore: Arc) -> anyhow::Result<()> { let account_id = self.account_id; // Wait for the account to be committed to the DB. For newly created accounts, the creation - // transaction must be committed before the actor becomes active. + // transaction must be committed before we start processing notes. if !self.wait_for_committed_account(account_id).await? { return Ok(()); } + // Determine initial mode by checking the DB for available notes. + let block_num = self.state.chain.chain_tip_block_number(); + let has_notes = self + .state + .db + .has_available_notes(account_id, block_num, self.config.max_note_attempts) + .await + .context("failed to check for available notes")?; + let mut mode = if has_notes { + ActorMode::NotesAvailable + } else { + ActorMode::NoViableNotes + }; + loop { + // Acquire an execution permit only when there are notes to process. + let tx_permit_acquisition = match mode { + ActorMode::NoViableNotes | ActorMode::WaitForBlock { .. } => { + std::future::pending().boxed() + }, + ActorMode::NotesAvailable => semaphore.acquire().boxed(), + }; + + // The idle timer only ticks while there is nothing to do. + let idle_timeout_sleep = match mode { + ActorMode::NoViableNotes => tokio::time::sleep(self.config.idle_timeout).boxed(), + _ => std::future::pending().boxed(), + }; + tokio::select! { - // A committed block touched this account (or the coordinator woke everyone). PR 3 - // reconnects transaction execution here. + // A committed block touched this account (or the coordinator woke everyone). _ = self.notify.notified() => { - tracing::debug!( - %account_id, - "actor notified; transaction execution reconnects in PR 3", - ); + mode = self.reevaluate_mode(account_id, mode).await?; + }, + // Execute a transaction once a permit is available. + permit = tx_permit_acquisition => { + let _permit = permit.context("semaphore closed")?; + let chain_state = self.state.chain.get_cloned(); + let tx_candidate = + self.select_candidate_from_db(account_id, chain_state).await?; + mode = match tx_candidate { + Some(candidate) => self.execute_transactions(account_id, candidate).await, + None => ActorMode::NoViableNotes, + }; } // Idle timeout: actor has been idle too long, deactivate. - () = tokio::time::sleep(self.config.idle_timeout) => { + () = idle_timeout_sleep => { tracing::info!(%account_id, "Account actor deactivated due to idle timeout"); return Ok(()); } @@ -260,6 +337,51 @@ impl AccountActor { } } + /// Decides the actor's next mode after a coordinator notification. + /// + /// - In `NoViableNotes`/`NotesAvailable`, a wake means the DB may now have new work; advance to + /// `NotesAvailable` and let the next `select_candidate` decide whether a real candidate + /// exists. + /// - In `WaitForBlock`, query whether the submitted transaction's nullifiers have all been + /// consumed by a committed block (the tx landed). If so, return to `NotesAvailable`. Else, if + /// `tx_expiration_delta` blocks have passed since submission, give up waiting and resume + /// candidate selection; otherwise stay in `WaitForBlock`. + async fn reevaluate_mode( + &self, + account_id: AccountId, + mode: ActorMode, + ) -> anyhow::Result { + match mode { + ActorMode::WaitForBlock { submitted_nullifiers, submitted_at } => { + let landed = self + .state + .db + .submitted_tx_landed(account_id, submitted_nullifiers.clone()) + .await + .context("failed to check submitted tx landing")?; + if landed { + return Ok(ActorMode::NotesAvailable); + } + + let chain_tip = self.state.chain.chain_tip_block_number(); + let elapsed = chain_tip.checked_sub(submitted_at.as_u32()).unwrap_or_default(); + if elapsed.as_u32() >= u32::from(self.config.tx_expiration_delta) { + tracing::info!( + %account_id, + %submitted_at, + current_tip = %chain_tip, + delta = self.config.tx_expiration_delta, + "submitted tx not landed within expiration delta; retrying", + ); + return Ok(ActorMode::NotesAvailable); + } + + Ok(ActorMode::WaitForBlock { submitted_nullifiers, submitted_at }) + }, + _ => Ok(ActorMode::NotesAvailable), + } + } + /// Selects a transaction candidate by querying the DB. async fn select_candidate_from_db( &self, @@ -382,6 +504,7 @@ impl AccountActor { self.state.script_cache.clone(), self.state.db.clone(), self.config.max_cycles, + self.state.expiration_script.clone(), self.config.request_backoff_initial, self.config.request_backoff_max, ); @@ -406,11 +529,31 @@ impl AccountActor { "network transaction executed with some failed notes", ); self.cache_note_scripts(scripts_to_cache).await; + + // The nullifiers that actually went into the submitted tx are the candidate notes + // minus those rejected during consumability filtering. + let failed_nullifiers: std::collections::HashSet = + failed.iter().map(|f| f.note().nullifier()).collect(); + let submitted_nullifiers: Vec = notes + .iter() + .map(|n| n.as_note().nullifier()) + .filter(|nullifier| !failed_nullifiers.contains(nullifier)) + .collect(); + if !failed.is_empty() { let failed_notes = log_failed_notes(failed); self.mark_notes_failed(&failed_notes, block_num).await; } - ActorMode::TransactionInflight(tx_id) + + if submitted_nullifiers.is_empty() { + // Every input note was filtered out before submission. + ActorMode::NoViableNotes + } else { + ActorMode::WaitForBlock { + submitted_nullifiers, + submitted_at: block_num, + } + } }, // Transaction execution failed. Err(err) => { @@ -508,123 +651,18 @@ fn log_failed_notes(failed: Vec) -> Vec<(Nullifier, NoteError)> { #[cfg(test)] mod tests { - #[tokio::test] - #[ignore = "wip refactor"] - async fn select_candidate_keeps_allowlisted_notes() { - // let (db, _dir) = Db::test_setup().await; - // let account_id = mock_network_account_id(); - // let note = mock_single_target_note(account_id, 10); - // let account = mock_account_with_auth_component( - // AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([note - // .as_note() - // .script() - // .root()])) - // .expect("non-empty allowlist should construct"), - // ); - - // db.sync_account_from_store(account_id, account, vec![note.clone()]) - // .await - // .expect("fixtures should sync"); - - // let (actor, context) = actor_with_request_handler(&db, account_id); let chain_state = - // context.state.chain.get_cloned(); - - // let candidate = actor - // .select_candidate_from_db(account_id, chain_state) - // .await - // .expect("selection should succeed") - // .expect("allowed note should produce a candidate"); - - // assert_eq!(candidate.notes.len(), 1); - // assert_eq!(candidate.notes[0].as_note().nullifier(), note.as_note().nullifier()); - } - - #[tokio::test] - #[ignore = "wip refactor"] - async fn select_candidate_marks_non_allowlisted_notes_failed() { - // let (db, _dir) = Db::test_setup().await; - // let account_id = mock_network_account_id(); - // let allowed_note = mock_single_target_note(account_id, 10); - // let rejected_note = - // mock_single_target_note_with_code(account_id, 20, Some(OTHER_NOTE_SCRIPT)); - // let account = mock_account_with_auth_component( - // AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([allowed_note - // .as_note() - // .script() - // .root()])) - // .expect("non-empty allowlist should construct"), - // ); - - // db.sync_account_from_store(account_id, account, vec![rejected_note.clone()]) - // .await - // .expect("fixtures should sync"); - - // let (actor, context) = actor_with_request_handler(&db, account_id); let chain_state = - // context.state.chain.get_cloned(); - - // let candidate = actor - // .select_candidate_from_db(account_id, chain_state) - // .await - // .expect("selection should succeed"); - - // assert!(candidate.is_none()); - - // let status = db - // .get_note_status(rejected_note.as_note().id()) - // .await - // .expect("status query should succeed") - // .expect("note should exist"); - // assert_eq!(status.attempt_count, 1); - // assert!( - // status - // .last_error - // .as_deref() - // .expect("rejected note should record an error") - // .contains("not allowlisted") - // ); - } - - #[tokio::test] - #[ignore = "wip refactor"] - async fn select_candidate_executes_allowed_notes_and_marks_rejected_notes_failed() { - // let (db, _dir) = Db::test_setup().await; - // let account_id = mock_network_account_id(); - // let allowed_note = mock_single_target_note(account_id, 10); - // let rejected_note = - // mock_single_target_note_with_code(account_id, 20, Some(OTHER_NOTE_SCRIPT)); - // let account = mock_account_with_auth_component( - // AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([allowed_note - // .as_note() - // .script() - // .root()])) - // .expect("non-empty allowlist should construct"), - // ); - - // db.sync_account_from_store( - // account_id, - // account, - // vec![allowed_note.clone(), rejected_note.clone()], - // ) - // .await - // .expect("fixtures should sync"); - - // let (actor, context) = actor_with_request_handler(&db, account_id); let chain_state = - // context.state.chain.get_cloned(); - - // let candidate = actor - // .select_candidate_from_db(account_id, chain_state) - // .await - // .expect("selection should succeed") - // .expect("allowed note should remain"); - - // assert_eq!(candidate.notes.len(), 1); - // assert_eq!(candidate.notes[0].as_note().nullifier(), allowed_note.as_note().nullifier()); - - // let rejected_status = db - // .get_note_status(rejected_note.as_note().id()) - // .await - // .expect("status query should succeed") - // .expect("rejected note should exist"); - // assert_eq!(rejected_status.attempt_count, 1); + use super::expiration_tx_script; + + /// The expiration script must compile for the full valid delta range, and the delta must be + /// baked into the script (distinct deltas → distinct script roots), proving the on-chain + /// expiration value is actually carried rather than ignored. + #[test] + fn expiration_script_compiles_and_encodes_delta() { + let one = expiration_tx_script(1).expect("delta 1 should compile"); + let thirty = expiration_tx_script(30).expect("delta 30 should compile"); + let max = expiration_tx_script(u16::MAX).expect("delta u16::MAX should compile"); + + assert_ne!(one.root(), thirty.root(), "distinct deltas must yield distinct scripts"); + assert_ne!(thirty.root(), max.root(), "distinct deltas must yield distinct scripts"); } } diff --git a/bin/ntx-builder/src/chain_state.rs b/bin/ntx-builder/src/chain_state.rs index 49025c0a2..68608164e 100644 --- a/bin/ntx-builder/src/chain_state.rs +++ b/bin/ntx-builder/src/chain_state.rs @@ -89,8 +89,6 @@ impl SharedChainState { Self(RwLock::new(ChainState::new(chain_tip_header, chain_mmr))) } - // Read by the actor execution path, which is unwired until PR 3. - #[expect(dead_code)] pub(crate) fn chain_tip_block_number(&self) -> BlockNumber { self.0.read().expect("chain state lock poisoned").chain_tip_header.block_num() } @@ -108,8 +106,6 @@ impl SharedChainState { .update_chain_tip(tip, max_block_count); } - // Read by the actor execution path (candidate selection), which is unwired until PR 3. - #[expect(dead_code)] pub(crate) fn get_cloned(&self) -> ChainState { self.0.read().expect("chain state lock poisoned").clone() } diff --git a/bin/ntx-builder/src/commands/mod.rs b/bin/ntx-builder/src/commands/mod.rs index a6ff76d58..e808efec1 100644 --- a/bin/ntx-builder/src/commands/mod.rs +++ b/bin/ntx-builder/src/commands/mod.rs @@ -18,11 +18,13 @@ const ENV_RPC_AUTH_HEADER_VALUE: &str = "MIDEN_NODE_NTX_BUILDER_RPC_AUTH_HEADER_ const ENV_TX_PROVER_URL: &str = "MIDEN_NODE_NTX_BUILDER_NTX_PROVER_URL"; const ENV_SCRIPT_CACHE_SIZE: &str = "MIDEN_NODE_NTX_BUILDER_SCRIPT_CACHE_SIZE"; const ENV_MAX_CYCLES: &str = "MIDEN_NODE_NTX_BUILDER_MAX_CYCLES"; +const ENV_TX_EXPIRATION_DELTA: &str = "MIDEN_NODE_NTX_BUILDER_TX_EXPIRATION_DELTA"; const ENV_SQLITE_CONNECTION_POOL_SIZE: &str = "MIDEN_NODE_NTX_BUILDER_SQLITE_CONNECTION_POOL_SIZE"; const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60); const DEFAULT_SCRIPT_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).unwrap(); const DEFAULT_MAX_CYCLES: u32 = 1 << 18; +const DEFAULT_TX_EXPIRATION_DELTA: u16 = 30; #[derive(Parser)] #[command(version, about, long_about = None)] @@ -91,6 +93,19 @@ pub enum NtxBuilderCommand { )] max_tx_cycles: u32, + /// Number of blocks after which a submitted network transaction expires. + /// + /// Set as the on-chain transaction expiration delta and reused as the actor's local retry + /// timeout. Must be between 1 and 65535. + #[arg( + long = "tx-expiration-delta", + env = ENV_TX_EXPIRATION_DELTA, + default_value_t = DEFAULT_TX_EXPIRATION_DELTA, + value_parser = clap::value_parser!(u16).range(1..), + value_name = "NUM", + )] + tx_expiration_delta: u16, + /// Maximum number of SQLite connections in the ntx-builder database connection pool. #[arg( long = "sqlite.connection_pool_size", @@ -124,6 +139,7 @@ impl NtxBuilderCommand { idle_timeout, max_account_crashes, max_tx_cycles, + tx_expiration_delta, sqlite_connection_pool_size, data_directory, enable_otel: _, @@ -141,6 +157,7 @@ impl NtxBuilderCommand { .with_idle_timeout(idle_timeout) .with_max_account_crashes(max_account_crashes) .with_max_cycles(max_tx_cycles) + .with_tx_expiration_delta(tx_expiration_delta) .with_sqlite_connection_pool_size(sqlite_connection_pool_size); let config = match rpc_auth_header_value { Some(value) => config.with_rpc_auth_header(value), diff --git a/bin/ntx-builder/src/db/mod.rs b/bin/ntx-builder/src/db/mod.rs index cac8fb2e4..00c8f7de4 100644 --- a/bin/ntx-builder/src/db/mod.rs +++ b/bin/ntx-builder/src/db/mod.rs @@ -149,6 +149,20 @@ impl Db { .await } + /// Returns `true` when all the supplied nullifiers for `account_id` have been marked consumed + /// in a committed block. + pub async fn submitted_tx_landed( + &self, + account_id: AccountId, + nullifiers: Vec, + ) -> Result { + self.inner + .query("submitted_tx_landed", move |conn| { + queries::submitted_tx_landed(conn, account_id, &nullifiers) + }) + .await + } + /// Marks notes as failed by incrementing `attempt_count`, setting `last_attempt`, and storing /// the latest error message. pub async fn notes_failed( diff --git a/bin/ntx-builder/src/db/models/queries/notes.rs b/bin/ntx-builder/src/db/models/queries/notes.rs index a344e25a0..3394d3e24 100644 --- a/bin/ntx-builder/src/db/models/queries/notes.rs +++ b/bin/ntx-builder/src/db/models/queries/notes.rs @@ -193,6 +193,32 @@ pub fn accounts_with_pending_notes( .collect() } +/// Returns `true` if every one of `nullifiers` for `account_id` has been marked consumed +/// (`committed_at IS NOT NULL`), i.e. the actor's submitted transaction has landed in a committed +/// block. An empty `nullifiers` slice trivially returns `true`. +pub fn submitted_tx_landed( + conn: &mut SqliteConnection, + account_id: AccountId, + nullifiers: &[Nullifier], +) -> Result { + if nullifiers.is_empty() { + return Ok(true); + } + + let account_id_bytes = conversions::account_id_to_bytes(account_id); + let nullifier_bytes: Vec> = + nullifiers.iter().map(conversions::nullifier_to_bytes).collect(); + + let still_pending: i64 = schema::notes::table + .filter(schema::notes::account_id.eq(&account_id_bytes)) + .filter(schema::notes::nullifier.eq_any(&nullifier_bytes)) + .filter(schema::notes::committed_at.is_null()) + .count() + .get_result(conn)?; + + Ok(still_pending == 0) +} + // HELPERS // ================================================================================================ diff --git a/bin/ntx-builder/src/db/models/queries/tests.rs b/bin/ntx-builder/src/db/models/queries/tests.rs index fcc2c07d5..2e9a05541 100644 --- a/bin/ntx-builder/src/db/models/queries/tests.rs +++ b/bin/ntx-builder/src/db/models/queries/tests.rs @@ -261,6 +261,38 @@ fn accounts_with_pending_notes_distinct_and_filters_consumed_and_capped() { assert_eq!(pending[0], alice); } +// SUBMITTED-TX LANDING +// ================================================================================================ + +#[test] +fn submitted_tx_landed_detects_full_landing() { + let (conn, _dir) = &mut test_conn(); + let account_id = mock_network_account_id(); + let note_a = mock_single_target_note(account_id, 7); + let note_b = mock_single_target_note(account_id, 8); + insert_network_notes(conn, &[note_a.clone(), note_b.clone()]).unwrap(); + + let nullifiers = vec![note_a.as_note().nullifier(), note_b.as_note().nullifier()]; + + // Nothing consumed yet. + assert!(!submitted_tx_landed(conn, account_id, &nullifiers).unwrap()); + + // Only one consumed. + mark_notes_consumed(conn, &[note_a.as_note().nullifier()], BlockNumber::from(3)).unwrap(); + assert!(!submitted_tx_landed(conn, account_id, &nullifiers).unwrap()); + + // Both consumed. + mark_notes_consumed(conn, &[note_b.as_note().nullifier()], BlockNumber::from(4)).unwrap(); + assert!(submitted_tx_landed(conn, account_id, &nullifiers).unwrap()); +} + +#[test] +fn submitted_tx_landed_empty_nullifiers_is_trivially_true() { + let (conn, _dir) = &mut test_conn(); + let account_id = mock_network_account_id(); + assert!(submitted_tx_landed(conn, account_id, &[]).unwrap()); +} + #[test] fn notes_failed_increments_attempt_and_records_error() { let (conn, _dir) = &mut test_conn(); diff --git a/bin/ntx-builder/src/lib.rs b/bin/ntx-builder/src/lib.rs index d20f0c6dd..422eaee51 100644 --- a/bin/ntx-builder/src/lib.rs +++ b/bin/ntx-builder/src/lib.rs @@ -24,9 +24,6 @@ use crate::coordinator::Coordinator; pub(crate) type NoteError = Arc; -// PR 2 spawns actors and runs their lifecycle (wait-for-account + notify/idle), but the transaction -// execution path (candidate selection, proving, submission) stays unwired until PR 3 reconnects it. -#[expect(dead_code)] mod actor; mod builder; mod chain_state; @@ -89,6 +86,12 @@ const DEFAULT_REQUEST_BACKOFF_MAX: Duration = Duration::from_secs(30); /// `1 << 29` but network transactions should be much cheaper. const DEFAULT_MAX_TX_CYCLES: u32 = 1 << 19; +/// Default number of blocks after which a submitted network transaction expires. +/// +/// Used both as the on-chain transaction expiration delta and as the local retry timeout an actor +/// waits in `WaitForBlock` before resubmitting. Must be within the kernel's `1..=u16::MAX` range. +const DEFAULT_TX_EXPIRATION_DELTA: u16 = 30; + // CONFIGURATION // ================================================================================================= @@ -144,6 +147,11 @@ pub struct NtxBuilderConfig { /// Defaults to 2^18 cycles. pub max_cycles: u32, + /// Number of blocks after which a submitted network transaction expires. Set as the on-chain + /// transaction expiration delta and reused as the local `WaitForBlock` retry timeout. Must be + /// within `1..=u16::MAX` (enforced by the transaction kernel). + pub tx_expiration_delta: u16, + /// Initial sleep applied between per-request retries on transient infrastructure failures (e.g. /// prover unreachable, RPC crash, transport error, RPC gRPC hiccup). Doubles on each retry up /// to [`Self::request_backoff_max`]. Per-note `attempt_count` is *not* advanced while retries @@ -175,6 +183,7 @@ impl NtxBuilderConfig { idle_timeout: DEFAULT_IDLE_TIMEOUT, max_account_crashes: DEFAULT_MAX_ACCOUNT_CRASHES, max_cycles: DEFAULT_MAX_TX_CYCLES, + tx_expiration_delta: DEFAULT_TX_EXPIRATION_DELTA, request_backoff_initial: DEFAULT_REQUEST_BACKOFF_INITIAL, request_backoff_max: DEFAULT_REQUEST_BACKOFF_MAX, database_filepath, @@ -273,6 +282,14 @@ impl NtxBuilderConfig { self } + /// Sets the transaction expiration delta (in blocks). Also bounds the actor's `WaitForBlock` + /// retry timeout. + #[must_use] + pub fn with_tx_expiration_delta(mut self, delta: u16) -> Self { + self.tx_expiration_delta = delta; + self + } + /// Sets the per-request retry backoff bounds (initial sleep and cap) used when retrying /// transient infrastructure failures inside a single transaction attempt. #[must_use] @@ -384,28 +401,53 @@ impl NtxBuilderConfig { }; let chain = Arc::new(chain); - // Wire the actor context + coordinator. The actor request channel is owned by the builder - // (receiver) and cloned into every spawned actor (sender) so all DB writes from actors - // serialize through the builder's event loop. + let (coordinator, actor_request_rx) = + self.build_coordinator(rpc, db.clone(), chain.clone())?; + + Ok(NetworkTransactionBuilder::new( + self, + db, + block_stream, + last_applied_block, + chain, + coordinator, + actor_request_rx, + )) + } + + /// Builds the actor [`Coordinator`] and the channel over which spawned actors send their DB + /// writes back to the builder's event loop. + /// + /// The receiver is owned by the builder loop; the sender is cloned into every spawned actor so + /// all actor-side DB writes serialize through the loop. + fn build_coordinator( + &self, + rpc: RpcClient, + db: Db, + chain: Arc, + ) -> anyhow::Result<(Coordinator, mpsc::Receiver)> { let (request_tx, actor_request_rx) = mpsc::channel(self.account_channel_capacity); let actor_context = AccountActorContext { clients: GrpcClients { - rpc: rpc.clone(), + rpc, prover: self .tx_prover_url .clone() .map(|url| RemoteTransactionProver::new(url.as_str())), }, state: State { - db: db.clone(), - chain: chain.clone(), + db, + chain, script_cache: LruCache::new(self.script_cache_size), + expiration_script: actor::expiration_tx_script(self.tx_expiration_delta) + .context("failed to compile network-tx expiration script")?, }, config: ActorConfig { max_notes_per_tx: self.max_notes_per_tx, max_note_attempts: self.max_note_attempts, idle_timeout: self.idle_timeout, max_cycles: self.max_cycles, + tx_expiration_delta: self.tx_expiration_delta, request_backoff_initial: self.request_backoff_initial, request_backoff_max: self.request_backoff_max, }, @@ -414,15 +456,7 @@ impl NtxBuilderConfig { let coordinator = Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, actor_context); - Ok(NetworkTransactionBuilder::new( - self, - db, - block_stream, - last_applied_block, - chain, - coordinator, - actor_request_rx, - )) + Ok((coordinator, actor_request_rx)) } } From 87434f1a6bf1069cf8ae0f5d2a6ff742130234ad Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Thu, 28 May 2026 17:43:55 -0300 Subject: [PATCH 2/4] review: use NonZero & coordinator tracked tx id --- bin/ntx-builder/src/actor/mod.rs | 64 +++++++++---------- bin/ntx-builder/src/commands/mod.rs | 8 +-- bin/ntx-builder/src/committed_block.rs | 14 +++- bin/ntx-builder/src/coordinator.rs | 3 + bin/ntx-builder/src/db/migrations.rs | 2 +- .../src/db/migrations/001_initial.sql | 6 +- bin/ntx-builder/src/db/mod.rs | 16 ++--- bin/ntx-builder/src/db/models/conv.rs | 10 +++ .../src/db/models/queries/accounts.rs | 59 ++++++++++++++++- bin/ntx-builder/src/db/models/queries/mod.rs | 6 ++ .../src/db/models/queries/notes.rs | 26 -------- .../src/db/models/queries/tests.rs | 49 +++++++++----- bin/ntx-builder/src/db/schema.rs | 1 + bin/ntx-builder/src/lib.rs | 8 +-- bin/ntx-builder/src/test_utils.rs | 6 ++ 15 files changed, 179 insertions(+), 99 deletions(-) diff --git a/bin/ntx-builder/src/actor/mod.rs b/bin/ntx-builder/src/actor/mod.rs index 98a03bca3..ba2212731 100644 --- a/bin/ntx-builder/src/actor/mod.rs +++ b/bin/ntx-builder/src/actor/mod.rs @@ -2,7 +2,7 @@ mod allowlist; pub mod candidate; mod execute; -use std::num::NonZeroUsize; +use std::num::{NonZeroU16, NonZeroUsize}; use std::sync::Arc; use std::time::Duration; @@ -16,7 +16,7 @@ use miden_protocol::Word; use miden_protocol::account::AccountId; use miden_protocol::block::BlockNumber; use miden_protocol::note::{NoteScript, Nullifier}; -use miden_protocol::transaction::TransactionScript; +use miden_protocol::transaction::{TransactionId, TransactionScript}; use miden_remote_prover_client::RemoteTransactionProver; use miden_standards::code_builder::CodeBuilder; use miden_tx::FailedNote; @@ -36,7 +36,8 @@ use crate::db::Db; /// push.{delta} exec.::miden::protocol::tx::update_expiration_block_delta /// end /// ``` -pub(crate) fn expiration_tx_script(delta: u16) -> anyhow::Result { +pub(crate) fn expiration_tx_script(delta: NonZeroU16) -> anyhow::Result { + let delta = delta.get(); let source = format!( "begin\n push.{delta} exec.::miden::protocol::tx::update_expiration_block_delta\nend" ); @@ -104,7 +105,7 @@ pub struct ActorConfig { pub max_cycles: u32, /// Number of blocks after which a submitted transaction expires. Set as the on-chain expiration /// delta and reused as the `WaitForBlock` retry timeout. - pub tx_expiration_delta: u16, + pub tx_expiration_delta: NonZeroU16, /// Initial sleep applied between per-request retries on transient infrastructure failures /// (prover unreachable, RPC transport error, RPC gRPC hiccup). Doubles each retry up to /// [`Self::request_backoff_max`]. @@ -161,7 +162,7 @@ impl AccountActorContext { db: db.clone(), chain: chain_state, script_cache: LruCache::new(NonZeroUsize::new(1).unwrap()), - expiration_script: expiration_tx_script(30) + expiration_script: expiration_tx_script(NonZeroU16::new(30).unwrap()) .expect("expiration script should compile"), }, config: ActorConfig { @@ -169,7 +170,7 @@ impl AccountActorContext { max_note_attempts: 1, idle_timeout: Duration::from_secs(60), max_cycles: 1 << 18, - tx_expiration_delta: 30, + tx_expiration_delta: NonZeroU16::new(30).unwrap(), request_backoff_initial: Duration::from_millis(1), request_backoff_max: Duration::from_millis(10), }, @@ -191,11 +192,12 @@ enum ActorMode { /// candidate. NotesAvailable, /// A network transaction has been submitted; the actor waits for it to land in a committed - /// block. Landing is detected from the local DB: `apply_committed_block` marks each consumed - /// nullifier with `committed_at` so no RPC roundtrip is needed. + /// block. Landing is detected from the local DB: `apply_committed_block` records the + /// transaction id that updated each network account as `accounts.last_tx_id`, so the actor only + /// has to check whether its own submitted id is the account's latest. WaitForBlock { - /// Nullifiers of the network notes consumed by the submitted transaction. - submitted_nullifiers: Vec, + /// Id of the network transaction the actor submitted. + submitted_tx_id: TransactionId, /// Chain tip block number at submission. With [`ActorConfig::tx_expiration_delta`] this /// bounds how long the actor waits before retrying. submitted_at: BlockNumber, @@ -342,8 +344,8 @@ impl AccountActor { /// - In `NoViableNotes`/`NotesAvailable`, a wake means the DB may now have new work; advance to /// `NotesAvailable` and let the next `select_candidate` decide whether a real candidate /// exists. - /// - In `WaitForBlock`, query whether the submitted transaction's nullifiers have all been - /// consumed by a committed block (the tx landed). If so, return to `NotesAvailable`. Else, if + /// - In `WaitForBlock`, query the latest transaction recorded against the account. If it equals + /// the actor's submitted transaction id, the tx landed; return to `NotesAvailable`. Else, if /// `tx_expiration_delta` blocks have passed since submission, give up waiting and resume /// candidate selection; otherwise stay in `WaitForBlock`. async fn reevaluate_mode( @@ -352,20 +354,21 @@ impl AccountActor { mode: ActorMode, ) -> anyhow::Result { match mode { - ActorMode::WaitForBlock { submitted_nullifiers, submitted_at } => { + ActorMode::WaitForBlock { submitted_tx_id, submitted_at } => { let landed = self .state .db - .submitted_tx_landed(account_id, submitted_nullifiers.clone()) + .account_last_tx(account_id) .await - .context("failed to check submitted tx landing")?; + .context("failed to check submitted tx landing")? + == Some(submitted_tx_id); if landed { return Ok(ActorMode::NotesAvailable); } let chain_tip = self.state.chain.chain_tip_block_number(); let elapsed = chain_tip.checked_sub(submitted_at.as_u32()).unwrap_or_default(); - if elapsed.as_u32() >= u32::from(self.config.tx_expiration_delta) { + if elapsed.as_u32() >= u32::from(self.config.tx_expiration_delta.get()) { tracing::info!( %account_id, %submitted_at, @@ -376,7 +379,7 @@ impl AccountActor { return Ok(ActorMode::NotesAvailable); } - Ok(ActorMode::WaitForBlock { submitted_nullifiers, submitted_at }) + Ok(ActorMode::WaitForBlock { submitted_tx_id, submitted_at }) }, _ => Ok(ActorMode::NotesAvailable), } @@ -530,27 +533,20 @@ impl AccountActor { ); self.cache_note_scripts(scripts_to_cache).await; - // The nullifiers that actually went into the submitted tx are the candidate notes - // minus those rejected during consumability filtering. - let failed_nullifiers: std::collections::HashSet = - failed.iter().map(|f| f.note().nullifier()).collect(); - let submitted_nullifiers: Vec = notes - .iter() - .map(|n| n.as_note().nullifier()) - .filter(|nullifier| !failed_nullifiers.contains(nullifier)) - .collect(); + // A tx carries work only if at least one candidate note survived consumability + // filtering; if every note failed there is nothing on-chain to wait for. + let all_notes_failed = failed.len() == notes.len(); if !failed.is_empty() { let failed_notes = log_failed_notes(failed); self.mark_notes_failed(&failed_notes, block_num).await; } - if submitted_nullifiers.is_empty() { - // Every input note was filtered out before submission. + if all_notes_failed { ActorMode::NoViableNotes } else { ActorMode::WaitForBlock { - submitted_nullifiers, + submitted_tx_id: tx_id, submitted_at: block_num, } } @@ -651,6 +647,8 @@ fn log_failed_notes(failed: Vec) -> Vec<(Nullifier, NoteError)> { #[cfg(test)] mod tests { + use std::num::NonZeroU16; + use super::expiration_tx_script; /// The expiration script must compile for the full valid delta range, and the delta must be @@ -658,9 +656,11 @@ mod tests { /// expiration value is actually carried rather than ignored. #[test] fn expiration_script_compiles_and_encodes_delta() { - let one = expiration_tx_script(1).expect("delta 1 should compile"); - let thirty = expiration_tx_script(30).expect("delta 30 should compile"); - let max = expiration_tx_script(u16::MAX).expect("delta u16::MAX should compile"); + let one = + expiration_tx_script(NonZeroU16::new(1).unwrap()).expect("delta 1 should compile"); + let thirty = + expiration_tx_script(NonZeroU16::new(30).unwrap()).expect("delta 30 should compile"); + let max = expiration_tx_script(NonZeroU16::MAX).expect("delta u16::MAX should compile"); assert_ne!(one.root(), thirty.root(), "distinct deltas must yield distinct scripts"); assert_ne!(thirty.root(), max.root(), "distinct deltas must yield distinct scripts"); diff --git a/bin/ntx-builder/src/commands/mod.rs b/bin/ntx-builder/src/commands/mod.rs index e808efec1..04dcb4403 100644 --- a/bin/ntx-builder/src/commands/mod.rs +++ b/bin/ntx-builder/src/commands/mod.rs @@ -1,5 +1,5 @@ use std::net::SocketAddr; -use std::num::NonZeroUsize; +use std::num::{NonZeroU16, NonZeroUsize}; use std::path::PathBuf; use std::time::Duration; @@ -24,7 +24,7 @@ const ENV_SQLITE_CONNECTION_POOL_SIZE: &str = "MIDEN_NODE_NTX_BUILDER_SQLITE_CON const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60); const DEFAULT_SCRIPT_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).unwrap(); const DEFAULT_MAX_CYCLES: u32 = 1 << 18; -const DEFAULT_TX_EXPIRATION_DELTA: u16 = 30; +const DEFAULT_TX_EXPIRATION_DELTA: NonZeroU16 = NonZeroU16::new(30).unwrap(); #[derive(Parser)] #[command(version, about, long_about = None)] @@ -101,10 +101,10 @@ pub enum NtxBuilderCommand { long = "tx-expiration-delta", env = ENV_TX_EXPIRATION_DELTA, default_value_t = DEFAULT_TX_EXPIRATION_DELTA, - value_parser = clap::value_parser!(u16).range(1..), + value_parser = clap::value_parser!(NonZeroU16), value_name = "NUM", )] - tx_expiration_delta: u16, + tx_expiration_delta: NonZeroU16, /// Maximum number of SQLite connections in the ntx-builder database connection pool. #[arg( diff --git a/bin/ntx-builder/src/committed_block.rs b/bin/ntx-builder/src/committed_block.rs index 4a0c38634..9bd0481dc 100644 --- a/bin/ntx-builder/src/committed_block.rs +++ b/bin/ntx-builder/src/committed_block.rs @@ -2,7 +2,7 @@ use miden_protocol::account::AccountId; use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::block::{BlockHeader, SignedBlock}; use miden_protocol::note::Nullifier; -use miden_protocol::transaction::OutputNote; +use miden_protocol::transaction::{OutputNote, TransactionId}; use miden_standards::note::AccountTargetNetworkNote; /// Network-relevant state extracted from a committed [`SignedBlock`]. @@ -15,6 +15,10 @@ pub struct CommittedBlockEffects { pub network_notes: Vec, pub nullifiers: Vec, pub network_account_updates: Vec<(AccountId, AccountUpdateDetails)>, + /// Transaction id paired with the account it updated, for every transaction in the block. + /// `apply_committed_block` uses this to record the latest landed transaction per network + /// account so actors can confirm their own submitted transaction landed. + pub account_transactions: Vec<(AccountId, TransactionId)>, } impl CommittedBlockEffects { @@ -56,11 +60,19 @@ impl CommittedBlockEffects { }) .collect(); + let account_transactions = body + .transactions() + .as_slice() + .iter() + .map(|tx| (tx.account_id(), tx.id())) + .collect(); + Self { header, network_notes, nullifiers, network_account_updates, + account_transactions, } } } diff --git a/bin/ntx-builder/src/coordinator.rs b/bin/ntx-builder/src/coordinator.rs index 948c9fcba..ae4273e76 100644 --- a/bin/ntx-builder/src/coordinator.rs +++ b/bin/ntx-builder/src/coordinator.rs @@ -247,6 +247,7 @@ mod tests { network_notes: vec![note], nullifiers: vec![], network_account_updates: vec![], + account_transactions: vec![], }; coordinator.handle_committed_block(&effects); @@ -270,6 +271,7 @@ mod tests { updated_id, miden_protocol::account::delta::AccountUpdateDetails::Private, )], + account_transactions: vec![], }; coordinator.handle_committed_block(&effects); @@ -327,6 +329,7 @@ mod tests { network_notes: vec![note], nullifiers: vec![], network_account_updates: vec![], + account_transactions: vec![], }; coordinator.handle_committed_block(&effects); diff --git a/bin/ntx-builder/src/db/migrations.rs b/bin/ntx-builder/src/db/migrations.rs index 195301353..5e94e3bab 100644 --- a/bin/ntx-builder/src/db/migrations.rs +++ b/bin/ntx-builder/src/db/migrations.rs @@ -30,7 +30,7 @@ mod tests { use super::*; const EXPECTED_SCHEMA_HASHES: [SchemaHash; 1] = [SchemaHash::from_hex( - "892f3fb597808a97bdb55762a6ebd4b7941c855d22eb5e0d9b210901720e1125", + "f1197b13da306e5590e5d9fd06acdabcb2fd02277c2efc8eef4395a55700cfd7", )]; #[test] diff --git a/bin/ntx-builder/src/db/migrations/001_initial.sql b/bin/ntx-builder/src/db/migrations/001_initial.sql index 06b6a80d4..297188457 100644 --- a/bin/ntx-builder/src/db/migrations/001_initial.sql +++ b/bin/ntx-builder/src/db/migrations/001_initial.sql @@ -23,7 +23,11 @@ CREATE TABLE accounts ( -- AccountId serialized bytes (8 bytes). account_id BLOB NOT NULL PRIMARY KEY, -- Serialized Account state. - account_data BLOB NOT NULL + account_data BLOB NOT NULL, + -- TransactionId (32 bytes) of the latest transaction that updated this account in a committed + -- block. NULL until the first transaction lands. Actors compare their own submitted tx id + -- against this to confirm landing without an RPC roundtrip. + last_tx_id BLOB ) WITHOUT ROWID; -- Network notes targeting network accounts, plus backoff metadata used by the actor execution diff --git a/bin/ntx-builder/src/db/mod.rs b/bin/ntx-builder/src/db/mod.rs index 00c8f7de4..351a9ae14 100644 --- a/bin/ntx-builder/src/db/mod.rs +++ b/bin/ntx-builder/src/db/mod.rs @@ -8,6 +8,7 @@ use miden_protocol::account::AccountId; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::crypto::merkle::mmr::PartialMmr; use miden_protocol::note::{NoteId, NoteScript, Nullifier}; +use miden_protocol::transaction::TransactionId; use miden_standards::note::AccountTargetNetworkNote; use tracing::{info, instrument}; @@ -149,17 +150,12 @@ impl Db { .await } - /// Returns `true` when all the supplied nullifiers for `account_id` have been marked consumed - /// in a committed block. - pub async fn submitted_tx_landed( - &self, - account_id: AccountId, - nullifiers: Vec, - ) -> Result { + /// Returns the latest transaction recorded against `account_id` in a committed block, if any. + /// An actor waiting on its submission compares this against its own transaction id to confirm + /// landing. + pub async fn account_last_tx(&self, account_id: AccountId) -> Result> { self.inner - .query("submitted_tx_landed", move |conn| { - queries::submitted_tx_landed(conn, account_id, &nullifiers) - }) + .query("account_last_tx", move |conn| queries::account_last_tx(conn, account_id)) .await } diff --git a/bin/ntx-builder/src/db/models/conv.rs b/bin/ntx-builder/src/db/models/conv.rs index dfb8e9a31..c967c1360 100644 --- a/bin/ntx-builder/src/db/models/conv.rs +++ b/bin/ntx-builder/src/db/models/conv.rs @@ -5,6 +5,7 @@ use miden_protocol::Word; use miden_protocol::account::{Account, AccountId}; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::note::{NoteId, NoteScript, Nullifier}; +use miden_protocol::transaction::TransactionId; use miden_protocol::utils::serde::{Deserializable, Serializable}; // SERIALIZATION (domain → DB) @@ -30,6 +31,10 @@ pub fn note_id_to_bytes(note_id: &NoteId) -> Vec { note_id.to_bytes() } +pub fn transaction_id_to_bytes(tx_id: &TransactionId) -> Vec { + tx_id.to_bytes() +} + pub fn block_num_to_i64(block_num: BlockNumber) -> i64 { i64::from(block_num.as_u32()) } @@ -50,6 +55,11 @@ pub fn account_id_from_bytes(bytes: &[u8]) -> Result { AccountId::read_from_bytes(bytes).map_err(|e| DatabaseError::deserialization("account id", e)) } +pub fn transaction_id_from_bytes(bytes: &[u8]) -> Result { + TransactionId::read_from_bytes(bytes) + .map_err(|e| DatabaseError::deserialization("transaction id", e)) +} + pub fn word_to_bytes(word: &Word) -> Vec { word.to_bytes() } diff --git a/bin/ntx-builder/src/db/models/queries/accounts.rs b/bin/ntx-builder/src/db/models/queries/accounts.rs index b732c65d5..79c7da4ee 100644 --- a/bin/ntx-builder/src/db/models/queries/accounts.rs +++ b/bin/ntx-builder/src/db/models/queries/accounts.rs @@ -3,6 +3,7 @@ use diesel::prelude::*; use miden_node_db::DatabaseError; use miden_protocol::account::{Account, AccountId}; +use miden_protocol::transaction::TransactionId; use crate::db::models::conv as conversions; use crate::db::schema; @@ -28,13 +29,14 @@ pub struct AccountRow { // QUERIES // ================================================================================================ -/// Inserts or replaces the committed account state. +/// Inserts the committed account state, or updates `account_data` if the account already exists. /// /// # Raw SQL /// /// ```sql -/// INSERT OR REPLACE INTO accounts (account_id, account_data) +/// INSERT INTO accounts (account_id, account_data) /// VALUES (?1, ?2) +/// ON CONFLICT(account_id) DO UPDATE SET account_data = excluded.account_data /// ``` pub fn upsert_account( conn: &mut SqliteConnection, @@ -45,10 +47,61 @@ pub fn upsert_account( account_id: conversions::account_id_to_bytes(account_id), account_data: conversions::account_to_bytes(account), }; - diesel::replace_into(schema::accounts::table).values(&row).execute(conn)?; + diesel::insert_into(schema::accounts::table) + .values(&row) + .on_conflict(schema::accounts::account_id) + .do_update() + .set(schema::accounts::account_data.eq(&row.account_data)) + .execute(conn)?; Ok(()) } +/// Records `tx_id` as the latest transaction that updated `account_id`. No-ops for accounts not +/// tracked locally (i.e. non-network accounts), which never have a row in this table. +/// +/// # Raw SQL +/// +/// ```sql +/// UPDATE accounts SET last_tx_id = ?2 WHERE account_id = ?1 +/// ``` +pub fn set_account_last_tx( + conn: &mut SqliteConnection, + account_id: AccountId, + tx_id: TransactionId, +) -> Result<(), DatabaseError> { + let account_id_bytes = conversions::account_id_to_bytes(account_id); + let tx_id_bytes = conversions::transaction_id_to_bytes(&tx_id); + diesel::update(schema::accounts::table.find(&account_id_bytes)) + .set(schema::accounts::last_tx_id.eq(Some(tx_id_bytes))) + .execute(conn)?; + Ok(()) +} + +/// Returns the latest transaction recorded against `account_id`, if any. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT last_tx_id FROM accounts WHERE account_id = ?1 +/// ``` +pub fn account_last_tx( + conn: &mut SqliteConnection, + account_id: AccountId, +) -> Result, DatabaseError> { + let account_id_bytes = conversions::account_id_to_bytes(account_id); + + let last_tx_id: Option>> = schema::accounts::table + .find(&account_id_bytes) + .select(schema::accounts::last_tx_id) + .first(conn) + .optional()?; + + last_tx_id + .flatten() + .map(|bytes| conversions::transaction_id_from_bytes(&bytes)) + .transpose() +} + /// Returns the committed account state for the given network account. /// /// # Raw SQL diff --git a/bin/ntx-builder/src/db/models/queries/mod.rs b/bin/ntx-builder/src/db/models/queries/mod.rs index 01d8c21a1..da6152e0f 100644 --- a/bin/ntx-builder/src/db/models/queries/mod.rs +++ b/bin/ntx-builder/src/db/models/queries/mod.rs @@ -77,6 +77,12 @@ pub fn apply_committed_block( mark_notes_consumed(conn, &effects.nullifiers, effects.header.block_num())?; + // Record the latest landed transaction per account so waiting actors can confirm their own + // submission landed. Only updates rows already present. + for (account_id, tx_id) in &effects.account_transactions { + set_account_last_tx(conn, *account_id, *tx_id)?; + } + upsert_chain_state(conn, effects.header.block_num(), &effects.header, chain_mmr)?; Ok(affected_accounts.into_iter().collect()) diff --git a/bin/ntx-builder/src/db/models/queries/notes.rs b/bin/ntx-builder/src/db/models/queries/notes.rs index 3394d3e24..a344e25a0 100644 --- a/bin/ntx-builder/src/db/models/queries/notes.rs +++ b/bin/ntx-builder/src/db/models/queries/notes.rs @@ -193,32 +193,6 @@ pub fn accounts_with_pending_notes( .collect() } -/// Returns `true` if every one of `nullifiers` for `account_id` has been marked consumed -/// (`committed_at IS NOT NULL`), i.e. the actor's submitted transaction has landed in a committed -/// block. An empty `nullifiers` slice trivially returns `true`. -pub fn submitted_tx_landed( - conn: &mut SqliteConnection, - account_id: AccountId, - nullifiers: &[Nullifier], -) -> Result { - if nullifiers.is_empty() { - return Ok(true); - } - - let account_id_bytes = conversions::account_id_to_bytes(account_id); - let nullifier_bytes: Vec> = - nullifiers.iter().map(conversions::nullifier_to_bytes).collect(); - - let still_pending: i64 = schema::notes::table - .filter(schema::notes::account_id.eq(&account_id_bytes)) - .filter(schema::notes::nullifier.eq_any(&nullifier_bytes)) - .filter(schema::notes::committed_at.is_null()) - .count() - .get_result(conn)?; - - Ok(still_pending == 0) -} - // HELPERS // ================================================================================================ diff --git a/bin/ntx-builder/src/db/models/queries/tests.rs b/bin/ntx-builder/src/db/models/queries/tests.rs index 2e9a05541..37a2a3269 100644 --- a/bin/ntx-builder/src/db/models/queries/tests.rs +++ b/bin/ntx-builder/src/db/models/queries/tests.rs @@ -265,32 +265,47 @@ fn accounts_with_pending_notes_distinct_and_filters_consumed_and_capped() { // ================================================================================================ #[test] -fn submitted_tx_landed_detects_full_landing() { +fn account_last_tx_roundtrips_and_updates() { let (conn, _dir) = &mut test_conn(); let account_id = mock_network_account_id(); - let note_a = mock_single_target_note(account_id, 7); - let note_b = mock_single_target_note(account_id, 8); - insert_network_notes(conn, &[note_a.clone(), note_b.clone()]).unwrap(); - - let nullifiers = vec![note_a.as_note().nullifier(), note_b.as_note().nullifier()]; - - // Nothing consumed yet. - assert!(!submitted_tx_landed(conn, account_id, &nullifiers).unwrap()); + upsert_account(conn, account_id, &mock_account(account_id)).unwrap(); + + // No transaction recorded yet. + assert_eq!(account_last_tx(conn, account_id).unwrap(), None); + + // Record one, then overwrite it with a later one. + let first = mock_transaction_id(1); + let second = mock_transaction_id(2); + set_account_last_tx(conn, account_id, first).unwrap(); + assert_eq!(account_last_tx(conn, account_id).unwrap(), Some(first)); + set_account_last_tx(conn, account_id, second).unwrap(); + assert_eq!(account_last_tx(conn, account_id).unwrap(), Some(second)); +} - // Only one consumed. - mark_notes_consumed(conn, &[note_a.as_note().nullifier()], BlockNumber::from(3)).unwrap(); - assert!(!submitted_tx_landed(conn, account_id, &nullifiers).unwrap()); +#[test] +fn set_account_last_tx_noops_for_untracked_account() { + let (conn, _dir) = &mut test_conn(); + let account_id = mock_network_account_id(); - // Both consumed. - mark_notes_consumed(conn, &[note_b.as_note().nullifier()], BlockNumber::from(4)).unwrap(); - assert!(submitted_tx_landed(conn, account_id, &nullifiers).unwrap()); + // No row exists for this account; the update must affect nothing and not insert. + set_account_last_tx(conn, account_id, mock_transaction_id(1)).unwrap(); + assert_eq!(count_accounts(conn), 0); + assert_eq!(account_last_tx(conn, account_id).unwrap(), None); } #[test] -fn submitted_tx_landed_empty_nullifiers_is_trivially_true() { +fn upsert_account_preserves_last_tx_id() { let (conn, _dir) = &mut test_conn(); let account_id = mock_network_account_id(); - assert!(submitted_tx_landed(conn, account_id, &[]).unwrap()); + let account = mock_account(account_id); + + upsert_account(conn, account_id, &account).unwrap(); + let tx_id = mock_transaction_id(7); + set_account_last_tx(conn, account_id, tx_id).unwrap(); + + // A subsequent account-state upsert must not clobber the recorded transaction id. + upsert_account(conn, account_id, &account).unwrap(); + assert_eq!(account_last_tx(conn, account_id).unwrap(), Some(tx_id)); } #[test] diff --git a/bin/ntx-builder/src/db/schema.rs b/bin/ntx-builder/src/db/schema.rs index dc41d6176..a1c8b5d12 100644 --- a/bin/ntx-builder/src/db/schema.rs +++ b/bin/ntx-builder/src/db/schema.rs @@ -4,6 +4,7 @@ diesel::table! { accounts (account_id) { account_id -> Binary, account_data -> Binary, + last_tx_id -> Nullable, } } diff --git a/bin/ntx-builder/src/lib.rs b/bin/ntx-builder/src/lib.rs index 422eaee51..8d4504fcf 100644 --- a/bin/ntx-builder/src/lib.rs +++ b/bin/ntx-builder/src/lib.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroUsize; +use std::num::{NonZeroU16, NonZeroUsize}; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -90,7 +90,7 @@ const DEFAULT_MAX_TX_CYCLES: u32 = 1 << 19; /// /// Used both as the on-chain transaction expiration delta and as the local retry timeout an actor /// waits in `WaitForBlock` before resubmitting. Must be within the kernel's `1..=u16::MAX` range. -const DEFAULT_TX_EXPIRATION_DELTA: u16 = 30; +const DEFAULT_TX_EXPIRATION_DELTA: NonZeroU16 = NonZeroU16::new(30).unwrap(); // CONFIGURATION // ================================================================================================= @@ -150,7 +150,7 @@ pub struct NtxBuilderConfig { /// Number of blocks after which a submitted network transaction expires. Set as the on-chain /// transaction expiration delta and reused as the local `WaitForBlock` retry timeout. Must be /// within `1..=u16::MAX` (enforced by the transaction kernel). - pub tx_expiration_delta: u16, + pub tx_expiration_delta: NonZeroU16, /// Initial sleep applied between per-request retries on transient infrastructure failures (e.g. /// prover unreachable, RPC crash, transport error, RPC gRPC hiccup). Doubles on each retry up @@ -285,7 +285,7 @@ impl NtxBuilderConfig { /// Sets the transaction expiration delta (in blocks). Also bounds the actor's `WaitForBlock` /// retry timeout. #[must_use] - pub fn with_tx_expiration_delta(mut self, delta: u16) -> Self { + pub fn with_tx_expiration_delta(mut self, delta: NonZeroU16) -> Self { self.tx_expiration_delta = delta; self } diff --git a/bin/ntx-builder/src/test_utils.rs b/bin/ntx-builder/src/test_utils.rs index 14956b793..709168b0f 100644 --- a/bin/ntx-builder/src/test_utils.rs +++ b/bin/ntx-builder/src/test_utils.rs @@ -7,6 +7,7 @@ use miden_protocol::testing::account_id::{ ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE, AccountIdBuilder, }; +use miden_protocol::transaction::TransactionId; use miden_standards::note::{AccountTargetNetworkNote, NetworkAccountTarget, NoteExecutionHint}; use miden_standards::testing::note::NoteBuilder; use rand_chacha::ChaCha20Rng; @@ -17,6 +18,11 @@ pub fn mock_network_account_id() -> AccountId { ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE.try_into().unwrap() } +/// Creates a distinct [`TransactionId`] from a seed, for landing-detection tests. +pub fn mock_transaction_id(seed: u32) -> TransactionId { + TransactionId::from_raw(Word::from([seed, 0, 0, 0])) +} + /// Creates a distinct network account ID using a seeded RNG. pub fn mock_network_account_id_seeded(seed: u8) -> AccountId { AccountIdBuilder::new() From 93c0d74ea22c991b8d8e6b9fc27319f542de1a68 Mon Sep 17 00:00:00 2001 From: Santiago Pittella <87827390+SantiagoPittella@users.noreply.github.com> Date: Fri, 29 May 2026 10:05:08 -0300 Subject: [PATCH 3/4] review: improve trace message Co-authored-by: Mirko <48352201+Mirko-von-Leipzig@users.noreply.github.com> --- bin/ntx-builder/src/actor/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/ntx-builder/src/actor/mod.rs b/bin/ntx-builder/src/actor/mod.rs index ba2212731..8c9078f09 100644 --- a/bin/ntx-builder/src/actor/mod.rs +++ b/bin/ntx-builder/src/actor/mod.rs @@ -374,7 +374,7 @@ impl AccountActor { %submitted_at, current_tip = %chain_tip, delta = self.config.tx_expiration_delta, - "submitted tx not landed within expiration delta; retrying", + "submitted transaction expired", ); return Ok(ActorMode::NotesAvailable); } From 3db92c15ab1e28d78364dadd20efdfce7dffd797 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Fri, 29 May 2026 14:06:46 -0300 Subject: [PATCH 4/4] review: always store tx id --- bin/ntx-builder/src/db/migrations.rs | 2 +- .../src/db/migrations/001_initial.sql | 5 +- .../src/db/models/queries/accounts.rs | 46 +++++++------------ bin/ntx-builder/src/db/models/queries/mod.rs | 20 ++++---- .../src/db/models/queries/tests.rs | 36 ++++----------- bin/ntx-builder/src/db/schema.rs | 2 +- 6 files changed, 41 insertions(+), 70 deletions(-) diff --git a/bin/ntx-builder/src/db/migrations.rs b/bin/ntx-builder/src/db/migrations.rs index 5e94e3bab..8b2d25bb0 100644 --- a/bin/ntx-builder/src/db/migrations.rs +++ b/bin/ntx-builder/src/db/migrations.rs @@ -30,7 +30,7 @@ mod tests { use super::*; const EXPECTED_SCHEMA_HASHES: [SchemaHash; 1] = [SchemaHash::from_hex( - "f1197b13da306e5590e5d9fd06acdabcb2fd02277c2efc8eef4395a55700cfd7", + "e7383731af6f594a2f84ea8c3863325f0219899cff13e1396630c4ea8fed8157", )]; #[test] diff --git a/bin/ntx-builder/src/db/migrations/001_initial.sql b/bin/ntx-builder/src/db/migrations/001_initial.sql index 297188457..ad952d3d2 100644 --- a/bin/ntx-builder/src/db/migrations/001_initial.sql +++ b/bin/ntx-builder/src/db/migrations/001_initial.sql @@ -25,9 +25,10 @@ CREATE TABLE accounts ( -- Serialized Account state. account_data BLOB NOT NULL, -- TransactionId (32 bytes) of the latest transaction that updated this account in a committed - -- block. NULL until the first transaction lands. Actors compare their own submitted tx id + -- block. Always set: an account row is created from the block that created the account, whose + -- creation transaction is the first value here. Actors compare their own submitted tx id -- against this to confirm landing without an RPC roundtrip. - last_tx_id BLOB + last_tx_id BLOB NOT NULL ) WITHOUT ROWID; -- Network notes targeting network accounts, plus backoff metadata used by the actor execution diff --git a/bin/ntx-builder/src/db/models/queries/accounts.rs b/bin/ntx-builder/src/db/models/queries/accounts.rs index 79c7da4ee..8cf9b6d7c 100644 --- a/bin/ntx-builder/src/db/models/queries/accounts.rs +++ b/bin/ntx-builder/src/db/models/queries/accounts.rs @@ -17,6 +17,7 @@ use crate::db::schema; pub struct AccountInsert { pub account_id: Vec, pub account_data: Vec, + pub last_tx_id: Vec, } #[derive(Debug, Clone, Queryable, Selectable)] @@ -29,55 +30,43 @@ pub struct AccountRow { // QUERIES // ================================================================================================ -/// Inserts the committed account state, or updates `account_data` if the account already exists. +/// Inserts the committed account state, or updates an existing account's state. In both cases +/// `last_tx_id` is set to the transaction that produced this update. /// /// # Raw SQL /// /// ```sql -/// INSERT INTO accounts (account_id, account_data) -/// VALUES (?1, ?2) -/// ON CONFLICT(account_id) DO UPDATE SET account_data = excluded.account_data +/// INSERT INTO accounts (account_id, account_data, last_tx_id) +/// VALUES (?1, ?2, ?3) +/// ON CONFLICT(account_id) DO UPDATE SET +/// account_data = excluded.account_data, +/// last_tx_id = excluded.last_tx_id /// ``` pub fn upsert_account( conn: &mut SqliteConnection, account_id: AccountId, account: &Account, + last_tx_id: TransactionId, ) -> Result<(), DatabaseError> { let row = AccountInsert { account_id: conversions::account_id_to_bytes(account_id), account_data: conversions::account_to_bytes(account), + last_tx_id: conversions::transaction_id_to_bytes(&last_tx_id), }; diesel::insert_into(schema::accounts::table) .values(&row) .on_conflict(schema::accounts::account_id) .do_update() - .set(schema::accounts::account_data.eq(&row.account_data)) + .set(( + schema::accounts::account_data.eq(&row.account_data), + schema::accounts::last_tx_id.eq(&row.last_tx_id), + )) .execute(conn)?; Ok(()) } -/// Records `tx_id` as the latest transaction that updated `account_id`. No-ops for accounts not -/// tracked locally (i.e. non-network accounts), which never have a row in this table. -/// -/// # Raw SQL -/// -/// ```sql -/// UPDATE accounts SET last_tx_id = ?2 WHERE account_id = ?1 -/// ``` -pub fn set_account_last_tx( - conn: &mut SqliteConnection, - account_id: AccountId, - tx_id: TransactionId, -) -> Result<(), DatabaseError> { - let account_id_bytes = conversions::account_id_to_bytes(account_id); - let tx_id_bytes = conversions::transaction_id_to_bytes(&tx_id); - diesel::update(schema::accounts::table.find(&account_id_bytes)) - .set(schema::accounts::last_tx_id.eq(Some(tx_id_bytes))) - .execute(conn)?; - Ok(()) -} - -/// Returns the latest transaction recorded against `account_id`, if any. +/// Returns the latest transaction recorded against `account_id`, or `None` if the account is not +/// tracked locally. /// /// # Raw SQL /// @@ -90,14 +79,13 @@ pub fn account_last_tx( ) -> Result, DatabaseError> { let account_id_bytes = conversions::account_id_to_bytes(account_id); - let last_tx_id: Option>> = schema::accounts::table + let last_tx_id: Option> = schema::accounts::table .find(&account_id_bytes) .select(schema::accounts::last_tx_id) .first(conn) .optional()?; last_tx_id - .flatten() .map(|bytes| conversions::transaction_id_from_bytes(&bytes)) .transpose() } diff --git a/bin/ntx-builder/src/db/models/queries/mod.rs b/bin/ntx-builder/src/db/models/queries/mod.rs index da6152e0f..39be6f879 100644 --- a/bin/ntx-builder/src/db/models/queries/mod.rs +++ b/bin/ntx-builder/src/db/models/queries/mod.rs @@ -1,6 +1,6 @@ //! Database query functions for the NTX builder. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use diesel::prelude::*; use miden_node_db::DatabaseError; @@ -48,13 +48,21 @@ pub fn apply_committed_block( ) -> Result, DatabaseError> { let mut affected_accounts: HashSet = HashSet::new(); + // The latest transaction in this block per account. Every committed account update originates + // from a transaction in the same block, so each upserted account has an entry here. Collecting + // into a map keeps the last transaction per account (block order is preserved). + let last_tx: HashMap = effects.account_transactions.iter().copied().collect(); + for (account_id, details) in &effects.network_account_updates { let Some(effect) = NetworkAccountEffect::from_protocol(details) else { continue; }; + let last_tx_id = *last_tx + .get(account_id) + .expect("a committed account update must originate from a transaction in the block"); match effect { NetworkAccountEffect::Created(account) => { - upsert_account(conn, *account_id, &account)?; + upsert_account(conn, *account_id, &account, last_tx_id)?; }, NetworkAccountEffect::Updated(delta) => { // If the account is not already tracked locally, skip it. @@ -64,7 +72,7 @@ pub fn apply_committed_block( current .apply_delta(&delta) .expect("network account delta should apply since the block was committed"); - upsert_account(conn, *account_id, ¤t)?; + upsert_account(conn, *account_id, ¤t, last_tx_id)?; }, } affected_accounts.insert(*account_id); @@ -77,12 +85,6 @@ pub fn apply_committed_block( mark_notes_consumed(conn, &effects.nullifiers, effects.header.block_num())?; - // Record the latest landed transaction per account so waiting actors can confirm their own - // submission landed. Only updates rows already present. - for (account_id, tx_id) in &effects.account_transactions { - set_account_last_tx(conn, *account_id, *tx_id)?; - } - upsert_chain_state(conn, effects.header.block_num(), &effects.header, chain_mmr)?; Ok(affected_accounts.into_iter().collect()) diff --git a/bin/ntx-builder/src/db/models/queries/tests.rs b/bin/ntx-builder/src/db/models/queries/tests.rs index 37a2a3269..b67f9debb 100644 --- a/bin/ntx-builder/src/db/models/queries/tests.rs +++ b/bin/ntx-builder/src/db/models/queries/tests.rs @@ -41,8 +41,8 @@ fn upsert_account_replaces_existing_row() { let account_id = mock_network_account_id(); let account = mock_account(account_id); - upsert_account(conn, account_id, &account).unwrap(); - upsert_account(conn, account_id, &account).unwrap(); + upsert_account(conn, account_id, &account, mock_transaction_id(1)).unwrap(); + upsert_account(conn, account_id, &account, mock_transaction_id(2)).unwrap(); assert_eq!(count_accounts(conn), 1, "second upsert must overwrite, not insert"); assert!(get_account(conn, account_id).unwrap().is_some()); @@ -268,46 +268,26 @@ fn accounts_with_pending_notes_distinct_and_filters_consumed_and_capped() { fn account_last_tx_roundtrips_and_updates() { let (conn, _dir) = &mut test_conn(); let account_id = mock_network_account_id(); - upsert_account(conn, account_id, &mock_account(account_id)).unwrap(); - - // No transaction recorded yet. - assert_eq!(account_last_tx(conn, account_id).unwrap(), None); + let account = mock_account(account_id); - // Record one, then overwrite it with a later one. + // The first upsert records its transaction id; a later upsert overwrites it. let first = mock_transaction_id(1); let second = mock_transaction_id(2); - set_account_last_tx(conn, account_id, first).unwrap(); + upsert_account(conn, account_id, &account, first).unwrap(); assert_eq!(account_last_tx(conn, account_id).unwrap(), Some(first)); - set_account_last_tx(conn, account_id, second).unwrap(); + upsert_account(conn, account_id, &account, second).unwrap(); assert_eq!(account_last_tx(conn, account_id).unwrap(), Some(second)); } #[test] -fn set_account_last_tx_noops_for_untracked_account() { +fn account_last_tx_returns_none_for_untracked_account() { let (conn, _dir) = &mut test_conn(); let account_id = mock_network_account_id(); - // No row exists for this account; the update must affect nothing and not insert. - set_account_last_tx(conn, account_id, mock_transaction_id(1)).unwrap(); - assert_eq!(count_accounts(conn), 0); + // No row exists for this account. assert_eq!(account_last_tx(conn, account_id).unwrap(), None); } -#[test] -fn upsert_account_preserves_last_tx_id() { - let (conn, _dir) = &mut test_conn(); - let account_id = mock_network_account_id(); - let account = mock_account(account_id); - - upsert_account(conn, account_id, &account).unwrap(); - let tx_id = mock_transaction_id(7); - set_account_last_tx(conn, account_id, tx_id).unwrap(); - - // A subsequent account-state upsert must not clobber the recorded transaction id. - upsert_account(conn, account_id, &account).unwrap(); - assert_eq!(account_last_tx(conn, account_id).unwrap(), Some(tx_id)); -} - #[test] fn notes_failed_increments_attempt_and_records_error() { let (conn, _dir) = &mut test_conn(); diff --git a/bin/ntx-builder/src/db/schema.rs b/bin/ntx-builder/src/db/schema.rs index a1c8b5d12..6c5151fdd 100644 --- a/bin/ntx-builder/src/db/schema.rs +++ b/bin/ntx-builder/src/db/schema.rs @@ -4,7 +4,7 @@ diesel::table! { accounts (account_id) { account_id -> Binary, account_data -> Binary, - last_tx_id -> Nullable, + last_tx_id -> Binary, } }