Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions bin/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub struct AccountActorContext {
pub clients: GrpcClients,
pub state: State,
pub config: ActorConfig,
/// Channel for sending requests to the coordinator (via the builder event loop).
/// Channel for sending requests to the coordinator (via the builder loop).
pub request_tx: mpsc::Sender<ActorRequest>,
}

Expand Down Expand Up @@ -176,15 +176,15 @@ enum ActorMode {
/// based on current chain state and DB queries.
/// - **Transaction Execution**: Executes selected transactions using either local or remote
/// proving.
/// - **Mempool Integration**: Listens for mempool events to stay synchronized with the network
/// state and adjust behavior based on transaction confirmations.
/// - **Chain Integration**: Reacts to committed-chain updates persisted by the coordinator to stay
/// synchronized with the network state.
///
/// ## Lifecycle
///
/// 1. **Initialization**: Waits for committed account state, then checks DB for available notes.
/// 2. **Event Loop**: Continuously processes mempool events and executes transactions.
/// 2. **Event Loop**: Re-evaluates database state on notification and executes transactions.
/// 3. **Transaction Processing**: Selects, executes, proves, and submits transactions through RPC.
/// 4. **State Updates**: Event effects are persisted to DB by the coordinator before actors are
/// 4. **State Updates**: Committed-chain updates are persisted to DB before actors are
/// notified.
/// 5. **Shutdown**: Terminates gracefully on idle timeout, or returns an error on unrecoverable
/// failures.
Expand Down Expand Up @@ -227,7 +227,7 @@ impl AccountActor {
}
}

/// Runs the account actor, processing events and managing state until shutdown.
/// Runs the account actor, processing notifications and managing state until shutdown.
///
/// The return value signals the shutdown category to the coordinator:
///
Expand Down Expand Up @@ -312,7 +312,7 @@ impl AccountActor {
if let Some(tx_candidate) = tx_candidate {
mode = self.execute_transactions(account_id, tx_candidate).await;
} else {
// No transactions to execute, wait for events.
// No transactions to execute, wait for notifications.
mode = ActorMode::NoViableNotes;
}
}
Expand Down Expand Up @@ -385,9 +385,8 @@ impl AccountActor {
/// For accounts that are being created by an inflight transaction, this will idle
/// until the transaction is committed. Returns `true` when the account is ready, or
/// `false` if no commit arrived within [`ActorConfig::idle_timeout`] — in which case
/// the coordinator will respawn a new actor when the account reappears through
/// [`Coordinator::send_targeted`](crate::coordinator::Coordinator::send_targeted) or the
/// account loader.
/// the coordinator will respawn a new actor when committed-chain processing or the account
/// loader observes the account again.
async fn wait_for_committed_account(&self, account_id: AccountId) -> anyhow::Result<bool> {
// Check if the account is already committed.
if self
Expand Down
8 changes: 3 additions & 5 deletions bin/ntx-builder/src/chain_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,13 @@ impl ChainState {

/// Updates the chain tip and prunes old blocks from the MMR.
pub(crate) fn update_chain_tip(&mut self, tip: BlockHeader, max_block_count: usize) {
// Skip blocks already reflected in the chain state. A `BlockCommitted` event may arrive for
// a block whose state was already loaded from the store during startup: the mempool
// subscription is established first and then the chain tip is fetched, so any block
// committed in that window produces an event for state we have already ingested.
// Skip blocks already reflected in the chain state. The builder may load state during
// startup before receiving the same block from the committed-block subscription.
if tip.block_num() <= self.chain_tip_header.block_num() {
tracing::debug!(
event_block = %tip.block_num(),
current_tip = %self.chain_tip_header.block_num(),
"skipping BlockCommitted event for block already in chain state",
"skipping committed block already reflected in chain state",
);
return;
}
Expand Down
175 changes: 10 additions & 165 deletions bin/ntx-builder/src/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;

use miden_node_db::DatabaseError;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_protocol::account::AccountId;
use miden_protocol::account::delta::AccountUpdateDetails;
use tokio::sync::{Notify, Semaphore};
use tokio::task::JoinSet;

use crate::actor::{AccountActor, AccountActorContext};
use crate::db::Db;

// WRITE EVENT RESULT
// ================================================================================================

/// Result of writing a mempool event to the database.
pub struct WriteEventResult {
/// Accounts that should be notified of state changes.
pub accounts_to_notify: Vec<AccountId>,
}

// ACTOR HANDLE
// ================================================================================================
Expand Down Expand Up @@ -75,7 +62,7 @@ impl ActorHandle {
/// - Gracefully handles actor shutdown and cleanup when actors complete or fail.
/// - Monitors actor tasks through a join set to detect completion or errors.
///
/// ## Event Notification
/// ## Notification
/// - Notifies actors via a shared [`Notify`] when state may have changed.
/// - The DB is the source of truth: actors re-evaluate their state from DB on notification.
/// - Notifications are coalesced: [`Notify`] stores at most one permit, so multiple notifications
Expand All @@ -89,20 +76,19 @@ impl ActorHandle {
/// - Actors that have been idle for longer than the idle timeout deactivate themselves.
/// - When an actor deactivates, the coordinator checks if a notification arrived just as the actor
/// timed out. If so, the actor is respawned immediately.
/// - Deactivated actors are re-spawned when [`Coordinator::send_targeted`] detects notes targeting
/// an account without an active actor.
/// - Deactivated actors are re-spawned when committed-chain processing detects new work for them.
///
/// The coordinator operates in an event-driven manner:
/// The coordinator operates in a notification-driven manner:
/// 1. Network accounts are registered and actors spawned as needed.
/// 2. Mempool events are written to DB, then actors are notified.
/// 2. Committed-chain updates are written to DB, then actors are notified.
/// 3. Actor completion/failure events are monitored and handled.
/// 4. Failed or completed actors are cleaned up from the registry.
pub struct Coordinator {
/// Mapping of network account IDs to their notification handles.
///
/// This registry serves as the primary directory for notifying active account actors.
/// When actors are spawned, they register their notification handle here. When events need
/// to be broadcast, this registry is used to locate the appropriate actors. The registry is
/// When actors are spawned, they register their notification handle here. When accounts need
/// to be notified, this registry is used to locate the appropriate actors. The registry is
/// automatically cleaned up when actors complete their execution.
actor_registry: HashMap<AccountId, ActorHandle>,

Expand All @@ -122,9 +108,6 @@ pub struct Coordinator {
/// ensuring fair resource allocation and system stability under load.
semaphore: Arc<Semaphore>,

/// Database for persistent state.
db: Db,

/// Tracks the number of crashes per account actor.
///
/// When an actor shuts down due to a DB error, its crash count is incremented. Once
Expand All @@ -139,12 +122,11 @@ pub struct Coordinator {
impl Coordinator {
/// Creates a new coordinator with the specified maximum number of inflight transactions and the
/// crash threshold for account deactivation.
pub fn new(max_inflight_transactions: usize, max_account_crashes: usize, db: Db) -> Self {
pub fn new(max_inflight_transactions: usize, max_account_crashes: usize) -> Self {
Self {
actor_registry: HashMap::new(),
actor_join_set: JoinSet::new(),
semaphore: Arc::new(Semaphore::new(max_inflight_transactions)),
db,
crash_counts: HashMap::new(),
max_account_crashes,
}
Expand Down Expand Up @@ -251,160 +233,23 @@ impl Coordinator {
},
}
}

/// Notifies account actors that are affected by a `TransactionAdded` event.
///
/// Only actors that are currently active are notified. Since event effects are already
/// persisted in the DB by `write_event()`, actors that spawn later read their state from the
/// DB and do not need predating events.
///
/// Returns account IDs of note targets that do not have active actors (e.g. previously
/// deactivated due to sterility). The caller can use this to re-activate actors for those
/// accounts.
pub fn send_targeted(&self, event: &MempoolEvent) -> Vec<AccountId> {
let mut target_account_ids = HashSet::new();
let mut inactive_targets = Vec::new();

if let MempoolEvent::TransactionAdded { network_notes, account_delta, .. } = event {
// We need to inform the account if it was updated. This lets it know that its own
// transaction has been applied, and in the future also resolves race conditions with
// external network transactions (once these are allowed).
if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
// The actor registry only contains accounts the builder has already classified as
// network, so the lookup itself filters out non-network ids.
let account_id = delta.id();
if self.actor_registry.contains_key(&account_id) {
target_account_ids.insert(account_id);
}
}

// Determine target actors for each note.
for note in network_notes {
let account = note.target_account_id();

if self.actor_registry.contains_key(&account) {
target_account_ids.insert(account);
} else {
inactive_targets.push(account);
}
}
}
// Notify target actors.
for account_id in &target_account_ids {
if let Some(handle) = self.actor_registry.get(account_id) {
handle.notify();
}
}

inactive_targets
}

/// Writes mempool event effects to the database.
///
/// This must be called BEFORE sending notifications to actors. Returns a [`WriteEventResult`]
/// with the accounts to notify and cancel.
pub async fn write_event(
&self,
event: &MempoolEvent,
) -> Result<WriteEventResult, DatabaseError> {
match event {
MempoolEvent::TransactionAdded {
id,
nullifiers,
network_notes,
account_delta,
} => {
self.db
.handle_transaction_added(
*id,
account_delta.clone(),
network_notes.clone(),
nullifiers.clone(),
)
.await?;
Ok(WriteEventResult { accounts_to_notify: Vec::new() })
},
MempoolEvent::BlockCommitted { header, txs } => {
let affected_accounts = self
.db
.handle_block_committed(
txs.clone(),
header.block_num(),
header.as_ref().clone(),
)
.await?;
Ok(WriteEventResult { accounts_to_notify: affected_accounts })
},
MempoolEvent::TransactionsReverted(tx_ids) => {
let affected_accounts =
self.db.handle_transactions_reverted(tx_ids.iter().copied().collect()).await?;
Ok(WriteEventResult { accounts_to_notify: affected_accounts })
},
}
}
}

#[cfg(test)]
impl Coordinator {
/// Creates a coordinator with default settings backed by a temp DB.
pub async fn test() -> (Self, tempfile::TempDir) {
let (db, dir) = Db::test_setup().await;
(Self::new(4, 10, db), dir)
}
}

#[cfg(test)]
mod tests {
use miden_node_proto::domain::mempool::MempoolEvent;

use super::*;
use crate::actor::AccountActorContext;
use crate::db::Db;
use crate::test_utils::*;

/// Registers a dummy actor handle (no real actor task) in the coordinator's registry.
fn register_dummy_actor(coordinator: &mut Coordinator, account_id: AccountId) {
let notify = Arc::new(Notify::new());
coordinator.actor_registry.insert(account_id, ActorHandle::new(notify));
}

// SEND TARGETED TESTS
// ============================================================================================

#[tokio::test]
async fn send_targeted_returns_inactive_targets() {
let (mut coordinator, _dir) = Coordinator::test().await;

let active_id = mock_network_account_id();
let inactive_id = mock_network_account_id_seeded(42);

// Only register the active account.
register_dummy_actor(&mut coordinator, active_id);

let note_active = mock_single_target_note(active_id, 10);
let note_inactive = mock_single_target_note(inactive_id, 20);

let event = MempoolEvent::TransactionAdded {
id: mock_tx_id(1),
nullifiers: vec![],
network_notes: vec![note_active, note_inactive],
account_delta: None,
};

let inactive_targets = coordinator.send_targeted(&event);

assert_eq!(inactive_targets.len(), 1);
assert_eq!(inactive_targets[0], inactive_id);
}

// DEACTIVATED ACCOUNTS
// ============================================================================================

#[tokio::test]
async fn spawn_actor_skips_deactivated_account() {
let (db, _dir) = Db::test_setup().await;
let max_crashes = 3;
let mut coordinator = Coordinator::new(4, max_crashes, db.clone());
let mut coordinator = Coordinator::new(4, max_crashes);
let actor_context = AccountActorContext::test(&db);

let account_id = mock_network_account_id();
Expand All @@ -424,7 +269,7 @@ mod tests {
async fn spawn_actor_allows_below_threshold() {
let (db, _dir) = Db::test_setup().await;
let max_crashes = 3;
let mut coordinator = Coordinator::new(4, max_crashes, db.clone());
let mut coordinator = Coordinator::new(4, max_crashes);
let actor_context = AccountActorContext::test(&db);

let account_id = mock_network_account_id();
Expand Down
18 changes: 0 additions & 18 deletions bin/ntx-builder/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use miden_protocol::testing::account_id::{
ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE,
AccountIdBuilder,
};
use miden_protocol::transaction::TransactionId;
use miden_standards::note::{AccountTargetNetworkNote, NetworkAccountTarget, NoteExecutionHint};
use miden_standards::testing::note::NoteBuilder;
use rand_chacha::ChaCha20Rng;
Expand All @@ -18,23 +17,6 @@ pub fn mock_network_account_id() -> AccountId {
ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE.try_into().unwrap()
}

/// Creates a distinct network account ID using a seeded RNG.
pub fn mock_network_account_id_seeded(seed: u8) -> AccountId {
AccountIdBuilder::new()
.account_type(AccountType::Public)
.build_with_seed([seed; 32])
}

/// Creates a unique `TransactionId` from a seed value.
pub fn mock_tx_id(seed: u64) -> TransactionId {
use miden_protocol::testing::account_id::ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET;

let w = |n: u64| Word::try_from([n, 0, 0, 0]).unwrap();
let faucet_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap();
let fee = miden_protocol::asset::FungibleAsset::new(faucet_id, 0).unwrap();
TransactionId::new(w(seed), w(seed + 1), w(seed + 2), w(seed + 3), fee)
}

/// Creates a `AccountTargetNetworkNote` targeting the given network account.
pub fn mock_single_target_note(
network_account_id: AccountId,
Expand Down
1 change: 0 additions & 1 deletion crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ miden-node-proto-build = { features = ["internal"], workspace = true }
miden-node-utils = { features = ["testing"], workspace = true }
miden-protocol = { default-features = true, workspace = true }
miden-remote-prover-client = { features = ["batch-prover", "block-prover"], workspace = true }
miden-standards = { workspace = true }
miden-tx-batch-prover = { workspace = true }
rand = { workspace = true }
thiserror = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/block-producer/src/block_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl BlockBuilder {
.map_err(BuildBlockError::StoreApplyBlockFailed)?;

let (header, ..) = signed_block.into_parts();
mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.commit_block(header);
mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.commit_block(&header);

Ok(())
}
Expand Down
Loading
Loading