From e9d13e116579e0c198137e95d04d670671c63f30 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Tue, 26 May 2026 09:59:15 +0200 Subject: [PATCH 1/5] yank mempool subscription --- bin/ntx-builder/src/actor/mod.rs | 19 +- bin/ntx-builder/src/chain_state.rs | 8 +- bin/ntx-builder/src/coordinator.rs | 175 +------------- bin/ntx-builder/src/test_utils.rs | 18 -- crates/block-producer/Cargo.toml | 1 - .../block-producer/src/block_builder/mod.rs | 2 +- .../block-producer/src/domain/transaction.rs | 6 +- crates/block-producer/src/mempool/mod.rs | 52 +---- .../src/mempool/subscription.rs | 219 ------------------ crates/block-producer/src/mempool/tests.rs | 12 +- .../src/mempool/tests/add_transaction.rs | 4 +- crates/block-producer/src/server/mod.rs | 40 +--- crates/proto/src/domain/mempool.rs | 144 ------------ crates/proto/src/domain/mod.rs | 1 - crates/proto/src/generated/mod.rs | 1 - docs/external/src/operator/architecture.md | 4 +- docs/internal/src/ntx-builder.md | 8 +- proto/proto/internal/block_producer.proto | 58 ----- 18 files changed, 43 insertions(+), 729 deletions(-) delete mode 100644 crates/block-producer/src/mempool/subscription.rs delete mode 100644 crates/proto/src/domain/mempool.rs diff --git a/bin/ntx-builder/src/actor/mod.rs b/bin/ntx-builder/src/actor/mod.rs index 55e6d2dfda..803cdccbee 100644 --- a/bin/ntx-builder/src/actor/mod.rs +++ b/bin/ntx-builder/src/actor/mod.rs @@ -97,7 +97,7 @@ pub struct AccountActorContext { pub clients: GrpcClients, pub state: State, pub config: ActorConfig, - /// Channel for sending requests to the coordinator (via the builder event loop). + /// Channel for sending requests to the coordinator (via the builder loop). pub request_tx: mpsc::Sender, } @@ -176,15 +176,15 @@ enum ActorMode { /// based on current chain state and DB queries. /// - **Transaction Execution**: Executes selected transactions using either local or remote /// proving. -/// - **Mempool Integration**: Listens for mempool events to stay synchronized with the network -/// state and adjust behavior based on transaction confirmations. +/// - **Chain Integration**: Reacts to committed-chain updates persisted by the coordinator to stay +/// synchronized with the network state. /// /// ## Lifecycle /// /// 1. **Initialization**: Waits for committed account state, then checks DB for available notes. -/// 2. **Event Loop**: Continuously processes mempool events and executes transactions. +/// 2. **Event Loop**: Re-evaluates database state on notification and executes transactions. /// 3. **Transaction Processing**: Selects, executes, proves, and submits transactions through RPC. -/// 4. **State Updates**: Event effects are persisted to DB by the coordinator before actors are +/// 4. **State Updates**: Committed-chain updates are persisted to DB before actors are /// notified. /// 5. **Shutdown**: Terminates gracefully on idle timeout, or returns an error on unrecoverable /// failures. @@ -227,7 +227,7 @@ impl AccountActor { } } - /// Runs the account actor, processing events and managing state until shutdown. + /// Runs the account actor, processing notifications and managing state until shutdown. /// /// The return value signals the shutdown category to the coordinator: /// @@ -312,7 +312,7 @@ impl AccountActor { if let Some(tx_candidate) = tx_candidate { mode = self.execute_transactions(account_id, tx_candidate).await; } else { - // No transactions to execute, wait for events. + // No transactions to execute, wait for notifications. mode = ActorMode::NoViableNotes; } } @@ -385,9 +385,8 @@ impl AccountActor { /// For accounts that are being created by an inflight transaction, this will idle /// until the transaction is committed. Returns `true` when the account is ready, or /// `false` if no commit arrived within [`ActorConfig::idle_timeout`] — in which case - /// the coordinator will respawn a new actor when the account reappears through - /// [`Coordinator::send_targeted`](crate::coordinator::Coordinator::send_targeted) or the - /// account loader. + /// the coordinator will respawn a new actor when committed-chain processing or the account + /// loader observes the account again. async fn wait_for_committed_account(&self, account_id: AccountId) -> anyhow::Result { // Check if the account is already committed. if self diff --git a/bin/ntx-builder/src/chain_state.rs b/bin/ntx-builder/src/chain_state.rs index 033695c479..b1105c7245 100644 --- a/bin/ntx-builder/src/chain_state.rs +++ b/bin/ntx-builder/src/chain_state.rs @@ -59,15 +59,13 @@ impl ChainState { /// Updates the chain tip and prunes old blocks from the MMR. pub(crate) fn update_chain_tip(&mut self, tip: BlockHeader, max_block_count: usize) { - // Skip blocks already reflected in the chain state. A `BlockCommitted` event may arrive for - // a block whose state was already loaded from the store during startup: the mempool - // subscription is established first and then the chain tip is fetched, so any block - // committed in that window produces an event for state we have already ingested. + // Skip blocks already reflected in the chain state. The builder may load state during + // startup before receiving the same block from the committed-block subscription. if tip.block_num() <= self.chain_tip_header.block_num() { tracing::debug!( event_block = %tip.block_num(), current_tip = %self.chain_tip_header.block_num(), - "skipping BlockCommitted event for block already in chain state", + "skipping committed block already reflected in chain state", ); return; } diff --git a/bin/ntx-builder/src/coordinator.rs b/bin/ntx-builder/src/coordinator.rs index 45dbb167e9..acb86ff99f 100644 --- a/bin/ntx-builder/src/coordinator.rs +++ b/bin/ntx-builder/src/coordinator.rs @@ -1,24 +1,11 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; -use miden_node_db::DatabaseError; -use miden_node_proto::domain::mempool::MempoolEvent; use miden_protocol::account::AccountId; -use miden_protocol::account::delta::AccountUpdateDetails; use tokio::sync::{Notify, Semaphore}; use tokio::task::JoinSet; use crate::actor::{AccountActor, AccountActorContext}; -use crate::db::Db; - -// WRITE EVENT RESULT -// ================================================================================================ - -/// Result of writing a mempool event to the database. -pub struct WriteEventResult { - /// Accounts that should be notified of state changes. - pub accounts_to_notify: Vec, -} // ACTOR HANDLE // ================================================================================================ @@ -75,7 +62,7 @@ impl ActorHandle { /// - Gracefully handles actor shutdown and cleanup when actors complete or fail. /// - Monitors actor tasks through a join set to detect completion or errors. /// -/// ## Event Notification +/// ## Notification /// - Notifies actors via a shared [`Notify`] when state may have changed. /// - The DB is the source of truth: actors re-evaluate their state from DB on notification. /// - Notifications are coalesced: [`Notify`] stores at most one permit, so multiple notifications @@ -89,20 +76,19 @@ impl ActorHandle { /// - Actors that have been idle for longer than the idle timeout deactivate themselves. /// - When an actor deactivates, the coordinator checks if a notification arrived just as the actor /// timed out. If so, the actor is respawned immediately. -/// - Deactivated actors are re-spawned when [`Coordinator::send_targeted`] detects notes targeting -/// an account without an active actor. +/// - Deactivated actors are re-spawned when committed-chain processing detects new work for them. /// -/// The coordinator operates in an event-driven manner: +/// The coordinator operates in a notification-driven manner: /// 1. Network accounts are registered and actors spawned as needed. -/// 2. Mempool events are written to DB, then actors are notified. +/// 2. Committed-chain updates are written to DB, then actors are notified. /// 3. Actor completion/failure events are monitored and handled. /// 4. Failed or completed actors are cleaned up from the registry. pub struct Coordinator { /// Mapping of network account IDs to their notification handles. /// /// This registry serves as the primary directory for notifying active account actors. - /// When actors are spawned, they register their notification handle here. When events need - /// to be broadcast, this registry is used to locate the appropriate actors. The registry is + /// When actors are spawned, they register their notification handle here. When accounts need + /// to be notified, this registry is used to locate the appropriate actors. The registry is /// automatically cleaned up when actors complete their execution. actor_registry: HashMap, @@ -122,9 +108,6 @@ pub struct Coordinator { /// ensuring fair resource allocation and system stability under load. semaphore: Arc, - /// Database for persistent state. - db: Db, - /// Tracks the number of crashes per account actor. /// /// When an actor shuts down due to a DB error, its crash count is incremented. Once @@ -139,12 +122,11 @@ pub struct Coordinator { impl Coordinator { /// Creates a new coordinator with the specified maximum number of inflight transactions and the /// crash threshold for account deactivation. - pub fn new(max_inflight_transactions: usize, max_account_crashes: usize, db: Db) -> Self { + pub fn new(max_inflight_transactions: usize, max_account_crashes: usize) -> Self { Self { actor_registry: HashMap::new(), actor_join_set: JoinSet::new(), semaphore: Arc::new(Semaphore::new(max_inflight_transactions)), - db, crash_counts: HashMap::new(), max_account_crashes, } @@ -251,152 +233,15 @@ impl Coordinator { }, } } - - /// Notifies account actors that are affected by a `TransactionAdded` event. - /// - /// Only actors that are currently active are notified. Since event effects are already - /// persisted in the DB by `write_event()`, actors that spawn later read their state from the - /// DB and do not need predating events. - /// - /// Returns account IDs of note targets that do not have active actors (e.g. previously - /// deactivated due to sterility). The caller can use this to re-activate actors for those - /// accounts. - pub fn send_targeted(&self, event: &MempoolEvent) -> Vec { - let mut target_account_ids = HashSet::new(); - let mut inactive_targets = Vec::new(); - - if let MempoolEvent::TransactionAdded { network_notes, account_delta, .. } = event { - // We need to inform the account if it was updated. This lets it know that its own - // transaction has been applied, and in the future also resolves race conditions with - // external network transactions (once these are allowed). - if let Some(AccountUpdateDetails::Delta(delta)) = account_delta { - // The actor registry only contains accounts the builder has already classified as - // network, so the lookup itself filters out non-network ids. - let account_id = delta.id(); - if self.actor_registry.contains_key(&account_id) { - target_account_ids.insert(account_id); - } - } - - // Determine target actors for each note. - for note in network_notes { - let account = note.target_account_id(); - - if self.actor_registry.contains_key(&account) { - target_account_ids.insert(account); - } else { - inactive_targets.push(account); - } - } - } - // Notify target actors. - for account_id in &target_account_ids { - if let Some(handle) = self.actor_registry.get(account_id) { - handle.notify(); - } - } - - inactive_targets - } - - /// Writes mempool event effects to the database. - /// - /// This must be called BEFORE sending notifications to actors. Returns a [`WriteEventResult`] - /// with the accounts to notify and cancel. - pub async fn write_event( - &self, - event: &MempoolEvent, - ) -> Result { - match event { - MempoolEvent::TransactionAdded { - id, - nullifiers, - network_notes, - account_delta, - } => { - self.db - .handle_transaction_added( - *id, - account_delta.clone(), - network_notes.clone(), - nullifiers.clone(), - ) - .await?; - Ok(WriteEventResult { accounts_to_notify: Vec::new() }) - }, - MempoolEvent::BlockCommitted { header, txs } => { - let affected_accounts = self - .db - .handle_block_committed( - txs.clone(), - header.block_num(), - header.as_ref().clone(), - ) - .await?; - Ok(WriteEventResult { accounts_to_notify: affected_accounts }) - }, - MempoolEvent::TransactionsReverted(tx_ids) => { - let affected_accounts = - self.db.handle_transactions_reverted(tx_ids.iter().copied().collect()).await?; - Ok(WriteEventResult { accounts_to_notify: affected_accounts }) - }, - } - } -} - -#[cfg(test)] -impl Coordinator { - /// Creates a coordinator with default settings backed by a temp DB. - pub async fn test() -> (Self, tempfile::TempDir) { - let (db, dir) = Db::test_setup().await; - (Self::new(4, 10, db), dir) - } } #[cfg(test)] mod tests { - use miden_node_proto::domain::mempool::MempoolEvent; - use super::*; use crate::actor::AccountActorContext; use crate::db::Db; use crate::test_utils::*; - /// Registers a dummy actor handle (no real actor task) in the coordinator's registry. - fn register_dummy_actor(coordinator: &mut Coordinator, account_id: AccountId) { - let notify = Arc::new(Notify::new()); - coordinator.actor_registry.insert(account_id, ActorHandle::new(notify)); - } - - // SEND TARGETED TESTS - // ============================================================================================ - - #[tokio::test] - async fn send_targeted_returns_inactive_targets() { - let (mut coordinator, _dir) = Coordinator::test().await; - - let active_id = mock_network_account_id(); - let inactive_id = mock_network_account_id_seeded(42); - - // Only register the active account. - register_dummy_actor(&mut coordinator, active_id); - - let note_active = mock_single_target_note(active_id, 10); - let note_inactive = mock_single_target_note(inactive_id, 20); - - let event = MempoolEvent::TransactionAdded { - id: mock_tx_id(1), - nullifiers: vec![], - network_notes: vec![note_active, note_inactive], - account_delta: None, - }; - - let inactive_targets = coordinator.send_targeted(&event); - - assert_eq!(inactive_targets.len(), 1); - assert_eq!(inactive_targets[0], inactive_id); - } - // DEACTIVATED ACCOUNTS // ============================================================================================ @@ -404,7 +249,7 @@ mod tests { async fn spawn_actor_skips_deactivated_account() { let (db, _dir) = Db::test_setup().await; let max_crashes = 3; - let mut coordinator = Coordinator::new(4, max_crashes, db.clone()); + let mut coordinator = Coordinator::new(4, max_crashes); let actor_context = AccountActorContext::test(&db); let account_id = mock_network_account_id(); @@ -424,7 +269,7 @@ mod tests { async fn spawn_actor_allows_below_threshold() { let (db, _dir) = Db::test_setup().await; let max_crashes = 3; - let mut coordinator = Coordinator::new(4, max_crashes, db.clone()); + let mut coordinator = Coordinator::new(4, max_crashes); let actor_context = AccountActorContext::test(&db); let account_id = mock_network_account_id(); diff --git a/bin/ntx-builder/src/test_utils.rs b/bin/ntx-builder/src/test_utils.rs index 1e0e330746..691bc0fa07 100644 --- a/bin/ntx-builder/src/test_utils.rs +++ b/bin/ntx-builder/src/test_utils.rs @@ -7,7 +7,6 @@ use miden_protocol::testing::account_id::{ ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE, AccountIdBuilder, }; -use miden_protocol::transaction::TransactionId; use miden_standards::note::{AccountTargetNetworkNote, NetworkAccountTarget, NoteExecutionHint}; use miden_standards::testing::note::NoteBuilder; use rand_chacha::ChaCha20Rng; @@ -18,23 +17,6 @@ pub fn mock_network_account_id() -> AccountId { ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE.try_into().unwrap() } -/// Creates a distinct network account ID using a seeded RNG. -pub fn mock_network_account_id_seeded(seed: u8) -> AccountId { - AccountIdBuilder::new() - .account_type(AccountType::Public) - .build_with_seed([seed; 32]) -} - -/// Creates a unique `TransactionId` from a seed value. -pub fn mock_tx_id(seed: u64) -> TransactionId { - use miden_protocol::testing::account_id::ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET; - - let w = |n: u64| Word::try_from([n, 0, 0, 0]).unwrap(); - let faucet_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap(); - let fee = miden_protocol::asset::FungibleAsset::new(faucet_id, 0).unwrap(); - TransactionId::new(w(seed), w(seed + 1), w(seed + 2), w(seed + 3), fee) -} - /// Creates a `AccountTargetNetworkNote` targeting the given network account. pub fn mock_single_target_note( network_account_id: AccountId, diff --git a/crates/block-producer/Cargo.toml b/crates/block-producer/Cargo.toml index 5509826e74..f783191622 100644 --- a/crates/block-producer/Cargo.toml +++ b/crates/block-producer/Cargo.toml @@ -30,7 +30,6 @@ miden-node-proto-build = { features = ["internal"], workspace = true } miden-node-utils = { features = ["testing"], workspace = true } miden-protocol = { default-features = true, workspace = true } miden-remote-prover-client = { features = ["batch-prover", "block-prover"], workspace = true } -miden-standards = { workspace = true } miden-tx-batch-prover = { workspace = true } rand = { workspace = true } thiserror = { workspace = true } diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index b004315fdf..dd3510b933 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -271,7 +271,7 @@ impl BlockBuilder { .map_err(BuildBlockError::StoreApplyBlockFailed)?; let (header, ..) = signed_block.into_parts(); - mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.commit_block(header); + mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.commit_block(&header); Ok(()) } diff --git a/crates/block-producer/src/domain/transaction.rs b/crates/block-producer/src/domain/transaction.rs index bec445625b..d8f06e17e5 100644 --- a/crates/block-producer/src/domain/transaction.rs +++ b/crates/block-producer/src/domain/transaction.rs @@ -5,7 +5,7 @@ use miden_protocol::Word; use miden_protocol::account::AccountId; use miden_protocol::block::BlockNumber; use miden_protocol::note::Nullifier; -use miden_protocol::transaction::{OutputNote, ProvenTransaction, TransactionId, TxAccountUpdate}; +use miden_protocol::transaction::{ProvenTransaction, TransactionId, TxAccountUpdate}; use crate::errors::StateConflict; use crate::store::TransactionInputs; @@ -94,10 +94,6 @@ impl AuthenticatedTransaction { self.inner.output_notes().iter().map(|n| n.id().as_word()) } - pub fn output_notes(&self) -> impl Iterator + '_ { - self.inner.output_notes().iter() - } - pub fn output_note_count(&self) -> usize { self.inner.output_notes().num_notes() } diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 9ec79c417b..330ce242f3 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -54,14 +54,11 @@ use std::collections::{HashSet, VecDeque}; use std::num::NonZeroUsize; use std::sync::{Arc, LockResult, Mutex, MutexGuard}; -use miden_node_proto::domain::mempool::MempoolEvent; use miden_node_utils::ErrorReport; use miden_protocol::batch::{BatchId, ProvenBatch}; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::transaction::{TransactionHeader, TransactionId}; -use subscription::SubscriptionProvider; use thiserror::Error; -use tokio::sync::mpsc; use tracing::instrument; use crate::block_builder::SelectedBlock; @@ -80,7 +77,6 @@ mod budget; pub use budget::{BatchBudget, BlockBudget}; mod graph; -mod subscription; #[cfg(test)] mod tests; @@ -183,7 +179,6 @@ pub struct Mempool { committed_chain_tip: BlockNumber, config: MempoolConfig, - subscription: subscription::SubscriptionProvider, } impl Mempool { @@ -199,7 +194,6 @@ impl Mempool { Self { config, committed_chain_tip: chain_tip, - subscription: SubscriptionProvider::new(chain_tip), transactions: graph::TransactionGraph::default(), batches: graph::BatchGraph::default(), pending_block: None, @@ -221,8 +215,6 @@ impl Mempool { /// Adds a transaction to the mempool. /// - /// Sends a [`MempoolEvent::TransactionAdded`] event to subscribers. - /// /// # Returns /// /// Returns the current block height. @@ -251,7 +243,6 @@ impl Mempool { self.transactions .append(Arc::clone(&tx)) .map_err(MempoolSubmissionError::StateConflict)?; - self.subscription.transaction_added(&tx); self.inject_telemetry(); Ok(self.committed_chain_tip) @@ -286,9 +277,6 @@ impl Mempool { .append_user_batch(txs) .map_err(MempoolSubmissionError::StateConflict)?; - for tx in txs { - self.subscription.transaction_added(tx); - } self.inject_telemetry(); Ok(self.committed_chain_tip) @@ -341,8 +329,7 @@ impl Mempool { // could check this precondition above. if let Some(batch) = reverted_batches.iter().find(|reverted| reverted.id() == batch) { let failed_txs = batch.transactions().iter().map(|tx| tx.id()); - let reverted_txs = self.transactions.increment_failure_count(failed_txs); - self.subscription.txs_reverted(reverted_txs); + self.transactions.increment_failure_count(failed_txs); } self.inject_telemetry(); @@ -385,39 +372,26 @@ impl Mempool { /// The pool will mark the associated batches and transactions as committed, and prune stale /// committed data, and purge transactions that are now considered expired. /// - /// Sends a [`MempoolEvent::BlockCommitted`] event to subscribers, as well as a - /// [`MempoolEvent::TransactionsReverted`] for transactions that are now considered expired. - /// /// On success the internal state is updated in place: the chain tip advances, expired data is - /// pruned, and subscribers are notified about the committed block and any reverted - /// transactions. + /// pruned, and expired transactions are reverted. /// /// # Panics /// /// Panics if there is no matching block in flight. #[instrument(target = COMPONENT, name = "mempool.commit_block", skip_all)] - pub fn commit_block(&mut self, block_header: BlockHeader) { + pub fn commit_block(&mut self, block_header: &BlockHeader) { assert_eq!(self.committed_chain_tip.child(), block_header.block_num()); let block = self .pending_block .take_if(|pending| pending.block_number == block_header.block_num()) .expect("block must be in progress to commit"); - let tx_ids = block - .batches - .iter() - .flat_map(|batch| batch.transactions().as_slice().iter()) - .map(miden_protocol::transaction::TransactionHeader::id) - .collect(); - self.committed_chain_tip = self.committed_chain_tip.child(); - self.subscription.block_committed(block_header, tx_ids); self.committed_blocks.push_back(block); self.prune_oldest_block(); - let reverted_tx_ids = self.revert_expired(); - self.subscription.txs_reverted(reverted_tx_ids); + self.revert_expired(); self.inject_telemetry(); } @@ -427,8 +401,6 @@ impl Mempool { /// Additionally, the transactions from this block have their failure count incremented, /// potentially reverting them if they exceed the failure limit. /// - /// Sends a [`MempoolEvent::TransactionsReverted`] event to subscribers. - /// /// # Panics /// /// Panics if there is no matching block in flight. @@ -456,24 +428,10 @@ impl Mempool { .batches .iter() .flat_map(|batch| batch.transactions().as_slice().iter().map(TransactionHeader::id)); - let reverted_txs = self.transactions.increment_failure_count(failed_txs); - - self.subscription.txs_reverted(reverted_txs); + self.transactions.increment_failure_count(failed_txs); self.inject_telemetry(); } - // EVENTS & SUBSCRIPTIONS - // -------------------------------------------------------------------------------------------- - - /// Creates a subscription to [`MempoolEvent`] which will be emitted in the order they - /// occur. - /// - /// Only emits events which occurred after the current committed block. - #[instrument(target = COMPONENT, name = "mempool.subscribe", skip_all)] - pub fn subscribe(&mut self) -> mpsc::Receiver { - self.subscription.subscribe() - } - // STATS & INSPECTION // -------------------------------------------------------------------------------------------- diff --git a/crates/block-producer/src/mempool/subscription.rs b/crates/block-producer/src/mempool/subscription.rs deleted file mode 100644 index beae70c62b..0000000000 --- a/crates/block-producer/src/mempool/subscription.rs +++ /dev/null @@ -1,219 +0,0 @@ -use std::collections::{BTreeMap, HashSet}; -use std::ops::Mul; - -use miden_node_proto::domain::mempool::MempoolEvent; -use miden_protocol::block::{BlockHeader, BlockNumber}; -use miden_protocol::transaction::{OutputNote, TransactionId}; -use miden_standards::note::NetworkNoteExt; -use tokio::sync::mpsc; - -use crate::domain::transaction::AuthenticatedTransaction; - -/// Coordinates mempool event delivery for a single subscriber. -/// -/// Retains the active subscriber channel (if any) and an in-memory queue of uncommitted -/// transaction events so new subscriptions can immediately replay pending updates. -#[derive(Clone, Debug)] -pub(crate) struct SubscriptionProvider { - /// The latest event subscription, if any. - /// - /// The only current interested party is the network transaction builder, so one subscription - /// is enough. - subscription: Option>, - - /// The latest committed block number. - /// - /// This is used to ensure synchronicity with new subscribers. - chain_tip: BlockNumber, - - /// Tracks all uncommitted transaction events. These events must be resent on start - /// of a new subscription since the subscriber will only have data up to the latest - /// committed block and would otherwise miss these uncommitted transactions. - /// - /// The size is bounded by removing events as they are committed or reverted, and as - /// such this is always bound to the current amount of inflight transactions. - inflight_txs: InflightTransactions, -} - -impl PartialEq for SubscriptionProvider { - fn eq(&self, other: &Self) -> bool { - self.chain_tip == other.chain_tip && self.inflight_txs == other.inflight_txs - } -} - -impl SubscriptionProvider { - pub fn new(chain_tip: BlockNumber) -> Self { - Self { - chain_tip, - subscription: None, - inflight_txs: InflightTransactions::default(), - } - } - - /// Creates a new [`MempoolEvent`] subscription. - /// - /// This replaces any existing subscription. - /// Any previous subscriber is dropped and must resubscribe to continue receiving events. - pub fn subscribe(&mut self) -> mpsc::Receiver { - // We should leave enough space to at least send the uncommitted events (plus some extra). - let capacity = self.inflight_txs.len().mul(2).max(1024); - let (tx, rx) = mpsc::channel(capacity); - self.subscription.replace(tx); - - // Send each uncommitted tx event in chronological order. - // - // The ordering is guaranteed by the tracker. - // - // We don't clear the queue so that they're available for other new subscriptions. - // The queue size is managed by instead removing events once they're committed or reverted. - for tx in self.inflight_txs.iter() { - Self::send_event(&mut self.subscription, tx.clone()); - } - - rx - } - - /// Records a newly added transaction in the inflight queue and forwards the event to the - /// subscriber. - pub(super) fn transaction_added(&mut self, tx: &AuthenticatedTransaction) { - let id = tx.id(); - let nullifiers = tx.nullifiers().collect(); - let network_notes = tx - .output_notes() - .filter_map(|note| match note { - OutputNote::Public(inner) => { - inner.clone().into_note().into_account_target_network_note().ok() - }, - OutputNote::Private(_) => None, - }) - .collect(); - - // Private accounts cannot be managed by the network, so filter them out - let account_delta = match tx.account_update().details() { - miden_protocol::account::delta::AccountUpdateDetails::Private => None, - details @ miden_protocol::account::delta::AccountUpdateDetails::Delta(_) => { - Some(details.clone()) - }, - }; - let event = MempoolEvent::TransactionAdded { - id, - nullifiers, - network_notes, - account_delta, - }; - - self.inflight_txs.insert(event.clone()); - Self::send_event(&mut self.subscription, event); - } - - /// Records a committed block, prunes replayed transactions, and forwards the event so future - /// subscribers continue from the latest chain tip. - pub(super) fn block_committed(&mut self, header: BlockHeader, txs: Vec) { - self.chain_tip = header.block_num(); - for tx in &txs { - self.inflight_txs.remove(tx); - } - - Self::send_event( - &mut self.subscription, - MempoolEvent::BlockCommitted { header: Box::new(header), txs }, - ); - } - - /// Removes reverted transactions from the inflight queue and notifies the subscriber so they - /// can drop or retry the affected items. - pub(super) fn txs_reverted(&mut self, txs: HashSet) { - for tx in &txs { - self.inflight_txs.remove(tx); - } - Self::send_event(&mut self.subscription, MempoolEvent::TransactionsReverted(txs)); - } - - /// Sends a [`MempoolEvent`] to the subscriber, if any. - /// - /// If the send fails, the subscription is cancelled and the event is dropped, so callers must - /// resubscribe to continue receiving updates. - /// - /// This function does not take `&self` to work-around borrowing issues - /// where both the sender and inflight events need to be borrowed at the same time. - fn send_event(subscription: &mut Option>, event: MempoolEvent) { - let Some(sender) = subscription else { - return; - }; - - // If sending fails, end the subscription to prevent desync. - if let Err(error) = sender.try_send(event) { - tracing::warn!(%error, "mempool subscription failed, cancelling subscription"); - subscription.take(); - } - } -} - -/// Maintains an ordered index of [`MempoolEvent::TransactionAdded`] events which can be efficiently -/// added and removed. -/// -/// This is used to track events which need to be sent on fresh subscriptions. -/// -/// The events can be iterated over in chronological order. -#[derive(Default, Clone, Debug, PartialEq)] -struct InflightTransactions { - /// [`MempoolEvent::TransactionAdded`] events which are still inflight i.e. have not been - /// committed or reverted. - /// - /// These events need to be transmitted when a subscription is started, since the subscriber - /// only has the committed state. - /// - /// A [`BTreeMap`] is used to maintain event ordering while allowing for efficient removals of - /// committed or reverted transactions. - /// - /// The key is auto-incremented on each new insert to support this event ordering. - /// - /// A reverse lookup index is maintained in `index`. - txs: BTreeMap, - - /// A reverse lookup index for `txs` which allows for efficient removal of committed or reverted - /// events. - index: BTreeMap, -} - -impl InflightTransactions { - /// Adds a new transaction event to the tracker. - /// - /// # Panics - /// - /// Panics if: - /// - the event is not a [`MempoolEvent::TransactionAdded`], or - /// - the event already exists - fn insert(&mut self, tx: MempoolEvent) { - let MempoolEvent::TransactionAdded { id, .. } = &tx else { - panic!("Cannot submit a non-tx event to inflight transaction event tracker"); - }; - - let idx = self.txs.last_key_value().map(|(&k, _v)| k + 1).unwrap_or_default(); - assert!( - self.index.insert(*id, idx).is_none(), - "transaction event already exists in tracker" - ); - self.txs.insert(idx, tx); - } - - /// Removes a transaction from the tracker. - /// - /// # Panics - /// - /// Panics if the transaction was not being tracked. - fn remove(&mut self, tx: &TransactionId) { - let idx = self.index.remove(tx).expect("transaction to remove should be tracked"); - self.txs.remove(&idx); - } - - /// An iterator over all transaction events in the order they were added. - fn iter(&self) -> impl Iterator { - self.txs.values() - } - - /// The number of transaction events. - fn len(&self) -> usize { - self.txs.len() - } -} diff --git a/crates/block-producer/src/mempool/tests.rs b/crates/block-producer/src/mempool/tests.rs index 44b717cde7..973fcaafe7 100644 --- a/crates/block-producer/src/mempool/tests.rs +++ b/crates/block-producer/src/mempool/tests.rs @@ -167,7 +167,7 @@ fn block_commit_reverts_expired_txns() { // Create and commit the block which should revert the above tx. let block = uut.select_block(); let arb_header = BlockHeader::mock(block.block_number, None, None, &[], Word::empty()); - uut.commit_block(arb_header.clone()); + uut.commit_block(&arb_header); // A reverted transaction behaves as if it never existed. reference.add_transaction(tx_to_commit.clone()).unwrap(); @@ -176,7 +176,7 @@ fn block_commit_reverts_expired_txns() { tx_to_commit.raw_proven_transaction() ]))); reference.select_block(); - reference.commit_block(arb_header); + reference.commit_block(&arb_header); assert_eq!(uut, reference); } @@ -188,7 +188,7 @@ fn empty_block_commitment() { for _ in 0..3 { let block = uut.select_block(); let arb_header = BlockHeader::mock(block.block_number, None, None, &[], Word::empty()); - uut.commit_block(arb_header); + uut.commit_block(&arb_header); } } @@ -233,11 +233,11 @@ fn pruned_committed_notes_are_authenticated_for_inflight_descendants() { let block = uut.select_block(); let header = BlockHeader::mock(block.block_number, None, None, &[], Word::empty()); - uut.commit_block(header); + uut.commit_block(&header); let block = uut.select_block(); let header = BlockHeader::mock(block.block_number, None, None, &[], Word::empty()); - uut.commit_block(header); + uut.commit_block(&header); let child_batch = uut.select_batch().unwrap(); @@ -251,7 +251,7 @@ fn pruned_committed_notes_are_authenticated_for_inflight_descendants() { #[should_panic] fn block_commitment_is_rejected_if_no_block_is_in_flight() { let arb_header = BlockHeader::mock(0, None, None, &[], Word::empty()); - Mempool::for_tests().0.commit_block(arb_header); + Mempool::for_tests().0.commit_block(&arb_header); } #[test] diff --git a/crates/block-producer/src/mempool/tests/add_transaction.rs b/crates/block-producer/src/mempool/tests/add_transaction.rs index ae70c5f612..dbb82677ad 100644 --- a/crates/block-producer/src/mempool/tests/add_transaction.rs +++ b/crates/block-producer/src/mempool/tests/add_transaction.rs @@ -70,7 +70,7 @@ mod tx_expiration { for _ in 0..slack + 10 { let block = uut.select_block(); let header = BlockHeader::mock(block.block_number, None, None, &[], Word::default()); - uut.commit_block(header); + uut.commit_block(&header); } uut @@ -144,7 +144,7 @@ mod authentication_height { for _ in 0..retention + 10 { let block = uut.select_block(); let header = BlockHeader::mock(block.block_number, None, None, &[], Word::default()); - uut.commit_block(header); + uut.commit_block(&header); } uut diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index 0e9876d59d..903fb03fb7 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -5,8 +5,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; -use futures::StreamExt; -use miden_node_proto::domain::mempool::MempoolEvent; use miden_node_proto::generated::block_producer::api_server; use miden_node_proto::generated::{self as proto}; use miden_node_proto_build::block_producer_api_descriptor; @@ -20,7 +18,7 @@ use miden_protocol::transaction::ProvenTransaction; use miden_protocol::utils::serde::Deserializable; use tokio::net::TcpListener; use tokio::sync::{Mutex, RwLock}; -use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; +use tokio_stream::wrappers::TcpListenerStream; use tonic::Status; use tower_http::trace::TraceLayer; use tracing::{debug, error, info, instrument}; @@ -396,8 +394,6 @@ impl BlockProducerRpcServer { #[tonic::async_trait] impl api_server::Api for BlockProducerRpcServer { - type MempoolSubscriptionStream = MempoolEventSubscription; - async fn submit_proven_tx( &self, request: tonic::Request, @@ -433,40 +429,6 @@ impl api_server::Api for BlockProducerRpcServer { mempool_stats: Some(mempool_stats.into()), })) } - - async fn mempool_subscription( - &self, - _request: tonic::Request<()>, - ) -> Result, tonic::Status> { - let shared_mempool = self.mempool.lock().await; - let subscription = shared_mempool - .lock() - .map_err(|err| tonic::Status::internal(err.to_string()))? - .subscribe(); - let subscription = ReceiverStream::new(subscription); - - Ok(tonic::Response::new(MempoolEventSubscription { inner: subscription })) - } -} - -// MEMPOOL SUBSCRIPTION -// ================================================================================================ - -struct MempoolEventSubscription { - inner: ReceiverStream, -} - -impl tokio_stream::Stream for MempoolEventSubscription { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner - .poll_next_unpin(cx) - .map(|x| x.map(proto::block_producer::MempoolEvent::from).map(Result::Ok)) - } } // MEMPOOL STATISTICS diff --git a/crates/proto/src/domain/mempool.rs b/crates/proto/src/domain/mempool.rs deleted file mode 100644 index 4b278e4796..0000000000 --- a/crates/proto/src/domain/mempool.rs +++ /dev/null @@ -1,144 +0,0 @@ -use std::collections::HashSet; - -use miden_protocol::account::delta::AccountUpdateDetails; -use miden_protocol::block::BlockHeader; -use miden_protocol::note::Nullifier; -use miden_protocol::transaction::TransactionId; -use miden_protocol::utils::serde::Serializable; -use miden_standards::note::AccountTargetNetworkNote; - -use crate::decode::{ConversionResultExt, DecodeBytesExt, GrpcDecodeExt}; -use crate::errors::ConversionError; -use crate::{decode, generated as proto}; - -#[derive(Debug, Clone, PartialEq)] -pub enum MempoolEvent { - TransactionAdded { - id: TransactionId, - nullifiers: Vec, - network_notes: Vec, - account_delta: Option, - }, - BlockCommitted { - // Box'd as this struct is quite large and triggers clippy. - header: Box, - txs: Vec, - }, - TransactionsReverted(HashSet), -} - -impl MempoolEvent { - pub fn kind(&self) -> &'static str { - match self { - MempoolEvent::TransactionAdded { .. } => "TransactionAdded", - MempoolEvent::BlockCommitted { .. } => "BlockCommitted", - MempoolEvent::TransactionsReverted(_) => "TransactionsReverted", - } - } -} - -impl From for proto::block_producer::MempoolEvent { - fn from(event: MempoolEvent) -> Self { - let event = match event { - MempoolEvent::TransactionAdded { - id, - nullifiers, - network_notes, - account_delta, - } => { - let event = proto::block_producer::mempool_event::TransactionAdded { - id: Some(id.into()), - nullifiers: nullifiers.into_iter().map(Into::into).collect(), - network_notes: network_notes.into_iter().map(Into::into).collect(), - network_account_delta: account_delta - .as_ref() - .map(AccountUpdateDetails::to_bytes), - }; - - proto::block_producer::mempool_event::Event::TransactionAdded(event) - }, - MempoolEvent::BlockCommitted { header, txs } => { - proto::block_producer::mempool_event::Event::BlockCommitted( - proto::block_producer::mempool_event::BlockCommitted { - block_header: Some(header.as_ref().into()), - transactions: txs.into_iter().map(Into::into).collect(), - }, - ) - }, - MempoolEvent::TransactionsReverted(txs) => { - proto::block_producer::mempool_event::Event::TransactionsReverted( - proto::block_producer::mempool_event::TransactionsReverted { - reverted: txs.into_iter().map(Into::into).collect(), - }, - ) - }, - } - .into(); - - Self { event } - } -} - -impl TryFrom for MempoolEvent { - type Error = ConversionError; - - fn try_from(event: proto::block_producer::MempoolEvent) -> Result { - let event = event.event.ok_or(ConversionError::missing_field::< - proto::block_producer::MempoolEvent, - >("event"))?; - - match event { - proto::block_producer::mempool_event::Event::TransactionAdded(tx) => { - let decoder = tx.decoder(); - let id = decode!(decoder, tx.id)?; - let nullifiers = tx - .nullifiers - .into_iter() - .map(Nullifier::try_from) - .collect::>() - .context("nullifiers")?; - let network_notes = tx - .network_notes - .into_iter() - .map(AccountTargetNetworkNote::try_from) - .collect::>() - .context("network_notes")?; - let account_delta = tx - .network_account_delta - .as_deref() - .map(|bytes| AccountUpdateDetails::decode_bytes(bytes, "account_delta")) - .transpose()?; - - Ok(Self::TransactionAdded { - id, - nullifiers, - network_notes, - account_delta, - }) - }, - proto::block_producer::mempool_event::Event::BlockCommitted(block_committed) => { - let decoder = block_committed.decoder(); - let header = decode!(decoder, block_committed.block_header)?; - let header = Box::new(header); - let txs = block_committed - .transactions - .into_iter() - .map(TransactionId::try_from) - .collect::>() - .context("transactions")?; - - Ok(Self::BlockCommitted { header, txs }) - }, - proto::block_producer::mempool_event::Event::TransactionsReverted(txs) => { - let txs = txs - .reverted - .into_iter() - .map(TransactionId::try_from) - .collect::>() - .context("reverted")?; - - Ok(Self::TransactionsReverted(txs)) - }, - } - } -} diff --git a/crates/proto/src/domain/mod.rs b/crates/proto/src/domain/mod.rs index b078655532..e04eff7947 100644 --- a/crates/proto/src/domain/mod.rs +++ b/crates/proto/src/domain/mod.rs @@ -2,7 +2,6 @@ pub mod account; pub mod batch; pub mod block; pub mod digest; -pub mod mempool; pub mod merkle; pub mod note; pub mod nullifier; diff --git a/crates/proto/src/generated/mod.rs b/crates/proto/src/generated/mod.rs index 63dc1dfa2c..e559675415 100644 --- a/crates/proto/src/generated/mod.rs +++ b/crates/proto/src/generated/mod.rs @@ -1,6 +1,5 @@ #![expect( clippy::pedantic, - clippy::large_enum_variant, clippy::allow_attributes, reason = "generated by build.rs and tonic" )] diff --git a/docs/external/src/operator/architecture.md b/docs/external/src/operator/architecture.md index de7b3690a3..2ed5ba9bda 100644 --- a/docs/external/src/operator/architecture.md +++ b/docs/external/src/operator/architecture.md @@ -55,13 +55,11 @@ it is also possible to perform proving in-process. This is useful when running a ## Network transaction builder -The network transaction builder monitors the mempool for network notes, and creates transactions consuming these. +The network transaction builder follows committed blocks for network notes, and creates transactions consuming these. We call these network transactions and at present this is the only entity that is allowed to create such transactions. This restriction will be lifted in the future, but for now this component _must_ be enabled to have support for network transactions. -The mempool is monitored via a gRPC event stream served by the block-producer. - Internally, the builder spawns a dedicated actor for each network account that has pending notes. Actors that remain idle (no notes to consume) for a configurable duration are automatically deactivated to conserve resources, and are re-activated when new notes arrive. The idle timeout can be tuned with the `--ntx-builder.idle-timeout` CLI diff --git a/docs/internal/src/ntx-builder.md b/docs/internal/src/ntx-builder.md index bc9c3cfbe9..a26057921d 100644 --- a/docs/internal/src/ntx-builder.md +++ b/docs/internal/src/ntx-builder.md @@ -36,8 +36,8 @@ definitions of network accounts, notes and transactions mature. The NTB uses an actor-per-account model managed by a central `Coordinator`. On startup the coordinator syncs all known network accounts and their unconsumed notes from the store. It then -monitors the mempool for events (via a gRPC event stream from the block-producer) which would -impact network account state. +follows the committed block stream from the RPC service for updates which would impact network +account state. For each network account, the coordinator spawns a dedicated `AccountActor`. Each actor runs in its own async task and is responsible for creating transactions that consume network notes targeting @@ -50,8 +50,8 @@ Actors that have been idle (no available notes to consume) for longer than the * will be deactivated. The idle timeout is configurable via the `--ntx-builder.idle-timeout` CLI argument (default: 5 minutes). -Deactivated actors are re-spawned when new notes targeting their account are detected by the -coordinator (via the `send_targeted` path). +Deactivated actors are re-spawned when committed-chain processing detects new notes targeting their +account. Each actors crash count is tracked, and once the count reaches a configurable threshold, the account is **deactivated** and no new actor will be spawned for it. This prevents resource exhaustion from a persistently diff --git a/proto/proto/internal/block_producer.proto b/proto/proto/internal/block_producer.proto index 407054cd89..d1fa9f0ce1 100644 --- a/proto/proto/internal/block_producer.proto +++ b/proto/proto/internal/block_producer.proto @@ -5,8 +5,6 @@ package block_producer; import "google/protobuf/empty.proto"; import "rpc.proto"; import "types/blockchain.proto"; -import "types/note.proto"; -import "types/primitives.proto"; import "types/transaction.proto"; // BLOCK PRODUCER SERVICE @@ -25,60 +23,4 @@ service Api { // // Returns the node's current block height. rpc SubmitProvenTxBatch(transaction.TransactionBatch) returns (blockchain.BlockNumber) {} - - // Subscribe to mempool events. - // - // The event stream will contain all events after the current chain tip. This includes all - // currently inflight events that have not yet been committed to the chain. - // - // Currently only a single active subscription is supported. Subscription requests will cancel - // the active subscription, if any. - rpc MempoolSubscription(google.protobuf.Empty) returns (stream MempoolEvent) {} -} - -// MEMPOOL SUBSCRIPTION -// ================================================================================================ - -// Request to subscribe to mempool events. -message MempoolSubscriptionRequest {} - -// Event from the mempool. -message MempoolEvent { - // A block was committed. - // - // This event is sent when a block is committed to the chain. - message BlockCommitted { - blockchain.BlockHeader block_header = 1; - repeated transaction.TransactionId transactions = 2; - } - - // A transaction was added to the mempool. - // - // This event is sent when a transaction is added to the mempool. - message TransactionAdded { - // The ID of the transaction. - transaction.TransactionId id = 1; - // Nullifiers consumed by the transaction. - repeated primitives.Digest nullifiers = 2; - // Network notes created by the transaction. - repeated note.NetworkNote network_notes = 3; - // Changes to a network account, if any. This includes creation of new network accounts. - // - // The account delta is encoded using [miden_serde_utils::Serializable] implementation - // for [miden_protocol::account::delta::AccountDelta]. - optional bytes network_account_delta = 4; - } - - // A set of transactions was reverted and dropped from the mempool. - // - // This event is sent when a set of transactions are reverted and dropped from the mempool. - message TransactionsReverted { - repeated transaction.TransactionId reverted = 1; - } - - oneof event { - TransactionAdded transaction_added = 1; - BlockCommitted block_committed = 2; - TransactionsReverted transactions_reverted = 3; - } } From a52be2380026eb4b2da2feb14d4c1b315efeb813 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Tue, 26 May 2026 10:02:58 +0200 Subject: [PATCH 2/5] clippy From e02d6194e16e01a5f7a8d4e093ce1740debb3420 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Tue, 26 May 2026 10:11:08 +0200 Subject: [PATCH 3/5] More clippies From 745cf149427351525e6d35536788d8a12599fef0 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Tue, 26 May 2026 10:51:35 +0200 Subject: [PATCH 4/5] Forward submissions in full node mode. --- crates/rpc/src/lib.rs | 2 +- crates/rpc/src/server/api.rs | 133 +++++++++++------------------------ crates/rpc/src/server/mod.rs | 56 +++++++++++---- crates/rpc/src/tests.rs | 40 +++++++++-- 4 files changed, 121 insertions(+), 110 deletions(-) diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 368cb63de4..4254d631e3 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -2,7 +2,7 @@ mod server; #[cfg(test)] mod tests; -pub use server::Rpc; +pub use server::{Rpc, RpcMode}; // CONSTANTS // ================================================================================================= diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index ed26ed6be4..6143efa54e 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -3,13 +3,7 @@ use std::sync::LazyLock; use std::time::Duration; use anyhow::Context; -use miden_node_proto::clients::{ - BlockProducerClient, - Builder, - NtxBuilderClient, - StoreRpcClient, - ValidatorClient, -}; +use miden_node_proto::clients::{NtxBuilderClient, StoreRpcClient}; use miden_node_proto::decode::{read_account_id, read_account_ids, read_block_range}; use miden_node_proto::domain::account::{AccountRequest, SlotData}; use miden_node_proto::errors::ConversionError; @@ -42,18 +36,17 @@ use miden_protocol::{MIN_PROOF_SECURITY_LEVEL, Word}; use miden_tx::TransactionVerifier; use miden_tx_batch_prover::LocalBatchProver; use tonic::{IntoRequest, Request, Response, Status}; -use tracing::{Span, debug, info, info_span}; -use url::Url; +use tracing::{Span, debug, info_span}; use crate::COMPONENT; +use crate::server::RpcMode; // RPC SERVICE // ================================================================================================ pub struct RpcService { store: StoreRpcClient, - block_producer: Option, - validator: ValidatorClient, + mode: RpcMode, ntx_builder: Option, genesis_commitment: Option, block_commitment_cache: LruCache, @@ -61,72 +54,14 @@ pub struct RpcService { impl RpcService { pub(super) fn new( - store_url: Url, - block_producer_url: Option, - validator_url: Url, - ntx_builder_url: Option, + store: StoreRpcClient, + mode: RpcMode, + ntx_builder: Option, commitment_cache_capacity: NonZeroUsize, ) -> Self { - let store = { - info!(target: COMPONENT, store_endpoint = %store_url, "Initializing store client"); - Builder::new(store_url) - .without_tls() - .without_timeout() - .without_metadata_version() - .without_metadata_genesis() - .with_otel_context_injection() - .connect_lazy::() - }; - - let block_producer = block_producer_url.map(|block_producer_url| { - info!( - target: COMPONENT, - block_producer_endpoint = %block_producer_url, - "Initializing block producer client", - ); - Builder::new(block_producer_url) - .without_tls() - .without_timeout() - .without_metadata_version() - .without_metadata_genesis() - .with_otel_context_injection() - .connect_lazy::() - }); - - let validator = { - info!( - target: COMPONENT, - validator_endpoint = %validator_url, - "Initializing validator client", - ); - Builder::new(validator_url) - .without_tls() - .without_timeout() - .without_metadata_version() - .without_metadata_genesis() - .with_otel_context_injection() - .connect_lazy::() - }; - - let ntx_builder = ntx_builder_url.map(|ntx_builder_url| { - info!( - target: COMPONENT, - ntx_builder_endpoint = %ntx_builder_url, - "Initializing ntx-builder client", - ); - Builder::new(ntx_builder_url) - .without_tls() - .without_timeout() - .without_metadata_version() - .without_metadata_genesis() - .with_otel_context_injection() - .connect_lazy::() - }); - Self { store, - block_producer, - validator, + mode, ntx_builder, genesis_commitment: None, block_commitment_cache: LruCache::new(commitment_cache_capacity), @@ -135,8 +70,8 @@ impl RpcService { /// Sets the genesis commitment, returning an error if it is already set. /// - /// Required since `RpcService::new()` sets up the `store` which is used to fetch the - /// `genesis_commitment`. + /// Required since the store client is used to fetch the `genesis_commitment` after + /// `RpcService` construction. pub fn set_genesis_commitment(&mut self, commitment: Word) -> anyhow::Result<()> { if self.genesis_commitment.is_some() { return Err(anyhow::anyhow!("genesis commitment already set")); @@ -502,12 +437,6 @@ impl api_server::Api for RpcService { ) -> Result, Status> { debug!(target: COMPONENT, request = ?request.get_ref()); - let Some(block_producer) = &self.block_producer else { - return Err(Status::unavailable( - "Transaction submission not available in read-only mode", - )); - }; - let request = request.into_inner(); let tx = ProvenTransaction::read_from_bytes(&request.transaction).map_err(|err| { @@ -567,10 +496,20 @@ impl api_server::Api for RpcService { )) })?; + // In full node mode we forward the request to the source. + let (block_producer, validator) = match &self.mode { + RpcMode::Sequencer { block_producer, validator } => { + (block_producer.as_ref(), validator.as_ref()) + }, + RpcMode::FullNode { source_rpc } => { + return source_rpc.as_ref().clone().submit_proven_tx(request).await; + }, + }; + // Transaction inputs must be provided in order to allow for transaction re-execution via // the Validator. if request.transaction_inputs.is_some() { - self.validator.clone().submit_proven_transaction(request.clone()).await?; + validator.clone().submit_proven_transaction(request.clone()).await?; } else { return Err(Status::invalid_argument("Transaction inputs must be provided")); } @@ -584,10 +523,6 @@ impl api_server::Api for RpcService { &self, request: tonic::Request, ) -> Result, Status> { - let Some(block_producer) = &self.block_producer else { - return Err(Status::unavailable("Batch submission not available in read-only mode")); - }; - let request = request.into_inner(); let proven_batch = ProvenBatch::read_from_bytes(&request.batch_proof).map_err(|err| { @@ -657,6 +592,16 @@ impl api_server::Api for RpcService { return Err(Status::invalid_argument("batch proof did not match proposed batch")); } + // In full node mode we forward the request to the source. + let (block_producer, validator) = match &self.mode { + RpcMode::Sequencer { block_producer, validator } => { + (block_producer.as_ref(), validator.as_ref()) + }, + RpcMode::FullNode { source_rpc } => { + return source_rpc.as_ref().clone().submit_proven_tx_batch(request).await; + }, + }; + // Submit each transaction to the validator. // // SAFETY: We checked earlier that the two iterators are the same length. @@ -665,7 +610,7 @@ impl api_server::Api for RpcService { transaction: tx.to_bytes(), transaction_inputs: inputs.clone().into(), }; - self.validator.clone().submit_proven_transaction(request).await?; + validator.clone().submit_proven_transaction(request).await?; } block_producer.clone().submit_proven_tx_batch(request).await @@ -704,15 +649,21 @@ impl api_server::Api for RpcService { let store_status = self.store.clone().status(Request::new(())).await.map(Response::into_inner).ok(); - let block_producer_status = if let Some(block_producer) = &self.block_producer { - block_producer + let block_producer_status = match &self.mode { + RpcMode::Sequencer { block_producer, .. } => block_producer + .as_ref() .clone() .status(Request::new(())) .await .map(Response::into_inner) + .ok(), + RpcMode::FullNode { source_rpc } => source_rpc + .as_ref() + .clone() + .status(Request::new(())) + .await .ok() - } else { - None + .and_then(|response| response.into_inner().block_producer), }; Ok(Response::new(proto::rpc::RpcStatus { diff --git a/crates/rpc/src/server/mod.rs b/crates/rpc/src/server/mod.rs index 6a8c3acc98..4eaebc21d1 100644 --- a/crates/rpc/src/server/mod.rs +++ b/crates/rpc/src/server/mod.rs @@ -2,6 +2,13 @@ use std::num::NonZeroUsize; use accept::AcceptHeaderLayer; use anyhow::Context; +use miden_node_proto::clients::{ + BlockProducerClient, + NtxBuilderClient, + RpcClient as SourceRpcClient, + StoreRpcClient, + ValidatorClient, +}; use miden_node_proto::generated::rpc::api_server; use miden_node_proto_build::rpc_api_descriptor; use miden_node_utils::clap::GrpcOptionsExternal; @@ -16,7 +23,6 @@ use tonic_web::GrpcWebLayer; use tower_http::classify::{GrpcCode, GrpcErrorsAsFailures, SharedClassifier}; use tower_http::trace::TraceLayer; use tracing::info; -use url::Url; use crate::COMPONENT; use crate::server::health::HealthCheckLayer; @@ -28,17 +34,44 @@ mod health; /// The RPC server component. /// /// On startup, binds to the provided listener and starts serving the RPC API. -/// It connects lazily to the store, validator and block producer components as needed. -/// Requests will fail if the components are not available. +/// It uses the supplied clients for store access and mode-specific submission handling. +/// Requests will fail if the supplied clients cannot reach their components. pub struct Rpc { pub listener: TcpListener, - pub store_url: Url, - pub block_producer_url: Option, - pub validator_url: Url, - pub ntx_builder_url: Option, + pub store: StoreRpcClient, + pub mode: RpcMode, + pub ntx_builder: Option, pub grpc_options: GrpcOptionsExternal, } +#[derive(Clone, Debug)] +pub enum RpcMode { + /// Sequencer RPC validates submissions locally, re-executes them through the validator, then + /// forwards them to the block producer. + Sequencer { + block_producer: Box, + validator: Box, + }, + /// Full-node RPC forwards submissions to the source RPC. + /// + /// The caller is responsible for configuring this client with any request metadata the source + /// RPC requires. + FullNode { source_rpc: Box }, +} + +impl RpcMode { + pub fn sequencer(block_producer: BlockProducerClient, validator: ValidatorClient) -> Self { + Self::Sequencer { + block_producer: Box::new(block_producer), + validator: Box::new(validator), + } + } + + pub fn full_node(source_rpc: SourceRpcClient) -> Self { + Self::FullNode { source_rpc: Box::new(source_rpc) } + } +} + impl Rpc { /// Serves the RPC API. /// @@ -46,10 +79,9 @@ impl Rpc { /// a fatal error is encountered. pub async fn serve(self) -> anyhow::Result<()> { let mut api = api::RpcService::new( - self.store_url.clone(), - self.block_producer_url.clone(), - self.validator_url, - self.ntx_builder_url.clone(), + self.store.clone(), + self.mode.clone(), + self.ntx_builder.clone(), NonZeroUsize::new(1_000_000).unwrap(), ); @@ -66,7 +98,7 @@ impl Rpc { .build_v1() .context("failed to build reflection service")?; - info!(target: COMPONENT, endpoint=?self.listener, store=%self.store_url, block_producer=?self.block_producer_url, "Server initialized"); + info!(target: COMPONENT, endpoint=?self.listener, mode=?self.mode, "Server initialized"); let rpc_version = env!("CARGO_PKG_VERSION"); let rpc_version = diff --git a/crates/rpc/src/tests.rs b/crates/rpc/src/tests.rs index 75c67500c5..49732384f4 100644 --- a/crates/rpc/src/tests.rs +++ b/crates/rpc/src/tests.rs @@ -4,7 +4,15 @@ use std::time::Duration; use http::header::{ACCEPT, CONTENT_TYPE}; use http::{HeaderMap, HeaderValue}; -use miden_node_proto::clients::{Builder, GrpcClient, Interceptor, RpcClient}; +use miden_node_proto::clients::{ + BlockProducerClient, + Builder, + GrpcClient, + Interceptor, + RpcClient, + StoreRpcClient, + ValidatorClient, +}; use miden_node_proto::generated::rpc::api_client::ApiClient as ProtoClient; use miden_node_proto::generated::{self as proto}; use miden_node_store::genesis::config::GenesisConfig; @@ -42,7 +50,7 @@ use tokio::task; use tokio::time::sleep; use url::Url; -use crate::Rpc; +use crate::{Rpc, RpcMode}; /// A wrapper around the store runtime and data directory. /// @@ -670,12 +678,32 @@ async fn start_rpc_with_options( let block_producer_url = Url::parse(&format!("http://{block_producer_addr}")).unwrap(); // SAFETY: Using dummy validator URL for test - not actually contacted in this test let validator_url = Url::parse("http://127.0.0.1:0").unwrap(); + let store = Builder::new(store_url) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::(); + let block_producer = Builder::new(block_producer_url) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::(); + let validator = Builder::new(validator_url) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::(); Rpc { listener: rpc_listener, - store_url, - block_producer_url: Some(block_producer_url), - validator_url, - ntx_builder_url: None, + store, + mode: RpcMode::sequencer(block_producer, validator), + ntx_builder: None, grpc_options, } .serve() From eef31f9247905bbf5f8850dd10e39085693c9d67 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Tue, 26 May 2026 10:55:51 +0200 Subject: [PATCH 5/5] Wire it up --- Cargo.lock | 1 + bin/node/Cargo.toml | 1 + bin/node/src/commands/modes.rs | 44 +++++++++++++++++++++++++++++++--- 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5db357e3ef..78bec97de5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3328,6 +3328,7 @@ dependencies = [ "fs-err", "humantime", "miden-node-block-producer", + "miden-node-proto", "miden-node-rpc", "miden-node-store", "miden-node-utils", diff --git a/bin/node/Cargo.toml b/bin/node/Cargo.toml index 85e96e5b9a..fc716f8e55 100644 --- a/bin/node/Cargo.toml +++ b/bin/node/Cargo.toml @@ -23,6 +23,7 @@ clap = { features = ["env", "string"], workspace = true } fs-err = { workspace = true } humantime = { workspace = true } miden-node-block-producer = { workspace = true } +miden-node-proto = { workspace = true } miden-node-rpc = { workspace = true } miden-node-store = { workspace = true } miden-node-utils = { workspace = true } diff --git a/bin/node/src/commands/modes.rs b/bin/node/src/commands/modes.rs index 8eb4500da2..993a8ea63a 100644 --- a/bin/node/src/commands/modes.rs +++ b/bin/node/src/commands/modes.rs @@ -1,3 +1,4 @@ +use miden_node_proto::clients::{Builder, NtxBuilderClient, RpcClient, ValidatorClient}; use url::Url; use super::block_producer::BlockProducerOptions; @@ -27,11 +28,13 @@ impl SequencerCommand { pub fn handle(self) -> anyhow::Result<()> { let runtime = self.runtime.runtime_config(&self.store); self.block_producer.validate()?; + let validator = self.external_services.validator_client(); + let ntx_builder = self.external_services.ntx_builder_client(); let _ = ( runtime.rpc_listen, runtime.data_directory, - self.external_services.validator_url, - self.external_services.ntx_builder_url, + validator, + ntx_builder, self.block_producer.block_prover.url, runtime.database_options, runtime.internal_grpc_options, @@ -58,6 +61,28 @@ pub struct SequencerExternalServiceOptions { pub ntx_builder_url: Url, } +impl SequencerExternalServiceOptions { + fn validator_client(&self) -> ValidatorClient { + Builder::new(self.validator_url.clone()) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::() + } + + fn ntx_builder_client(&self) -> NtxBuilderClient { + Builder::new(self.ntx_builder_url.clone()) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::() + } +} + #[derive(clap::Args, Clone, Debug)] pub struct FullNodeCommand { #[command(flatten)] @@ -73,6 +98,7 @@ pub struct FullNodeCommand { impl FullNodeCommand { pub fn handle(self) -> anyhow::Result<()> { let runtime = self.runtime.runtime_config(&self.store); + let source_rpc = self.sync.source_rpc_client(); let _ = ( runtime.rpc_listen, runtime.data_directory, @@ -80,7 +106,7 @@ impl FullNodeCommand { runtime.internal_grpc_options, runtime.external_grpc_options, runtime.storage_options, - self.sync.block_source_url, + source_rpc, ); anyhow::bail!( @@ -89,3 +115,15 @@ impl FullNodeCommand { ) } } + +impl SyncOptions { + fn source_rpc_client(&self) -> RpcClient { + Builder::new(self.block_source_url.clone()) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::() + } +}