diff --git a/Cargo.lock b/Cargo.lock index 5db357e3e..78bec97de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3328,6 +3328,7 @@ dependencies = [ "fs-err", "humantime", "miden-node-block-producer", + "miden-node-proto", "miden-node-rpc", "miden-node-store", "miden-node-utils", diff --git a/bin/node/Cargo.toml b/bin/node/Cargo.toml index 85e96e5b9..fc716f8e5 100644 --- a/bin/node/Cargo.toml +++ b/bin/node/Cargo.toml @@ -23,6 +23,7 @@ clap = { features = ["env", "string"], workspace = true } fs-err = { workspace = true } humantime = { workspace = true } miden-node-block-producer = { workspace = true } +miden-node-proto = { workspace = true } miden-node-rpc = { workspace = true } miden-node-store = { workspace = true } miden-node-utils = { workspace = true } diff --git a/bin/node/src/commands/modes.rs b/bin/node/src/commands/modes.rs index 8eb4500da..993a8ea63 100644 --- a/bin/node/src/commands/modes.rs +++ b/bin/node/src/commands/modes.rs @@ -1,3 +1,4 @@ +use miden_node_proto::clients::{Builder, NtxBuilderClient, RpcClient, ValidatorClient}; use url::Url; use super::block_producer::BlockProducerOptions; @@ -27,11 +28,13 @@ impl SequencerCommand { pub fn handle(self) -> anyhow::Result<()> { let runtime = self.runtime.runtime_config(&self.store); self.block_producer.validate()?; + let validator = self.external_services.validator_client(); + let ntx_builder = self.external_services.ntx_builder_client(); let _ = ( runtime.rpc_listen, runtime.data_directory, - self.external_services.validator_url, - self.external_services.ntx_builder_url, + validator, + ntx_builder, self.block_producer.block_prover.url, runtime.database_options, runtime.internal_grpc_options, @@ -58,6 +61,28 @@ pub struct SequencerExternalServiceOptions { pub ntx_builder_url: Url, } +impl SequencerExternalServiceOptions { + fn validator_client(&self) -> ValidatorClient { + Builder::new(self.validator_url.clone()) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::() + } + + fn ntx_builder_client(&self) -> NtxBuilderClient { + Builder::new(self.ntx_builder_url.clone()) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::() + } +} + #[derive(clap::Args, Clone, Debug)] pub struct FullNodeCommand { #[command(flatten)] @@ -73,6 +98,7 @@ pub struct FullNodeCommand { impl FullNodeCommand { pub fn handle(self) -> anyhow::Result<()> { let runtime = self.runtime.runtime_config(&self.store); + let source_rpc = self.sync.source_rpc_client(); let _ = ( runtime.rpc_listen, runtime.data_directory, @@ -80,7 +106,7 @@ impl FullNodeCommand { runtime.internal_grpc_options, runtime.external_grpc_options, runtime.storage_options, - self.sync.block_source_url, + source_rpc, ); anyhow::bail!( @@ -89,3 +115,15 @@ impl FullNodeCommand { ) } } + +impl SyncOptions { + fn source_rpc_client(&self) -> RpcClient { + Builder::new(self.block_source_url.clone()) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::() + } +} diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 368cb63de..4254d631e 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -2,7 +2,7 @@ mod server; #[cfg(test)] mod tests; -pub use server::Rpc; +pub use server::{Rpc, RpcMode}; // CONSTANTS // ================================================================================================= diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index ed26ed6be..6143efa54 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -3,13 +3,7 @@ use std::sync::LazyLock; use std::time::Duration; use anyhow::Context; -use miden_node_proto::clients::{ - BlockProducerClient, - Builder, - NtxBuilderClient, - StoreRpcClient, - ValidatorClient, -}; +use miden_node_proto::clients::{NtxBuilderClient, StoreRpcClient}; use miden_node_proto::decode::{read_account_id, read_account_ids, read_block_range}; use miden_node_proto::domain::account::{AccountRequest, SlotData}; use miden_node_proto::errors::ConversionError; @@ -42,18 +36,17 @@ use miden_protocol::{MIN_PROOF_SECURITY_LEVEL, Word}; use miden_tx::TransactionVerifier; use miden_tx_batch_prover::LocalBatchProver; use tonic::{IntoRequest, Request, Response, Status}; -use tracing::{Span, debug, info, info_span}; -use url::Url; +use tracing::{Span, debug, info_span}; use crate::COMPONENT; +use crate::server::RpcMode; // RPC SERVICE // ================================================================================================ pub struct RpcService { store: StoreRpcClient, - block_producer: Option, - validator: ValidatorClient, + mode: RpcMode, ntx_builder: Option, genesis_commitment: Option, block_commitment_cache: LruCache, @@ -61,72 +54,14 @@ pub struct RpcService { impl RpcService { pub(super) fn new( - store_url: Url, - block_producer_url: Option, - validator_url: Url, - ntx_builder_url: Option, + store: StoreRpcClient, + mode: RpcMode, + ntx_builder: Option, commitment_cache_capacity: NonZeroUsize, ) -> Self { - let store = { - info!(target: COMPONENT, store_endpoint = %store_url, "Initializing store client"); - Builder::new(store_url) - .without_tls() - .without_timeout() - .without_metadata_version() - .without_metadata_genesis() - .with_otel_context_injection() - .connect_lazy::() - }; - - let block_producer = block_producer_url.map(|block_producer_url| { - info!( - target: COMPONENT, - block_producer_endpoint = %block_producer_url, - "Initializing block producer client", - ); - Builder::new(block_producer_url) - .without_tls() - .without_timeout() - .without_metadata_version() - .without_metadata_genesis() - .with_otel_context_injection() - .connect_lazy::() - }); - - let validator = { - info!( - target: COMPONENT, - validator_endpoint = %validator_url, - "Initializing validator client", - ); - Builder::new(validator_url) - .without_tls() - .without_timeout() - .without_metadata_version() - .without_metadata_genesis() - .with_otel_context_injection() - .connect_lazy::() - }; - - let ntx_builder = ntx_builder_url.map(|ntx_builder_url| { - info!( - target: COMPONENT, - ntx_builder_endpoint = %ntx_builder_url, - "Initializing ntx-builder client", - ); - Builder::new(ntx_builder_url) - .without_tls() - .without_timeout() - .without_metadata_version() - .without_metadata_genesis() - .with_otel_context_injection() - .connect_lazy::() - }); - Self { store, - block_producer, - validator, + mode, ntx_builder, genesis_commitment: None, block_commitment_cache: LruCache::new(commitment_cache_capacity), @@ -135,8 +70,8 @@ impl RpcService { /// Sets the genesis commitment, returning an error if it is already set. /// - /// Required since `RpcService::new()` sets up the `store` which is used to fetch the - /// `genesis_commitment`. + /// Required since the store client is used to fetch the `genesis_commitment` after + /// `RpcService` construction. pub fn set_genesis_commitment(&mut self, commitment: Word) -> anyhow::Result<()> { if self.genesis_commitment.is_some() { return Err(anyhow::anyhow!("genesis commitment already set")); @@ -502,12 +437,6 @@ impl api_server::Api for RpcService { ) -> Result, Status> { debug!(target: COMPONENT, request = ?request.get_ref()); - let Some(block_producer) = &self.block_producer else { - return Err(Status::unavailable( - "Transaction submission not available in read-only mode", - )); - }; - let request = request.into_inner(); let tx = ProvenTransaction::read_from_bytes(&request.transaction).map_err(|err| { @@ -567,10 +496,20 @@ impl api_server::Api for RpcService { )) })?; + // In full node mode we forward the request to the source. + let (block_producer, validator) = match &self.mode { + RpcMode::Sequencer { block_producer, validator } => { + (block_producer.as_ref(), validator.as_ref()) + }, + RpcMode::FullNode { source_rpc } => { + return source_rpc.as_ref().clone().submit_proven_tx(request).await; + }, + }; + // Transaction inputs must be provided in order to allow for transaction re-execution via // the Validator. if request.transaction_inputs.is_some() { - self.validator.clone().submit_proven_transaction(request.clone()).await?; + validator.clone().submit_proven_transaction(request.clone()).await?; } else { return Err(Status::invalid_argument("Transaction inputs must be provided")); } @@ -584,10 +523,6 @@ impl api_server::Api for RpcService { &self, request: tonic::Request, ) -> Result, Status> { - let Some(block_producer) = &self.block_producer else { - return Err(Status::unavailable("Batch submission not available in read-only mode")); - }; - let request = request.into_inner(); let proven_batch = ProvenBatch::read_from_bytes(&request.batch_proof).map_err(|err| { @@ -657,6 +592,16 @@ impl api_server::Api for RpcService { return Err(Status::invalid_argument("batch proof did not match proposed batch")); } + // In full node mode we forward the request to the source. + let (block_producer, validator) = match &self.mode { + RpcMode::Sequencer { block_producer, validator } => { + (block_producer.as_ref(), validator.as_ref()) + }, + RpcMode::FullNode { source_rpc } => { + return source_rpc.as_ref().clone().submit_proven_tx_batch(request).await; + }, + }; + // Submit each transaction to the validator. // // SAFETY: We checked earlier that the two iterators are the same length. @@ -665,7 +610,7 @@ impl api_server::Api for RpcService { transaction: tx.to_bytes(), transaction_inputs: inputs.clone().into(), }; - self.validator.clone().submit_proven_transaction(request).await?; + validator.clone().submit_proven_transaction(request).await?; } block_producer.clone().submit_proven_tx_batch(request).await @@ -704,15 +649,21 @@ impl api_server::Api for RpcService { let store_status = self.store.clone().status(Request::new(())).await.map(Response::into_inner).ok(); - let block_producer_status = if let Some(block_producer) = &self.block_producer { - block_producer + let block_producer_status = match &self.mode { + RpcMode::Sequencer { block_producer, .. } => block_producer + .as_ref() .clone() .status(Request::new(())) .await .map(Response::into_inner) + .ok(), + RpcMode::FullNode { source_rpc } => source_rpc + .as_ref() + .clone() + .status(Request::new(())) + .await .ok() - } else { - None + .and_then(|response| response.into_inner().block_producer), }; Ok(Response::new(proto::rpc::RpcStatus { diff --git a/crates/rpc/src/server/mod.rs b/crates/rpc/src/server/mod.rs index 6a8c3acc9..4eaebc21d 100644 --- a/crates/rpc/src/server/mod.rs +++ b/crates/rpc/src/server/mod.rs @@ -2,6 +2,13 @@ use std::num::NonZeroUsize; use accept::AcceptHeaderLayer; use anyhow::Context; +use miden_node_proto::clients::{ + BlockProducerClient, + NtxBuilderClient, + RpcClient as SourceRpcClient, + StoreRpcClient, + ValidatorClient, +}; use miden_node_proto::generated::rpc::api_server; use miden_node_proto_build::rpc_api_descriptor; use miden_node_utils::clap::GrpcOptionsExternal; @@ -16,7 +23,6 @@ use tonic_web::GrpcWebLayer; use tower_http::classify::{GrpcCode, GrpcErrorsAsFailures, SharedClassifier}; use tower_http::trace::TraceLayer; use tracing::info; -use url::Url; use crate::COMPONENT; use crate::server::health::HealthCheckLayer; @@ -28,17 +34,44 @@ mod health; /// The RPC server component. /// /// On startup, binds to the provided listener and starts serving the RPC API. -/// It connects lazily to the store, validator and block producer components as needed. -/// Requests will fail if the components are not available. +/// It uses the supplied clients for store access and mode-specific submission handling. +/// Requests will fail if the supplied clients cannot reach their components. pub struct Rpc { pub listener: TcpListener, - pub store_url: Url, - pub block_producer_url: Option, - pub validator_url: Url, - pub ntx_builder_url: Option, + pub store: StoreRpcClient, + pub mode: RpcMode, + pub ntx_builder: Option, pub grpc_options: GrpcOptionsExternal, } +#[derive(Clone, Debug)] +pub enum RpcMode { + /// Sequencer RPC validates submissions locally, re-executes them through the validator, then + /// forwards them to the block producer. + Sequencer { + block_producer: Box, + validator: Box, + }, + /// Full-node RPC forwards submissions to the source RPC. + /// + /// The caller is responsible for configuring this client with any request metadata the source + /// RPC requires. + FullNode { source_rpc: Box }, +} + +impl RpcMode { + pub fn sequencer(block_producer: BlockProducerClient, validator: ValidatorClient) -> Self { + Self::Sequencer { + block_producer: Box::new(block_producer), + validator: Box::new(validator), + } + } + + pub fn full_node(source_rpc: SourceRpcClient) -> Self { + Self::FullNode { source_rpc: Box::new(source_rpc) } + } +} + impl Rpc { /// Serves the RPC API. /// @@ -46,10 +79,9 @@ impl Rpc { /// a fatal error is encountered. pub async fn serve(self) -> anyhow::Result<()> { let mut api = api::RpcService::new( - self.store_url.clone(), - self.block_producer_url.clone(), - self.validator_url, - self.ntx_builder_url.clone(), + self.store.clone(), + self.mode.clone(), + self.ntx_builder.clone(), NonZeroUsize::new(1_000_000).unwrap(), ); @@ -66,7 +98,7 @@ impl Rpc { .build_v1() .context("failed to build reflection service")?; - info!(target: COMPONENT, endpoint=?self.listener, store=%self.store_url, block_producer=?self.block_producer_url, "Server initialized"); + info!(target: COMPONENT, endpoint=?self.listener, mode=?self.mode, "Server initialized"); let rpc_version = env!("CARGO_PKG_VERSION"); let rpc_version = diff --git a/crates/rpc/src/tests.rs b/crates/rpc/src/tests.rs index 75c67500c..49732384f 100644 --- a/crates/rpc/src/tests.rs +++ b/crates/rpc/src/tests.rs @@ -4,7 +4,15 @@ use std::time::Duration; use http::header::{ACCEPT, CONTENT_TYPE}; use http::{HeaderMap, HeaderValue}; -use miden_node_proto::clients::{Builder, GrpcClient, Interceptor, RpcClient}; +use miden_node_proto::clients::{ + BlockProducerClient, + Builder, + GrpcClient, + Interceptor, + RpcClient, + StoreRpcClient, + ValidatorClient, +}; use miden_node_proto::generated::rpc::api_client::ApiClient as ProtoClient; use miden_node_proto::generated::{self as proto}; use miden_node_store::genesis::config::GenesisConfig; @@ -42,7 +50,7 @@ use tokio::task; use tokio::time::sleep; use url::Url; -use crate::Rpc; +use crate::{Rpc, RpcMode}; /// A wrapper around the store runtime and data directory. /// @@ -670,12 +678,32 @@ async fn start_rpc_with_options( let block_producer_url = Url::parse(&format!("http://{block_producer_addr}")).unwrap(); // SAFETY: Using dummy validator URL for test - not actually contacted in this test let validator_url = Url::parse("http://127.0.0.1:0").unwrap(); + let store = Builder::new(store_url) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::(); + let block_producer = Builder::new(block_producer_url) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::(); + let validator = Builder::new(validator_url) + .without_tls() + .without_timeout() + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::(); Rpc { listener: rpc_listener, - store_url, - block_producer_url: Some(block_producer_url), - validator_url, - ntx_builder_url: None, + store, + mode: RpcMode::sequencer(block_producer, validator), + ntx_builder: None, grpc_options, } .serve()