Skip to content

Commit 4dfda9f

Browse files
prestwichclaude
andcommitted
fix(host-rpc): address review feedback
- O(1) view_hash using front number + index math with debug_assert - Extract convert_rpc_block helper to deduplicate block conversion - Concurrent safe/finalized tag fetches via tokio::try_join! - Add inspect_err for metrics on stale-hint fallback walk - Make slot_seconds configurable via builder (default 12) - Error on missing cached_finalized during walk exhaustion recovery Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2069ca2 commit 4dfda9f

3 files changed

Lines changed: 70 additions & 55 deletions

File tree

crates/host-rpc/src/builder.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct RpcHostNotifierBuilder<P> {
1919
provider: P,
2020
buffer_capacity: usize,
2121
backfill_batch_size: u64,
22+
slot_seconds: u64,
2223
genesis_timestamp: u64,
2324
}
2425

@@ -32,6 +33,7 @@ where
3233
provider,
3334
buffer_capacity: crate::DEFAULT_BUFFER_CAPACITY,
3435
backfill_batch_size: crate::DEFAULT_BACKFILL_BATCH_SIZE,
36+
slot_seconds: crate::notifier::DEFAULT_SLOT_SECONDS,
3537
genesis_timestamp: 0,
3638
}
3739
}
@@ -48,6 +50,12 @@ where
4850
self
4951
}
5052

53+
/// Set the slot duration in seconds (default: 12).
54+
pub const fn with_slot_seconds(mut self, slot_seconds: u64) -> Self {
55+
self.slot_seconds = slot_seconds;
56+
self
57+
}
58+
5159
/// Set the genesis timestamp for epoch calculation.
5260
pub const fn with_genesis_timestamp(mut self, timestamp: u64) -> Self {
5361
self.genesis_timestamp = timestamp;
@@ -67,6 +75,7 @@ where
6775
header_sub,
6876
self.buffer_capacity,
6977
self.backfill_batch_size,
78+
self.slot_seconds,
7079
self.genesis_timestamp,
7180
))
7281
}

crates/host-rpc/src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,9 @@ pub enum RpcHostError {
1818
/// The RPC node returned no block for the requested number or tag.
1919
#[error("missing block {0}")]
2020
MissingBlock(BlockNumberOrTag),
21+
22+
/// Walk exhaustion recovery requires a cached finalized block number,
23+
/// but none has been fetched yet.
24+
#[error("no cached finalized block number for exhaustion recovery")]
25+
NoFinalizedBlock,
2126
}

crates/host-rpc/src/notifier.rs

Lines changed: 56 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use signet_types::primitives::{RecoveredBlock, SealedBlock, TransactionSigned};
1414
use std::{collections::VecDeque, sync::Arc, time::Instant};
1515
use tracing::{debug, info, warn};
1616

17-
/// Seconds per Ethereum slot.
18-
const SLOT_SECONDS: u64 = 12;
17+
/// Default seconds per slot (Ethereum mainnet).
18+
pub(crate) const DEFAULT_SLOT_SECONDS: u64 = 12;
1919
/// Slots per Ethereum epoch.
2020
const SLOTS_PER_EPOCH: u64 = 32;
2121

@@ -80,6 +80,9 @@ pub struct RpcHostNotifier<P> {
8080
/// Max blocks per backfill batch.
8181
backfill_batch_size: u64,
8282

83+
/// Seconds per slot, used for epoch calculation.
84+
slot_seconds: u64,
85+
8386
/// Genesis timestamp, used for epoch calculation.
8487
genesis_timestamp: u64,
8588
}
@@ -104,6 +107,7 @@ where
104107
header_sub: SubscriptionStream<RpcHeader>,
105108
buffer_capacity: usize,
106109
backfill_batch_size: u64,
110+
slot_seconds: u64,
107111
genesis_timestamp: u64,
108112
) -> Self {
109113
Self {
@@ -116,6 +120,7 @@ where
116120
last_tag_epoch: None,
117121
backfill_from: None,
118122
backfill_batch_size,
123+
slot_seconds,
119124
genesis_timestamp,
120125
}
121126
}
@@ -127,9 +132,17 @@ where
127132
self.chain_view.back().copied()
128133
}
129134

130-
/// Look up a hash in the chain view by block number.
135+
/// Look up a hash in the chain view by block number (O(1)).
131136
fn view_hash(&self, number: u64) -> Option<B256> {
132-
self.chain_view.iter().rev().find(|(n, _)| *n == number).map(|(_, h)| *h)
137+
let &(front_number, _) = self.chain_view.front()?;
138+
let index = number.checked_sub(front_number)? as usize;
139+
let &(found_number, hash) = self.chain_view.get(index)?;
140+
debug_assert_eq!(
141+
found_number, number,
142+
"chain_view contiguity invariant violated: expected block {number} at index {index}, \
143+
found {found_number}"
144+
);
145+
Some(hash)
133146
}
134147

135148
/// Append entries to the chain view, evicting oldest if over capacity.
@@ -153,6 +166,30 @@ where
153166

154167
// ── Block fetching ─────────────────────────────────────────────
155168

169+
/// Convert an RPC block response and its receipts into an [`RpcBlock`].
170+
fn convert_rpc_block(
171+
rpc_block: alloy::rpc::types::Block,
172+
rpc_receipts: Option<Vec<alloy::rpc::types::TransactionReceipt>>,
173+
) -> RpcBlock {
174+
let hash = rpc_block.header.hash;
175+
let block = rpc_block
176+
.map_transactions(|tx| {
177+
let recovered = tx.inner;
178+
let signer = recovered.signer();
179+
let tx: TransactionSigned = recovered.into_inner().into();
180+
Recovered::new_unchecked(tx, signer)
181+
})
182+
.into_consensus();
183+
let sealed_header = Sealed::new_unchecked(block.header, hash);
184+
let block: RecoveredBlock = SealedBlock::new(sealed_header, block.body.transactions);
185+
let receipts = rpc_receipts
186+
.unwrap_or_default()
187+
.into_iter()
188+
.map(|r| r.inner.into_primitives_receipt())
189+
.collect();
190+
RpcBlock { block, receipts }
191+
}
192+
156193
/// Fetch a single block with receipts, anchored by hash.
157194
///
158195
/// The block and receipt fetches are concurrent via [`tokio::try_join!`].
@@ -169,25 +206,8 @@ where
169206
)?;
170207

171208
let rpc_block = rpc_block.ok_or(RpcHostError::MissingBlockByHash(hash))?;
172-
let rpc_receipts = rpc_receipts.unwrap_or_default();
173-
174-
let block_hash = rpc_block.header.hash;
175-
let block = rpc_block
176-
.map_transactions(|tx| {
177-
let recovered = tx.inner;
178-
let signer = recovered.signer();
179-
let tx: TransactionSigned = recovered.into_inner().into();
180-
Recovered::new_unchecked(tx, signer)
181-
})
182-
.into_consensus();
183-
let sealed_header = Sealed::new_unchecked(block.header, block_hash);
184-
let block: RecoveredBlock = SealedBlock::new(sealed_header, block.body.transactions);
185-
186-
let receipts =
187-
rpc_receipts.into_iter().map(|r| r.inner.into_primitives_receipt()).collect();
188-
189209
crate::metrics::record_fetch_block_duration(start.elapsed());
190-
Ok(RpcBlock { block, receipts })
210+
Ok(Self::convert_rpc_block(rpc_block, rpc_receipts))
191211
}
192212

193213
/// Fetch full blocks+receipts for a list of hashes, concurrently.
@@ -224,25 +244,8 @@ where
224244
)?;
225245

226246
let rpc_block = rpc_block.ok_or(RpcHostError::MissingBlock(tag))?;
227-
let rpc_receipts = rpc_receipts.unwrap_or_default();
228-
229-
let hash = rpc_block.header.hash;
230-
let block = rpc_block
231-
.map_transactions(|tx| {
232-
let recovered = tx.inner;
233-
let signer = recovered.signer();
234-
let tx: TransactionSigned = recovered.into_inner().into();
235-
Recovered::new_unchecked(tx, signer)
236-
})
237-
.into_consensus();
238-
let sealed_header = Sealed::new_unchecked(block.header, hash);
239-
let block: RecoveredBlock = SealedBlock::new(sealed_header, block.body.transactions);
240-
241-
let receipts =
242-
rpc_receipts.into_iter().map(|r| r.inner.into_primitives_receipt()).collect();
243-
244247
crate::metrics::record_fetch_block_duration(start.elapsed());
245-
Ok(RpcBlock { block, receipts })
248+
Ok(Self::convert_rpc_block(rpc_block, rpc_receipts))
246249
}
247250

248251
/// Fetch a range of blocks by number concurrently (used for backfill only).
@@ -269,7 +272,7 @@ where
269272

270273
/// Derive the epoch number from a block timestamp.
271274
const fn epoch_of(&self, timestamp: u64) -> u64 {
272-
timestamp.saturating_sub(self.genesis_timestamp) / (SLOT_SECONDS * SLOTS_PER_EPOCH)
275+
timestamp.saturating_sub(self.genesis_timestamp) / (self.slot_seconds * SLOTS_PER_EPOCH)
273276
}
274277

275278
/// Refresh safe/finalized block numbers if an epoch boundary was crossed.
@@ -281,16 +284,12 @@ where
281284
return Ok(());
282285
}
283286

284-
let safe = self
285-
.provider
286-
.get_block_by_number(BlockNumberOrTag::Safe)
287-
.await?
288-
.map(|b| b.header().number());
289-
let finalized = self
290-
.provider
291-
.get_block_by_number(BlockNumberOrTag::Finalized)
292-
.await?
293-
.map(|b| b.header().number());
287+
let (safe, finalized) = tokio::try_join!(
288+
self.provider.get_block_by_number(BlockNumberOrTag::Safe),
289+
self.provider.get_block_by_number(BlockNumberOrTag::Finalized),
290+
)?;
291+
let safe = safe.map(|b| b.header().number());
292+
let finalized = finalized.map(|b| b.header().number());
294293

295294
self.cached_safe = safe;
296295
self.cached_finalized = finalized;
@@ -421,8 +420,9 @@ where
421420
.get_block_by_number(BlockNumberOrTag::Latest)
422421
.await?
423422
.ok_or(RpcHostError::MissingBlock(BlockNumberOrTag::Latest))?;
424-
let latest_hash = latest.header.hash;
425-
self.walk_chain(latest_hash).await?
423+
self.walk_chain(latest.header.hash).await.inspect_err(|_| {
424+
crate::metrics::inc_rpc_errors();
425+
})?
426426
}
427427
Err(e) => {
428428
crate::metrics::inc_rpc_errors();
@@ -437,12 +437,13 @@ where
437437
return Ok(None);
438438
}
439439
WalkResult::Exhausted => {
440+
let finalized = self.cached_finalized.ok_or(RpcHostError::NoFinalizedBlock)?;
440441
warn!(
441442
buffer_capacity = self.buffer_capacity,
442-
"walk exhausted buffer, resetting to backfill"
443+
finalized, "walk exhausted buffer, resetting to backfill from finalized"
443444
);
444445
self.chain_view.clear();
445-
self.backfill_from = Some(self.cached_finalized.unwrap_or(0));
446+
self.backfill_from = Some(finalized);
446447
self.last_tag_epoch = None;
447448
crate::metrics::record_handle_new_head_duration(start.elapsed());
448449
return Ok(None);

0 commit comments

Comments
 (0)