diff --git a/Cargo.lock b/Cargo.lock index 06a910230..66cda2353 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3352,7 +3352,6 @@ dependencies = [ "miden-remote-prover-client", "miden-standards", "miden-tx-batch-prover", - "miden-validator", "pretty_assertions", "rand 0.9.4", "rand_chacha", @@ -3519,7 +3518,6 @@ dependencies = [ "clap", "fs-err", "futures", - "miden-node-block-producer", "miden-node-proto", "miden-node-store", "miden-node-utils", diff --git a/Cargo.toml b/Cargo.toml index e86a77609..36f8d2246 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,6 @@ miden-node-store = { path = "crates/store", version = "0.15" } miden-node-test-macro = { path = "crates/test-macro" } miden-node-utils = { path = "crates/utils", version = "0.15" } miden-remote-prover-client = { path = "crates/remote-prover-client", version = "0.15" } -miden-validator = { path = "bin/validator", version = "0.15" } # Temporary workaround until # is part of `rocksdb-rust` release miden-node-rocksdb-cxx-linkage-fix = { path = "crates/rocksdb-cxx-linkage-fix", version = "0.15" } diff --git a/bin/stress-test/Cargo.toml b/bin/stress-test/Cargo.toml index 0743e2b75..2650244db 100644 --- a/bin/stress-test/Cargo.toml +++ b/bin/stress-test/Cargo.toml @@ -17,17 +17,16 @@ version.workspace = true workspace = true [dependencies] -clap = { features = ["derive"], workspace = true } -fs-err = { workspace = true } -futures = { workspace = true } -miden-node-block-producer = { workspace = true } -miden-node-proto = { workspace = true } -miden-node-store = { workspace = true } -miden-node-utils = { workspace = true } -miden-protocol = { workspace = true } -miden-standards = { workspace = true } -rand = { workspace = true } -rayon = { workspace = true } -tokio = { workspace = true } -tonic = { default-features = true, workspace = true } -url = { workspace = true } +clap = { features = ["derive"], workspace = true } +fs-err = { workspace = true } +futures = { workspace = true } +miden-node-proto = { workspace = true } +miden-node-store = { workspace = true } +miden-node-utils = { workspace = true } +miden-protocol = { workspace = true } +miden-standards = { workspace = true } +rand = { workspace = true } +rayon = { workspace = true } +tokio = { workspace = true } +tonic = { default-features = true, workspace = true } +url = { workspace = true } diff --git a/bin/stress-test/src/main.rs b/bin/stress-test/src/main.rs index 79c466c9f..705a67d3a 100644 --- a/bin/stress-test/src/main.rs +++ b/bin/stress-test/src/main.rs @@ -122,14 +122,14 @@ async fn main() { vault_entries, account_update_blocks, } => { - seed_store( + Box::pin(seed_store( data_directory, num_accounts, public_accounts_percentage, storage_map_entries, vault_entries, account_update_blocks, - ) + )) .await; }, Command::BenchmarkStore { diff --git a/bin/stress-test/src/seeding/mod.rs b/bin/stress-test/src/seeding/mod.rs index 15ea556e1..0cba9bdb8 100644 --- a/bin/stress-test/src/seeding/mod.rs +++ b/bin/stress-test/src/seeding/mod.rs @@ -4,9 +4,9 @@ use std::sync::{Arc, Mutex}; use std::time::Instant; use metrics::SeedingMetrics; -use miden_node_block_producer::store::StoreClient; use miden_node_proto::domain::batch::BatchInputs; use miden_node_proto::generated::store::rpc_client::RpcClient; +use miden_node_store::state::State; use miden_node_store::{DataDirectory, GenesisState, Store, StoreMode}; use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::tracing::grpc::OtelInterceptor; @@ -41,7 +41,7 @@ use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SigningKey as EcdsaSecretKey use miden_protocol::crypto::dsa::falcon512_poseidon2::{PublicKey, SecretKey}; use miden_protocol::crypto::rand::RandomCoin; use miden_protocol::errors::AssetError; -use miden_protocol::note::{Note, NoteAssets, NoteHeader, NoteId, NoteInclusionProof}; +use miden_protocol::note::{Note, NoteAssets, NoteId, NoteInclusionProof}; use miden_protocol::transaction::{ InputNote, InputNoteCommitment, @@ -130,9 +130,7 @@ pub async fn seed_store( .expect("genesis block should be created"); Store::bootstrap(genesis_block, &data_directory).expect("store should bootstrap"); - // start the store - let (_, store_url) = start_store(data_directory.clone()).await; - let store_client = StoreClient::new(store_url); + let store_state = load_state(data_directory.clone()).await; // start generating blocks let accounts_filepath = data_directory.join(ACCOUNTS_FILENAME); @@ -144,7 +142,7 @@ pub async fn seed_store( public_accounts_percentage, faucet, genesis_header, - &store_client, + &store_state, data_directory, accounts_filepath, &signer, @@ -170,7 +168,7 @@ async fn generate_blocks( public_accounts_percentage: u8, mut faucet: Account, genesis_block: SignedBlock, - store_client: &StoreClient, + store_state: &Arc, data_directory: DataDirectory, accounts_filepath: PathBuf, signer: &EcdsaSecretKey, @@ -259,11 +257,11 @@ async fn generate_blocks( .collect(); // create the block and send it to the store - let block_inputs = get_block_inputs(store_client, &batches, &mut metrics).await; + let block_inputs = get_block_inputs(store_state, &batches, &mut metrics).await; // update blocks prev_block_header = - apply_block(batches, block_inputs, store_client, &mut metrics, signer).await; + apply_block(batches, block_inputs, store_state, &mut metrics, signer).await; account_states .extend(pending_consumed_accounts.into_iter().map(|account| (account.id(), account))); if current_anchor_header.block_epoch() != prev_block_header.block_epoch() { @@ -272,7 +270,7 @@ async fn generate_blocks( // create the consume notes txs to be used in the next block let batch_inputs = - get_batch_inputs(store_client, &prev_block_header, ¬es, &mut metrics).await; + get_batch_inputs(store_state, &prev_block_header, ¬es, &mut metrics).await; (pending_consumed_accounts, consume_notes_txs) = create_consume_note_txs( &prev_block_header, accounts, @@ -316,10 +314,10 @@ async fn generate_blocks( .map(|txs| create_batch(txs, &prev_block_header)) .collect(); - let block_inputs = get_block_inputs(store_client, &batches, &mut metrics).await; + let block_inputs = get_block_inputs(store_state, &batches, &mut metrics).await; prev_block_header = - apply_block(batches, block_inputs, store_client, &mut metrics, signer).await; + apply_block(batches, block_inputs, store_state, &mut metrics, signer).await; account_states .extend(pending_consumed_accounts.into_iter().map(|account| (account.id(), account))); if current_anchor_header.block_epoch() != prev_block_header.block_epoch() { @@ -327,7 +325,7 @@ async fn generate_blocks( } let batch_inputs = - get_batch_inputs(store_client, &prev_block_header, ¬es, &mut metrics).await; + get_batch_inputs(store_state, &prev_block_header, ¬es, &mut metrics).await; let accounts = selected_account_ids .iter() .filter_map(|account_id| account_states.get(account_id).cloned()) @@ -357,30 +355,33 @@ async fn generate_blocks( metrics } -/// Given a list of batches and block inputs, creates a `ProvenBlock` and sends it to the store. +/// Given a list of batches and block inputs, creates a `ProvenBlock` and applies it to the store. /// Tracks the insertion time on the metrics. /// /// Returns the the inserted block. async fn apply_block( batches: Vec, block_inputs: BlockInputs, - store_client: &StoreClient, + store_state: &Arc, metrics: &mut SeedingMetrics, signer: &EcdsaSecretKey, ) -> BlockHeader { - let proposed_block = ProposedBlock::new(block_inputs, batches).unwrap(); + let proposed_block = ProposedBlock::new(block_inputs.clone(), batches).unwrap(); let (header, body) = proposed_block.clone().into_header_and_body().unwrap(); let block_size: usize = header.to_bytes().len() + body.to_bytes().len(); let signature = signer.sign(header.commitment()); // SAFETY: The header, body, and signature are known to correspond to each other. let signed_block = SignedBlock::new_unchecked(header, body, signature); + let header = signed_block.header().clone(); let ordered_batches = proposed_block.batches().clone(); let start = Instant::now(); - store_client.apply_block(&ordered_batches, &signed_block).await.unwrap(); + store_state + .apply_block_with_proving_inputs(ordered_batches, block_inputs, signed_block) + .await + .unwrap(); metrics.track_block_insertion(start.elapsed(), block_size); - let (header, ..) = signed_block.into_parts(); header } @@ -789,7 +790,7 @@ fn create_emit_note_tx( /// Gets the batch inputs from the store and tracks the query time on the metrics. async fn get_batch_inputs( - store_client: &StoreClient, + store_state: &Arc, block_ref: &BlockHeader, notes: &[Note], metrics: &mut SeedingMetrics, @@ -797,10 +798,10 @@ async fn get_batch_inputs( let start = Instant::now(); // Mark every note as unauthenticated, so that the store returns the inclusion proofs for all of // them - let batch_inputs = store_client + let batch_inputs = store_state .get_batch_inputs( - vec![(block_ref.block_num(), block_ref.commitment())].into_iter(), - notes.iter().map(Note::id), + [block_ref.block_num()].into_iter().collect(), + notes.iter().map(|note| note.id().as_word()).collect(), ) .await .unwrap(); @@ -810,22 +811,25 @@ async fn get_batch_inputs( /// Gets the block inputs from the store and tracks the query time on the metrics. async fn get_block_inputs( - store_client: &StoreClient, + store_state: &Arc, batches: &[ProvenBatch], metrics: &mut SeedingMetrics, ) -> BlockInputs { let start = Instant::now(); - let inputs = store_client + let inputs = store_state .get_block_inputs( - batches.iter().flat_map(ProvenBatch::updated_accounts), - batches.iter().flat_map(ProvenBatch::created_nullifiers), - batches.iter().flat_map(|batch| { - batch - .input_notes() - .into_iter() - .filter_map(|note| note.header().map(NoteHeader::id)) - }), - batches.iter().map(ProvenBatch::reference_block_num), + batches.iter().flat_map(ProvenBatch::updated_accounts).collect(), + batches.iter().flat_map(ProvenBatch::created_nullifiers).collect(), + batches + .iter() + .flat_map(|batch| { + batch + .input_notes() + .into_iter() + .filter_map(|note| note.header().map(|header| header.id().as_word())) + }) + .collect(), + batches.iter().map(ProvenBatch::reference_block_num).collect(), ) .await .unwrap(); @@ -845,20 +849,13 @@ pub async fn start_store( let rpc_listener = TcpListener::bind("127.0.0.1:0") .await .expect("Failed to bind store RPC gRPC endpoint"); - let block_producer_listener = TcpListener::bind("127.0.0.1:0") - .await - .expect("Failed to bind store block-producer gRPC endpoint"); let store_addr = rpc_listener.local_addr().expect("Failed to get store RPC address"); - let store_block_producer_addr = block_producer_listener - .local_addr() - .expect("Failed to get store block-producer address"); let dir = data_directory.clone(); task::spawn(async move { Store { rpc_listener, - mode: StoreMode::BlockProducer { - block_producer_listener, + mode: StoreMode::Sequencer { block_prover_url: None, max_concurrent_proofs: miden_node_store::DEFAULT_MAX_CONCURRENT_PROOFS, }, @@ -878,7 +875,14 @@ pub async fn start_store( .await .expect("Failed to connect to store"); - // SAFETY: The store_block_producer_addr is always valid as it is created from a `SocketAddr`. - let store_url = Url::parse(&format!("http://{store_block_producer_addr}")).unwrap(); + let store_url = Url::parse(&format!("http://{store_addr}")).unwrap(); (RpcClient::with_interceptor(channel, OtelInterceptor), store_url) } + +async fn load_state(data_directory: PathBuf) -> Arc { + let (termination_ask, _termination_signal) = tokio::sync::mpsc::channel(1); + let (state, _) = State::load(&data_directory, StorageOptions::bench(), termination_ask) + .await + .expect("store state should load"); + Arc::new(state) +} diff --git a/crates/block-producer/Cargo.toml b/crates/block-producer/Cargo.toml index 5acce1d69..b0d1f2a5b 100644 --- a/crates/block-producer/Cargo.toml +++ b/crates/block-producer/Cargo.toml @@ -26,6 +26,7 @@ anyhow = { workspace = true } futures = { workspace = true } itertools = { workspace = true } miden-node-proto = { workspace = true } +miden-node-store = { workspace = true } miden-node-utils = { features = ["testing"], workspace = true } miden-protocol = { default-features = true, workspace = true } miden-remote-prover-client = { features = ["batch-prover", "block-prover"], workspace = true } @@ -39,11 +40,9 @@ url = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } -miden-node-store = { workspace = true } miden-node-utils = { features = ["testing"], workspace = true } miden-protocol = { default-features = true, features = ["testing"], workspace = true } miden-standards = { features = ["testing"], workspace = true } -miden-validator = { workspace = true } pretty_assertions = { workspace = true } rand_chacha = { default-features = false, workspace = true } serial_test = { workspace = true } diff --git a/crates/block-producer/README.md b/crates/block-producer/README.md index 9968d4646..938e62c25 100644 --- a/crates/block-producer/README.md +++ b/crates/block-producer/README.md @@ -4,7 +4,7 @@ Contains code defining the [Miden node's block-producer](/README.md#architecture ordering transactions into blocks and submitting these for inclusion in the blockchain. It exposes an in-process API which the node's RPC component uses to submit new transactions. In turn, the -`block-producer` uses the store's gRPC API to submit blocks and query chain state. +`block-producer` uses the store's in-process state to submit blocks and query chain state. For more information on the installation and operation of this component, please see the [node's readme](../../README.md). diff --git a/crates/block-producer/src/batch_builder/mod.rs b/crates/block-producer/src/batch_builder/mod.rs index 1294e027b..4082c28b0 100644 --- a/crates/block-producer/src/batch_builder/mod.rs +++ b/crates/block-producer/src/batch_builder/mod.rs @@ -5,11 +5,11 @@ use std::time::Duration; use futures::TryFutureExt; use miden_node_proto::domain::batch::BatchInputs; +use miden_node_store::state::State; use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::MIN_PROOF_SECURITY_LEVEL; use miden_protocol::batch::{BatchId, ProposedBatch, ProvenBatch}; -use miden_protocol::note::NoteId; use miden_remote_prover_client::RemoteBatchProver; use miden_tx_batch_prover::LocalBatchProver; use rand::Rng; @@ -20,9 +20,8 @@ use url::Url; use crate::domain::batch::SelectedBatch; use crate::domain::transaction::AuthenticatedTransaction; -use crate::errors::BuildBatchError; +use crate::errors::{BuildBatchError, StoreError}; use crate::mempool::SharedMempool; -use crate::store::StoreClient; use crate::{COMPONENT, TelemetryInjectorExt}; // BATCH BUILDER @@ -50,7 +49,7 @@ pub struct BatchBuilder { /// /// Note: this _must_ be sign positive and less than 1.0. failure_rate: f64, - store: StoreClient, + store: Arc, } impl BatchBuilder { @@ -59,7 +58,7 @@ impl BatchBuilder { /// /// If no batch prover URL is provided, a local batch prover is used instead. pub fn new( - store: StoreClient, + store: Arc, num_workers: NonZeroUsize, batch_prover_url: Option, batch_interval: Duration, @@ -162,7 +161,7 @@ struct BatchJob { /// /// Note: this _must_ be sign positive and less than 1.0. failure_rate: f64, - store: StoreClient, + store: Arc, batch_prover: BatchProver, mempool: SharedMempool, } @@ -222,12 +221,15 @@ impl BatchJob { .transactions() .iter() .map(Deref::deref) - .flat_map(AuthenticatedTransaction::unauthenticated_note_ids) - .map(NoteId::from_raw); + .flat_map(AuthenticatedTransaction::unauthenticated_note_ids); self.store - .get_batch_inputs(block_references, unauthenticated_notes) + .get_batch_inputs( + block_references.map(|(block_num, _)| block_num).collect(), + unauthenticated_notes.collect(), + ) .await + .map_err(StoreError::GetBatchInputsFailed) .map_err(BuildBatchError::FetchBatchInputsFailed) .map(|inputs| (batch, inputs)) } diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index dd3510b93..7adb562c7 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -2,18 +2,17 @@ use std::ops::Deref; use std::sync::Arc; use anyhow::Context; +use miden_node_store::state::State; use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::batch::{OrderedBatches, ProvenBatch}; use miden_protocol::block::{BlockInputs, BlockNumber, ProposedBlock, ProvenBlock, SignedBlock}; -use miden_protocol::note::NoteHeader; use miden_protocol::transaction::TransactionHeader; use tokio::time::Duration; use tracing::{Span, instrument}; -use crate::errors::BuildBlockError; +use crate::errors::{BuildBlockError, StoreError}; use crate::mempool::SharedMempool; -use crate::store::StoreClient; use crate::validator::BlockProducerValidatorClient; use crate::{COMPONENT, TelemetryInjectorExt}; @@ -29,19 +28,19 @@ pub struct BlockBuilder { /// Note: this _must_ be sign positive and less than 1.0. pub failure_rate: f64, - /// The store RPC client for committing blocks. - pub store: StoreClient, + /// The store state for committing blocks. + pub store: Arc, /// The validator RPC client for validating blocks. pub validator: BlockProducerValidatorClient, } impl BlockBuilder { - /// Creates a new [`BlockBuilder`] with the given [`StoreClient`] and optional block prover URL. + /// Creates a new [`BlockBuilder`] with the given store state and optional block prover URL. /// /// If the block prover URL is not set, the block builder will use the local block prover. pub fn new( - store: StoreClient, + store: Arc, validator: BlockProducerValidatorClient, block_interval: Duration, ) -> Self { @@ -118,10 +117,10 @@ impl BlockBuilder { .inspect_ok(BlockBatchesAndInputs::inject_telemetry) .and_then(|inputs| self.propose_block(inputs)) .inspect_ok(|proposed_block| { - ProposedBlock::inject_telemetry(proposed_block); + ProposedBlock::inject_telemetry(&proposed_block.proposed_block); }) .and_then(|proposed_block| self.build_and_validate_block(proposed_block)) - .and_then(|(ordered_batches, signed_block)| self.commit_block(mempool, ordered_batches, signed_block)) + .and_then(|block_commit| self.commit_block(mempool, block_commit)) // Handle errors by propagating the error to the root span and rolling back the block. .inspect_err(|err| Span::current().set_error(err)) .or_else(|err| async { @@ -172,7 +171,7 @@ impl BlockBuilder { .input_notes() .iter() .cloned() - .filter_map(|note| note.header().map(NoteHeader::id)) + .filter_map(|note| note.header().map(|header| header.id().as_word())) }); let block_references_iter = batch_iter.clone().map(Deref::deref).map(ProvenBatch::reference_block_num); @@ -184,19 +183,20 @@ impl BlockBuilder { let inputs = self .store .get_block_inputs( - account_ids_iter, - created_nullifiers_iter, - unauthenticated_notes_iter, - block_references_iter, + account_ids_iter.collect(), + created_nullifiers_iter.collect(), + unauthenticated_notes_iter.collect(), + block_references_iter.collect(), ) .await + .map_err(StoreError::GetBlockInputsFailed) .map_err(BuildBlockError::GetBlockInputsFailed)?; // Check that the latest committed block in the store matches our expectations. // - // Desync can occur since the mempool and store are separate components. One example is if - // the block-producer's apply_block gRPC request times out, rolling back the block locally, - // but the store still committed the block on its end. + // Desync can occur since the mempool and store state are updated separately. For example, + // the store may commit a block while the block builder rolls back its local mempool view + // after a late failure. let store_chain_tip = inputs.prev_block_header().block_num(); if store_chain_tip.child() != block_number { return Err(BuildBlockError::Desync { @@ -214,21 +214,24 @@ impl BlockBuilder { async fn propose_block( &self, batches_inputs: BlockBatchesAndInputs, - ) -> Result { + ) -> Result { let BlockBatchesAndInputs { batches, inputs } = batches_inputs; + let block_inputs = inputs.clone(); let batches = batches.into_iter().map(Arc::unwrap_or_clone).collect(); let proposed_block = ProposedBlock::new(inputs, batches).map_err(BuildBlockError::ProposeBlockFailed)?; - Ok(proposed_block) + Ok(ProposedBlockAndInputs { proposed_block, block_inputs }) } #[instrument(target = COMPONENT, name = "block_builder.validate_block", skip_all, err)] async fn build_and_validate_block( &self, - proposed_block: ProposedBlock, - ) -> Result<(OrderedBatches, SignedBlock), BuildBlockError> { + proposal: ProposedBlockAndInputs, + ) -> Result { + let ProposedBlockAndInputs { proposed_block, block_inputs } = proposal; + // Concurrently build the block and validate it via the validator. let build_result = spawn_blocking_in_current_span({ let proposed_block = proposed_block.clone(); @@ -250,27 +253,37 @@ impl BlockBuilder { return Err(BuildBlockError::InvalidSignature); } - let (ordered_batches, ..) = proposed_block.into_parts(); + let ordered_batches = proposed_block.batches().clone(); // SAFETY: The header, body, and signature are known to correspond to each other because the // header and body are derived from the proposed block and the signature is verified against // the corresponding commitment. let signed_block = SignedBlock::new_unchecked(header, body, signature); - Ok((ordered_batches, signed_block)) + Ok(BlockCommit { + ordered_batches, + block_inputs, + signed_block, + }) } #[instrument(target = COMPONENT, name = "block_builder.commit_block", skip_all, err)] async fn commit_block( &self, mempool: &SharedMempool, - ordered_batches: OrderedBatches, - signed_block: SignedBlock, + block_commit: BlockCommit, ) -> Result<(), BuildBlockError> { + let BlockCommit { + ordered_batches, + block_inputs, + signed_block, + } = block_commit; + let header = signed_block.header().clone(); + self.store - .apply_block(&ordered_batches, &signed_block) + .apply_block_with_proving_inputs(ordered_batches, block_inputs, signed_block) .await + .map_err(StoreError::ApplyBlockFailed) .map_err(BuildBlockError::StoreApplyBlockFailed)?; - let (header, ..) = signed_block.into_parts(); mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.commit_block(&header); Ok(()) @@ -319,6 +332,19 @@ struct BlockBatchesAndInputs { inputs: BlockInputs, } +/// A proposed block bundled with the exact inputs used to construct it. +struct ProposedBlockAndInputs { + proposed_block: ProposedBlock, + block_inputs: BlockInputs, +} + +/// Data needed to commit a signed block and persist its proving inputs. +struct BlockCommit { + ordered_batches: OrderedBatches, + block_inputs: BlockInputs, + signed_block: SignedBlock, +} + impl TelemetryInjectorExt for BlockBatchesAndInputs { fn inject_telemetry(&self) { let span = Span::current(); diff --git a/crates/block-producer/src/errors.rs b/crates/block-producer/src/errors.rs index b5d880956..651093a6e 100644 --- a/crates/block-producer/src/errors.rs +++ b/crates/block-producer/src/errors.rs @@ -1,13 +1,17 @@ use core::error::Error as CoreError; -use miden_node_proto::errors::{ConversionError, GrpcError}; +use miden_node_proto::errors::GrpcError; +use miden_node_store::{ + ApplyBlockWithProvingInputsError, + DatabaseError, + GetBatchInputsError, + GetBlockInputsError, +}; use miden_protocol::Word; use miden_protocol::account::AccountId; use miden_protocol::block::BlockNumber; use miden_protocol::errors::{ProposedBatchError, ProposedBlockError, ProvenBatchError}; use miden_protocol::note::Nullifier; -use miden_protocol::transaction::TransactionId; -use miden_protocol::utils::serde::DeserializationError; use miden_remote_prover_client::RemoteProverClientError; use thiserror::Error; use tokio::task::JoinError; @@ -41,12 +45,9 @@ pub enum BlockProducerError { #[derive(Debug, Error, GrpcError)] pub enum MempoolSubmissionError { - #[error("failed to retrieve inputs from the store")] + #[error("failed to read state from the store")] #[grpc(internal)] - StoreConnectionFailed(#[source] StoreError), - - #[error("invalid transaction proof error for transaction: {0}")] - InvalidTransactionProof(TransactionId), + StoreStateReadFailed(#[source] StoreError), #[error( "transaction input data from block {input_block} is rejected as stale because it is older than the limit of {stale_limit}" @@ -57,9 +58,6 @@ pub enum MempoolSubmissionError { stale_limit: BlockNumber, }, - #[error("request deserialization failed")] - DeserializationFailed(#[source] DeserializationError), - #[error( "transaction expired at block height {expired_at} but the block height limit was {limit}" )] @@ -77,9 +75,6 @@ pub enum MempoolSubmissionError { #[error("mempool lock is poisoned")] #[grpc(internal)] MempoolPoisoned(#[source] MempoolPoisonError), - - #[error("missing proposed batch")] - MissingProposedBatch, } // Mempool submission conflicts with current state @@ -184,21 +179,17 @@ impl BuildBlockError { // Store errors // ================================================================================================= -/// Errors returned by the [`StoreClient`](crate::store::StoreClient). +/// Errors returned by the store state. #[derive(Debug, Error)] pub enum StoreError { #[error("account Id prefix already exists: {0}")] DuplicateAccountIdPrefix(AccountId), - #[error("gRPC client error")] - GrpcClientError(#[from] Box), - #[error("malformed response from store: {0}")] - MalformedResponse(String), - #[error("failed to parse response")] - DeserializationError(#[from] ConversionError), -} - -impl From for StoreError { - fn from(value: tonic::Status) -> Self { - StoreError::GrpcClientError(value.into()) - } + #[error("failed to get transaction inputs from store")] + GetTransactionInputsFailed(#[source] DatabaseError), + #[error("failed to get batch inputs from store")] + GetBatchInputsFailed(#[source] GetBatchInputsError), + #[error("failed to get block inputs from store")] + GetBlockInputsFailed(#[source] GetBlockInputsError), + #[error("failed to apply block to store")] + ApplyBlockFailed(#[source] ApplyBlockWithProvingInputsError), } diff --git a/crates/block-producer/src/lib.rs b/crates/block-producer/src/lib.rs index 353a6a71c..a8ced3923 100644 --- a/crates/block-producer/src/lib.rs +++ b/crates/block-producer/src/lib.rs @@ -19,7 +19,14 @@ mod errors; pub mod server; pub use errors::MempoolSubmissionError; -pub use server::{BlockProducer, BlockProducerApi, BlockProducerApiConfig, BlockProducerRuntime}; +pub use server::{ + BlockProducer, + BlockProducerApi, + BlockProducerApiConfig, + BlockProducerRuntime, + BlockProducerStatus, + MempoolStats, +}; // CONSTANTS // ================================================================================================= diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index 5019df460..60f172b54 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -4,23 +4,21 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; -use miden_node_proto::generated::{self as proto}; +use miden_node_store::state::{Finality, State}; use miden_node_utils::formatting::{format_input_notes, format_output_notes}; use miden_protocol::batch::ProposedBatch; use miden_protocol::block::BlockNumber; use miden_protocol::transaction::ProvenTransaction; -use miden_protocol::utils::serde::Deserializable; use tokio::sync::{Mutex, RwLock}; use tokio::task::{Id, JoinSet}; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, info, instrument}; use url::Url; use crate::batch_builder::BatchBuilder; use crate::block_builder::BlockBuilder; use crate::domain::transaction::AuthenticatedTransaction; -use crate::errors::{BlockProducerError, MempoolSubmissionError, StoreError}; +use crate::errors::{BlockProducerError, MempoolSubmissionError}; use crate::mempool::{BatchBudget, BlockBudget, Mempool, MempoolConfig, SharedMempool}; -use crate::store::StoreClient; use crate::validator::BlockProducerValidatorClient; use crate::{CACHED_MEMPOOL_STATS_UPDATE_INTERVAL, COMPONENT, SERVER_NUM_BATCH_BUILDERS}; @@ -64,13 +62,10 @@ impl BlockProducerApiConfig { /// The block producer runtime. /// -/// Specifies how to connect to the store, batch prover, and block prover components. -/// The connection to the store is established at startup and retried with exponential backoff -/// until the store becomes available. Once the connection is established, the block producer -/// starts its batch and block builders. +/// Specifies how to connect to the batch prover and block prover components. pub struct BlockProducer { - /// The address of the store component. - pub store_url: Url, + /// The store state shared with the block producer. + pub store: Arc, /// The address of the validator component. pub validator_url: Url, /// The address of the batch prover component. @@ -97,44 +92,16 @@ impl BlockProducer { /// The returned handle owns the batch and block builder tasks. Dropping the handle stops those /// tasks. pub async fn start(self) -> Result { - info!(target: COMPONENT, store=%self.store_url, "Initializing block producer"); - let store = StoreClient::new(self.store_url.clone()); + info!(target: COMPONENT, "Initializing block producer"); + let store = self.store; let validator = BlockProducerValidatorClient::new(self.validator_url.clone()); - - // Retry fetching the chain tip from the store until it succeeds. - let mut retries_counter = 0; - let chain_tip = loop { - match store.latest_header().await { - Err(StoreError::GrpcClientError(err)) => { - // exponential backoff with base 500ms and max 30s - let backoff = Duration::from_millis(500) - .saturating_mul(1 << retries_counter) - .min(Duration::from_secs(30)); - - error!( - store = %self.store_url, - ?backoff, - %retries_counter, - %err, - "store connection failed while fetching chain tip, retrying" - ); - - retries_counter += 1; - tokio::time::sleep(backoff).await; - }, - Ok(header) => break header.block_num(), - Err(e) => { - error!(target: COMPONENT, %e, "failed to fetch chain tip from store"); - return Err(e.into()); - }, - } - }; + let chain_tip = store.chain_tip(Finality::Committed).await; info!(target: COMPONENT, "Block producer initialized"); - let block_builder = BlockBuilder::new(store.clone(), validator, self.block_interval); + let block_builder = BlockBuilder::new(Arc::clone(&store), validator, self.block_interval); let batch_builder = BatchBuilder::new( - store.clone(), + Arc::clone(&store), SERVER_NUM_BATCH_BUILDERS, self.batch_prover_url, self.batch_interval, @@ -228,7 +195,7 @@ impl BlockProducerRuntime { // ================================================================================================ /// In-process block producer API used by the RPC layer. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct BlockProducerApi { /// The mutex effectively rate limits incoming transactions into the mempool by forcing them /// through a queue. @@ -238,20 +205,39 @@ pub struct BlockProducerApi { /// the block-producers usage of the mempool. mempool: Arc>, - store: StoreClient, + store: Arc, /// Cached mempool statistics that are updated periodically to avoid locking the mempool for /// each status request. cached_mempool_stats: Arc>, } +impl std::fmt::Debug for BlockProducerApi { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlockProducerApi").finish_non_exhaustive() + } +} + +/// Current block producer status. +#[derive(Clone, Debug)] +pub struct BlockProducerStatus { + /// The block producer crate version. + pub version: String, + /// Human-readable status string. + pub status: String, + /// The mempool's current view of the chain tip height. + pub chain_tip: BlockNumber, + /// Cached mempool statistics. + pub mempool_stats: MempoolStats, +} + impl BlockProducerApi { /// Creates an API backed by a fresh mempool. - pub fn new(store: StoreClient, chain_tip: BlockNumber, config: BlockProducerApiConfig) -> Self { + pub fn new(store: Arc, chain_tip: BlockNumber, config: BlockProducerApiConfig) -> Self { Self::from_shared_mempool(Mempool::shared(chain_tip, config.mempool_config()), store) } - fn from_shared_mempool(mempool: SharedMempool, store: StoreClient) -> Self { + fn from_shared_mempool(mempool: SharedMempool, store: Arc) -> Self { let cached_mempool_stats = mempool .lock() .map(|mempool| MempoolStats::from_mempool(&mempool)) @@ -302,20 +288,15 @@ impl BlockProducerApi { #[instrument( target = COMPONENT, - name = "block_producer.server.submit_proven_tx", + name = "block_producer.api.submit_proven_tx", skip_all, err )] #[expect(clippy::let_and_return)] pub async fn submit_proven_tx( &self, - request: proto::transaction::ProvenTransaction, - ) -> Result { - debug!(target: COMPONENT, ?request); - - let tx = ProvenTransaction::read_from_bytes(&request.transaction) - .map_err(MempoolSubmissionError::DeserializationFailed)?; - + tx: ProvenTransaction, + ) -> Result { let tx_id = tx.id(); debug!( @@ -327,15 +308,13 @@ impl BlockProducerApi { input_notes = %format_input_notes(tx.input_notes()), output_notes = %format_output_notes(tx.output_notes()), ref_block_commitment = %tx.ref_block_commitment(), - "Deserialized transaction" + "Submitting transaction" ); debug!(target: COMPONENT, proof = ?tx.proof()); - let inputs = self - .store - .get_tx_inputs(&tx) + let inputs = crate::store::get_tx_inputs(&self.store, &tx) .await - .map_err(MempoolSubmissionError::StoreConnectionFailed)?; + .map_err(MempoolSubmissionError::StoreStateReadFailed)?; // SAFETY: we assume that the rpc component has verified the transaction proof already. let tx = AuthenticatedTransaction::new_unchecked(Arc::new(tx), inputs) @@ -347,37 +326,29 @@ impl BlockProducerApi { let result = shared_mempool .lock() .map_err(MempoolSubmissionError::MempoolPoisoned)? - .add_transaction(tx) - .map(Into::into); + .add_transaction(tx); result } #[instrument( target = COMPONENT, - name = "block_producer.server.submit_proven_tx_batch", + name = "block_producer.api.submit_proven_tx_batch", skip_all, err )] #[expect(clippy::let_and_return)] pub async fn submit_proven_tx_batch( &self, - request: proto::transaction::TransactionBatch, - ) -> Result { - let proposed = - request.proposed_batch.ok_or(MempoolSubmissionError::MissingProposedBatch)?; - let batch = ProposedBatch::read_from_bytes(&proposed) - .map_err(MempoolSubmissionError::DeserializationFailed)?; - + batch: ProposedBatch, + ) -> Result { // We assume that the rpc component has verified everything, including the transaction // proofs. let mut txs = Vec::with_capacity(batch.transactions().len()); for tx in batch.transactions() { - let inputs = self - .store - .get_tx_inputs(tx) + let inputs = crate::store::get_tx_inputs(&self.store, tx) .await - .map_err(MempoolSubmissionError::StoreConnectionFailed)?; + .map_err(MempoolSubmissionError::StoreStateReadFailed)?; // SAFETY: We assume that the rpc component has verified the transaction proofs, as well // as the batch integrity itself. @@ -392,19 +363,18 @@ impl BlockProducerApi { let result = shared_mempool .lock() .map_err(MempoolSubmissionError::MempoolPoisoned)? - .add_user_batch(&txs) - .map(Into::into); + .add_user_batch(&txs); result } - pub async fn status(&self) -> proto::rpc::BlockProducerStatus { + pub async fn status(&self) -> BlockProducerStatus { let mempool_stats = *self.cached_mempool_stats.read().await; - proto::rpc::BlockProducerStatus { + BlockProducerStatus { version: env!("CARGO_PKG_VERSION").to_string(), status: "connected".to_string(), - chain_tip: mempool_stats.chain_tip.as_u32(), - mempool_stats: Some(mempool_stats.into()), + chain_tip: mempool_stats.chain_tip, + mempool_stats, } } } @@ -414,15 +384,15 @@ impl BlockProducerApi { /// Mempool statistics that are updated periodically to avoid locking the mempool. #[derive(Clone, Copy, Debug, Default)] -struct MempoolStats { +pub struct MempoolStats { /// The mempool's current view of the chain tip height. - chain_tip: BlockNumber, + pub chain_tip: BlockNumber, /// Number of transactions currently in the mempool waiting to be batched. - unbatched_transactions: u64, + pub unbatched_transactions: u64, /// Number of batches currently being proven. - proposed_batches: u64, + pub proposed_batches: u64, /// Number of proven batches waiting for block inclusion. - proven_batches: u64, + pub proven_batches: u64, } impl MempoolStats { @@ -435,13 +405,3 @@ impl MempoolStats { } } } - -impl From for proto::rpc::MempoolStats { - fn from(stats: MempoolStats) -> Self { - proto::rpc::MempoolStats { - unbatched_transactions: stats.unbatched_transactions, - proposed_batches: stats.proposed_batches, - proven_batches: stats.proven_batches, - } - } -} diff --git a/crates/block-producer/src/server/tests.rs b/crates/block-producer/src/server/tests.rs index 5ce9a71d6..1f315f7eb 100644 --- a/crates/block-producer/src/server/tests.rs +++ b/crates/block-producer/src/server/tests.rs @@ -1,155 +1,54 @@ use std::num::NonZeroUsize; +use std::sync::Arc; use std::time::Duration; -use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, GenesisState, Store, StoreMode}; -use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; +use miden_node_store::state::State; +use miden_node_store::{GenesisState, Store}; +use miden_node_utils::clap::StorageOptions; use miden_node_utils::fee::test_fee_params; +use miden_protocol::block::BlockNumber; use miden_protocol::testing::random_secret_key::random_secret_key; -use miden_validator::{Validator, ValidatorSigner}; -use tokio::net::TcpListener; -use tokio::time::{sleep, timeout}; -use tokio::{runtime, task}; use url::Url; use crate::{BlockProducer, DEFAULT_MAX_BATCHES_PER_BLOCK, DEFAULT_MAX_TXS_PER_BATCH}; -/// A wrapper around the store runtime and data directory. -/// -/// Guarantees that the store runtime is shut down _before_ the data directory is dropped and thus removed. -struct TestStore { - runtime: Option, - _data_directory: tempfile::TempDir, -} - -impl Drop for TestStore { - fn drop(&mut self) { - if let Some(runtime) = self.runtime.take() { - std::thread::spawn(move || { - runtime.shutdown_timeout(Duration::from_millis(500)); - }) - .join() - .expect("store runtime shutdown thread should complete"); - } - } -} - -/// Tests that the block producer starts up correctly even when the store is not initially -/// available. The block producer should retry with exponential backoff until the store becomes -/// available, then start serving requests. #[tokio::test] -async fn block_producer_startup_is_robust_to_network_failures() { - // get the addresses for the store and validator - let store_addr = { - let store_listener = - TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port"); - store_listener.local_addr().expect("store should get a local address") - }; - - let validator_addr = { - let validator_listener = - TcpListener::bind("127.0.0.1:0").await.expect("failed to bind validator"); - validator_listener.local_addr().expect("failed to get validator address") - }; - - let grpc_options = GrpcOptionsInternal::default(); - - // start the validator - task::spawn(async move { - let temp_dir = tempfile::tempdir().expect("tempdir should be created"); - let data_directory = temp_dir.path().to_path_buf(); - Validator { - address: validator_addr, - grpc_options, - signer: ValidatorSigner::new_local(random_secret_key()), - data_directory, - sqlite_connection_pool_size: NonZeroUsize::new(2).unwrap(), - } - .serve() - .await - .unwrap(); - }); - - // start the block producer BEFORE the store is available this tests the exponential backoff - // behavior - let store_url = Url::parse(&format!("http://{store_addr}")).expect("Failed to parse store URL"); - let validator_url = - Url::parse(&format!("http://{validator_addr}")).expect("Failed to parse validator URL"); - let block_producer = task::spawn(async move { - BlockProducer { - store_url, - validator_url, - batch_prover_url: None, - batch_interval: Duration::from_millis(500), - block_interval: Duration::from_millis(500), - max_txs_per_batch: DEFAULT_MAX_TXS_PER_BATCH, - max_batches_per_block: DEFAULT_MAX_BATCHES_PER_BLOCK, - mempool_tx_capacity: NonZeroUsize::new(100).unwrap(), - } - .start() - .await - .unwrap() - }); - - // test: startup should still be waiting because the store is not yet available. - sleep(Duration::from_millis(100)).await; - assert!( - !block_producer.is_finished(), - "Block producer should wait until the store is started" - ); - - // start the store - let _store = start_store(store_addr).await; - - let block_producer = timeout(Duration::from_secs(10), block_producer) - .await - .expect("block producer should start after store is started") - .expect("block producer task should not panic"); +async fn block_producer_starts_with_store_state() { + let data_directory = tempfile::tempdir().expect("tempdir should be created"); + bootstrap_store(data_directory.path()); + let store = load_state(data_directory.path()).await; + + let block_producer = BlockProducer { + store, + validator_url: Url::parse("http://127.0.0.1:0").unwrap(), + batch_prover_url: None, + batch_interval: Duration::from_secs(3600), + block_interval: Duration::from_secs(3600), + max_txs_per_batch: DEFAULT_MAX_TXS_PER_BATCH, + max_batches_per_block: DEFAULT_MAX_BATCHES_PER_BLOCK, + mempool_tx_capacity: NonZeroUsize::new(100).unwrap(), + } + .start() + .await + .unwrap(); - // verify the in-process API returns expected status data. let status = block_producer.api().status().await; assert_eq!(status.status, "connected"); + assert_eq!(status.chain_tip, BlockNumber::GENESIS); } -/// Starts the store with a fresh genesis state and returns the runtime handle. -async fn start_store(store_addr: std::net::SocketAddr) -> TestStore { - let data_directory = tempfile::tempdir().expect("tempdir should be created"); +fn bootstrap_store(path: &std::path::Path) { let signer = random_secret_key(); let genesis_state = GenesisState::new(vec![], test_fee_params(), 1, 1, signer.public_key()); - let genesis_block = genesis_state - .clone() - .into_block(&signer) - .expect("genesis block should be created"); - Store::bootstrap(genesis_block, data_directory.path()).expect("store should bootstrap"); + let genesis_block = genesis_state.into_block(&signer).expect("genesis block should be created"); - let dir = data_directory.path().to_path_buf(); - let rpc_listener = - TcpListener::bind("127.0.0.1:0").await.expect("store should bind the RPC port"); - let block_producer_listener = TcpListener::bind(store_addr) - .await - .expect("store should bind the block-producer port"); + Store::bootstrap(genesis_block, path).expect("store should bootstrap"); +} - // Use a separate runtime so we can kill all store tasks later - let store_runtime = - runtime::Builder::new_multi_thread().enable_time().enable_io().build().unwrap(); - store_runtime.spawn(async move { - Store { - rpc_listener, - mode: StoreMode::BlockProducer { - block_producer_listener, - block_prover_url: None, - max_concurrent_proofs: DEFAULT_MAX_CONCURRENT_PROOFS, - }, - data_directory: dir, - database_options: miden_node_store::DatabaseOptions::default(), - grpc_options: GrpcOptionsInternal::bench(), - storage_options: StorageOptions::bench(), - } - .serve() +async fn load_state(path: &std::path::Path) -> Arc { + let (termination_ask, _termination_signal) = tokio::sync::mpsc::channel(1); + let (state, _) = State::load(path, StorageOptions::default(), termination_ask) .await - .expect("store should start serving"); - }); - TestStore { - runtime: Some(store_runtime), - _data_directory: data_directory, - } + .expect("state should load"); + Arc::new(state) } diff --git a/crates/block-producer/src/store/mod.rs b/crates/block-producer/src/store/mod.rs index cf314a10a..dbd896e8e 100644 --- a/crates/block-producer/src/store/mod.rs +++ b/crates/block-producer/src/store/mod.rs @@ -3,21 +3,14 @@ use std::fmt::{Display, Formatter}; use std::num::NonZeroU32; use itertools::Itertools; -use miden_node_proto::clients::{Builder, StoreBlockProducerClient}; -use miden_node_proto::decode::{ConversionResultExt, GrpcDecodeExt}; -use miden_node_proto::domain::batch::BatchInputs; -use miden_node_proto::errors::ConversionError; -use miden_node_proto::{AccountState, decode, generated as proto}; +use miden_node_store::state::{Finality, State, TransactionInputs as StoreTransactionInputs}; use miden_node_utils::formatting::format_opt; use miden_protocol::Word; use miden_protocol::account::AccountId; -use miden_protocol::batch::OrderedBatches; -use miden_protocol::block::{BlockHeader, BlockInputs, BlockNumber, SignedBlock}; -use miden_protocol::note::{NoteId, Nullifier}; +use miden_protocol::block::BlockNumber; +use miden_protocol::note::Nullifier; use miden_protocol::transaction::ProvenTransaction; -use miden_protocol::utils::serde::Serializable; use tracing::{debug, info, instrument}; -use url::Url; use crate::COMPONENT; use crate::errors::StoreError; @@ -44,6 +37,34 @@ pub struct TransactionInputs { pub current_block_height: BlockNumber, } +impl TransactionInputs { + fn from_store_inputs( + account_id: AccountId, + inputs: StoreTransactionInputs, + current_block_height: BlockNumber, + ) -> Self { + let account_commitment = if inputs.account_commitment == Word::empty() { + None + } else { + Some(inputs.account_commitment) + }; + + let nullifiers = inputs + .nullifiers + .into_iter() + .map(|nullifier| (nullifier.nullifier, NonZeroU32::new(nullifier.block_num.as_u32()))) + .collect(); + + Self { + account_id, + account_commitment, + nullifiers, + found_unauthenticated_notes: inputs.found_unauthenticated_notes, + current_block_height, + } + } +} + impl Display for TransactionInputs { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let nullifiers = self @@ -67,185 +88,45 @@ impl Display for TransactionInputs { } } -impl TryFrom for TransactionInputs { - type Error = ConversionError; - - fn try_from(response: proto::store::TransactionInputs) -> Result { - let decoder = response.decoder(); - let AccountState { account_id, account_commitment } = - decode!(decoder, response.account_state)?; - - let mut nullifiers = HashMap::new(); - for nullifier_record in response.nullifiers { - let decoder = nullifier_record.decoder(); - let nullifier = decode!(decoder, nullifier_record.nullifier)?; - - // Note that this intentionally maps 0 to None as this is the definition used in - // protobuf. - nullifiers.insert(nullifier, NonZeroU32::new(nullifier_record.block_num)); - } - - let found_unauthenticated_notes = response - .found_unauthenticated_notes - .into_iter() - .map(Word::try_from) - .collect::>() - .context("found_unauthenticated_notes")?; - - let current_block_height = response.block_height.into(); - - Ok(Self { - account_id, - account_commitment, - nullifiers, - found_unauthenticated_notes, - current_block_height, - }) - } -} - -// STORE CLIENT +// STORE STATE // ================================================================================================ -/// Interface to the store's block-producer gRPC API. -/// -/// Essentially just a thin wrapper around the generated gRPC client which improves type safety. -#[derive(Clone, Debug)] -pub struct StoreClient { - client: StoreBlockProducerClient, -} - -impl StoreClient { - /// Creates a new store client with a lazy connection. - pub fn new(store_url: Url) -> Self { - info!(target: COMPONENT, store_endpoint = %store_url, "Initializing store client"); - - let store = Builder::new(store_url) - .without_tls() - .without_timeout() - .without_metadata_version() - .without_metadata_genesis() - .with_otel_context_injection() - .connect_lazy::(); - - Self { client: store } - } - - /// Returns the latest block's header from the store. - #[instrument(target = COMPONENT, name = "store.client.latest_header", skip_all, err)] - pub async fn latest_header(&self) -> Result { - let response = self - .client - .clone() - .get_block_header_by_number(tonic::Request::new( - proto::rpc::BlockHeaderByNumberRequest::default(), - )) - .await? - .into_inner() - .block_header - .ok_or_else(|| { - StoreError::DeserializationError(ConversionError::missing_field::< - miden_node_proto::generated::blockchain::BlockHeader, - >("block_header")) - })?; - - BlockHeader::try_from(response).map_err(StoreError::DeserializationError) - } - - #[instrument(target = COMPONENT, name = "store.client.get_tx_inputs", skip_all, err)] - pub async fn get_tx_inputs( - &self, - proven_tx: &ProvenTransaction, - ) -> Result { - let message = proto::store::TransactionInputsRequest { - account_id: Some(proven_tx.account_id().into()), - nullifiers: proven_tx.nullifiers().map(Into::into).collect(), - unauthenticated_notes: proven_tx - .unauthenticated_notes() - .map(|note| note.id().into()) - .collect(), - }; - - info!(target: COMPONENT, tx_id = %proven_tx.id().to_hex()); - debug!(target: COMPONENT, ?message); - - let request = tonic::Request::new(message); - let response = self.client.clone().get_transaction_inputs(request).await?.into_inner(); - - debug!(target: COMPONENT, ?response); - - if !response.new_account_id_prefix_is_unique.unwrap_or(true) { - debug_assert!( - proven_tx.account_update().initial_state_commitment().is_empty(), - "account id prefix uniqueness should not be validated unless transaction creates a new account" - ); - return Err(StoreError::DuplicateAccountIdPrefix(proven_tx.account_id())); - } - - let tx_inputs: TransactionInputs = response.try_into()?; - - if tx_inputs.account_id != proven_tx.account_id() { - return Err(StoreError::MalformedResponse(format!( - "incorrect account id returned from store. Got: {}, expected: {}", - tx_inputs.account_id, - proven_tx.account_id() - ))); - } - - debug!(target: COMPONENT, %tx_inputs); - - Ok(tx_inputs) - } - - #[instrument(target = COMPONENT, name = "store.client.get_block_inputs", skip_all, err)] - pub async fn get_block_inputs( - &self, - updated_accounts: impl Iterator + Send, - created_nullifiers: impl Iterator + Send, - unauthenticated_notes: impl Iterator + Send, - reference_blocks: impl Iterator + Send, - ) -> Result { - let request = tonic::Request::new(proto::store::BlockInputsRequest { - account_ids: updated_accounts.map(Into::into).collect(), - nullifiers: created_nullifiers.map(proto::primitives::Digest::from).collect(), - unauthenticated_notes: unauthenticated_notes - .map(proto::primitives::Digest::from) - .collect(), - reference_blocks: reference_blocks.map(|block_num| block_num.as_u32()).collect(), - }); - - let store_response = self.client.clone().get_block_inputs(request).await?.into_inner(); - - store_response.try_into().map_err(StoreError::DeserializationError) +#[instrument(target = COMPONENT, name = "store.state.get_tx_inputs", skip_all, err)] +pub async fn get_tx_inputs( + state: &State, + proven_tx: &ProvenTransaction, +) -> Result { + info!(target: COMPONENT, tx_id = %proven_tx.id().to_hex()); + + let nullifiers = proven_tx.nullifiers().collect::>(); + let unauthenticated_note_commitments = + proven_tx.unauthenticated_notes().map(|header| header.id().as_word()).collect(); + + let store_inputs = state + .get_transaction_inputs( + proven_tx.account_id(), + &nullifiers, + unauthenticated_note_commitments, + ) + .await + .map_err(StoreError::GetTransactionInputsFailed)?; + + if !store_inputs.new_account_id_prefix_is_unique.unwrap_or(true) { + debug_assert!( + proven_tx.account_update().initial_state_commitment().is_empty(), + "account id prefix uniqueness should not be validated unless transaction creates a new account" + ); + return Err(StoreError::DuplicateAccountIdPrefix(proven_tx.account_id())); } - #[instrument(target = COMPONENT, name = "store.client.get_batch_inputs", skip_all, err)] - pub async fn get_batch_inputs( - &self, - block_references: impl Iterator + Send, - note_ids: impl Iterator + Send, - ) -> Result { - let request = tonic::Request::new(proto::store::BatchInputsRequest { - reference_blocks: block_references.map(|(block_num, _)| block_num.as_u32()).collect(), - note_commitments: note_ids.map(proto::primitives::Digest::from).collect(), - }); + let current_block_height = state.chain_tip(Finality::Committed).await; + let tx_inputs = TransactionInputs::from_store_inputs( + proven_tx.account_id(), + store_inputs, + current_block_height, + ); - let store_response = self.client.clone().get_batch_inputs(request).await?.into_inner(); + debug!(target: COMPONENT, %tx_inputs); - store_response.try_into().map_err(StoreError::DeserializationError) - } - - #[instrument(target = COMPONENT, name = "store.client.apply_block", skip_all, err)] - pub async fn apply_block( - &self, - ordered_batches: &OrderedBatches, - signed_block: &SignedBlock, - ) -> Result<(), StoreError> { - let request = tonic::Request::new(proto::store::ApplyBlockRequest { - ordered_batches: ordered_batches.to_bytes(), - block: Some(signed_block.into()), - }); - - self.client.clone().apply_block(request).await.map(|_| ()).map_err(Into::into) - } + Ok(tx_inputs) } diff --git a/crates/proto/README.md b/crates/proto/README.md index 41edaae06..a4d665382 100644 --- a/crates/proto/README.md +++ b/crates/proto/README.md @@ -10,10 +10,8 @@ This crate contains protobuf bindings for the APIs exposed by the components of The node's gRPC endpoints return rich error information using gRPC status details. Each component exposes its own set of error codes which are included in the response details. -- Error types are defined in this crate's `src/errors/` directory: - - `src/errors/mod.rs` - Common error traits and conversion utilities - - `src/errors/store.rs` - Store component gRPC error enums - - `src/errors/block_producer.rs` - Block producer component gRPC error enums +Error conversion helpers are defined in `src/errors/mod.rs`. Component-specific gRPC error +details are derived next to the component errors that need to cross a gRPC boundary. ## License This project is [MIT licensed](../../LICENSE). diff --git a/crates/proto/src/clients/mod.rs b/crates/proto/src/clients/mod.rs index 9bd378156..51e96fe69 100644 --- a/crates/proto/src/clients/mod.rs +++ b/crates/proto/src/clients/mod.rs @@ -109,8 +109,6 @@ impl tonic::service::Interceptor for Interceptor { type InterceptedChannel = InterceptedService; type GeneratedRpcClient = generated::rpc::api_client::ApiClient; -type GeneratedStoreClientForBlockProducer = - generated::store::block_producer_client::BlockProducerClient; type GeneratedStoreClientForRpc = generated::store::rpc_client::RpcClient; type GeneratedProxyStatusClient = generated::remote_prover::proxy_status_api_client::ProxyStatusApiClient; @@ -124,8 +122,6 @@ type GeneratedNtxBuilderClient = generated::ntx_builder::api_client::ApiClient &mut Self::Target { - &mut self.0 - } -} - -impl Deref for StoreBlockProducerClient { - type Target = GeneratedStoreClientForBlockProducer; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - impl DerefMut for StoreRpcClient { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 @@ -248,15 +230,6 @@ impl GrpcClient for RpcClient { } } -impl GrpcClient for StoreBlockProducerClient { - fn with_interceptor(channel: Channel, interceptor: Interceptor) -> Self { - Self(GeneratedStoreClientForBlockProducer::new(InterceptedService::new( - channel, - interceptor, - ))) - } -} - impl GrpcClient for StoreRpcClient { fn with_interceptor(channel: Channel, interceptor: Interceptor) -> Self { Self(GeneratedStoreClientForRpc::new(InterceptedService::new(channel, interceptor))) diff --git a/crates/proto/src/domain/account.rs b/crates/proto/src/domain/account.rs index ba9bdb4d8..de3b01957 100644 --- a/crates/proto/src/domain/account.rs +++ b/crates/proto/src/domain/account.rs @@ -1,6 +1,5 @@ use std::fmt::{Debug, Display, Formatter}; -use miden_node_utils::formatting::format_opt; use miden_node_utils::limiter::{QueryParamLimiter, QueryParamStorageMapKeyTotalLimit}; use miden_protocol::Word; use miden_protocol::account::{ @@ -890,60 +889,6 @@ impl From for proto::account::AccountWitness { } } -// ACCOUNT STATE -// ================================================================================================ - -/// Information needed from the store to verify account in transaction. -#[derive(Debug)] -pub struct AccountState { - /// Account ID - pub account_id: AccountId, - /// The account commitment in the store corresponding to tx's account ID - pub account_commitment: Option, -} - -impl Display for AccountState { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!( - "{{ account_id: {}, account_commitment: {} }}", - self.account_id, - format_opt(self.account_commitment.as_ref()), - )) - } -} - -impl TryFrom for AccountState { - type Error = ConversionError; - - fn try_from( - from: proto::store::transaction_inputs::AccountTransactionInputRecord, - ) -> Result { - let decoder = from.decoder(); - let account_id = decode!(decoder, from.account_id)?; - - let account_commitment = decode!(decoder, from.account_commitment)?; - - // If the commitment is equal to `Word::empty()`, it signifies that this is a new account - // which is not yet present in the Store. - let account_commitment = if account_commitment == Word::empty() { - None - } else { - Some(account_commitment) - }; - - Ok(Self { account_id, account_commitment }) - } -} - -impl From for proto::store::transaction_inputs::AccountTransactionInputRecord { - fn from(from: AccountState) -> Self { - Self { - account_id: Some(from.account_id.into()), - account_commitment: from.account_commitment.map(Into::into), - } - } -} - // ASSET // ================================================================================================ diff --git a/crates/proto/src/domain/batch.rs b/crates/proto/src/domain/batch.rs index 590ea026a..d4f1418b6 100644 --- a/crates/proto/src/domain/batch.rs +++ b/crates/proto/src/domain/batch.rs @@ -3,11 +3,6 @@ use std::collections::BTreeMap; use miden_protocol::block::BlockHeader; use miden_protocol::note::{NoteId, NoteInclusionProof}; use miden_protocol::transaction::PartialBlockchain; -use miden_protocol::utils::serde::Serializable; - -use crate::decode::{ConversionResultExt, DecodeBytesExt, GrpcDecodeExt}; -use crate::errors::ConversionError; -use crate::generated as proto; /// Data required for a transaction batch. #[derive(Clone, Debug)] @@ -16,39 +11,3 @@ pub struct BatchInputs { pub note_proofs: BTreeMap, pub partial_block_chain: PartialBlockchain, } - -impl From for proto::store::BatchInputs { - fn from(inputs: BatchInputs) -> Self { - Self { - batch_reference_block_header: Some(inputs.batch_reference_block_header.into()), - note_proofs: inputs.note_proofs.iter().map(Into::into).collect(), - partial_block_chain: inputs.partial_block_chain.to_bytes(), - } - } -} - -impl TryFrom for BatchInputs { - type Error = ConversionError; - - fn try_from(response: proto::store::BatchInputs) -> Result { - let decoder = response.decoder(); - let result = Self { - batch_reference_block_header: crate::decode!( - decoder, - response.batch_reference_block_header - )?, - note_proofs: response - .note_proofs - .iter() - .map(<(NoteId, NoteInclusionProof)>::try_from) - .collect::>() - .context("note_proofs")?, - partial_block_chain: PartialBlockchain::decode_bytes( - &response.partial_block_chain, - "PartialBlockchain", - )?, - }; - - Ok(result) - } -} diff --git a/crates/proto/src/domain/block.rs b/crates/proto/src/domain/block.rs index a6b6e364e..068f52fe0 100644 --- a/crates/proto/src/domain/block.rs +++ b/crates/proto/src/domain/block.rs @@ -1,25 +1,14 @@ -use std::collections::BTreeMap; use std::ops::RangeInclusive; use miden_protocol::account::AccountId; -use miden_protocol::block::nullifier_tree::NullifierWitness; -use miden_protocol::block::{ - BlockBody, - BlockHeader, - BlockInputs, - BlockNumber, - FeeParameters, - SignedBlock, -}; +use miden_protocol::block::{BlockBody, BlockHeader, BlockNumber, FeeParameters, SignedBlock}; use miden_protocol::crypto::dsa::ecdsa_k256_keccak::{PublicKey, Signature}; -use miden_protocol::note::{NoteId, NoteInclusionProof}; -use miden_protocol::transaction::PartialBlockchain; use miden_protocol::utils::serde::Serializable; use thiserror::Error; use crate::decode::{ConversionResultExt, DecodeBytesExt, GrpcDecodeExt}; use crate::errors::ConversionError; -use crate::{AccountWitnessRecord, NullifierWitnessRecord, decode, generated as proto}; +use crate::{decode, generated as proto}; // BLOCK NUMBER // ================================================================================================ @@ -173,88 +162,6 @@ impl TryFrom for SignedBlock { } } -// BLOCK INPUTS -// ================================================================================================ - -impl From for proto::store::BlockInputs { - fn from(inputs: BlockInputs) -> Self { - let ( - prev_block_header, - partial_block_chain, - account_witnesses, - nullifier_witnesses, - unauthenticated_note_proofs, - ) = inputs.into_parts(); - - proto::store::BlockInputs { - latest_block_header: Some(prev_block_header.into()), - account_witnesses: account_witnesses - .into_iter() - .map(|(id, witness)| AccountWitnessRecord { account_id: id, witness }.into()) - .collect(), - nullifier_witnesses: nullifier_witnesses - .into_iter() - .map(|(nullifier, witness)| { - let proof = witness.into_proof(); - NullifierWitnessRecord { nullifier, proof }.into() - }) - .collect(), - partial_block_chain: partial_block_chain.to_bytes(), - unauthenticated_note_proofs: unauthenticated_note_proofs - .iter() - .map(proto::note::NoteInclusionInBlockProof::from) - .collect(), - } - } -} - -impl TryFrom for BlockInputs { - type Error = ConversionError; - - fn try_from(response: proto::store::BlockInputs) -> Result { - let decoder = response.decoder(); - let latest_block_header: BlockHeader = decode!(decoder, response.latest_block_header)?; - - let account_witnesses = response - .account_witnesses - .into_iter() - .map(|entry| { - let witness_record: AccountWitnessRecord = entry.try_into()?; - Ok((witness_record.account_id, witness_record.witness)) - }) - .collect::, ConversionError>>() - .context("account_witnesses")?; - - let nullifier_witnesses = response - .nullifier_witnesses - .into_iter() - .map(|entry| { - let witness: NullifierWitnessRecord = entry.try_into()?; - Ok((witness.nullifier, NullifierWitness::new(witness.proof))) - }) - .collect::, ConversionError>>() - .context("nullifier_witnesses")?; - - let unauthenticated_note_proofs = response - .unauthenticated_note_proofs - .iter() - .map(<(NoteId, NoteInclusionProof)>::try_from) - .collect::>() - .context("unauthenticated_note_proofs")?; - - let partial_block_chain = - PartialBlockchain::decode_bytes(&response.partial_block_chain, "PartialBlockchain")?; - - Ok(BlockInputs::new( - latest_block_header, - partial_block_chain, - account_witnesses, - nullifier_witnesses, - unauthenticated_note_proofs, - )) - } -} - // PUBLIC KEY // ================================================================================================ diff --git a/crates/proto/src/domain/nullifier.rs b/crates/proto/src/domain/nullifier.rs index 326cc4ebf..e206c165b 100644 --- a/crates/proto/src/domain/nullifier.rs +++ b/crates/proto/src/domain/nullifier.rs @@ -1,10 +1,8 @@ use miden_protocol::Word; -use miden_protocol::crypto::merkle::smt::SmtProof; use miden_protocol::note::Nullifier; -use crate::decode::GrpcDecodeExt; use crate::errors::ConversionError; -use crate::{decode, generated as proto}; +use crate::generated as proto; // FROM NULLIFIER // ================================================================================================ @@ -32,35 +30,3 @@ impl TryFrom for Nullifier { Ok(Nullifier::from_raw(digest)) } } - -// NULLIFIER WITNESS RECORD -// ================================================================================================ - -#[derive(Clone, Debug)] -pub struct NullifierWitnessRecord { - pub nullifier: Nullifier, - pub proof: SmtProof, -} - -impl TryFrom for NullifierWitnessRecord { - type Error = ConversionError; - - fn try_from( - nullifier_witness_record: proto::store::block_inputs::NullifierWitness, - ) -> Result { - let decoder = nullifier_witness_record.decoder(); - Ok(Self { - nullifier: decode!(decoder, nullifier_witness_record.nullifier)?, - proof: decode!(decoder, nullifier_witness_record.opening)?, - }) - } -} - -impl From for proto::store::block_inputs::NullifierWitness { - fn from(value: NullifierWitnessRecord) -> Self { - Self { - nullifier: Some(value.nullifier.into()), - opening: Some(value.proof.into()), - } - } -} diff --git a/crates/proto/src/lib.rs b/crates/proto/src/lib.rs index 1ec05672c..615f516a4 100644 --- a/crates/proto/src/lib.rs +++ b/crates/proto/src/lib.rs @@ -9,8 +9,7 @@ pub mod generated; // RE-EXPORTS // ================================================================================================ -pub use domain::account::{AccountState, AccountWitnessRecord}; -pub use domain::nullifier::NullifierWitnessRecord; +pub use domain::account::AccountWitnessRecord; pub use domain::proof_request::BlockProofRequest; pub use domain::{convert, try_convert}; pub use prost; diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index ccec3843d..f12fa6561 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -3,11 +3,12 @@ use std::sync::LazyLock; use std::time::Duration; use anyhow::Context; +use miden_node_block_producer::{BlockProducerStatus, MempoolStats as BlockProducerMempoolStats}; use miden_node_proto::clients::{NtxBuilderClient, StoreRpcClient}; use miden_node_proto::decode::{read_account_id, read_account_ids, read_block_range}; use miden_node_proto::domain::account::{AccountRequest, SlotData}; use miden_node_proto::errors::ConversionError; -use miden_node_proto::generated::rpc::MempoolStats; +use miden_node_proto::generated::rpc::MempoolStats as ProtoMempoolStats; use miden_node_proto::generated::rpc::api_server::{self, Api}; use miden_node_proto::generated::{self as proto}; use miden_node_proto::try_convert; @@ -515,8 +516,9 @@ impl api_server::Api for RpcService { } block_producer - .submit_proven_tx(request) + .submit_proven_tx(rebuilt_tx) .await + .map(Into::into) .map(Response::new) .map_err(Into::into) } @@ -618,8 +620,9 @@ impl api_server::Api for RpcService { } block_producer - .submit_proven_tx_batch(request) + .submit_proven_tx_batch(proposed_batch) .await + .map(Into::into) .map(Response::new) .map_err(Into::into) } @@ -658,7 +661,9 @@ impl api_server::Api for RpcService { let store_status = self.store.clone().status(Request::new(())).await.map(Response::into_inner).ok(); let block_producer_status = match &self.mode { - RpcMode::Sequencer { block_producer, .. } => Some(block_producer.status().await), + RpcMode::Sequencer { block_producer, .. } => { + Some(block_producer_status_to_proto(block_producer.status().await)) + }, RpcMode::FullNode { source_rpc } => source_rpc .as_ref() .clone() @@ -679,7 +684,7 @@ impl api_server::Api for RpcService { status: "unreachable".to_string(), version: "-".to_string(), chain_tip: 0, - mempool_stats: Some(MempoolStats::default()), + mempool_stats: Some(ProtoMempoolStats::default()), })), genesis_commitment: self.genesis_commitment.map(Into::into), })) @@ -736,6 +741,25 @@ fn strip_output_note_decorators<'a>( }) } +fn block_producer_status_to_proto(status: BlockProducerStatus) -> proto::rpc::BlockProducerStatus { + proto::rpc::BlockProducerStatus { + version: status.version, + status: status.status, + chain_tip: status.chain_tip.as_u32(), + mempool_stats: Some(block_producer_mempool_stats_to_proto(status.mempool_stats)), + } +} + +fn block_producer_mempool_stats_to_proto( + stats: BlockProducerMempoolStats, +) -> proto::rpc::MempoolStats { + proto::rpc::MempoolStats { + unbatched_transactions: stats.unbatched_transactions, + proposed_batches: stats.proposed_batches, + proven_batches: stats.proven_batches, + } +} + // LIMIT HELPERS // ================================================================================================ diff --git a/crates/rpc/src/tests.rs b/crates/rpc/src/tests.rs index cf9b541f3..f653927a6 100644 --- a/crates/rpc/src/tests.rs +++ b/crates/rpc/src/tests.rs @@ -1,10 +1,10 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::num::{NonZeroU32, NonZeroU64}; +use std::sync::Arc; use std::time::Duration; use http::header::{ACCEPT, CONTENT_TYPE}; use http::{HeaderMap, HeaderValue}; -use miden_node_block_producer::store::StoreClient as BlockProducerStoreClient; use miden_node_block_producer::{BlockProducerApi, BlockProducerApiConfig}; use miden_node_proto::clients::{ Builder, @@ -17,6 +17,7 @@ use miden_node_proto::clients::{ use miden_node_proto::generated::rpc::api_client::ApiClient as ProtoClient; use miden_node_proto::generated::{self as proto}; use miden_node_store::genesis::config::GenesisConfig; +use miden_node_store::state::State; use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, Store, StoreMode}; use miden_node_utils::clap::{GrpcOptionsExternal, GrpcOptionsInternal, StorageOptions}; use miden_node_utils::fee::test_fee; @@ -83,10 +84,10 @@ impl TestStore { self.data_directory.take().expect("data_directory should be set") } - async fn start(store_listener: TcpListener) -> Self { + fn start(store_listener: TcpListener) -> Self { let data_directory = tempfile::tempdir().expect("tempdir should be created"); let genesis_commitment = Self::bootstrap(data_directory.path()); - Self::start_without_bootstrap(data_directory, genesis_commitment, store_listener).await + Self::start_without_bootstrap(data_directory, genesis_commitment, store_listener) } fn bootstrap(path: &std::path::Path) -> Word { @@ -104,7 +105,7 @@ impl TestStore { genesis_commitment } - async fn start_without_bootstrap( + fn start_without_bootstrap( data_directory: TempDir, genesis_commitment: Word, store_listener: TcpListener, @@ -113,8 +114,6 @@ impl TestStore { let store_addr = store_listener.local_addr().expect("store listener should get a local address"); let rpc_listener = store_listener; - let block_producer_listener = - TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port"); // In order to later kill the store, we need to spawn a new runtime and run the store on it. // That allows us to kill all the tasks spawned by the store when we kill the runtime. @@ -123,8 +122,7 @@ impl TestStore { store_runtime.spawn(async move { Store { rpc_listener, - mode: StoreMode::BlockProducer { - block_producer_listener, + mode: StoreMode::Sequencer { block_prover_url: None, max_concurrent_proofs: DEFAULT_MAX_CONCURRENT_PROOFS, }, @@ -172,6 +170,14 @@ fn shutdown_store_runtime_blocking(store_runtime: Runtime) { .expect("store runtime shutdown thread should complete"); } +async fn load_state(path: &std::path::Path) -> Arc { + let (termination_ask, _termination_signal) = tokio::sync::mpsc::channel(1); + let (state, _) = State::load(path, StorageOptions::default(), termination_ask) + .await + .expect("state should load"); + Arc::new(state) +} + /// Byte offset of the account delta commitment in serialized `ProvenTransaction`. Layout: /// `AccountId` (15) + `initial_commitment` (32) + `final_commitment` (32) = 79 const DELTA_COMMITMENT_BYTE_OFFSET: usize = 15 + 32 + 32; @@ -258,7 +264,7 @@ fn build_test_proven_tx_with_id( async fn rpc_server_accepts_requests_without_accept_header() { // Start the RPC. let (_, rpc_addr, store_listener) = start_rpc().await; - let _store = TestStore::start(store_listener).await; + let _store = TestStore::start(store_listener); // Override the client so that the ACCEPT header is not set. let mut rpc_client = { @@ -286,7 +292,7 @@ async fn rpc_rate_limits_per_ip() { ..GrpcOptionsExternal::test() }; let (_, rpc_addr, store_listener) = start_rpc_with_options(grpc_options).await; - let _store = TestStore::start(store_listener).await; + let _store = TestStore::start(store_listener); let url = rpc_addr.to_string(); let url = Url::parse(format!("http://{}", &url).as_str()).unwrap(); @@ -311,7 +317,9 @@ async fn rpc_rate_limits_per_ip() { #[tokio::test] async fn rpc_server_accepts_requests_with_accept_header() { - let (mut rpc_client, _, _store) = start_rpc_and_store_ready().await; + // Start the RPC. + let (mut rpc_client, _, store_listener) = start_rpc().await; + let _store = TestStore::start(store_listener); // Send any request to the RPC. let response = send_request(&mut rpc_client).await; @@ -323,7 +331,9 @@ async fn rpc_server_accepts_requests_with_accept_header() { #[tokio::test] async fn rpc_server_rejects_requests_with_accept_header_invalid_version() { for version in ["1.9.0", "0.8.1", "0.8.0", "0.999.0", "99.0.0"] { - let (_, rpc_addr, _store) = start_rpc_and_store_ready().await; + // Start the RPC. + let (_, rpc_addr, store_listener) = start_rpc().await; + let _store = TestStore::start(store_listener); // Recreate the RPC client with an invalid version. let url = rpc_addr.to_string(); @@ -362,7 +372,7 @@ async fn rpc_startup_is_robust_to_network_failures() { assert!(response.is_err()); // Start the store. - let store = TestStore::start(store_listener).await; + let store = TestStore::start(store_listener); // Test: send request against RPC api and should succeed let response = send_request_until_success(&mut rpc_client).await; @@ -378,8 +388,7 @@ async fn rpc_startup_is_robust_to_network_failures() { // Test: restart the store and request should succeed let store_listener = TcpListener::bind(store_addr).await.expect("Failed to bind store"); let _store = - TestStore::start_without_bootstrap(data_directory, genesis_commitment, store_listener) - .await; + TestStore::start_without_bootstrap(data_directory, genesis_commitment, store_listener); let response = send_request_until_success(&mut rpc_client).await; assert_eq!(response.unwrap().into_inner().block_header.unwrap().block_num, 0); } @@ -388,7 +397,7 @@ async fn rpc_startup_is_robust_to_network_failures() { async fn rpc_server_has_web_support() { // Start server let (_, rpc_addr, store_listener) = start_rpc().await; - let _store = TestStore::start(store_listener).await; + let _store = TestStore::start(store_listener); // Send a status request let client = reqwest::Client::new(); @@ -428,7 +437,9 @@ async fn rpc_server_has_web_support() { #[tokio::test] async fn rpc_server_rejects_proven_transactions_with_invalid_commitment() { - let (_, rpc_addr, store) = start_rpc_and_store_ready().await; + // Start the RPC. + let (_, rpc_addr, store_listener) = start_rpc().await; + let store = TestStore::start(store_listener); let genesis = store.genesis_commitment(); // Override the client so that the ACCEPT header is not set. @@ -475,7 +486,9 @@ async fn rpc_server_rejects_proven_transactions_with_invalid_commitment() { #[tokio::test] async fn rpc_server_rejects_proven_transactions_with_invalid_reference_block() { - let (_, rpc_addr, store) = start_rpc_and_store_ready().await; + // Start the RPC. + let (_, rpc_addr, store_listener) = start_rpc().await; + let store = TestStore::start(store_listener); let genesis = store.genesis_commitment(); // Override the client so that the ACCEPT header is not set. @@ -561,7 +574,9 @@ async fn rpc_rejects_post_deployment_network_account_tx() { #[tokio::test] async fn rpc_server_rejects_tx_submissions_without_genesis() { - let (_, rpc_addr, store) = start_rpc_and_store_ready().await; + // Start the RPC. + let (_, rpc_addr, store_listener) = start_rpc().await; + let store = TestStore::start(store_listener); let genesis = store.genesis_commitment(); // Override the client so that the ACCEPT header is not set. @@ -649,7 +664,7 @@ async fn start_rpc() -> (RpcClient, std::net::SocketAddr, TcpListener) { /// would otherwise race the RPC component's startup under high test parallelism. async fn start_rpc_and_store_ready() -> (RpcClient, std::net::SocketAddr, TestStore) { let (mut rpc_client, rpc_addr, store_listener) = start_rpc().await; - let store = TestStore::start(store_listener).await; + let store = TestStore::start(store_listener); send_request_until_success(&mut rpc_client) .await .expect("RPC should become ready after store starts"); @@ -661,16 +676,21 @@ async fn start_rpc_with_options( ) -> (RpcClient, std::net::SocketAddr, TcpListener) { let store_listener = TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port"); let store_addr = store_listener.local_addr().expect("store should get a local address"); + let block_producer_data_directory = + tempfile::tempdir().expect("block producer state tempdir should be created"); + TestStore::bootstrap(block_producer_data_directory.path()); + let block_producer_state = load_state(block_producer_data_directory.path()).await; // Start the rpc component. let rpc_listener = TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind rpc"); let rpc_addr = rpc_listener.local_addr().expect("Failed to get rpc address"); task::spawn(async move { + let _block_producer_data_directory = block_producer_data_directory; // SAFETY: The store_addr is always valid as it is created from a `SocketAddr`. let store_url = Url::parse(&format!("http://{store_addr}")).unwrap(); // SAFETY: Using dummy validator URL for test - not actually contacted in this test let validator_url = Url::parse("http://127.0.0.1:0").unwrap(); - let store = Builder::new(store_url.clone()) + let store = Builder::new(store_url) .without_tls() .without_timeout() .without_metadata_version() @@ -678,7 +698,7 @@ async fn start_rpc_with_options( .with_otel_context_injection() .connect_lazy::(); let block_producer = BlockProducerApi::new( - BlockProducerStoreClient::new(store_url), + block_producer_state, 0.into(), BlockProducerApiConfig::default(), ); @@ -710,7 +730,9 @@ async fn start_rpc_with_options( #[tokio::test] async fn get_limits_endpoint() { - let (mut rpc_client, _rpc_addr, _store) = start_rpc_and_store_ready().await; + // Start the RPC and store + let (mut rpc_client, _rpc_addr, store_listener) = start_rpc().await; + let _store = TestStore::start(store_listener); // Call the get_limits endpoint let response = rpc_client.get_limits(()).await.expect("get_limits should succeed"); @@ -775,7 +797,7 @@ async fn get_limits_endpoint() { #[tokio::test] async fn sync_chain_mmr_returns_delta() { let (mut rpc_client, _rpc_addr, store_listener) = start_rpc().await; - let _store = TestStore::start(store_listener).await; + let _store = TestStore::start(store_listener); let request = proto::rpc::SyncChainMmrRequest { current_client_block_height: 0, diff --git a/crates/store/README.md b/crates/store/README.md index 5331a0df6..1ad99a424 100644 --- a/crates/store/README.md +++ b/crates/store/README.md @@ -38,17 +38,14 @@ Without the environment variables above, `librocksdb-sys` compiles RocksDB from ## API overview -The full gRPC API can be found [here](../../proto/proto/store.proto). +The full gRPC API can be found [here](../../proto/proto/internal/store.proto). Block producer +access to store state is in-process and is not exposed as a store gRPC API. -- [ApplyBlock](#applyblock) - [GetAccount](#getaccount) - [GetBlockByNumber](#getblockbynumber) - [GetBlockHeaderByNumber](#getblockheaderbynumber) -- [GetBlockInputs](#getblockinputs) -- [GetNoteAuthenticationInfo](#getnoteauthenticationinfo) - [GetNotesById](#getnotesbyid) -- [GetTransactionInputs](#gettransactioninputs) - [GetNoteScriptByRoot](#getnotescriptbyroot) - [SyncNullifiers](#syncnullifiers) - [SyncAccountVault](#syncaccountvault) @@ -58,14 +55,6 @@ The full gRPC API can be found [here](../../proto/proto/store.proto). - [SyncTransactions](#synctransactions) ---- - -### ApplyBlock - -Applies changes of a new block to the DB and in-memory data structures. Raw block data is also stored as a flat file. - ---- - ### GetAccount Returns an account witness (Merkle proof of inclusion in the account tree) and optionally account details. @@ -87,22 +76,6 @@ Returns raw block data for the specified block number. Retrieves block header by given block number. Optionally, it also returns the MMR path and current chain length to authenticate the block's inclusion. ---- - -### GetBlockInputs - -Used by the `block-producer` to query state required to prove the next block. - ---- - -### GetNoteAuthenticationInfo - -Returns a list of Note inclusion proofs for the specified Note IDs. - -This is used by the `block-producer` as part of the batch proving process. - ---- - ### GetNotesById Returns a list of notes matching the provided note IDs. @@ -119,14 +92,6 @@ When note retrieval fails, detailed error information is provided through gRPC s | `TOO_MANY_NOTE_IDS` | 3 | `INVALID_ARGUMENT` | Too many note IDs in request | | `NOTE_NOT_PUBLIC` | 4 | `PERMISSION_DENIED`| Note details not publicly accessible | ---- - -### GetTransactionInputs - -Used by the `block-producer` to query state required to verify a submitted transaction. - ---- - ### GetNoteScriptByRoot Returns the script for a note by its root. diff --git a/crates/store/src/errors.rs b/crates/store/src/errors.rs index d16548804..4df7de101 100644 --- a/crates/store/src/errors.rs +++ b/crates/store/src/errors.rs @@ -225,6 +225,14 @@ pub enum ApplyBlockError { DbUpdateTaskFailed(String), } +#[derive(Error, Debug)] +pub enum ApplyBlockWithProvingInputsError { + #[error("failed to save block proving inputs")] + SaveProvingInputs(#[source] io::Error), + #[error("failed to apply block")] + ApplyBlock(#[source] ApplyBlockError), +} + impl From for Status { fn from(err: ApplyBlockError) -> Self { match err { diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 5ceb43235..6b35699f2 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -13,7 +13,14 @@ pub use accounts::PersistentAccountTree; pub use accounts::{AccountTreeWithHistory, HistoricalError, InMemoryAccountTree}; pub use db::Db; pub use db::models::conv::SqlTypeConvert; -pub use errors::DatabaseError; +pub use errors::{ + ApplyBlockError, + ApplyBlockWithProvingInputsError, + DatabaseError, + GetBatchInputsError, + GetBlockHeaderError, + GetBlockInputsError, +}; pub use genesis::GenesisState; pub use server::block_prover_client::BlockProver; pub use server::proof_scheduler::DEFAULT_MAX_CONCURRENT_PROOFS; diff --git a/crates/store/src/server/api.rs b/crates/store/src/server/api.rs index 406df7663..4ec4a3abd 100644 --- a/crates/store/src/server/api.rs +++ b/crates/store/src/server/api.rs @@ -1,20 +1,12 @@ -use std::collections::BTreeSet; use std::sync::Arc; -use miden_node_proto::decode::ConversionResultExt; -use miden_node_proto::errors::ConversionError; use miden_node_proto::generated as proto; -use miden_node_utils::ErrorReport; -use miden_protocol::Word; -use miden_protocol::batch::OrderedBatches; -use miden_protocol::block::{BlockInputs, BlockNumber}; -use miden_protocol::note::Nullifier; +use miden_protocol::block::BlockNumber; use tokio::sync::{Semaphore, watch}; use tonic::{Request, Response, Status}; -use tracing::{info, instrument}; +use tracing::info; use crate::COMPONENT; -use crate::errors::GetBlockInputsError; use crate::state::{BlockCache, ProofCache, State}; // STORE API @@ -77,40 +69,6 @@ impl StoreApi { mmr_path: mmr_proof.map(|p| Into::into(p.merkle_path())), })) } - - /// Retrieves block inputs from state based on the contents of the supplied ordered batches. - pub(crate) async fn block_inputs_from_ordered_batches( - &self, - batches: &OrderedBatches, - ) -> Result { - // Construct fields required to retrieve block inputs. - let mut account_ids = BTreeSet::new(); - let mut nullifiers = Vec::new(); - let mut unauthenticated_note_commitments = BTreeSet::new(); - let mut reference_blocks = BTreeSet::new(); - - for batch in batches.as_slice() { - account_ids.extend(batch.updated_accounts()); - nullifiers.extend(batch.created_nullifiers()); - reference_blocks.insert(batch.reference_block_num()); - - for note in batch.input_notes().iter() { - if let Some(header) = note.header() { - unauthenticated_note_commitments.insert(header.id().as_word()); - } - } - } - - // Retrieve block inputs from the store. - self.state - .get_block_inputs( - account_ids.into_iter().collect(), - nullifiers, - unauthenticated_note_commitments, - reference_blocks, - ) - .await - } } // UTILITIES @@ -120,58 +78,3 @@ impl StoreApi { pub fn internal_error(err: E) -> Status { Status::internal(err.to_string()) } - -/// Formats an "Invalid argument" error -pub fn invalid_argument(err: E) -> Status { - Status::invalid_argument(err.to_string()) -} - -/// Converts `ConversionError` to Status for nullifier validation -pub fn conversion_error_to_status(value: &ConversionError) -> Status { - invalid_argument(value.as_report_context("Invalid nullifier format")) -} - -#[instrument( - level = "debug", - target = COMPONENT, - skip_all, - fields(nullifiers = nullifiers.len()), - err -)] -pub fn validate_nullifiers(nullifiers: &[proto::primitives::Digest]) -> Result, E> -where - E: From + std::fmt::Display, -{ - nullifiers - .iter() - .copied() - .map(Nullifier::try_from) - .collect::>() - .context("nullifiers") - .map_err(Into::into) -} - -#[instrument( - level = "debug", - target = COMPONENT, - skip_all, - fields(notes = notes.len()), - err -)] -pub fn validate_note_commitments(notes: &[proto::primitives::Digest]) -> Result, Status> { - notes - .iter() - .map(Word::try_from) - .collect::, _>>() - .map_err(|_| invalid_argument("Digest field is not in the modulus range")) -} - -#[instrument( - level = "debug", - target = COMPONENT, - skip_all, - fields(block_numbers = block_numbers.len()) -)] -pub fn read_block_numbers(block_numbers: &[u32]) -> BTreeSet { - BTreeSet::from_iter(block_numbers.iter().map(|raw_number| BlockNumber::from(*raw_number))) -} diff --git a/crates/store/src/server/block_producer.rs b/crates/store/src/server/block_producer.rs deleted file mode 100644 index 7f971b8ec..000000000 --- a/crates/store/src/server/block_producer.rs +++ /dev/null @@ -1,258 +0,0 @@ -use std::convert::Infallible; - -use miden_crypto::dsa::ecdsa_k256_keccak::Signature; -use miden_node_proto::decode::{GrpcDecodeExt, read_account_id, read_account_ids}; -use miden_node_proto::domain::proof_request::BlockProofRequest; -use miden_node_proto::errors::ConversionError; -use miden_node_proto::generated::store::block_producer_server; -use miden_node_proto::generated::{self as proto}; -use miden_node_proto::{decode, try_convert}; -use miden_node_utils::ErrorReport; -use miden_node_utils::tracing::OpenTelemetrySpanExt; -use miden_protocol::Word; -use miden_protocol::batch::OrderedBatches; -use miden_protocol::block::{BlockBody, BlockHeader, BlockNumber, SignedBlock}; -use miden_protocol::utils::serde::Deserializable; -use tokio::sync::watch; -use tonic::{Request, Response, Status}; -use tracing::{Instrument, error}; - -use crate::errors::ApplyBlockError; -use crate::server::api::{ - StoreApi, - conversion_error_to_status, - read_block_numbers, - validate_note_commitments, - validate_nullifiers, -}; -use crate::state::Finality; - -// BLOCK PRODUCER API -// ================================================================================================ - -/// Extends [`StoreApi`] with the proof-scheduler notification channel, which is only required by -/// the `BlockProducer` gRPC service. Not used in replica mode. -#[derive(Clone)] -pub(super) struct BlockProducerApi { - pub(super) inner: StoreApi, - /// Notifies the proof scheduler of the latest committed block number after each `apply_block`. - pub(super) chain_tip_sender: watch::Sender, -} - -// BLOCK PRODUCER ENDPOINTS -// ================================================================================================ - -#[tonic::async_trait] -impl block_producer_server::BlockProducer for BlockProducerApi { - /// Returns block header for the specified block number. - /// - /// If the block number is not provided, block header for the latest block is returned. - async fn get_block_header_by_number( - &self, - request: Request, - ) -> Result, Status> { - self.inner.get_block_header_by_number_inner(request).await - } - - /// Updates the local DB by inserting a new block header and the related data. - async fn apply_block( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - // Read ordered batches. - let ordered_batches = - OrderedBatches::read_from_bytes(&request.ordered_batches).map_err(|err| { - Status::invalid_argument( - err.as_report_context("failed to deserialize ordered batches"), - ) - })?; - // Read block. - let block = request - .block - .ok_or(ConversionError::missing_field::("block"))?; - // Decode block fields. - let decoder = block.decoder(); - let header: BlockHeader = decode!(decoder, block.header)?; - let body: BlockBody = decode!(decoder, block.body)?; - let signature: Signature = decode!(decoder, block.signature)?; - - // Get block inputs from ordered batches. - let block_inputs = - self.inner.block_inputs_from_ordered_batches(&ordered_batches).await.map_err( - |err| { - Status::invalid_argument( - err.as_report_context("failed to get block inputs from ordered batches"), - ) - }, - )?; - - let span = tracing::Span::current(); - span.set_attribute("block.number", header.block_num()); - span.set_attribute("block.commitment", header.commitment()); - span.set_attribute("block.accounts.count", body.updated_accounts().len()); - span.set_attribute("block.output_notes.count", body.output_notes().count()); - span.set_attribute("block.nullifiers.count", body.created_nullifiers().len()); - - // Construct block proof request to be stored alongside the block for deferred block - // proving. - let proving_inputs = BlockProofRequest { - tx_batches: ordered_batches, - block_header: header.clone(), - block_inputs, - }; - let block_num = header.block_num(); - self.inner - .state - .save_proving_inputs(block_num, &proving_inputs) - .await - .map_err(|err| Status::new(tonic::Code::Internal, err.as_report()))?; - - // We perform the apply block work in a separate task. This prevents the caller - // cancelling the request and thereby cancelling the task at an arbitrary point of - // execution. - // - // Normally this shouldn't be a problem, however our apply_block isn't quite ACID compliant - // so things get a bit messy. This is more a temporary hack-around to minimize this risk. - let this = self.clone(); - tokio::spawn( - async move { - let signed_block = SignedBlock::new(header, body, signature) - .map_err(|err| Status::new(tonic::Code::Internal, err.as_report()))?; - // Note: This is an internal endpoint, so its safe to expose the full error report. - this.inner - .state - .apply_block(signed_block) - .await - .inspect(|_| { - if let Err(err) = this.chain_tip_sender.send(block_num) { - error!("Failed to send chain tip: {:?}", err); - } - }) - .map_err(|err| { - span.set_error(&err); - let code = match err { - ApplyBlockError::InvalidBlockError(_) => tonic::Code::InvalidArgument, - _ => tonic::Code::Internal, - }; - Status::new(code, err.as_report()) - }) - } - .in_current_span(), - ) - .await - .map_err(|err| { - tonic::Status::internal(err.as_report_context("joining apply_block task failed")) - }) - .flatten()?; - Ok(Response::new(())) - } - - /// Returns data needed by the block producer to construct and prove the next block. - async fn get_block_inputs( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - - let account_ids = read_account_ids::(request.account_ids)?; - let nullifiers = validate_nullifiers(&request.nullifiers) - .map_err(|err| conversion_error_to_status(&err))?; - let unauthenticated_note_commitments = - validate_note_commitments(&request.unauthenticated_notes)?; - let reference_blocks = read_block_numbers(&request.reference_blocks); - let unauthenticated_note_commitments = - unauthenticated_note_commitments.into_iter().collect(); - - self.inner - .state - .get_block_inputs( - account_ids, - nullifiers, - unauthenticated_note_commitments, - reference_blocks, - ) - .await - .map(proto::store::BlockInputs::from) - .map(Response::new) - .inspect_err(|err| tracing::Span::current().set_error(err)) - .map_err(|err| tonic::Status::internal(err.as_report())) - } - - /// Fetches the inputs for a transaction batch from the database. - /// - /// See [`State::get_batch_inputs`] for details. - async fn get_batch_inputs( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - - let note_commitments: Vec = try_convert(request.note_commitments) - .collect::>() - .map_err(|err| Status::invalid_argument(format!("Invalid note commitment: {err}")))?; - - let reference_blocks: Vec = - try_convert::<_, Infallible, _, _>(request.reference_blocks) - .collect::, _>>() - .expect("operation should be infallible"); - let reference_blocks = reference_blocks.into_iter().map(BlockNumber::from).collect(); - - self.inner - .state - .get_batch_inputs(reference_blocks, note_commitments.into_iter().collect()) - .await - .map(Into::into) - .map(Response::new) - .inspect_err(|err| tracing::Span::current().set_error(err)) - .map_err(|err| tonic::Status::internal(err.as_report())) - } - - async fn get_transaction_inputs( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - - let account_id = - read_account_id::(request.account_id)?; - let nullifiers = validate_nullifiers(&request.nullifiers) - .map_err(|err| conversion_error_to_status(&err))?; - let unauthenticated_note_commitments = - validate_note_commitments(&request.unauthenticated_notes)?; - - let tx_inputs = self - .inner - .state - .get_transaction_inputs(account_id, &nullifiers, unauthenticated_note_commitments) - .await - .inspect_err(|err| tracing::Span::current().set_error(err)) - .map_err(|err| tonic::Status::internal(err.as_report()))?; - - let block_height = self.inner.state.chain_tip(Finality::Committed).await.as_u32(); - - Ok(Response::new(proto::store::TransactionInputs { - account_state: Some(proto::store::transaction_inputs::AccountTransactionInputRecord { - account_id: Some(account_id.into()), - account_commitment: Some(tx_inputs.account_commitment.into()), - }), - nullifiers: tx_inputs - .nullifiers - .into_iter() - .map(|nullifier| { - proto::store::transaction_inputs::NullifierTransactionInputRecord { - nullifier: Some(nullifier.nullifier.into()), - block_num: nullifier.block_num.as_u32(), - } - }) - .collect(), - found_unauthenticated_notes: tx_inputs - .found_unauthenticated_notes - .into_iter() - .map(Into::into) - .collect(), - new_account_id_prefix_is_unique: tx_inputs.new_account_id_prefix_is_unique, - block_height, - })) - } -} diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 98c21ef84..49a718024 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -13,7 +13,6 @@ use miden_node_utils::spawn::spawn_blocking_in_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_node_utils::tracing::grpc::grpc_trace_fn; use tokio::net::TcpListener; -use tokio::sync::watch; use tokio::task::JoinSet; use tokio_stream::wrappers::TcpListenerStream; use tower_http::trace::TraceLayer; @@ -30,7 +29,6 @@ use crate::state::{ProofCache, State}; use crate::{BlockProver, COMPONENT}; mod api; -mod block_producer; pub mod block_prover_client; mod replica_sync; @@ -41,28 +39,23 @@ mod rpc_api; /// Determines how the store receives new blocks. /// -/// The two modes are mutually exclusive: a store either accepts blocks from a block producer -/// via its `BlockProducer` gRPC service, or it syncs blocks from an upstream store instance. -/// The services exposed on the network differ between modes accordingly. +/// The two modes are mutually exclusive: a store either acts as the primary writer for locally +/// produced blocks, or it syncs blocks from an upstream store instance. pub enum StoreMode { - /// Accepts blocks from a block producer via the `BlockProducer` gRPC service. + /// Store mode for a sequencing node that produces local blocks. /// - /// Exposes the `Rpc` and `BlockProducer` gRPC services and runs the proof scheduler to - /// generate block proofs. - BlockProducer { - /// Listener for the block producer gRPC endpoint. - block_producer_listener: TcpListener, + /// Exposes the `Rpc` gRPC service and runs the proof scheduler to generate block proofs. + Sequencer { /// URL of the remote block prover. Uses a local prover if `None`. block_prover_url: Option, /// Maximum number of blocks proven concurrently by the proof scheduler. max_concurrent_proofs: NonZeroUsize, }, - /// Receives blocks from an upstream store's `Rpc` gRPC service. + /// Store mode for a full node that syncs from an upstream store's `Rpc` gRPC service. /// - /// Only the `Rpc` gRPC service is exposed. The `BlockProducer` service is not started and no - /// proof scheduler runs. - Replica { upstream_url: Url }, + /// Only the `Rpc` gRPC service is exposed and no proof scheduler runs. + Full { upstream_url: Url }, } /// Database options used by the store. @@ -152,24 +145,18 @@ impl Store { let _disk_monitor_task = Self::spawn_disk_monitor(self.data_directory.clone()); let ModeSetup { mut grpc_servers, mode_task } = match self.mode { - StoreMode::BlockProducer { - block_producer_listener, - block_prover_url, - max_concurrent_proofs, - } => { - Self::setup_block_producer_mode( + StoreMode::Sequencer { block_prover_url, max_concurrent_proofs } => { + Self::setup_sequencer_mode( state, - block_producer_listener, block_prover_url, max_concurrent_proofs, tx_proven_tip, self.grpc_options, self.rpc_listener, - ) - .await? + )? }, - StoreMode::Replica { upstream_url } => { - Self::setup_replica_mode(state, upstream_url, self.grpc_options, self.rpc_listener)? + StoreMode::Full { upstream_url } => { + Self::setup_full_mode(state, upstream_url, self.grpc_options, self.rpc_listener)? }, }; @@ -193,43 +180,29 @@ impl Store { } } - async fn setup_block_producer_mode( + fn setup_sequencer_mode( state: State, - block_producer_listener: TcpListener, block_prover_url: Option, max_concurrent_proofs: NonZeroUsize, tx_proven_tip: ProvenTipWriter, grpc_options: GrpcOptionsInternal, rpc_listener: TcpListener, ) -> anyhow::Result { - info!(target: COMPONENT, - block_producer_endpoint=?block_producer_listener.local_addr()?, - "Starting in block-producer mode"); + info!(target: COMPONENT, "Starting in sequencer mode"); let proof_cache = state.proof_cache.clone(); - let (proof_scheduler_task, chain_tip_sender) = Self::spawn_proof_scheduler( + let proof_scheduler_task = Self::spawn_proof_scheduler( &state, block_prover_url, max_concurrent_proofs, tx_proven_tip, proof_cache, - ) - .await; + ); let state = Arc::new(state); let store_api = api::StoreApi::new(state); - let block_producer_api = block_producer::BlockProducerApi { - inner: store_api.clone(), - chain_tip_sender, - }; - let join_set = Self::spawn_block_producer_grpc_servers( - store_api, - block_producer_api, - grpc_options, - rpc_listener, - block_producer_listener, - )?; + let join_set = Self::spawn_store_grpc_server(store_api, grpc_options, rpc_listener)?; Ok(ModeSetup { grpc_servers: join_set, @@ -237,13 +210,13 @@ impl Store { }) } - fn setup_replica_mode( + fn setup_full_mode( state: State, upstream_url: Url, grpc_options: GrpcOptionsInternal, rpc_listener: TcpListener, ) -> anyhow::Result { - info!(target: COMPONENT, %upstream_url, "Starting in replica mode"); + info!(target: COMPONENT, %upstream_url, "Starting in full mode"); let state = Arc::new(state); let block_handle = BlockReplicaSync::new(Arc::clone(&state), upstream_url.clone()).spawn(); @@ -266,54 +239,41 @@ impl Store { /// Initializes the block prover client and spawns the proof scheduler as a background task. /// - /// Returns the scheduler task handle and the chain tip sender (needed by the block-producer - /// gRPC service to notify the scheduler of new blocks). - async fn spawn_proof_scheduler( + /// Returns the scheduler task handle. + fn spawn_proof_scheduler( state: &State, block_prover_url: Option, max_concurrent_proofs: NonZeroUsize, proven_tip: ProvenTipWriter, proof_cache: ProofCache, - ) -> ( - tokio::task::JoinHandle>, - watch::Sender, - ) { + ) -> tokio::task::JoinHandle> { let block_prover = if let Some(url) = block_prover_url { Arc::new(BlockProver::remote(url)) } else { Arc::new(BlockProver::local()) }; - let chain_tip = state.chain_tip(crate::state::Finality::Committed).await; - let (chain_tip_tx, chain_tip_rx) = watch::channel(chain_tip); + let chain_tip_rx = state.subscribe_committed_tip(); - let handle = proof_scheduler::spawn( + proof_scheduler::spawn( block_prover, state.block_store(), chain_tip_rx, proven_tip, max_concurrent_proofs, proof_cache, - ); - - (handle, chain_tip_tx) + ) } - /// Spawns the gRPC servers for block-producer mode. - /// - /// Starts two listeners: `Rpc` and `BlockProducer`. - fn spawn_block_producer_grpc_servers( + /// Spawns the store gRPC server. + fn spawn_store_grpc_server( store_api: api::StoreApi, - block_producer_api: block_producer::BlockProducerApi, grpc_options: GrpcOptionsInternal, rpc_listener: TcpListener, - block_producer_listener: TcpListener, ) -> anyhow::Result>> { let mut join_set = JoinSet::new(); let rpc_service = store::rpc_server::RpcServer::new(store_api); - let block_producer_service = - store::block_producer_server::BlockProducerServer::new(block_producer_api); let reflection_service = tonic_reflection::server::Builder::configure() .register_file_descriptor_set(store_api_descriptor()) @@ -330,49 +290,22 @@ impl Store { join_set.spawn( make_server() .add_service(rpc_service) - .add_service(reflection_service.clone()) - .serve_with_incoming(TcpListenerStream::new(rpc_listener)), - ); - - join_set.spawn( - make_server() - .accept_http1(true) - .add_service(block_producer_service) .add_service(reflection_service) - .serve_with_incoming(TcpListenerStream::new(block_producer_listener)), + .serve_with_incoming(TcpListenerStream::new(rpc_listener)), ); Ok(join_set) } - /// Spawns the gRPC servers for replica mode. + /// Spawns the gRPC servers for full-node mode. /// - /// Only the `Rpc` service is exposed — no `BlockProducer` or proof scheduler. + /// Only the `Rpc` service is exposed and no proof scheduler runs. fn spawn_replica_grpc_servers( store_api: api::StoreApi, grpc_options: GrpcOptionsInternal, rpc_listener: TcpListener, ) -> anyhow::Result>> { - let mut join_set = JoinSet::new(); - - let rpc_service = store::rpc_server::RpcServer::new(store_api); - - let reflection_service = tonic_reflection::server::Builder::configure() - .register_file_descriptor_set(store_api_descriptor()) - .build_v1() - .context("failed to build reflection service")?; - - join_set.spawn( - tonic::transport::Server::builder() - .timeout(grpc_options.request_timeout) - .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) - .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) - .add_service(rpc_service) - .add_service(reflection_service) - .serve_with_incoming(TcpListenerStream::new(rpc_listener)), - ); - - Ok(join_set) + Self::spawn_store_grpc_server(store_api, grpc_options, rpc_listener) } /// Spawns a background task that periodically records the on-disk size of every store data path /// as `OTel` span attributes. diff --git a/crates/store/src/state/apply_block.rs b/crates/store/src/state/apply_block.rs index 4c5f2ab73..aa473d1bd 100644 --- a/crates/store/src/state/apply_block.rs +++ b/crates/store/src/state/apply_block.rs @@ -4,9 +4,10 @@ use miden_node_proto::domain::proof_request::BlockProofRequest; use miden_node_utils::ErrorReport; use miden_protocol::Word; use miden_protocol::account::delta::AccountUpdateDetails; +use miden_protocol::batch::OrderedBatches; use miden_protocol::block::account_tree::AccountMutationSet; use miden_protocol::block::nullifier_tree::NullifierMutationSet; -use miden_protocol::block::{BlockBody, BlockHeader, BlockNumber, SignedBlock}; +use miden_protocol::block::{BlockBody, BlockHeader, BlockInputs, BlockNumber, SignedBlock}; use miden_protocol::note::{NoteAttachments, NoteDetails, Nullifier}; use miden_protocol::transaction::OutputNote; use miden_protocol::utils::serde::Serializable; @@ -14,11 +15,39 @@ use tokio::sync::oneshot; use tracing::{Instrument, info, info_span, instrument}; use crate::db::NoteRecord; -use crate::errors::{ApplyBlockError, InvalidBlockError}; +use crate::errors::{ApplyBlockError, ApplyBlockWithProvingInputsError, InvalidBlockError}; use crate::state::{BlockNotification, State}; use crate::{COMPONENT, HistoricalError}; impl State { + /// Saves proving inputs for a signed block and applies it to the state. + /// + /// Used by the in-process block producer after it has built and signed a block. + #[instrument(target = COMPONENT, skip_all, err)] + pub async fn apply_block_with_proving_inputs( + &self, + ordered_batches: OrderedBatches, + block_inputs: BlockInputs, + signed_block: SignedBlock, + ) -> Result<(), ApplyBlockWithProvingInputsError> { + let block_header = signed_block.header().clone(); + let block_num = block_header.block_num(); + + let proving_inputs = BlockProofRequest { + tx_batches: ordered_batches, + block_header, + block_inputs, + }; + + self.save_proving_inputs(block_num, &proving_inputs) + .await + .map_err(ApplyBlockWithProvingInputsError::SaveProvingInputs)?; + + self.apply_block(signed_block) + .await + .map_err(ApplyBlockWithProvingInputsError::ApplyBlock) + } + /// Apply changes of a new block to the DB and in-memory data structures. /// /// ## Note on state consistency diff --git a/crates/store/src/state/apply_proof.rs b/crates/store/src/state/apply_proof.rs index 9116fb108..63847512e 100644 --- a/crates/store/src/state/apply_proof.rs +++ b/crates/store/src/state/apply_proof.rs @@ -7,7 +7,7 @@ use crate::state::{ProofNotification, State}; impl State { /// Saves a block proof, advances the proven-in-sequence tip, and notifies replica subscribers. /// - /// Only used when the store is running in replica mode. + /// Only used when the store is running in full-node mode. #[instrument(target = COMPONENT, skip_all, err, fields(block.number = block_num.as_u32()))] pub async fn apply_proof( &self, diff --git a/crates/store/src/state/mod.rs b/crates/store/src/state/mod.rs index b5fa213b9..b78efa38b 100644 --- a/crates/store/src/state/mod.rs +++ b/crates/store/src/state/mod.rs @@ -186,7 +186,7 @@ impl State { /// Loads the state from the data directory. /// /// Returns `(Self, ProvenTipWriter)`. The `ProvenTipWriter` is used by the proof scheduler - /// (in block-producer mode) to advance the proven tip; callers can subscribe to tip changes + /// (in sequencer mode) to advance the proven tip; callers can subscribe to tip changes /// via the methods on `Self`. #[instrument(target = COMPONENT, skip_all)] pub async fn load( @@ -206,7 +206,7 @@ impl State { /// Loads the state from the data directory using explicit database options. /// /// Returns `(Self, ProvenTipWriter)`. The `ProvenTipWriter` is used by the proof scheduler - /// (in block-producer mode) to advance the proven tip; callers can subscribe to tip changes + /// (in sequencer mode) to advance the proven tip; callers can subscribe to tip changes /// via the methods on `Self`. #[instrument(target = COMPONENT, skip_all)] pub async fn load_with_database_options( diff --git a/docs/external/src/operator/monitoring.md b/docs/external/src/operator/monitoring.md index 4c2b686ec..64327e443 100644 --- a/docs/external/src/operator/monitoring.md +++ b/docs/external/src/operator/monitoring.md @@ -52,26 +52,18 @@ block_builder.build_block │ ┕━ mempool.select_block ┝━ block_builder.get_block_inputs │ ┝━ block_builder.summarize_batches -│ ┕━ store.client.get_block_inputs -│ ┕━ store.rpc/GetBlockInputs -│ ┕━ store.server.get_block_inputs -│ ┝━ validate_nullifiers -│ ┝━ read_account_ids -│ ┝━ validate_note_commitments -│ ┝━ select_block_header_by_block_num -│ ┝━ select_note_inclusion_proofs -│ ┕━ select_block_headers +│ ┕━ store.state.get_block_inputs +│ ┝━ select_note_inclusion_proofs +│ ┕━ select_block_headers ┝━ block_builder.prove_block │ ┝━ execute_program │ ┕━ block_builder.simulate_proving ┝━ block_builder.inject_failure ┕━ block_builder.commit_block - ┝━ store.client.apply_block - │ ┕━ store.rpc/ApplyBlock - │ ┕━ store.server.apply_block - │ ┕━ apply_block - │ ┝━ select_block_header_by_block_num - │ ┕━ update_in_memory_structs + ┝━ store.state.apply_block_with_proving_inputs + │ ┕━ apply_block + │ ┝━ select_block_header_by_block_num + │ ┕━ update_in_memory_structs ┝━ mempool.lock ┕━ mempool.commit_block ┕━ mempool.revert_expired_transactions @@ -94,7 +86,7 @@ batch_builder.build_batch │ ┝━ mempool.lock │ ┕━ mempool.select_batch ┝━ batch_builder.get_batch_inputs -│ ┕━ store.client.get_batch_inputs +│ ┕━ store.state.get_batch_inputs ┝━ batch_builder.propose_batch ┝━ batch_builder.prove_batch ┝━ batch_builder.inject_failure diff --git a/docs/internal/src/store.md b/docs/internal/src/store.md index 90bef64e6..ac18de763 100644 --- a/docs/internal/src/store.md +++ b/docs/internal/src/store.md @@ -25,5 +25,6 @@ full fixed configuration. Runtime-tuneable parameters are documented in the ## Architecture -The store consists mainly of a gRPC server which answers requests from the RPC and block-producer components, as well as -new block submissions from the block-producer. +The store consists mainly of state management plus a gRPC server which answers requests from the +RPC component. In sequencer mode, the block-producer uses the store state in-process for block +inputs and block application. diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 40e4541a2..c7cc50300 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -12,8 +12,7 @@ rust-version.workspace = true version.workspace = true [features] -# Enables the gRPC file descriptors for the internal component APIs, -# i.e. the `store` and `block-producer` gRPC services. +# Enables the gRPC file descriptors for the internal component APIs. internal = [] [lints] diff --git a/proto/proto/internal/store.proto b/proto/proto/internal/store.proto index c608a8b26..33ece7b37 100644 --- a/proto/proto/internal/store.proto +++ b/proto/proto/internal/store.proto @@ -5,9 +5,7 @@ package store; import "google/protobuf/empty.proto"; import "types/account.proto"; import "types/blockchain.proto"; -import "types/transaction.proto"; import "types/note.proto"; -import "types/primitives.proto"; import "rpc.proto"; // RPC STORE API @@ -78,169 +76,3 @@ service Rpc { // On lag, the stream is closed with a DATA_LOSS error. rpc ProofSubscription(rpc.ProofSubscriptionRequest) returns (stream rpc.ProofSubscriptionResponse) {} } - -// BLOCK PRODUCER STORE API -// ================================================================================================ - -// Store API for the BlockProducer component -service BlockProducer { - // Applies changes of a new block to the DB and in-memory data structures. - rpc ApplyBlock(ApplyBlockRequest) returns (google.protobuf.Empty) {} - - // Retrieves block header by given block number. Optionally, it also returns the MMR path - // and current chain length to authenticate the block's inclusion. - rpc GetBlockHeaderByNumber(rpc.BlockHeaderByNumberRequest) returns (rpc.BlockHeaderByNumberResponse) {} - - // Returns data required to prove the next block. - rpc GetBlockInputs(BlockInputsRequest) returns (BlockInputs) {} - - // Returns the inputs for a transaction batch. - rpc GetBatchInputs(BatchInputsRequest) returns (BatchInputs) {} - - // Returns data required to validate a new transaction. - rpc GetTransactionInputs(TransactionInputsRequest) returns (TransactionInputs) {} -} - -// APPLY BLOCK REQUEST -// ================================================================================================ - -// Applies a block to the state. -message ApplyBlockRequest { - // Ordered batches encoded using [miden_serde_utils::Serializable] implementation for - // [miden_objects::batch::OrderedBatches]. - bytes ordered_batches = 1; - // Block signed by the Validator. - blockchain.SignedBlock block = 2; -} - -// GET BLOCK INPUTS -// ================================================================================================ - -// Returns data required to prove the next block. -message BlockInputsRequest { - // IDs of all accounts updated in the proposed block for which to retrieve account witnesses. - repeated account.AccountId account_ids = 1; - - // Nullifiers of all notes consumed by the block for which to retrieve witnesses. - // - // Due to note erasure it will generally not be possible to know the exact set of nullifiers - // a block will create, unless we pre-execute note erasure. So in practice, this set of - // nullifiers will be the set of nullifiers of all proven batches in the block, which is a - // superset of the nullifiers the block may create. - // - // However, if it is known that a certain note will be erased, it would not be necessary to - // provide a nullifier witness for it. - repeated primitives.Digest nullifiers = 2; - - // Array of note IDs for which to retrieve note inclusion proofs, **if they exist in the store**. - repeated primitives.Digest unauthenticated_notes = 3; - - // Array of block numbers referenced by all batches in the block. - repeated fixed32 reference_blocks = 4; -} - -// Represents the result of getting block inputs. -message BlockInputs { - // A nullifier returned as a response to the `GetBlockInputs`. - message NullifierWitness { - // The nullifier. - primitives.Digest nullifier = 1; - - // The SMT proof to verify the nullifier's inclusion in the nullifier tree. - primitives.SmtOpening opening = 2; - } - // The latest block header. - blockchain.BlockHeader latest_block_header = 1; - - // Proof of each requested unauthenticated note's inclusion in a block, **if it existed in - // the store**. - repeated note.NoteInclusionInBlockProof unauthenticated_note_proofs = 2; - - // The serialized chain MMR which includes proofs for all blocks referenced by the - // above note inclusion proofs as well as proofs for inclusion of the requested blocks - // referenced by the batches in the block. - bytes partial_block_chain = 3; - - // The state commitments of the requested accounts and their authentication paths. - repeated account.AccountWitness account_witnesses = 4; - - // The requested nullifiers and their authentication paths. - repeated NullifierWitness nullifier_witnesses = 5; -} - -// GET BATCH INPUTS -// ================================================================================================ - -// Returns the inputs for a transaction batch. -message BatchInputsRequest { - // List of unauthenticated note commitments to be queried from the database. - repeated primitives.Digest note_commitments = 1; - // Set of block numbers referenced by transactions. - repeated fixed32 reference_blocks = 2; -} - -// Represents the result of getting batch inputs. -message BatchInputs { - // The block header that the transaction batch should reference. - blockchain.BlockHeader batch_reference_block_header = 1; - - // Proof of each _found_ unauthenticated note's inclusion in a block. - repeated note.NoteInclusionInBlockProof note_proofs = 2; - - // The serialized chain MMR which includes proofs for all blocks referenced by the - // above note inclusion proofs as well as proofs for inclusion of the blocks referenced - // by the transactions in the batch. - bytes partial_block_chain = 3; -} - -// GET TRANSACTION INPUTS -// ================================================================================================ - -// Returns data required to validate a new transaction. -message TransactionInputsRequest { - // ID of the account against which a transaction is executed. - account.AccountId account_id = 1; - // Set of nullifiers consumed by this transaction. - repeated primitives.Digest nullifiers = 2; - // Set of unauthenticated note commitments to check for existence on-chain. - // - // These are notes which were not on-chain at the state the transaction was proven, - // but could by now be present. - repeated primitives.Digest unauthenticated_notes = 3; -} - -// Represents the result of getting transaction inputs. -message TransactionInputs { - // An account returned as a response to the `GetTransactionInputs`. - message AccountTransactionInputRecord { - // The account ID. - account.AccountId account_id = 1; - - // The latest account commitment, zero commitment if the account doesn't exist. - primitives.Digest account_commitment = 2; - } - - // A nullifier returned as a response to the `GetTransactionInputs`. - message NullifierTransactionInputRecord { - // The nullifier ID. - primitives.Digest nullifier = 1; - - // The block at which the nullifier has been consumed, zero if not consumed. - fixed32 block_num = 2; - } - - // Account state proof. - AccountTransactionInputRecord account_state = 1; - - // List of nullifiers that have been consumed. - repeated NullifierTransactionInputRecord nullifiers = 2; - - // List of unauthenticated notes that were not found in the database. - repeated primitives.Digest found_unauthenticated_notes = 3; - - // The node's current block height. - fixed32 block_height = 4; - - // Whether the account ID prefix is unique. Only relevant for account creation requests. - optional bool new_account_id_prefix_is_unique = 5; // TODO: Replace this with an error. When a general error message exists. -} diff --git a/scripts/run-node.sh b/scripts/run-node.sh index 0968087bc..0dd92bd44 100755 --- a/scripts/run-node.sh +++ b/scripts/run-node.sh @@ -21,7 +21,7 @@ VALIDATOR_DIR="/tmp/validator" NTX_BUILDER_DIR="/tmp/ntx-builder" ACCOUNTS_DIR="/tmp/accounts" -# Primary store (block-producer mode): 2 APIs. +# Sequencer store. STORE_RPC_PORT=50001 STORE_BLOCK_PRODUCER_PORT=50003 @@ -107,7 +107,7 @@ fi echo "=== Starting components ===" -echo "Starting store (block-producer mode)..." +echo "Starting sequencer store..." OTEL_SERVICE_NAME=miden-store-primary $BINARY store start \ --rpc.listen "0.0.0.0:$STORE_RPC_PORT" \ --block-producer.listen "0.0.0.0:$STORE_BLOCK_PRODUCER_PORT" \