Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 5 additions & 8 deletions bin/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,14 @@ enum ActorMode {
/// based on current chain state and DB queries.
/// - **Transaction Execution**: Executes selected transactions using either local or remote
/// proving.
/// - **Mempool Integration**: Listens for mempool events to stay synchronized with the network
/// state and adjust behavior based on transaction confirmations.
/// - **Chain Integration**: Reacts to committed chain state persisted by the builder.
///
/// ## Lifecycle
///
/// 1. **Initialization**: Waits for committed account state, then checks DB for available notes.
/// 2. **Event Loop**: Continuously processes mempool events and executes transactions.
/// 2. **Event Loop**: Re-evaluates persisted state and executes transactions when notified.
/// 3. **Transaction Processing**: Selects, executes, proves, and submits transactions through RPC.
/// 4. **State Updates**: Event effects are persisted to DB by the coordinator before actors are
/// notified.
/// 4. **State Updates**: Committed block effects are persisted to DB before actors are notified.
/// 5. **Shutdown**: Terminates gracefully on idle timeout, or returns an error on unrecoverable
/// failures.
///
Expand Down Expand Up @@ -383,9 +381,8 @@ impl AccountActor {
/// For accounts that are being created by an inflight transaction, this will idle
/// until the transaction is committed. Returns `true` when the account is ready, or
/// `false` if no commit arrived within [`ActorConfig::idle_timeout`] — in which case
/// the coordinator will respawn a new actor when the account reappears through
/// [`Coordinator::send_targeted`](crate::coordinator::Coordinator::send_targeted) or the
/// account loader.
/// the coordinator will respawn a new actor when the account reappears through the account
/// loader.
async fn wait_for_committed_account(
&self,
account_id: NetworkAccountId,
Expand Down
153 changes: 4 additions & 149 deletions bin/ntx-builder/src/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,13 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;

use miden_node_db::DatabaseError;
use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_protocol::account::delta::AccountUpdateDetails;
use tokio::sync::{Notify, Semaphore};
use tokio::task::JoinSet;

use crate::actor::{AccountActor, AccountActorContext};
use crate::db::Db;

// WRITE EVENT RESULT
// ================================================================================================

/// Result of writing a mempool event to the database.
pub struct WriteEventResult {
/// Accounts that should be notified of state changes.
pub accounts_to_notify: Vec<NetworkAccountId>,
}

// ACTOR HANDLE
// ================================================================================================

Expand Down Expand Up @@ -75,7 +63,7 @@ impl ActorHandle {
/// - Gracefully handles actor shutdown and cleanup when actors complete or fail.
/// - Monitors actor tasks through a join set to detect completion or errors.
///
/// ## Event Notification
/// ## State Notification
/// - Notifies actors via a shared [`Notify`] when state may have changed.
/// - The DB is the source of truth: actors re-evaluate their state from DB on notification.
/// - Notifications are coalesced: [`Notify`] stores at most one permit, so multiple notifications
Expand All @@ -89,12 +77,11 @@ impl ActorHandle {
/// - Actors that have been idle for longer than the idle timeout deactivate themselves.
/// - When an actor deactivates, the coordinator checks if a notification arrived just as the actor
/// timed out. If so, the actor is respawned immediately.
/// - Deactivated actors are re-spawned when [`Coordinator::send_targeted`] detects notes targeting
/// an account without an active actor.
/// - Deactivated actors may be re-spawned when later committed chain state requires them.
///
/// The coordinator operates in an event-driven manner:
/// 1. Network accounts are registered and actors spawned as needed.
/// 2. Mempool events are written to DB, then actors are notified.
/// 2. Actors are notified when DB state relevant to them may have changed.
/// 3. Actor completion/failure events are monitored and handled.
/// 4. Failed or completed actors are cleaned up from the registry.
pub struct Coordinator {
Expand Down Expand Up @@ -255,101 +242,6 @@ impl Coordinator {
},
}
}

/// Notifies account actors that are affected by a `TransactionAdded` event.
///
/// Only actors that are currently active are notified. Since event effects are already
/// persisted in the DB by `write_event()`, actors that spawn later read their state from the
/// DB and do not need predating events.
///
/// Returns account IDs of note targets that do not have active actors (e.g. previously
/// deactivated due to sterility). The caller can use this to re-activate actors for those
/// accounts.
pub fn send_targeted(&self, event: &MempoolEvent) -> Vec<NetworkAccountId> {
let mut target_account_ids = HashSet::new();
let mut inactive_targets = Vec::new();

if let MempoolEvent::TransactionAdded { network_notes, account_delta, .. } = event {
// We need to inform the account if it was updated. This lets it know that its own
// transaction has been applied, and in the future also resolves race conditions with
// external network transactions (once these are allowed).
if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
let account_id = delta.id();
if account_id.is_network() {
let network_account_id =
account_id.try_into().expect("account is network account");
if self.actor_registry.contains_key(&network_account_id) {
target_account_ids.insert(network_account_id);
}
}
}

// Determine target actors for each note.
for note in network_notes {
let account = note.target_account_id();
let account = NetworkAccountId::try_from(account)
.expect("network note target account should be a network account");

if self.actor_registry.contains_key(&account) {
target_account_ids.insert(account);
} else {
inactive_targets.push(account);
}
}
}
// Notify target actors.
for account_id in &target_account_ids {
if let Some(handle) = self.actor_registry.get(account_id) {
handle.notify();
}
}

inactive_targets
}

/// Writes mempool event effects to the database.
///
/// This must be called BEFORE sending notifications to actors. Returns a [`WriteEventResult`]
/// with the accounts to notify and cancel.
pub async fn write_event(
&self,
event: &MempoolEvent,
) -> Result<WriteEventResult, DatabaseError> {
match event {
MempoolEvent::TransactionAdded {
id,
nullifiers,
network_notes,
account_delta,
} => {
self.db
.handle_transaction_added(
*id,
account_delta.clone(),
network_notes.clone(),
nullifiers.clone(),
)
.await?;
Ok(WriteEventResult { accounts_to_notify: Vec::new() })
},
MempoolEvent::BlockCommitted { header, txs } => {
let affected_accounts = self
.db
.handle_block_committed(
txs.clone(),
header.block_num(),
header.as_ref().clone(),
)
.await?;
Ok(WriteEventResult { accounts_to_notify: affected_accounts })
},
MempoolEvent::TransactionsReverted(tx_ids) => {
let affected_accounts =
self.db.handle_transactions_reverted(tx_ids.iter().copied().collect()).await?;
Ok(WriteEventResult { accounts_to_notify: affected_accounts })
},
}
}
}

#[cfg(test)]
Expand All @@ -363,48 +255,11 @@ impl Coordinator {

#[cfg(test)]
mod tests {
use miden_node_proto::domain::mempool::MempoolEvent;

use super::*;
use crate::actor::AccountActorContext;
use crate::db::Db;
use crate::test_utils::*;

/// Registers a dummy actor handle (no real actor task) in the coordinator's registry.
fn register_dummy_actor(coordinator: &mut Coordinator, account_id: NetworkAccountId) {
let notify = Arc::new(Notify::new());
coordinator.actor_registry.insert(account_id, ActorHandle::new(notify));
}

// SEND TARGETED TESTS
// ============================================================================================

#[tokio::test]
async fn send_targeted_returns_inactive_targets() {
let (mut coordinator, _dir) = Coordinator::test().await;

let active_id = mock_network_account_id();
let inactive_id = mock_network_account_id_seeded(42);

// Only register the active account.
register_dummy_actor(&mut coordinator, active_id);

let note_active = mock_single_target_note(active_id, 10);
let note_inactive = mock_single_target_note(inactive_id, 20);

let event = MempoolEvent::TransactionAdded {
id: mock_tx_id(1),
nullifiers: vec![],
network_notes: vec![note_active, note_inactive],
account_delta: None,
};

let inactive_targets = coordinator.send_targeted(&event);

assert_eq!(inactive_targets.len(), 1);
assert_eq!(inactive_targets[0], inactive_id);
}

// DEACTIVATED ACCOUNTS
// ============================================================================================

Expand Down
33 changes: 1 addition & 32 deletions bin/ntx-builder/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl Db {
// DEAD-CODE STUBS
// ============================================================================================
//
// These methods exist to keep the dead actor/coordinator modules compiling in PR 1. They are
// These methods exist to keep the dead actor module compiling in PR 1. They are
// never reached because `NetworkTransactionBuilder` does not spawn the actor path. PR 2
// replaces them with their new committed-block-driven equivalents.

Expand All @@ -198,37 +198,6 @@ impl Db {
unimplemented!("transaction_exists is rewired in PR 2 of the ntx-builder refactor")
}

#[expect(clippy::unused_async)]
pub async fn handle_transaction_added(
&self,
_tx_id: miden_protocol::transaction::TransactionId,
_account_delta: Option<miden_protocol::account::delta::AccountUpdateDetails>,
_notes: Vec<AccountTargetNetworkNote>,
_nullifiers: Vec<Nullifier>,
) -> Result<()> {
unimplemented!("handle_transaction_added is rewired in PR 2 of the ntx-builder refactor")
}

#[expect(clippy::unused_async)]
pub async fn handle_block_committed(
&self,
_txs: Vec<miden_protocol::transaction::TransactionId>,
_block_num: BlockNumber,
_header: BlockHeader,
) -> Result<Vec<NetworkAccountId>> {
unimplemented!("handle_block_committed is rewired in PR 2 of the ntx-builder refactor")
}

#[expect(clippy::unused_async)]
pub async fn handle_transactions_reverted(
&self,
_tx_ids: Vec<miden_protocol::transaction::TransactionId>,
) -> Result<Vec<NetworkAccountId>> {
unimplemented!(
"handle_transactions_reverted is rewired in PR 2 of the ntx-builder refactor"
)
}

/// Creates a file-backed SQLite test connection with migrations applied.
#[cfg(test)]
pub fn test_conn() -> (diesel::SqliteConnection, tempfile::TempDir) {
Expand Down
20 changes: 0 additions & 20 deletions bin/ntx-builder/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use miden_protocol::testing::account_id::{
ACCOUNT_ID_REGULAR_NETWORK_ACCOUNT_IMMUTABLE_CODE,
AccountIdBuilder,
};
use miden_protocol::transaction::TransactionId;
use miden_standards::note::{AccountTargetNetworkNote, NetworkAccountTarget, NoteExecutionHint};
use miden_standards::testing::note::NoteBuilder;
use rand_chacha::ChaCha20Rng;
Expand All @@ -27,25 +26,6 @@ pub fn mock_network_account_id() -> NetworkAccountId {
NetworkAccountId::try_from(account_id).unwrap()
}

/// Creates a distinct network account ID using a seeded RNG.
pub fn mock_network_account_id_seeded(seed: u8) -> NetworkAccountId {
let account_id = AccountIdBuilder::new()
.account_type(AccountType::RegularAccountImmutableCode)
.storage_mode(AccountStorageMode::Network)
.build_with_seed([seed; 32]);
NetworkAccountId::try_from(account_id).unwrap()
}

/// Creates a unique `TransactionId` from a seed value.
pub fn mock_tx_id(seed: u64) -> TransactionId {
use miden_protocol::testing::account_id::ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET;

let w = |n: u64| Word::try_from([n, 0, 0, 0]).unwrap();
let faucet_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap();
let fee = miden_protocol::asset::FungibleAsset::new(faucet_id, 0).unwrap();
TransactionId::new(w(seed), w(seed + 1), w(seed + 2), w(seed + 3), fee)
}

/// Creates a `AccountTargetNetworkNote` targeting the given network account.
pub fn mock_single_target_note(
network_account_id: NetworkAccountId,
Expand Down
27 changes: 13 additions & 14 deletions bin/stress-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ version.workspace = true
workspace = true

[dependencies]
clap = { features = ["derive"], workspace = true }
fs-err = { workspace = true }
futures = { workspace = true }
miden-node-block-producer = { workspace = true }
miden-node-proto = { workspace = true }
miden-node-store = { workspace = true }
miden-node-utils = { workspace = true }
miden-protocol = { workspace = true }
miden-standards = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
tokio = { workspace = true }
tonic = { default-features = true, workspace = true }
url = { workspace = true }
clap = { features = ["derive"], workspace = true }
fs-err = { workspace = true }
futures = { workspace = true }
miden-node-proto = { workspace = true }
miden-node-store = { workspace = true }
miden-node-utils = { workspace = true }
miden-protocol = { workspace = true }
miden-standards = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
tokio = { workspace = true }
tonic = { default-features = true, workspace = true }
url = { workspace = true }
Loading
Loading