diff --git a/bin/ntx-builder/src/actor/mod.rs b/bin/ntx-builder/src/actor/mod.rs index 803cdccbe..affc33214 100644 --- a/bin/ntx-builder/src/actor/mod.rs +++ b/bin/ntx-builder/src/actor/mod.rs @@ -9,7 +9,6 @@ use std::time::Duration; use allowlist::{NoteScriptNotAllowlisted, partition_by_allowlist}; use anyhow::Context; use candidate::TransactionCandidate; -use futures::FutureExt; use miden_node_utils::ErrorReport; use miden_node_utils::lru_cache::LruCache; use miden_protocol::Word; @@ -231,93 +230,29 @@ impl AccountActor { /// /// The return value signals the shutdown category to the coordinator: /// - /// - `Ok(())`: intentional shutdown (idle timeout or account removal). - /// - `Err(_)`: crash (database error, semaphore failure, or any other bug). - pub async fn run(self, semaphore: Arc) -> anyhow::Result<()> { + /// - `Ok(())`: intentional shutdown (idle timeout or account not committed in time). + /// - `Err(_)`: crash (database error or any other bug). + pub async fn run(self, _semaphore: Arc) -> anyhow::Result<()> { let account_id = self.account_id; // Wait for the account to be committed to the DB. For newly created accounts, the creation - // transaction must be committed before we start processing notes. + // transaction must be committed before the actor becomes active. if !self.wait_for_committed_account(account_id).await? { return Ok(()); } - // Determine initial mode by checking DB for available notes. - let block_num = self.state.chain.chain_tip_block_number(); - let has_notes = self - .state - .db - .has_available_notes(account_id, block_num, self.config.max_note_attempts) - .await - .context("failed to check for available notes")?; - - let mut mode = if has_notes { - ActorMode::NotesAvailable - } else { - ActorMode::NoViableNotes - }; - loop { - // Enable or disable transaction execution based on actor mode. - let tx_permit_acquisition = match mode { - // Disable transaction execution. - ActorMode::NoViableNotes | ActorMode::TransactionInflight(_) => { - std::future::pending().boxed() - }, - // Enable transaction execution. - ActorMode::NotesAvailable => semaphore.acquire().boxed(), - }; - - // Idle timeout timer: only ticks when in NoViableNotes mode. Mode changes cause the - // next loop iteration to create a fresh sleep or pending. - let idle_timeout_sleep = match mode { - ActorMode::NoViableNotes => tokio::time::sleep(self.config.idle_timeout).boxed(), - _ => std::future::pending().boxed(), - }; - tokio::select! { - // Handle coordinator notifications. On notification, re-evaluate state from DB. + // A committed block touched this account (or the coordinator woke everyone). PR 3 + // reconnects transaction execution here. _ = self.notify.notified() => { - match mode { - ActorMode::TransactionInflight(awaited_id) => { - // Check DB: is the inflight tx still pending? - let exists = self - .state - .db - .transaction_exists(awaited_id) - .await - .context("failed to check transaction status")?; - if exists { - mode = ActorMode::NotesAvailable; - } - }, - _ => { - mode = ActorMode::NotesAvailable; - } - } - }, - // Execute transactions. - permit = tx_permit_acquisition => { - let _permit = permit.context("semaphore closed")?; - - // Read the chain state. - let chain_state = self.state.chain.get_cloned(); - - // Query DB for latest account and available notes. - let tx_candidate = self.select_candidate_from_db( - account_id, - chain_state, - ).await?; - - if let Some(tx_candidate) = tx_candidate { - mode = self.execute_transactions(account_id, tx_candidate).await; - } else { - // No transactions to execute, wait for notifications. - mode = ActorMode::NoViableNotes; - } + tracing::debug!( + %account_id, + "actor notified; transaction execution reconnects in PR 3", + ); } - // Idle timeout: actor has been idle too long, deactivate account. - _ = idle_timeout_sleep => { + // Idle timeout: actor has been idle too long, deactivate. + () = tokio::time::sleep(self.config.idle_timeout) => { tracing::info!(%account_id, "Account actor deactivated due to idle timeout"); return Ok(()); } @@ -385,8 +320,8 @@ impl AccountActor { /// For accounts that are being created by an inflight transaction, this will idle /// until the transaction is committed. Returns `true` when the account is ready, or /// `false` if no commit arrived within [`ActorConfig::idle_timeout`] — in which case - /// the coordinator will respawn a new actor when committed-chain processing or the account - /// loader observes the account again. + /// the coordinator will respawn a new actor when a later committed block targets the + /// account again. async fn wait_for_committed_account(&self, account_id: AccountId) -> anyhow::Result { // Check if the account is already committed. if self @@ -573,47 +508,6 @@ fn log_failed_notes(failed: Vec) -> Vec<(Nullifier, NoteError)> { #[cfg(test)] mod tests { - - use std::sync::Arc; - - use tokio::sync::{Notify, mpsc}; - - use super::*; - - const OTHER_NOTE_SCRIPT: &str = "\ -@note_script -pub proc main - push.1 drop -end"; - - async fn ack_actor_requests(mut rx: mpsc::Receiver, db: Db) { - while let Some(request) = rx.recv().await { - match request { - ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => { - db.notes_failed(failed_notes, block_num) - .await - .expect("test DB write should succeed"); - let _ = ack_tx.send(()); - }, - ActorRequest::CacheNoteScript { .. } => {}, - } - } - } - - fn actor_with_request_handler( - db: &Db, - account_id: AccountId, - ) -> (AccountActor, AccountActorContext) { - let (request_tx, request_rx) = mpsc::channel(8); - let mut context = AccountActorContext::test(db); - context.request_tx = request_tx; - tokio::spawn(ack_actor_requests(request_rx, db.clone())); - - let actor = AccountActor::new(account_id, &context, Arc::new(Notify::new())); - - (actor, context) - } - #[tokio::test] #[ignore = "wip refactor"] async fn select_candidate_keeps_allowlisted_notes() { diff --git a/bin/ntx-builder/src/builder.rs b/bin/ntx-builder/src/builder.rs index 7781e891e..1d024176f 100644 --- a/bin/ntx-builder/src/builder.rs +++ b/bin/ntx-builder/src/builder.rs @@ -1,19 +1,32 @@ use std::pin::Pin; +use std::sync::Arc; use anyhow::Context; use futures::Stream; use miden_protocol::block::{BlockNumber, SignedBlock}; use tokio::net::TcpListener; +use tokio::sync::mpsc; use tokio::task::JoinSet; use tokio_stream::StreamExt; use crate::NtxBuilderConfig; -use crate::chain_state::ChainState; +use crate::actor::ActorRequest; +use crate::chain_state::SharedChainState; use crate::clients::RpcError; use crate::committed_block::CommittedBlockEffects; -use crate::db::Db; +use crate::coordinator::Coordinator; +use crate::db::{Db, LoopDb}; use crate::server::NtxBuilderRpcServer; +/// Discriminator returned by the steady-state `select!` so the dispatch can run on a fully-owned +/// `&mut self` instead of three concurrent borrows. The `Block` variant is boxed since a +/// `SignedBlock` dwarfs the other two payloads. +enum SteadyStateAction { + Block(Box>>), + Request(Option), + Respawn(Option), +} + // NETWORK TRANSACTION BUILDER // ================================================================================================ @@ -27,10 +40,16 @@ pub(crate) type BlockStream = /// Network transaction builder component. /// -/// The builder consumes the RPC committed-block subscription and applies each block's -/// network-relevant effects to its local database. The actor execution path is currently unwired; -/// the builder keeps the local DB caught up to the live chain tip without scheduling any network -/// transactions. +/// Runs in three phases: +/// 1. **Catch-up**: drain the committed-block subscription, applying each block to the local DB +/// and in-memory chain, until the local tip matches the node-reported `committed_chain_tip` +/// (signaled by `is_synced` flipping to `true`). No actors run. +/// 2. **Boundary**: query the DB for accounts with carry-over pending notes (e.g. from a previous +/// process) and spawn an actor for each. +/// 3. **Steady-state**: on every subsequent committed block, apply the effects, advance the chain, +/// and have the coordinator spawn-if-missing for newly-targeted accounts then wake every active +/// actor. Concurrently drain actor requests (`NotesFailed`, `CacheNoteScript`) so the actors' +/// DB writes happen serialized through the builder. pub struct NetworkTransactionBuilder { /// Configuration for the builder. config: NtxBuilderConfig, @@ -40,13 +59,15 @@ pub struct NetworkTransactionBuilder { block_stream: BlockStream, /// Highest block number applied to the DB so far. last_applied_block: BlockNumber, - /// In-memory partial chain (tip header + chain MMR + tracked recent headers). Persisted - /// alongside each block in the DB so the builder can resume without replaying genesis on - /// restart. - chain: ChainState, + /// In-memory partial chain shared with every spawned actor through the coordinator. + chain: Arc, + /// Lifecycle owner for `AccountActor` instances. + coordinator: Coordinator, + /// Channel receiving DB-side requests (note-failed bookkeeping, script-cache persistence) from + /// spawned actors. Drained in the steady-state loop so writes happen through the builder. + actor_request_rx: mpsc::Receiver, /// `false` until the first applied block whose `committed_chain_tip` matches the just-applied - /// block number. Stays `true` afterwards. Exposed so the gRPC status surface and future actor - /// spawn gating can read it. + /// block number. Stays `true` afterwards. is_synced: bool, } @@ -56,7 +77,9 @@ impl NetworkTransactionBuilder { db: Db, block_stream: BlockStream, last_applied_block: BlockNumber, - chain: ChainState, + chain: Arc, + coordinator: Coordinator, + actor_request_rx: mpsc::Receiver, ) -> Self { Self { config, @@ -64,6 +87,8 @@ impl NetworkTransactionBuilder { block_stream, last_applied_block, chain, + coordinator, + actor_request_rx, is_synced: false, } } @@ -75,10 +100,6 @@ impl NetworkTransactionBuilder { } /// Runs the network transaction builder event loop until a fatal error occurs. - /// - /// 1. Starts the gRPC server for note status queries. - /// 2. Continuously drains the committed-block subscription, applying each block's effects to - /// the local DB. pub async fn run(self, listener: TcpListener) -> anyhow::Result<()> { let mut join_set = JoinSet::new(); @@ -100,16 +121,19 @@ impl NetworkTransactionBuilder { } async fn run_event_loop(mut self) -> anyhow::Result<()> { - // First sync up to the chain tip. + // Pin a dedicated connection for the loop's DB writes so block application is never starved + // by the account actors competing for the shared pool. + let loop_db = self + .db + .pin_loop_connection() + .await + .context("failed to pin a database connection for the ntx-builder event loop")?; + + // Phase 1: catch-up. loop { - let (block, committed_tip) = self - .block_stream - .next() - .await - .context("block stream ended")? - .context("block stream failed")?; + let (block, committed_tip) = self.next_block().await?; let local_tip = block.header().block_num(); - self.apply_committed_block(block, committed_tip).await?; + self.apply_committed_block(&loop_db, block, committed_tip).await?; if local_tip == committed_tip { self.is_synced = true; @@ -118,31 +142,100 @@ impl NetworkTransactionBuilder { } } - // Spawn and handle network account actors, and apply new blocks. + // Phase 2: spawn an actor for every account with carry-over pending notes. + let pending_accounts = loop_db + .accounts_with_pending_notes(self.config.max_note_attempts) + .await + .context("failed to load accounts with pending notes at catch-up")?; + tracing::info!( + num_accounts = pending_accounts.len(), + "spawning actors for accounts with carry-over pending notes", + ); + for account_id in pending_accounts { + self.coordinator.spawn_actor(account_id); + } + + // Phase 3: drive actors per committed block, plus serialize their DB writes. loop { - let (block, committed_tip) = self - .block_stream - .next() - .await - .context("block stream ended")? - .context("block stream failed")?; - self.apply_committed_block(block, committed_tip).await?; + // Split `&mut self` into disjoint borrows so each `select!` arm holds only the one + // field it polls. The action is materialised and self is released before the body + // dispatches the work via the regular `&mut self` methods. + let action = { + let block_stream = &mut self.block_stream; + let actor_request_rx = &mut self.actor_request_rx; + let coordinator = &mut self.coordinator; + + tokio::select! { + block = block_stream.next() => SteadyStateAction::Block(Box::new(block)), + request = actor_request_rx.recv() => SteadyStateAction::Request(request), + respawn = coordinator.next() => SteadyStateAction::Respawn(respawn?), + } + }; + + match action { + SteadyStateAction::Block(block) => { + let (block, committed_tip) = + (*block).context("block stream ended")?.context("block stream failed")?; + let effects = self + .apply_committed_block_with_effects(&loop_db, block, committed_tip) + .await?; + self.coordinator.handle_committed_block(&effects); + }, + SteadyStateAction::Request(request) => { + let Some(request) = request else { + anyhow::bail!("actor request channel closed unexpectedly"); + }; + handle_actor_request(&loop_db, request).await?; + }, + SteadyStateAction::Respawn(respawn) => { + if let Some(account_id) = respawn { + tracing::info!( + account.id = %account_id, + "respawning actor that shut down with a pending notification", + ); + self.coordinator.spawn_actor(account_id); + } + }, + } } } - /// Applies a single committed block's effects to the DB, advances the in-memory partial chain, - /// persists the updated chain MMR atomically with the effects, and flips `is_synced` the first - /// time the applied block matches the node-reported committed tip. + /// Pulls the next `(block, committed_tip)` pair from the subscription, surfacing both the + /// "stream ended" and per-item RPC errors as `anyhow::Error`. + async fn next_block(&mut self) -> anyhow::Result<(SignedBlock, BlockNumber)> { + self.block_stream + .next() + .await + .context("block stream ended")? + .context("block stream failed") + } + + /// Applies a committed block without surfacing the computed effects. + async fn apply_committed_block( + &mut self, + loop_db: &LoopDb, + block: SignedBlock, + committed_tip: BlockNumber, + ) -> anyhow::Result<()> { + self.apply_committed_block_with_effects(loop_db, block, committed_tip) + .await + .map(drop) + } + + /// Applies a committed block and returns the computed `CommittedBlockEffects` so the + /// steady-state loop can hand them to the coordinator without re-deriving from the signed + /// block. #[tracing::instrument( name = "ntx.builder.apply_committed_block", - skip(self, block), + skip(self, loop_db, block), fields(block_num = %block.header().block_num(), %committed_tip), )] - async fn apply_committed_block( + async fn apply_committed_block_with_effects( &mut self, + loop_db: &LoopDb, block: SignedBlock, committed_tip: BlockNumber, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let header = block.header().clone(); let block_num = header.block_num(); @@ -153,13 +246,34 @@ impl NetworkTransactionBuilder { self.chain.update_chain_tip(header, self.config.max_block_count); let next_mmr = self.chain.current_mmr(); - self.db - .apply_committed_block(effects, next_mmr) + loop_db + .apply_committed_block(effects.clone(), next_mmr) .await .context("failed to apply committed block to DB")?; self.last_applied_block = block_num; - Ok(()) + Ok(effects) + } +} + +/// Handles a single actor request then acknowledges the actor. Runs on the pinned loop connection +/// so the actors' shared pool cannot starve these writes. +async fn handle_actor_request(loop_db: &LoopDb, request: ActorRequest) -> anyhow::Result<()> { + match request { + ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => { + loop_db + .notes_failed(failed_notes, block_num) + .await + .context("failed to persist note failure")?; + let _ = ack_tx.send(()); + }, + ActorRequest::CacheNoteScript { script_root, script } => { + loop_db + .insert_note_script(script_root, &script) + .await + .context("failed to cache note script")?; + }, } + Ok(()) } diff --git a/bin/ntx-builder/src/chain_state.rs b/bin/ntx-builder/src/chain_state.rs index b1105c724..49025c0a2 100644 --- a/bin/ntx-builder/src/chain_state.rs +++ b/bin/ntx-builder/src/chain_state.rs @@ -47,11 +47,6 @@ impl ChainState { (self.chain_tip_header, self.chain_mmr) } - /// Returns the current chain tip header. - pub(crate) fn chain_tip_header(&self) -> &BlockHeader { - &self.chain_tip_header - } - /// Returns a clone of the current partial chain MMR. pub(crate) fn current_mmr(&self) -> PartialMmr { self.chain_mmr.mmr().clone() @@ -94,10 +89,18 @@ impl SharedChainState { Self(RwLock::new(ChainState::new(chain_tip_header, chain_mmr))) } + // Read by the actor execution path, which is unwired until PR 3. + #[expect(dead_code)] pub(crate) fn chain_tip_block_number(&self) -> BlockNumber { self.0.read().expect("chain state lock poisoned").chain_tip_header.block_num() } + /// Returns a clone of the current partial chain MMR. Cheap enough for per-block persistence + /// since the MMR is bounded by `max_block_count` headers. + pub(crate) fn current_mmr(&self) -> PartialMmr { + self.0.read().expect("chain state lock poisoned").current_mmr() + } + pub(crate) fn update_chain_tip(&self, tip: BlockHeader, max_block_count: usize) { self.0 .write() @@ -105,6 +108,8 @@ impl SharedChainState { .update_chain_tip(tip, max_block_count); } + // Read by the actor execution path (candidate selection), which is unwired until PR 3. + #[expect(dead_code)] pub(crate) fn get_cloned(&self) -> ChainState { self.0.read().expect("chain state lock poisoned").clone() } diff --git a/bin/ntx-builder/src/coordinator.rs b/bin/ntx-builder/src/coordinator.rs index acb86ff99..948c9fcba 100644 --- a/bin/ntx-builder/src/coordinator.rs +++ b/bin/ntx-builder/src/coordinator.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use miden_protocol::account::AccountId; @@ -6,6 +6,7 @@ use tokio::sync::{Notify, Semaphore}; use tokio::task::JoinSet; use crate::actor::{AccountActor, AccountActorContext}; +use crate::committed_block::CommittedBlockEffects; // ACTOR HANDLE // ================================================================================================ @@ -49,65 +50,35 @@ impl ActorHandle { // COORDINATOR // ================================================================================================ -/// Coordinator for managing [`AccountActor`] instances, tasks, and notifications. +/// Lifecycle owner for [`AccountActor`] instances driven by committed blocks. /// -/// The `Coordinator` is the central orchestrator of the network transaction builder system. -/// It manages the lifecycle of account actors. Each actor is responsible for handling transactions -/// for a specific network account. The coordinator provides the following core -/// functionality: +/// The coordinator owns the actor-side context (gRPC clients, shared chain state, script cache, +/// per-actor config), the actor task join set, and a registry mapping each network account to a +/// notify handle. The builder calls into the coordinator at two moments: /// -/// ## Actor Management -/// - Spawns new [`AccountActor`] instances for network accounts as needed. -/// - Maintains a registry of active actors with their notification handles. -/// - Gracefully handles actor shutdown and cleanup when actors complete or fail. -/// - Monitors actor tasks through a join set to detect completion or errors. +/// 1. At the catch-up boundary, to spawn one actor per account returned by +/// `Db::accounts_with_pending_notes()`. +/// 2. On every committed block in steady state, via [`Coordinator::handle_committed_block`], which +/// spawns missing actors for accounts that just received new network notes and wakes every +/// active actor so it can re-evaluate its state from the DB. /// -/// ## Notification -/// - Notifies actors via a shared [`Notify`] when state may have changed. -/// - The DB is the source of truth: actors re-evaluate their state from DB on notification. -/// - Notifications are coalesced: [`Notify`] stores at most one permit, so multiple notifications -/// while an actor is busy result in a single wake-up. -/// -/// ## Resource Management -/// - Controls transaction concurrency across all network accounts using a semaphore. -/// - Prevents resource exhaustion by limiting simultaneous transaction processing. -/// -/// ## Actor Lifecycle -/// - Actors that have been idle for longer than the idle timeout deactivate themselves. -/// - When an actor deactivates, the coordinator checks if a notification arrived just as the actor -/// timed out. If so, the actor is respawned immediately. -/// - Deactivated actors are re-spawned when committed-chain processing detects new work for them. -/// -/// The coordinator operates in a notification-driven manner: -/// 1. Network accounts are registered and actors spawned as needed. -/// 2. Committed-chain updates are written to DB, then actors are notified. -/// 3. Actor completion/failure events are monitored and handled. -/// 4. Failed or completed actors are cleaned up from the registry. +/// Notifications are coalesced through [`Notify`]: multiple wakes while an actor is busy +/// collapse into one. Actors that crash repeatedly are deactivated after `max_account_crashes` +/// failures. pub struct Coordinator { /// Mapping of network account IDs to their notification handles. - /// - /// This registry serves as the primary directory for notifying active account actors. - /// When actors are spawned, they register their notification handle here. When accounts need - /// to be notified, this registry is used to locate the appropriate actors. The registry is - /// automatically cleaned up when actors complete their execution. actor_registry: HashMap, - /// Join set for managing actor tasks and monitoring their completion status. - /// - /// This join set allows the coordinator to wait for actor task completion and handle - /// different shutdown scenarios. When an actor task completes (either successfully or - /// due to an error), the corresponding entry is removed from the actor registry. + /// Join set tracking each spawned actor task; used to detect intentional shutdowns vs. crashes. actor_join_set: JoinSet<(AccountId, anyhow::Result<()>)>, - /// Semaphore for controlling the maximum number of concurrent transactions across all network - /// accounts. - /// - /// This shared semaphore prevents the system from becoming overwhelmed by limiting the total - /// number of transactions that can be processed simultaneously across all account actors. - /// Each actor must acquire a permit from this semaphore before processing a transaction, - /// ensuring fair resource allocation and system stability under load. + /// Shared transaction-execution semaphore handed to each spawned actor. semaphore: Arc, + /// Shared resources needed to spawn an actor. Stored on the coordinator so spawns at runtime + /// don't need the builder to plumb context through every call site. + actor_context: AccountActorContext, + /// Tracks the number of crashes per account actor. /// /// When an actor shuts down due to a DB error, its crash count is incremented. Once @@ -120,13 +91,18 @@ pub struct Coordinator { } impl Coordinator { - /// Creates a new coordinator with the specified maximum number of inflight transactions and the - /// crash threshold for account deactivation. - pub fn new(max_inflight_transactions: usize, max_account_crashes: usize) -> Self { + /// Creates a new coordinator with the specified transaction concurrency limit and the per- + /// account crash threshold. + pub fn new( + max_inflight_transactions: usize, + max_account_crashes: usize, + actor_context: AccountActorContext, + ) -> Self { Self { actor_registry: HashMap::new(), actor_join_set: JoinSet::new(), semaphore: Arc::new(Semaphore::new(max_inflight_transactions)), + actor_context, crash_counts: HashMap::new(), max_account_crashes, } @@ -137,67 +113,63 @@ impl Coordinator { /// This method creates a new [`AccountActor`] instance for the specified account origin /// and adds it to the coordinator's management system. The actor will be responsible for /// processing transactions and managing state for the network account. - #[tracing::instrument(name = "ntx.builder.spawn_actor", skip(self, actor_context))] - pub fn spawn_actor(&mut self, account_id: AccountId, actor_context: &AccountActorContext) { - // Skip spawning if the account has been deactivated due to repeated crashes. - if let Some(&count) = self.crash_counts.get(&account_id) { - if count >= self.max_account_crashes { - tracing::warn!( - account.id = %account_id, - crash_count = count, - "Account deactivated due to repeated crashes, skipping actor spawn" - ); - return; - } + #[tracing::instrument(name = "ntx.builder.spawn_actor", skip(self))] + pub fn spawn_actor(&mut self, account_id: AccountId) { + if let Some(&count) = self.crash_counts.get(&account_id) + && count >= self.max_account_crashes + { + tracing::warn!( + account.id = %account_id, + crash_count = count, + "Account deactivated due to repeated crashes, skipping actor spawn" + ); + return; } - // If an actor already exists for this account ID, something has gone wrong. Reject the - // spawn rather than replacing. if self.actor_registry.contains_key(&account_id) { tracing::error!( - account_id = %account_id, - "Account actor already exists" + account.id = %account_id, + "Account actor already exists", ); return; } let notify = Arc::new(Notify::new()); - let actor = AccountActor::new(account_id, actor_context, notify.clone()); + let actor = AccountActor::new(account_id, &self.actor_context, notify.clone()); let handle = ActorHandle::new(notify); - // Run the actor. Actor reads state from DB on startup. let semaphore = self.semaphore.clone(); self.actor_join_set .spawn(Box::pin(async move { (account_id, actor.run(semaphore).await) })); self.actor_registry.insert(account_id, handle); - tracing::info!(account_id = %account_id, "Created actor for account prefix"); + tracing::info!(account.id = %account_id, "Created actor for account"); } - /// Notifies specific account actors that state may have changed. - /// - /// Only actors that are currently active are notified. Each actor will re-evaluate its state - /// from the DB on the next iteration of its run loop. Notifications are coalesced: multiple - /// notifications while an actor is busy result in a single wake-up. - pub fn notify_accounts(&self, account_ids: &[AccountId]) { - for account_id in account_ids { - if let Some(handle) = self.actor_registry.get(account_id) { - handle.notify(); + /// Reacts to a committed block: spawns actors for any newly-targeted network accounts and wakes + /// every active actor so it can re-evaluate its state. + pub fn handle_committed_block(&mut self, effects: &CommittedBlockEffects) { + let mut targeted: HashSet = HashSet::new(); + for note in &effects.network_notes { + targeted.insert(note.target_account_id()); + } + + for account_id in &targeted { + if !self.actor_registry.contains_key(account_id) { + self.spawn_actor(*account_id); } } + + for handle in self.actor_registry.values() { + handle.notify(); + } } /// Waits for the next actor to complete and handles the outcome. /// - /// This method monitors the join set for actor task completion and handles - /// different shutdown scenarios appropriately. It's designed to be called - /// in a loop to continuously monitor and manage actor lifecycles. - /// - /// If no actors are currently running, this method will wait indefinitely until - /// new actors are spawned. This prevents busy-waiting when the coordinator is idle. - /// - /// Returns `Some(account_id)` if an actor should be respawned (because a - /// notification arrived just as it shut down), or `None` otherwise. + /// Returns `Some(account_id)` if an actor should be respawned (because a notification arrived + /// just as it shut down on idle timeout), or `None` otherwise. If no actors are currently + /// running, this method waits indefinitely until new actors are spawned. pub async fn next(&mut self) -> anyhow::Result> { let actor_result = self.actor_join_set.join_next().await; match actor_result { @@ -213,7 +185,6 @@ impl Coordinator { Ok(should_respawn.then_some(account_id)) }, Some(Ok((account_id, Err(err)))) => { - // Actor crashed. Increment crash counter. let count = self.crash_counts.entry(account_id).or_insert(0); *count += 1; tracing::error!( @@ -235,53 +206,139 @@ impl Coordinator { } } +#[cfg(test)] +impl Coordinator { + /// Creates a coordinator with default settings backed by a temp DB. Returns the coordinator, + /// the temp dir holding the DB file, and the actor request receiver (drop it to discard, or + /// drive it from the test to inspect actor requests). + pub async fn test() + -> (Self, tempfile::TempDir, tokio::sync::mpsc::Receiver) { + use crate::db::Db; + + let (db, dir) = Db::test_setup().await; + let (tx, rx) = tokio::sync::mpsc::channel(8); + let mut actor_context = AccountActorContext::test(&db); + actor_context.request_tx = tx; + (Self::new(4, 10, actor_context), dir, rx) + } +} + #[cfg(test)] mod tests { + use futures::FutureExt; + use super::*; - use crate::actor::AccountActorContext; - use crate::db::Db; use crate::test_utils::*; - // DEACTIVATED ACCOUNTS - // ============================================================================================ + /// Registers a dummy actor handle (no real actor task) in the coordinator's registry. + fn register_dummy_actor(coordinator: &mut Coordinator, account_id: AccountId) { + let notify = Arc::new(Notify::new()); + coordinator.actor_registry.insert(account_id, ActorHandle::new(notify)); + } + + #[tokio::test] + async fn handle_committed_block_spawns_for_unknown_note_target() { + let (mut coordinator, _dir, _rx) = Coordinator::test().await; + + let unknown_id = mock_network_account_id(); + let note = mock_single_target_note(unknown_id, 10); + let effects = CommittedBlockEffects { + header: mock_block_header(1_u32.into()), + network_notes: vec![note], + nullifiers: vec![], + network_account_updates: vec![], + }; + + coordinator.handle_committed_block(&effects); + + assert!( + coordinator.actor_registry.contains_key(&unknown_id), + "previously-untouched account targeted by a note should get a fresh actor", + ); + } + + #[tokio::test] + async fn handle_committed_block_does_not_spawn_for_account_update_only() { + let (mut coordinator, _dir, _rx) = Coordinator::test().await; + + let updated_id = mock_network_account_id(); + let effects = CommittedBlockEffects { + header: mock_block_header(1_u32.into()), + network_notes: vec![], + nullifiers: vec![], + network_account_updates: vec![( + updated_id, + miden_protocol::account::delta::AccountUpdateDetails::Private, + )], + }; + + coordinator.handle_committed_block(&effects); + + assert!( + !coordinator.actor_registry.contains_key(&updated_id), + "an account update without a new note should not trigger an actor spawn", + ); + } #[tokio::test] async fn spawn_actor_skips_deactivated_account() { - let (db, _dir) = Db::test_setup().await; - let max_crashes = 3; - let mut coordinator = Coordinator::new(4, max_crashes); - let actor_context = AccountActorContext::test(&db); + let (mut coordinator, _dir, _rx) = Coordinator::test().await; let account_id = mock_network_account_id(); + coordinator.crash_counts.insert(account_id, coordinator.max_account_crashes); - // Simulate the account having reached the crash threshold. - coordinator.crash_counts.insert(account_id, max_crashes); - - coordinator.spawn_actor(account_id, &actor_context); + coordinator.spawn_actor(account_id); assert!( !coordinator.actor_registry.contains_key(&account_id), - "Deactivated account should not have an actor in the registry" + "deactivated account should not have an actor in the registry", ); } #[tokio::test] async fn spawn_actor_allows_below_threshold() { - let (db, _dir) = Db::test_setup().await; - let max_crashes = 3; - let mut coordinator = Coordinator::new(4, max_crashes); - let actor_context = AccountActorContext::test(&db); + let (mut coordinator, _dir, _rx) = Coordinator::test().await; let account_id = mock_network_account_id(); + coordinator + .crash_counts + .insert(account_id, coordinator.max_account_crashes.saturating_sub(1)); - // Set crash count below the threshold. - coordinator.crash_counts.insert(account_id, max_crashes - 1); - - coordinator.spawn_actor(account_id, &actor_context); + coordinator.spawn_actor(account_id); assert!( coordinator.actor_registry.contains_key(&account_id), - "Account below crash threshold should have an actor in the registry" + "account below crash threshold should have an actor in the registry", + ); + } + + #[tokio::test] + async fn handle_committed_block_notifies_existing_actors() { + let (mut coordinator, _dir, _rx) = Coordinator::test().await; + + let bystander = mock_network_account_id(); + register_dummy_actor(&mut coordinator, bystander); + let bystander_notify = coordinator.actor_registry.get(&bystander).unwrap().notify.clone(); + + let target = mock_network_account_id_seeded(42); + let note = mock_single_target_note(target, 10); + let effects = CommittedBlockEffects { + header: mock_block_header(1_u32.into()), + network_notes: vec![note], + nullifiers: vec![], + network_account_updates: vec![], + }; + + coordinator.handle_committed_block(&effects); + + assert!( + bystander_notify.notified().now_or_never().is_some(), + "every registered actor should be notified on a committed block", + ); + + assert!( + coordinator.actor_registry.contains_key(&target), + "freshly-targeted account should get an actor", ); } } diff --git a/bin/ntx-builder/src/db/mod.rs b/bin/ntx-builder/src/db/mod.rs index b30229650..cac8fb2e4 100644 --- a/bin/ntx-builder/src/db/mod.rs +++ b/bin/ntx-builder/src/db/mod.rs @@ -139,6 +139,16 @@ impl Db { .await } + /// Returns the distinct set of network accounts that currently have at least one pending + /// (unconsumed, within attempt budget) note. + pub async fn accounts_with_pending_notes(&self, max_attempts: usize) -> Result> { + self.inner + .query("accounts_with_pending_notes", move |conn| { + queries::accounts_with_pending_notes(conn, max_attempts) + }) + .await + } + /// Marks notes as failed by incrementing `attempt_count`, setting `last_attempt`, and storing /// the latest error message. pub async fn notes_failed( @@ -183,50 +193,14 @@ impl Db { .await } - // DEAD-CODE STUBS - // ============================================================================================ - // - // These methods exist to keep the dead actor/coordinator modules compiling in PR 1. They are - // never reached because `NetworkTransactionBuilder` does not spawn the actor path. PR 2 - // replaces them with their new committed-block-driven equivalents. - - #[expect(clippy::unused_async)] - pub async fn transaction_exists( - &self, - _tx_id: miden_protocol::transaction::TransactionId, - ) -> Result { - unimplemented!("transaction_exists is rewired in PR 2 of the ntx-builder refactor") - } - - #[expect(clippy::unused_async)] - pub async fn handle_transaction_added( - &self, - _tx_id: miden_protocol::transaction::TransactionId, - _account_delta: Option, - _notes: Vec, - _nullifiers: Vec, - ) -> Result<()> { - unimplemented!("handle_transaction_added is rewired in PR 2 of the ntx-builder refactor") - } - - #[expect(clippy::unused_async)] - pub async fn handle_block_committed( - &self, - _txs: Vec, - _block_num: BlockNumber, - _header: BlockHeader, - ) -> Result> { - unimplemented!("handle_block_committed is rewired in PR 2 of the ntx-builder refactor") - } - - #[expect(clippy::unused_async)] - pub async fn handle_transactions_reverted( - &self, - _tx_ids: Vec, - ) -> Result> { - unimplemented!( - "handle_transactions_reverted is rewired in PR 2 of the ntx-builder refactor" - ) + /// Pins a dedicated connection for the builder's event loop, returning a [`LoopDb`]. + /// + /// The loop performs its writes through the pinned connection so it never competes with the + /// account actors for the shared pool. + pub async fn pin_loop_connection(&self) -> Result { + Ok(LoopDb { + conn: self.inner.pinned_connection().await?, + }) } /// Creates a file-backed SQLite test connection with migrations applied. @@ -255,3 +229,60 @@ impl Db { (db, dir) } } + +/// The subset of write operations the builder's event loop performs, bound to a connection pinned +/// out of [`Db`]'s pool. Routing the loop's writes here keeps block application off the shared pool +/// that the account actors hammer, so the loop is never starved of a connection. +pub struct LoopDb { + conn: miden_node_db::PinnedConnection, +} + +impl LoopDb { + /// Applies a committed block's effects (see [`Db::apply_committed_block`]) on the pinned + /// connection. + pub async fn apply_committed_block( + &self, + effects: CommittedBlockEffects, + chain_mmr: PartialMmr, + ) -> Result> { + self.conn + .transact("apply_committed_block", move |conn| { + queries::apply_committed_block(conn, &effects, &chain_mmr) + }) + .await + } + + /// Returns the network accounts with carry-over pending notes (see + /// [`Db::accounts_with_pending_notes`]) on the pinned connection. + pub async fn accounts_with_pending_notes(&self, max_attempts: usize) -> Result> { + self.conn + .query("accounts_with_pending_notes", move |conn| { + queries::accounts_with_pending_notes(conn, max_attempts) + }) + .await + } + + /// Marks notes as failed (see [`Db::notes_failed`]) on the pinned connection. + pub async fn notes_failed( + &self, + failed_notes: Vec<(Nullifier, NoteError)>, + block_num: BlockNumber, + ) -> Result<()> { + self.conn + .transact("notes_failed", move |conn| { + queries::notes_failed(conn, &failed_notes, block_num) + }) + .await + } + + /// Persists a note script to the local cache (see [`Db::insert_note_script`]) on the pinned + /// connection. + pub async fn insert_note_script(&self, script_root: Word, script: &NoteScript) -> Result<()> { + let script = script.clone(); + self.conn + .transact("insert_note_script", move |conn| { + queries::insert_note_script(conn, &script_root, &script) + }) + .await + } +} diff --git a/bin/ntx-builder/src/db/models/conv.rs b/bin/ntx-builder/src/db/models/conv.rs index 5a95cc4ec..dfb8e9a31 100644 --- a/bin/ntx-builder/src/db/models/conv.rs +++ b/bin/ntx-builder/src/db/models/conv.rs @@ -5,7 +5,6 @@ use miden_protocol::Word; use miden_protocol::account::{Account, AccountId}; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::note::{NoteId, NoteScript, Nullifier}; -use miden_protocol::transaction::TransactionId; use miden_protocol::utils::serde::{Deserializable, Serializable}; // SERIALIZATION (domain → DB) @@ -23,11 +22,6 @@ pub fn account_id_to_bytes(id: AccountId) -> Vec { id.to_bytes() } -#[expect(dead_code)] -pub fn transaction_id_to_bytes(id: &TransactionId) -> Vec { - id.to_bytes() -} - pub fn nullifier_to_bytes(nullifier: &Nullifier) -> Vec { nullifier.to_bytes() } @@ -52,7 +46,6 @@ pub fn account_from_bytes(bytes: &[u8]) -> Result { Account::read_from_bytes(bytes).map_err(|e| DatabaseError::deserialization("account", e)) } -#[expect(dead_code)] pub fn account_id_from_bytes(bytes: &[u8]) -> Result { AccountId::read_from_bytes(bytes).map_err(|e| DatabaseError::deserialization("account id", e)) } diff --git a/bin/ntx-builder/src/db/models/queries/notes.rs b/bin/ntx-builder/src/db/models/queries/notes.rs index 137de6bcd..a344e25a0 100644 --- a/bin/ntx-builder/src/db/models/queries/notes.rs +++ b/bin/ntx-builder/src/db/models/queries/notes.rs @@ -173,6 +173,26 @@ pub fn get_note_status( .map_err(Into::into) } +/// Returns the distinct set of network accounts that currently have at least one pending note +/// (unconsumed and within the per-note attempt budget). +#[expect(clippy::cast_possible_wrap)] +pub fn accounts_with_pending_notes( + conn: &mut SqliteConnection, + max_attempts: usize, +) -> Result, DatabaseError> { + let account_id_blobs: Vec> = schema::notes::table + .filter(schema::notes::committed_at.is_null()) + .filter(schema::notes::attempt_count.lt(max_attempts as i32)) + .select(schema::notes::account_id) + .distinct() + .load(conn)?; + + account_id_blobs + .iter() + .map(|bytes| conversions::account_id_from_bytes(bytes)) + .collect() +} + // HELPERS // ================================================================================================ diff --git a/bin/ntx-builder/src/db/models/queries/tests.rs b/bin/ntx-builder/src/db/models/queries/tests.rs index 7768d5e02..fcc2c07d5 100644 --- a/bin/ntx-builder/src/db/models/queries/tests.rs +++ b/bin/ntx-builder/src/db/models/queries/tests.rs @@ -222,6 +222,45 @@ fn note_script_cache_roundtrip() { // NOTE STATUS // ================================================================================================ +// ACCOUNTS WITH PENDING NOTES +// ================================================================================================ + +#[test] +fn accounts_with_pending_notes_distinct_and_filters_consumed_and_capped() { + let (conn, _dir) = &mut test_conn(); + let alice = mock_network_account_id(); + let bob = mock_network_account_id_seeded(42); + let carol = mock_network_account_id_seeded(99); + + let alice_note_1 = mock_single_target_note(alice, 1); + let alice_note_2 = mock_single_target_note(alice, 2); + let bob_note = mock_single_target_note(bob, 3); + let carol_note = mock_single_target_note(carol, 4); + + insert_network_notes( + conn, + &[alice_note_1.clone(), alice_note_2, bob_note.clone(), carol_note.clone()], + ) + .unwrap(); + + // Alice has two notes — must still appear exactly once (DISTINCT). Bob's only note is already + // consumed — exclude. + mark_notes_consumed(conn, &[bob_note.as_note().nullifier()], BlockNumber::from(7)).unwrap(); + // Carol's note has hit the attempt cap — exclude. + for _ in 0..30 { + notes_failed( + conn, + &[(carol_note.as_note().nullifier(), test_note_error("boom"))], + BlockNumber::from(5), + ) + .unwrap(); + } + + let pending = accounts_with_pending_notes(conn, 30).unwrap(); + assert_eq!(pending.len(), 1, "only alice should remain pending"); + assert_eq!(pending[0], alice); +} + #[test] fn notes_failed_increments_attempt_and_records_error() { let (conn, _dir) = &mut test_conn(); diff --git a/bin/ntx-builder/src/lib.rs b/bin/ntx-builder/src/lib.rs index 907a46774..d20f0c6dd 100644 --- a/bin/ntx-builder/src/lib.rs +++ b/bin/ntx-builder/src/lib.rs @@ -5,30 +5,33 @@ use std::time::Duration; use anyhow::Context; use builder::BlockStream; -use chain_state::ChainState; +use chain_state::SharedChainState; use clients::RpcClient; use db::Db; use futures::StreamExt; use miden_node_utils::ErrorReport; +use miden_node_utils::lru_cache::LruCache; use miden_protocol::block::BlockNumber; use miden_protocol::crypto::merkle::mmr::PartialMmr; +use miden_remote_prover_client::RemoteTransactionProver; +use tokio::sync::mpsc; use tonic::metadata::AsciiMetadataValue; use url::Url; +use crate::actor::{AccountActorContext, ActorConfig, GrpcClients, State}; use crate::committed_block::CommittedBlockEffects; +use crate::coordinator::Coordinator; pub(crate) type NoteError = Arc; -// PR 1 of the block-subscription refactor leaves the actor execution path in tree but unwired. It -// is restored by PR 2 +// PR 2 spawns actors and runs their lifecycle (wait-for-account + notify/idle), but the transaction +// execution path (candidate selection, proving, submission) stays unwired until PR 3 reconnects it. #[expect(dead_code)] mod actor; mod builder; -#[expect(dead_code)] mod chain_state; mod clients; mod committed_block; -#[expect(dead_code)] mod coordinator; pub(crate) mod db; pub mod server; @@ -300,6 +303,14 @@ impl NtxBuilderConfig { /// - The RPC connection fails (after retries) /// - The genesis block cannot be read from the subscription on a fresh start pub async fn build(self) -> anyhow::Result { + // The event loop pins one connection for itself (so block application is never starved by + // the account actors), leaving the rest of the pool for actors and the gRPC server. That + // requires at least two connections. + anyhow::ensure!( + self.sqlite_connection_pool_size.get() >= 2, + "sqlite connection pool size must be at least 2 (the event loop pins one connection)", + ); + let rpc = match self.rpc_auth_header.clone() { Some(rpc_auth_header_value) => RpcClient::new_with_auth( self.rpc_url.clone(), @@ -345,7 +356,7 @@ impl NtxBuilderConfig { let (chain, last_applied_block) = if let Some((block_num, header, mmr)) = stored_chain_state { - (ChainState::new(header, mmr), block_num) + (SharedChainState::new(header, mmr), block_num) } else { // Fresh DB: consume the genesis block inline so the in-memory chain state is non- empty // before the steady-state loop runs. @@ -366,8 +377,42 @@ impl NtxBuilderConfig { .await .context("failed to apply genesis block during bootstrap")?; - (ChainState::new(genesis_header, PartialMmr::default()), BlockNumber::GENESIS) + ( + SharedChainState::new(genesis_header, PartialMmr::default()), + BlockNumber::GENESIS, + ) + }; + let chain = Arc::new(chain); + + // Wire the actor context + coordinator. The actor request channel is owned by the builder + // (receiver) and cloned into every spawned actor (sender) so all DB writes from actors + // serialize through the builder's event loop. + let (request_tx, actor_request_rx) = mpsc::channel(self.account_channel_capacity); + let actor_context = AccountActorContext { + clients: GrpcClients { + rpc: rpc.clone(), + prover: self + .tx_prover_url + .clone() + .map(|url| RemoteTransactionProver::new(url.as_str())), + }, + state: State { + db: db.clone(), + chain: chain.clone(), + script_cache: LruCache::new(self.script_cache_size), + }, + config: ActorConfig { + max_notes_per_tx: self.max_notes_per_tx, + max_note_attempts: self.max_note_attempts, + idle_timeout: self.idle_timeout, + max_cycles: self.max_cycles, + request_backoff_initial: self.request_backoff_initial, + request_backoff_max: self.request_backoff_max, + }, + request_tx, }; + let coordinator = + Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, actor_context); Ok(NetworkTransactionBuilder::new( self, @@ -375,6 +420,8 @@ impl NtxBuilderConfig { block_stream, last_applied_block, chain, + coordinator, + actor_request_rx, )) } } diff --git a/bin/ntx-builder/src/test_utils.rs b/bin/ntx-builder/src/test_utils.rs index 691bc0fa0..14956b793 100644 --- a/bin/ntx-builder/src/test_utils.rs +++ b/bin/ntx-builder/src/test_utils.rs @@ -17,6 +17,13 @@ pub fn mock_network_account_id() -> AccountId { ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE.try_into().unwrap() } +/// Creates a distinct network account ID using a seeded RNG. +pub fn mock_network_account_id_seeded(seed: u8) -> AccountId { + AccountIdBuilder::new() + .account_type(AccountType::Public) + .build_with_seed([seed; 32]) +} + /// Creates a `AccountTargetNetworkNote` targeting the given network account. pub fn mock_single_target_note( network_account_id: AccountId, diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index d1a3b81b3..2da3b5eb6 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -49,6 +49,21 @@ impl Db { Ok(Self { pool }) } + /// Checks out a connection from the pool and pins it for the caller's exclusive, long-lived + /// use. See [`PinnedConnection`]. + /// + /// This removes one connection from the shared pool for the lifetime of the returned handle, + /// so the pool must be sized to leave at least one connection for other users. + pub async fn pinned_connection(&self) -> Result { + let conn = self + .pool + .get() + .in_current_span() + .await + .map_err(|e| DatabaseError::ConnectionPoolObtainError(Box::new(e)))?; + Ok(PinnedConnection { conn }) + } + /// Create and commit a transaction with the queries added in the provided closure pub async fn transact(&self, msg: M, query: Q) -> std::result::Result where @@ -61,20 +76,7 @@ impl Db { E: From, E: std::error::Error + Send + Sync + 'static, { - let conn = self - .pool - .get() - .in_current_span() - .await - .map_err(|e| DatabaseError::ConnectionPoolObtainError(Box::new(e)))?; - - let span = tracing::Span::current(); - conn.interact(move |conn| { - let _guard = span.enter(); - <_ as diesel::Connection>::transaction::(conn, query) - }) - .await - .map_err(|err| E::from(DatabaseError::interact(&msg.to_string(), &err)))? + self.pinned_connection().await.map_err(E::from)?.transact(msg, query).await } /// Run the query _without_ a transaction @@ -86,19 +88,60 @@ impl Db { E: From, E: std::error::Error + Send + Sync + 'static, { - let conn = self - .pool - .get() + self.pinned_connection().await.map_err(E::from)?.query(msg, query).await + } +} + +/// A connection checked out of [`Db`]'s pool and held for the caller's exclusive, long-lived use. +/// +/// A hot event loop can pin a connection so its queries never wait on the shared pool even when +/// many concurrent tasks are saturating it. `transact`/`query` mirror [`Db`]'s, but run on the +/// pinned connection rather than acquiring one per call. The connection is returned to the pool +/// when the `PinnedConnection` is dropped. +pub struct PinnedConnection { + conn: deadpool::managed::Object, +} + +impl PinnedConnection { + /// Create and commit a transaction with the queries added in the provided closure, running on + /// the pinned connection. + pub async fn transact(&self, msg: M, query: Q) -> std::result::Result + where + Q: Send + + for<'a, 't> FnOnce(&'a mut SqliteConnection) -> std::result::Result + + 'static, + R: Send + 'static, + M: Send + ToString, + E: From, + E: From, + E: std::error::Error + Send + Sync + 'static, + { + let span = tracing::Span::current(); + self.conn + .interact(move |conn| { + let _guard = span.enter(); + <_ as diesel::Connection>::transaction::(conn, query) + }) .await - .map_err(|e| DatabaseError::ConnectionPoolObtainError(Box::new(e)))?; + .map_err(|err| E::from(DatabaseError::interact(&msg.to_string(), &err)))? + } + /// Run the query _without_ a transaction on the pinned connection. + pub async fn query(&self, msg: M, query: Q) -> std::result::Result + where + Q: Send + FnOnce(&mut SqliteConnection) -> std::result::Result + 'static, + R: Send + 'static, + M: Send + ToString, + E: From, + E: std::error::Error + Send + Sync + 'static, + { let span = tracing::Span::current(); - conn.interact(move |conn| { - let _guard = span.enter(); - let r = query(conn)?; - Ok(r) - }) - .await - .map_err(|err| E::from(DatabaseError::interact(&msg.to_string(), &err)))? + self.conn + .interact(move |conn| { + let _guard = span.enter(); + query(conn) + }) + .await + .map_err(|err| E::from(DatabaseError::interact(&msg.to_string(), &err)))? } }