diff --git a/Cargo.lock b/Cargo.lock index 78bec97de..06a910230 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3346,7 +3346,6 @@ dependencies = [ "futures", "itertools 0.14.0", "miden-node-proto", - "miden-node-proto-build", "miden-node-store", "miden-node-utils", "miden-protocol", @@ -3361,10 +3360,7 @@ dependencies = [ "tempfile", "thiserror 2.0.18", "tokio", - "tokio-stream", "tonic", - "tonic-reflection", - "tower-http", "tracing", "url", ] @@ -3446,6 +3442,7 @@ dependencies = [ "futures", "http 1.4.0", "mediatype", + "miden-node-block-producer", "miden-node-proto", "miden-node-proto-build", "miden-node-store", diff --git a/crates/block-producer/Cargo.toml b/crates/block-producer/Cargo.toml index f78319162..5acce1d69 100644 --- a/crates/block-producer/Cargo.toml +++ b/crates/block-producer/Cargo.toml @@ -26,7 +26,6 @@ anyhow = { workspace = true } futures = { workspace = true } itertools = { workspace = true } miden-node-proto = { workspace = true } -miden-node-proto-build = { features = ["internal"], workspace = true } miden-node-utils = { features = ["testing"], workspace = true } miden-protocol = { default-features = true, workspace = true } miden-remote-prover-client = { features = ["batch-prover", "block-prover"], workspace = true } @@ -34,10 +33,7 @@ miden-tx-batch-prover = { workspace = true } rand = { workspace = true } thiserror = { workspace = true } tokio = { features = ["macros", "net", "rt-multi-thread"], workspace = true } -tokio-stream = { features = ["net"], workspace = true } tonic = { default-features = true, features = ["transport"], workspace = true } -tonic-reflection = { workspace = true } -tower-http = { features = ["util"], workspace = true } tracing = { workspace = true } url = { workspace = true } diff --git a/crates/block-producer/README.md b/crates/block-producer/README.md index eb35acc27..9968d4646 100644 --- a/crates/block-producer/README.md +++ b/crates/block-producer/README.md @@ -3,14 +3,14 @@ Contains code defining the [Miden node's block-producer](/README.md#architecture) component. It is responsible for ordering transactions into blocks and submitting these for inclusion in the blockchain. -It serves a small [gRPC](https://grpc.io) API which the node's RPC component uses to submit new transactions. In turn, -the `block-producer` uses the store's gRPC API to submit blocks and query chain state. +It exposes an in-process API which the node's RPC component uses to submit new transactions. In turn, the +`block-producer` uses the store's gRPC API to submit blocks and query chain state. For more information on the installation and operation of this component, please see the [node's readme](../../README.md). ## API -The full gRPC API can be found [here](../../proto/proto/block_producer.proto). +The API is exposed by `BlockProducerApi`. --- diff --git a/crates/block-producer/src/errors.rs b/crates/block-producer/src/errors.rs index 862a1a2ce..b5d880956 100644 --- a/crates/block-producer/src/errors.rs +++ b/crates/block-producer/src/errors.rs @@ -77,6 +77,9 @@ pub enum MempoolSubmissionError { #[error("mempool lock is poisoned")] #[grpc(internal)] MempoolPoisoned(#[source] MempoolPoisonError), + + #[error("missing proposed batch")] + MissingProposedBatch, } // Mempool submission conflicts with current state diff --git a/crates/block-producer/src/lib.rs b/crates/block-producer/src/lib.rs index 955aa2356..353a6a71c 100644 --- a/crates/block-producer/src/lib.rs +++ b/crates/block-producer/src/lib.rs @@ -18,7 +18,8 @@ pub mod errors; mod errors; pub mod server; -pub use server::BlockProducer; +pub use errors::MempoolSubmissionError; +pub use server::{BlockProducer, BlockProducerApi, BlockProducerApiConfig, BlockProducerRuntime}; // CONSTANTS // ================================================================================================= diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 330ce242f..d49609f70 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -84,7 +84,7 @@ mod tests; // MEMPOOL CONFIGURATION // ================================================================================================ -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SharedMempool(Arc>); #[derive(Debug, Error, Clone, Copy, PartialEq, Eq)] diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index 903fb03fb..5019df460 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -1,26 +1,17 @@ use std::collections::HashMap; -use std::net::SocketAddr; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; -use anyhow::{Context, Result}; -use miden_node_proto::generated::block_producer::api_server; +use anyhow::Result; use miden_node_proto::generated::{self as proto}; -use miden_node_proto_build::block_producer_api_descriptor; -use miden_node_utils::clap::GrpcOptionsInternal; use miden_node_utils::formatting::{format_input_notes, format_output_notes}; -use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn}; -use miden_node_utils::tracing::grpc::grpc_trace_fn; use miden_protocol::batch::ProposedBatch; use miden_protocol::block::BlockNumber; use miden_protocol::transaction::ProvenTransaction; use miden_protocol::utils::serde::Deserializable; -use tokio::net::TcpListener; use tokio::sync::{Mutex, RwLock}; -use tokio_stream::wrappers::TcpListenerStream; -use tonic::Status; -use tower_http::trace::TraceLayer; +use tokio::task::{Id, JoinSet}; use tracing::{debug, error, info, instrument}; use url::Url; @@ -36,15 +27,48 @@ use crate::{CACHED_MEMPOOL_STATS_UPDATE_INTERVAL, COMPONENT, SERVER_NUM_BATCH_BU #[cfg(test)] mod tests; -/// The block producer server. +/// Configuration for the in-process block producer API. +#[derive(Clone, Copy, Debug)] +pub struct BlockProducerApiConfig { + /// The maximum number of transactions per batch. + pub max_txs_per_batch: usize, + /// The maximum number of batches per block. + pub max_batches_per_block: usize, + /// The maximum number of inflight transactions allowed in the mempool at once. + pub mempool_tx_capacity: NonZeroUsize, +} + +impl Default for BlockProducerApiConfig { + fn default() -> Self { + Self { + max_txs_per_batch: crate::DEFAULT_MAX_TXS_PER_BATCH, + max_batches_per_block: crate::DEFAULT_MAX_BATCHES_PER_BLOCK, + mempool_tx_capacity: crate::DEFAULT_MEMPOOL_TX_CAPACITY, + } + } +} + +impl BlockProducerApiConfig { + fn mempool_config(self) -> MempoolConfig { + MempoolConfig { + batch_budget: BatchBudget { + transactions: self.max_txs_per_batch, + ..BatchBudget::default() + }, + block_budget: BlockBudget { batches: self.max_batches_per_block }, + tx_capacity: self.mempool_tx_capacity, + ..Default::default() + } + } +} + +/// The block producer runtime. /// /// Specifies how to connect to the store, batch prover, and block prover components. /// The connection to the store is established at startup and retried with exponential backoff /// until the store becomes available. Once the connection is established, the block producer -/// will start serving requests. +/// starts its batch and block builders. pub struct BlockProducer { - /// The address of the block producer component. - pub block_producer_address: SocketAddr, /// The address of the store component. pub store_url: Url, /// The address of the validator component. @@ -59,8 +83,6 @@ pub struct BlockProducer { pub max_txs_per_batch: usize, /// The maximum number of batches per block. pub max_batches_per_block: usize, - /// Server-side gRPC options. - pub grpc_options: GrpcOptionsInternal, /// The maximum number of inflight transactions allowed in the mempool at once. pub mempool_tx_capacity: NonZeroUsize, @@ -70,12 +92,12 @@ pub struct BlockProducer { // ================================================================================================ impl BlockProducer { - /// Serves the block-producer RPC API, the batch-builder and the block-builder. + /// Starts the block producer and returns its in-process API. /// - /// Executes in place (i.e. not spawned) and will run indefinitely until a fatal error is - /// encountered. - pub async fn serve(self) -> anyhow::Result<()> { - info!(target: COMPONENT, endpoint=?self.block_producer_address, store=%self.store_url, "Initializing server"); + /// The returned handle owns the batch and block builder tasks. Dropping the handle stops those + /// tasks. + pub async fn start(self) -> Result { + info!(target: COMPONENT, store=%self.store_url, "Initializing block producer"); let store = StoreClient::new(self.store_url.clone()); let validator = BlockProducerValidatorClient::new(self.validator_url.clone()); @@ -108,11 +130,7 @@ impl BlockProducer { } }; - let listener = TcpListener::bind(self.block_producer_address) - .await - .context("failed to bind to block producer address")?; - - info!(target: COMPONENT, "Server initialized"); + info!(target: COMPONENT, "Block producer initialized"); let block_builder = BlockBuilder::new(store.clone(), validator, self.block_interval); let batch_builder = BatchBuilder::new( @@ -121,36 +139,19 @@ impl BlockProducer { self.batch_prover_url, self.batch_interval, ); - let mempool = MempoolConfig { - batch_budget: BatchBudget { - transactions: self.max_txs_per_batch, - ..BatchBudget::default() - }, - block_budget: BlockBudget { batches: self.max_batches_per_block }, - tx_capacity: self.mempool_tx_capacity, - ..Default::default() + let api_config = BlockProducerApiConfig { + max_txs_per_batch: self.max_txs_per_batch, + max_batches_per_block: self.max_batches_per_block, + mempool_tx_capacity: self.mempool_tx_capacity, }; - let mempool = Mempool::shared(chain_tip, mempool); + let mempool = Mempool::shared(chain_tip, api_config.mempool_config()); + let api = BlockProducerApi::from_shared_mempool(mempool.clone(), store); - // Spawn rpc server and batch and block provers. - // - // These communicate indirectly via a shared mempool. + // Spawn batch and block builders. These communicate indirectly via a shared mempool. // // These should run forever, so we combine them into a joinset so that if // any complete or fail, we can shutdown the rest (somewhat) gracefully. - let mut tasks = tokio::task::JoinSet::new(); - - // Launch the gRPC server. - let rpc_id = tasks - .spawn({ - let mempool = mempool.clone(); - async move { - BlockProducerRpcServer::new(mempool, store) - .serve(listener, self.grpc_options) - .await - } - }) - .id(); + let mut tasks = JoinSet::new(); let batch_builder_id = tasks .spawn({ @@ -168,19 +169,48 @@ impl BlockProducer { let task_ids = HashMap::from([ (batch_builder_id, "batch-builder"), (block_builder_id, "block-builder"), - (rpc_id, "rpc"), ]); + Ok(BlockProducerRuntime { api, tasks, task_ids }) + } + + /// Serves the block producer's batch-builder and block-builder tasks. + /// + /// Executes in place (i.e. not spawned) and will run indefinitely until a fatal error is + /// encountered. + pub async fn serve(self) -> anyhow::Result<()> { + self.start().await?.wait().await + } +} + +/// Running block producer tasks plus the API used to submit work to them. +pub struct BlockProducerRuntime { + api: BlockProducerApi, + tasks: JoinSet>, + task_ids: HashMap, +} + +impl BlockProducerRuntime { + /// Returns a cloneable handle to the block producer API. + pub fn api(&self) -> BlockProducerApi { + self.api.clone() + } + + /// Waits for the block producer runtime to end. + /// + /// The batch and block builder tasks should run indefinitely, so this returns an error when any + /// task completes. + pub async fn wait(mut self) -> anyhow::Result<()> { // Wait for any task to end. They should run indefinitely, so this is an unexpected result. // // SAFETY: The JoinSet is definitely not empty. - let task_result = tasks.join_next_with_id().await.unwrap(); + let task_result = self.tasks.join_next_with_id().await.unwrap(); let task_id = match &task_result { Ok((id, _)) => *id, Err(err) => err.id(), }; - let task = task_ids.get(&task_id).unwrap_or(&"unknown"); + let task = self.task_ids.get(&task_id).copied().unwrap_or("unknown"); // We could abort the other tasks here, but not much point as we're probably crashing the // node. @@ -194,18 +224,19 @@ impl BlockProducer { } } -// BLOCK PRODUCER RPC SERVER +// BLOCK PRODUCER API // ================================================================================================ -/// Serves the block producer's RPC [api](api_server::Api). -struct BlockProducerRpcServer { +/// In-process block producer API used by the RPC layer. +#[derive(Clone, Debug)] +pub struct BlockProducerApi { /// The mutex effectively rate limits incoming transactions into the mempool by forcing them /// through a queue. /// /// This gives mempool users such as the batch and block builders equal footing with __all__ /// incoming transactions combined. Without this incoming transactions would greatly restrict /// the block-producers usage of the mempool. - mempool: Mutex, + mempool: Arc>, store: StoreClient, @@ -214,83 +245,59 @@ struct BlockProducerRpcServer { cached_mempool_stats: Arc>, } -impl BlockProducerRpcServer { - pub fn new(mempool: SharedMempool, store: StoreClient) -> Self { - Self { - mempool: Mutex::new(mempool), - store, - cached_mempool_stats: Arc::new(RwLock::new(MempoolStats::default())), - } +impl BlockProducerApi { + /// Creates an API backed by a fresh mempool. + pub fn new(store: StoreClient, chain_tip: BlockNumber, config: BlockProducerApiConfig) -> Self { + Self::from_shared_mempool(Mempool::shared(chain_tip, config.mempool_config()), store) } - // SERVER STARTUP - // -------------------------------------------------------------------------------------------- - - async fn serve( - self, - listener: TcpListener, - grpc_options: GrpcOptionsInternal, - ) -> anyhow::Result<()> { - // Start background task to periodically update cached mempool stats - self.spawn_mempool_stats_updater().await; - - let reflection_service = tonic_reflection::server::Builder::configure() - .register_file_descriptor_set(block_producer_api_descriptor()) - .build_v1() - .context("failed to build reflection service")?; - - // Build the gRPC server with the API service and trace layer. - - tonic::transport::Server::builder() - .accept_http1(true) - .timeout(grpc_options.request_timeout) - .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) - .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) - .add_service(api_server::ApiServer::new(self)) - .add_service(reflection_service) - .serve_with_incoming(TcpListenerStream::new(listener)) - .await - .context("failed to serve block producer API") + fn from_shared_mempool(mempool: SharedMempool, store: StoreClient) -> Self { + let cached_mempool_stats = mempool + .lock() + .map(|mempool| MempoolStats::from_mempool(&mempool)) + .unwrap_or_default(); + let api = Self { + mempool: Arc::new(Mutex::new(mempool)), + store, + cached_mempool_stats: Arc::new(RwLock::new(cached_mempool_stats)), + }; + api.spawn_mempool_stats_updater(); + api } /// Starts a background task that periodically updates the cached mempool statistics. /// /// This prevents the need to lock the mempool for each status request. - async fn spawn_mempool_stats_updater(&self) { + fn spawn_mempool_stats_updater(&self) { let cached_mempool_stats = Arc::clone(&self.cached_mempool_stats); - let mempool = self.mempool.lock().await.clone(); + let mempool = Arc::clone(&self.mempool); + + let Ok(handle) = tokio::runtime::Handle::try_current() else { + return; + }; - tokio::spawn(async move { + handle.spawn(async move { + let mempool = mempool.lock().await.clone(); let mut interval = tokio::time::interval(CACHED_MEMPOOL_STATS_UPDATE_INTERVAL); loop { interval.tick().await; - let (chain_tip, unbatched_transactions, proposed_batches, proven_batches) = { + let stats = { let Ok(mempool) = mempool.lock() else { tracing::error!("mempool lock poisoned, stopping mempool stats updater"); return; }; - ( - mempool.chain_tip(), - mempool.unbatched_transactions_count() as u64, - mempool.proposed_batches_count() as u64, - mempool.proven_batches_count() as u64, - ) + MempoolStats::from_mempool(&mempool) }; let mut cache = cached_mempool_stats.write().await; - *cache = MempoolStats { - chain_tip, - unbatched_transactions, - proposed_batches, - proven_batches, - }; + *cache = stats; } }); } - // RPC ENDPOINTS + // ENDPOINTS // -------------------------------------------------------------------------------------------- #[instrument( @@ -300,7 +307,7 @@ impl BlockProducerRpcServer { err )] #[expect(clippy::let_and_return)] - async fn submit_proven_tx( + pub async fn submit_proven_tx( &self, request: proto::transaction::ProvenTransaction, ) -> Result { @@ -352,13 +359,12 @@ impl BlockProducerRpcServer { err )] #[expect(clippy::let_and_return)] - async fn submit_proven_tx_batch( + pub async fn submit_proven_tx_batch( &self, request: proto::transaction::TransactionBatch, ) -> Result { - let proposed = request - .proposed_batch - .expect("proposed batch existence is enforced by RPC component"); + let proposed = + request.proposed_batch.ok_or(MempoolSubmissionError::MissingProposedBatch)?; let batch = ProposedBatch::read_from_bytes(&proposed) .map_err(MempoolSubmissionError::DeserializationFailed)?; @@ -390,44 +396,16 @@ impl BlockProducerRpcServer { .map(Into::into); result } -} - -#[tonic::async_trait] -impl api_server::Api for BlockProducerRpcServer { - async fn submit_proven_tx( - &self, - request: tonic::Request, - ) -> Result, Status> { - self.submit_proven_tx(request.into_inner()) - .await - .map(tonic::Response::new) - // This Status::from mapping takes care of hiding internal errors. - .map_err(Into::into) - } - async fn submit_proven_tx_batch( - &self, - request: tonic::Request, - ) -> Result, Status> { - self.submit_proven_tx_batch(request.into_inner()) - .await - .map(tonic::Response::new) - // This Status::from mapping takes care of hiding internal errors. - .map_err(Into::into) - } - - async fn status( - &self, - _request: tonic::Request<()>, - ) -> Result, Status> { + pub async fn status(&self) -> proto::rpc::BlockProducerStatus { let mempool_stats = *self.cached_mempool_stats.read().await; - Ok(tonic::Response::new(proto::rpc::BlockProducerStatus { + proto::rpc::BlockProducerStatus { version: env!("CARGO_PKG_VERSION").to_string(), status: "connected".to_string(), chain_tip: mempool_stats.chain_tip.as_u32(), mempool_stats: Some(mempool_stats.into()), - })) + } } } @@ -435,7 +413,7 @@ impl api_server::Api for BlockProducerRpcServer { // ================================================================================================ /// Mempool statistics that are updated periodically to avoid locking the mempool. -#[derive(Clone, Copy, Default)] +#[derive(Clone, Copy, Debug, Default)] struct MempoolStats { /// The mempool's current view of the chain tip height. chain_tip: BlockNumber, @@ -447,6 +425,17 @@ struct MempoolStats { proven_batches: u64, } +impl MempoolStats { + fn from_mempool(mempool: &Mempool) -> Self { + Self { + chain_tip: mempool.chain_tip(), + unbatched_transactions: mempool.unbatched_transactions_count() as u64, + proposed_batches: mempool.proposed_batches_count() as u64, + proven_batches: mempool.proven_batches_count() as u64, + } + } +} + impl From for proto::rpc::MempoolStats { fn from(stats: MempoolStats) -> Self { proto::rpc::MempoolStats { diff --git a/crates/block-producer/src/server/tests.rs b/crates/block-producer/src/server/tests.rs index e5a7a9129..5ce9a71d6 100644 --- a/crates/block-producer/src/server/tests.rs +++ b/crates/block-producer/src/server/tests.rs @@ -1,16 +1,14 @@ use std::num::NonZeroUsize; use std::time::Duration; -use miden_node_proto::generated::block_producer::api_client as block_producer_client; use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, GenesisState, Store, StoreMode}; use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::fee::test_fee_params; use miden_protocol::testing::random_secret_key::random_secret_key; use miden_validator::{Validator, ValidatorSigner}; use tokio::net::TcpListener; -use tokio::time::sleep; +use tokio::time::{sleep, timeout}; use tokio::{runtime, task}; -use tonic::transport::{Channel, Endpoint}; use url::Url; use crate::{BlockProducer, DEFAULT_MAX_BATCHES_PER_BLOCK, DEFAULT_MAX_TXS_PER_BATCH}; @@ -40,19 +38,12 @@ impl Drop for TestStore { /// available, then start serving requests. #[tokio::test] async fn block_producer_startup_is_robust_to_network_failures() { - // get the addresses for the store and block producer + // get the addresses for the store and validator let store_addr = { let store_listener = TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port"); store_listener.local_addr().expect("store should get a local address") }; - let block_producer_addr = { - let block_producer_listener = - TcpListener::bind("127.0.0.1:0").await.expect("failed to bind block-producer"); - block_producer_listener - .local_addr() - .expect("Failed to get block-producer address") - }; let validator_addr = { let validator_listener = @@ -83,9 +74,8 @@ async fn block_producer_startup_is_robust_to_network_failures() { let store_url = Url::parse(&format!("http://{store_addr}")).expect("Failed to parse store URL"); let validator_url = Url::parse(&format!("http://{validator_addr}")).expect("Failed to parse validator URL"); - task::spawn(async move { + let block_producer = task::spawn(async move { BlockProducer { - block_producer_address: block_producer_addr, store_url, validator_url, batch_prover_url: None, @@ -93,52 +83,30 @@ async fn block_producer_startup_is_robust_to_network_failures() { block_interval: Duration::from_millis(500), max_txs_per_batch: DEFAULT_MAX_TXS_PER_BATCH, max_batches_per_block: DEFAULT_MAX_BATCHES_PER_BLOCK, - grpc_options, mempool_tx_capacity: NonZeroUsize::new(100).unwrap(), } - .serve() + .start() .await - .unwrap(); + .unwrap() }); - // test: connecting to the block producer should fail because the store is not yet started (and - // therefore the block producer is not yet listening) - let block_producer_endpoint = - Endpoint::try_from(format!("http://{block_producer_addr}")).expect("valid url"); - let block_producer_client = - block_producer_client::ApiClient::connect(block_producer_endpoint.clone()).await; + // test: startup should still be waiting because the store is not yet available. + sleep(Duration::from_millis(100)).await; assert!( - block_producer_client.is_err(), - "Block producer should not be available before store is started" + !block_producer.is_finished(), + "Block producer should wait until the store is started" ); // start the store let _store = start_store(store_addr).await; - // wait for the block producer's exponential backoff to connect to the store use a retry loop - // since CI environments may be slower - let block_producer_client = { - let mut attempts = 0; - loop { - attempts += 1; - match block_producer_client::ApiClient::connect(block_producer_endpoint.clone()).await { - Ok(client) => break client, - Err(_) if attempts < 30 => { - sleep(Duration::from_millis(200)).await; - }, - Err(e) => panic!( - "block producer client should connect after store is started (after {attempts} attempts): {e}" - ), - } - } - }; - - // test: status request against block-producer should succeed - let response = send_status_request(block_producer_client).await; - assert!(response.is_ok(), "Status request should succeed, got: {:?}", response.err()); + let block_producer = timeout(Duration::from_secs(10), block_producer) + .await + .expect("block producer should start after store is started") + .expect("block producer task should not panic"); - // verify the response contains expected data - let status = response.unwrap().into_inner(); + // verify the in-process API returns expected status data. + let status = block_producer.api().status().await; assert_eq!(status.status, "connected"); } @@ -185,10 +153,3 @@ async fn start_store(store_addr: std::net::SocketAddr) -> TestStore { _data_directory: data_directory, } } - -/// Sends a status request to the block producer to verify connectivity. -async fn send_status_request( - mut client: block_producer_client::ApiClient, -) -> Result, tonic::Status> { - client.status(()).await -} diff --git a/crates/proto/build.rs b/crates/proto/build.rs index 2cadf011c..78e0047fa 100644 --- a/crates/proto/build.rs +++ b/crates/proto/build.rs @@ -5,7 +5,6 @@ use std::process::Command; use codegen::{Function, Impl, Module, Trait, Type}; use fs_err as fs; use miden_node_proto_build::{ - block_producer_api_descriptor, ntx_builder_api_descriptor, remote_prover_api_descriptor, rpc_api_descriptor, @@ -29,7 +28,6 @@ fn main() -> miette::Result<()> { let descriptor_sets = [ rpc_api_descriptor(), store_api_descriptor(), - block_producer_api_descriptor(), remote_prover_api_descriptor(), validator_api_descriptor(), ntx_builder_api_descriptor(), diff --git a/crates/proto/src/clients/mod.rs b/crates/proto/src/clients/mod.rs index c1d7c0035..9bd378156 100644 --- a/crates/proto/src/clients/mod.rs +++ b/crates/proto/src/clients/mod.rs @@ -109,8 +109,6 @@ impl tonic::service::Interceptor for Interceptor { type InterceptedChannel = InterceptedService; type GeneratedRpcClient = generated::rpc::api_client::ApiClient; -type GeneratedBlockProducerClient = - generated::block_producer::api_client::ApiClient; type GeneratedStoreClientForBlockProducer = generated::store::block_producer_client::BlockProducerClient; type GeneratedStoreClientForRpc = generated::store::rpc_client::RpcClient; @@ -126,8 +124,6 @@ type GeneratedNtxBuilderClient = generated::ntx_builder::api_client::ApiClient &mut Self::Target { - &mut self.0 - } -} - -impl Deref for BlockProducerClient { - type Target = GeneratedBlockProducerClient; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - impl DerefMut for StoreBlockProducerClient { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 @@ -266,12 +248,6 @@ impl GrpcClient for RpcClient { } } -impl GrpcClient for BlockProducerClient { - fn with_interceptor(channel: Channel, interceptor: Interceptor) -> Self { - Self(GeneratedBlockProducerClient::new(InterceptedService::new(channel, interceptor))) - } -} - impl GrpcClient for StoreBlockProducerClient { fn with_interceptor(channel: Channel, interceptor: Interceptor) -> Self { Self(GeneratedStoreClientForBlockProducer::new(InterceptedService::new( diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index ed3d8ff90..67d833969 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -18,27 +18,28 @@ workspace = true doctest = false [dependencies] -anyhow = { workspace = true } -futures = { workspace = true } -http = { workspace = true } -mediatype = { version = "0.21" } -miden-node-proto = { workspace = true } -miden-node-proto-build = { workspace = true } -miden-node-utils = { workspace = true } -miden-protocol = { default-features = true, workspace = true } -miden-tx = { features = ["concurrent"], workspace = true } -miden-tx-batch-prover = { workspace = true } -semver = { version = "1.0" } -thiserror = { workspace = true } -tokio = { features = ["macros", "net", "rt-multi-thread"], workspace = true } -tokio-stream = { features = ["net"], workspace = true } -tonic = { default-features = true, features = ["tls-native-roots", "tls-ring"], workspace = true } -tonic-reflection = { workspace = true } -tonic-web = { workspace = true } -tower = { workspace = true } -tower-http = { features = ["trace"], workspace = true } -tracing = { workspace = true } -url = { workspace = true } +anyhow = { workspace = true } +futures = { workspace = true } +http = { workspace = true } +mediatype = { version = "0.21" } +miden-node-block-producer = { workspace = true } +miden-node-proto = { workspace = true } +miden-node-proto-build = { workspace = true } +miden-node-utils = { workspace = true } +miden-protocol = { default-features = true, workspace = true } +miden-tx = { features = ["concurrent"], workspace = true } +miden-tx-batch-prover = { workspace = true } +semver = { version = "1.0" } +thiserror = { workspace = true } +tokio = { features = ["macros", "net", "rt-multi-thread"], workspace = true } +tokio-stream = { features = ["net"], workspace = true } +tonic = { default-features = true, features = ["tls-native-roots", "tls-ring"], workspace = true } +tonic-reflection = { workspace = true } +tonic-web = { workspace = true } +tower = { workspace = true } +tower-http = { features = ["trace"], workspace = true } +tracing = { workspace = true } +url = { workspace = true } [dev-dependencies] miden-node-store = { features = ["rocksdb"], workspace = true } diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index 6143efa54..ccec3843d 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -514,7 +514,11 @@ impl api_server::Api for RpcService { return Err(Status::invalid_argument("Transaction inputs must be provided")); } - block_producer.clone().submit_proven_tx(request).await + block_producer + .submit_proven_tx(request) + .await + .map(Response::new) + .map_err(Into::into) } /// Deserializes the batch, strips MAST decorators from full output note scripts, rebuilds the @@ -613,7 +617,11 @@ impl api_server::Api for RpcService { validator.clone().submit_proven_transaction(request).await?; } - block_producer.clone().submit_proven_tx_batch(request).await + block_producer + .submit_proven_tx_batch(request) + .await + .map(Response::new) + .map_err(Into::into) } // -- Status & utility endpoints ---------------------------------------------------------- @@ -650,13 +658,7 @@ impl api_server::Api for RpcService { let store_status = self.store.clone().status(Request::new(())).await.map(Response::into_inner).ok(); let block_producer_status = match &self.mode { - RpcMode::Sequencer { block_producer, .. } => block_producer - .as_ref() - .clone() - .status(Request::new(())) - .await - .map(Response::into_inner) - .ok(), + RpcMode::Sequencer { block_producer, .. } => Some(block_producer.status().await), RpcMode::FullNode { source_rpc } => source_rpc .as_ref() .clone() diff --git a/crates/rpc/src/server/mod.rs b/crates/rpc/src/server/mod.rs index 4eaebc21d..0d20f711b 100644 --- a/crates/rpc/src/server/mod.rs +++ b/crates/rpc/src/server/mod.rs @@ -2,8 +2,8 @@ use std::num::NonZeroUsize; use accept::AcceptHeaderLayer; use anyhow::Context; +use miden_node_block_producer::BlockProducerApi; use miden_node_proto::clients::{ - BlockProducerClient, NtxBuilderClient, RpcClient as SourceRpcClient, StoreRpcClient, @@ -49,7 +49,7 @@ pub enum RpcMode { /// Sequencer RPC validates submissions locally, re-executes them through the validator, then /// forwards them to the block producer. Sequencer { - block_producer: Box, + block_producer: Box, validator: Box, }, /// Full-node RPC forwards submissions to the source RPC. @@ -60,7 +60,7 @@ pub enum RpcMode { } impl RpcMode { - pub fn sequencer(block_producer: BlockProducerClient, validator: ValidatorClient) -> Self { + pub fn sequencer(block_producer: BlockProducerApi, validator: ValidatorClient) -> Self { Self::Sequencer { block_producer: Box::new(block_producer), validator: Box::new(validator), diff --git a/crates/rpc/src/tests.rs b/crates/rpc/src/tests.rs index 49732384f..cf9b541f3 100644 --- a/crates/rpc/src/tests.rs +++ b/crates/rpc/src/tests.rs @@ -4,8 +4,9 @@ use std::time::Duration; use http::header::{ACCEPT, CONTENT_TYPE}; use http::{HeaderMap, HeaderValue}; +use miden_node_block_producer::store::StoreClient as BlockProducerStoreClient; +use miden_node_block_producer::{BlockProducerApi, BlockProducerApiConfig}; use miden_node_proto::clients::{ - BlockProducerClient, Builder, GrpcClient, Interceptor, @@ -660,13 +661,6 @@ async fn start_rpc_with_options( ) -> (RpcClient, std::net::SocketAddr, TcpListener) { let store_listener = TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port"); let store_addr = store_listener.local_addr().expect("store should get a local address"); - let block_producer_addr = { - let block_producer_listener = - TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind block-producer"); - block_producer_listener - .local_addr() - .expect("Failed to get block-producer address") - }; // Start the rpc component. let rpc_listener = TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind rpc"); @@ -674,24 +668,20 @@ async fn start_rpc_with_options( task::spawn(async move { // SAFETY: The store_addr is always valid as it is created from a `SocketAddr`. let store_url = Url::parse(&format!("http://{store_addr}")).unwrap(); - // SAFETY: The block_producer_addr is always valid as it is created from a `SocketAddr`. - let block_producer_url = Url::parse(&format!("http://{block_producer_addr}")).unwrap(); // SAFETY: Using dummy validator URL for test - not actually contacted in this test let validator_url = Url::parse("http://127.0.0.1:0").unwrap(); - let store = Builder::new(store_url) + let store = Builder::new(store_url.clone()) .without_tls() .without_timeout() .without_metadata_version() .without_metadata_genesis() .with_otel_context_injection() .connect_lazy::(); - let block_producer = Builder::new(block_producer_url) - .without_tls() - .without_timeout() - .without_metadata_version() - .without_metadata_genesis() - .with_otel_context_injection() - .connect_lazy::(); + let block_producer = BlockProducerApi::new( + BlockProducerStoreClient::new(store_url), + 0.into(), + BlockProducerApiConfig::default(), + ); let validator = Builder::new(validator_url) .without_tls() .without_timeout() diff --git a/proto/proto/README.md b/proto/proto/README.md index 5a3a9e321..f4f6554f8 100644 --- a/proto/proto/README.md +++ b/proto/proto/README.md @@ -1,6 +1,6 @@ # Proto Files Organization -The files are organized by a visibility hierarchy, where the root directory contains the public-facing RPC and remote prover protocols, while the `types` directory contains the data types used by these protocols. The `internal` directory contains the internal protocols used by the node, such as the store, non-transactional data, and block producer protocols. +The files are organized by a visibility hierarchy, where the root directory contains the public-facing RPC and remote prover protocols, while the `types` directory contains the data types used by these protocols. The `internal` directory contains the internal protocols used by the node, such as the store, non-transactional data, and validator protocols. The organization of the files is as follows: @@ -12,8 +12,8 @@ types/ └── xxx.proto internal/ ├── store.proto -├── ntx.proto -└── block_producer.proto +├── ntx_builder.proto +└── validator.proto ``` The public-facing files should only allow the usage of the `types` directory, to avoid service reflection to internal protocols. diff --git a/proto/proto/internal/block_producer.proto b/proto/proto/internal/block_producer.proto deleted file mode 100644 index d1fa9f0ce..000000000 --- a/proto/proto/internal/block_producer.proto +++ /dev/null @@ -1,26 +0,0 @@ -// Specification of the user facing gRPC API. -syntax = "proto3"; -package block_producer; - -import "google/protobuf/empty.proto"; -import "rpc.proto"; -import "types/blockchain.proto"; -import "types/transaction.proto"; - -// BLOCK PRODUCER SERVICE -// ================================================================================================ - -service Api { - // Returns the status info. - rpc Status(google.protobuf.Empty) returns (rpc.BlockProducerStatus) {} - - // Submits proven transaction to the Miden network. Returns the node's current block height. - rpc SubmitProvenTx(transaction.ProvenTransaction) returns (blockchain.BlockNumber) {} - - // Submits a batch of transactions to the Miden network. - // - // All transactions in this batch will be considered atomic, and be committed together or not all. - // - // Returns the node's current block height. - rpc SubmitProvenTxBatch(transaction.TransactionBatch) returns (blockchain.BlockNumber) {} -}