From b34ce46d0afafde1495fd09897e17783c48cdc32 Mon Sep 17 00:00:00 2001 From: Gianmarco Fraccaroli Date: Tue, 26 May 2026 23:05:46 +0200 Subject: [PATCH 1/5] fix(eth-rpc): MempoolFilter returns pending mempool txs, not chain events --- src/daemon/mod.rs | 8 +- src/rpc/methods/eth.rs | 75 +---- src/rpc/methods/eth/filter/mempool.rs | 288 ++++++++++++++---- src/rpc/methods/eth/filter/mod.rs | 16 +- src/tool/offline_server/server.rs | 9 +- .../subcommands/api_cmd/stateful_tests.rs | 101 +++++- 6 files changed, 356 insertions(+), 141 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 82342132afc..c7ca8b5122a 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -571,7 +571,13 @@ fn maybe_start_rpc_service( .map(|path| crate::rpc::FilterList::new_from_file(path).map(Arc::new)) .transpose()?; info!("JSON-RPC endpoint will listen at {rpc_address}"); - let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events)); + let eth_event_handler = { + let mp = mpool.shallow_clone(); + let subscriber = crate::rpc::eth::filter::mempool::MpoolSubscriber::new(move || { + mp.subscribe_to_updates() + }); + Arc::new(EthEventHandler::from_config(&config.events, subscriber)) + }; if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") { warn!( "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments" diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 349a16e3b02..d4ef62d4e40 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -2719,15 +2719,7 @@ impl EthGetTransactionHashByCid { if let Ok(smsgs) = smsgs_result && let Some(smsg) = smsgs.first() { - let hash = if smsg.is_delegated() { - let (_, tx) = eth_tx_from_signed_eth_message(smsg, eth_chain_id)?; - tx.eth_hash()?.into() - } else if smsg.is_secp256k1() { - smsg.cid().into() - } else { - smsg.message().cid().into() - }; - return Ok(Some(hash)); + return Ok(Some(eth_tx_hash_from_signed_message(smsg, eth_chain_id)?)); } let msg_result = crate::chain::get_chain_message(db, &cid); @@ -3296,29 +3288,6 @@ fn eth_filter_logs_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result anyhow::Result> { - events - .iter() - .filter_map(|event| { - match eth_tx_hash_from_message_cid( - ctx.db(), - &event.msg_cid, - ctx.state_manager.chain_config().eth_chain_id, - ) { - Ok(Some(hash)) => Some(Ok(hash)), - Ok(None) => { - tracing::warn!("Ignoring event"); - None - } - Err(err) => Some(Err(err)), - } - }) - .collect() -} - fn eth_filter_logs_from_events( ctx: &Ctx, events: &[CollectedEvent], @@ -3399,15 +3368,6 @@ fn eth_filter_result_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result anyhow::Result { - Ok(EthFilterResult::Hashes(eth_filter_logs_from_messages( - ctx, events, - )?)) -} - pub enum EthGetLogs {} impl RpcMethod<1> for EthGetLogs { const NAME: &'static str = "Filecoin.EthGetLogs"; @@ -3582,36 +3542,9 @@ impl RpcMethod<1> for EthGetFilterChanges { return Ok(eth_filter_result_from_tipsets(&events)?); } if let Some(mempool_filter) = filter.as_any().downcast_ref::() { - let events = ctx - .eth_event_handler - .get_events_for_parsed_filter( - &ctx, - &Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( - // heaviest tipset doesn't have events because its messages haven't been executed yet - RangeInclusive::new( - mempool_filter - .collected - .unwrap_or(ctx.chain_store().heaviest_tipset().epoch() - 1), - // Use -1 to indicate that the range extends until the latest available tipset. - -1, - ), - ))), - SkipEvent::OnUnresolvedAddress, - ) - .await?; - let new_collected = events - .iter() - .max_by_key(|event| event.height) - .map(|e| e.height); - if let Some(height) = new_collected { - let filter = Arc::new(MempoolFilter { - id: mempool_filter.id.clone(), - max_results: mempool_filter.max_results, - collected: Some(height), - }); - store.update(filter); - } - return Ok(eth_filter_result_from_messages(&ctx, &events)?); + let chain_id = ctx.chain_config().eth_chain_id; + let hashes = mempool_filter.drain(chain_id); + return Ok(EthFilterResult::Hashes(hashes)); } } Err(anyhow::anyhow!("method not supported").into()) diff --git a/src/rpc/methods/eth/filter/mempool.rs b/src/rpc/methods/eth/filter/mempool.rs index 88597981443..13923d2a65f 100644 --- a/src/rpc/methods/eth/filter/mempool.rs +++ b/src/rpc/methods/eth/filter/mempool.rs @@ -2,35 +2,128 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::prelude::*; +use crate::eth::EthChainId as EthChainIdType; +use crate::message::SignedMessage; +use crate::message_pool::MpoolUpdate; +use crate::rpc::Arc; +use crate::rpc::eth::eth_tx_hash_from_signed_message; +use crate::rpc::eth::types::EthHash; use crate::rpc::eth::{FilterID, filter::Filter, filter::FilterManager}; use crate::shim::fvm_shared_latest::clock::ChainEpoch; -use ahash::HashMap; -use anyhow::Result; -use parking_lot::RwLock; +use ahash::AHashMap as HashMap; +use anyhow::{Context, Result}; +use indexmap::IndexSet; +use parking_lot::{Mutex, RwLock}; use std::any::Any; +use tokio::sync::broadcast; -/// Data structure for filtering and collecting pending transactions -/// from the mempool before they are confirmed in a block. -#[allow(dead_code)] -#[derive(Debug, PartialEq)] +/// Factory that yields a fresh independent `broadcast::Receiver` +/// on each call. Wraps the `MessagePool` so the filter layer never sees the +/// pool's broadcast `Sender` directly — preserves the send-only encapsulation +/// owned by the message pool module. +#[derive(Clone)] +pub struct MpoolSubscriber { + inner: Arc broadcast::Receiver + Send + Sync>, +} + +impl MpoolSubscriber { + /// Build a subscriber from a factory closure that yields a fresh + /// receiver on each call (typically `move || mp.subscribe_to_updates()`). + pub fn new(factory: F) -> Self + where + F: Fn() -> broadcast::Receiver + Send + Sync + 'static, + { + Self { + inner: Arc::new(factory), + } + } + + /// Subscriber whose receivers never receive any events. Used by + /// standalone contexts (tests, snapshot tools, offline server when no + /// real mempool is attached). + pub fn dummy() -> Self { + let (tx, _) = broadcast::channel::(1); + Self::new(move || tx.subscribe()) + } + + fn subscribe(&self) -> broadcast::Receiver { + (self.inner)() + } +} + +impl std::fmt::Debug for MpoolSubscriber { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MpoolSubscriber").finish_non_exhaustive() + } +} + +/// Filter backing `eth_newPendingTransactionFilter`. Each instance owns its +/// own `broadcast::Receiver`. +#[derive(Debug)] pub struct MempoolFilter { // Unique id used to identify the filter pub id: FilterID, // Maximum number of results to collect pub max_results: usize, - // Epoch at which the results were collected - pub collected: Option, + // Receiver for mempool updates + rx: Mutex>, } impl MempoolFilter { - pub fn new(max_results: usize) -> Result, uuid::Error> { - let id = FilterID::new()?; + pub fn new( + max_results: usize, + rx: broadcast::Receiver, + ) -> Result, uuid::Error> { Ok(Arc::new(Self { - id, + id: FilterID::new()?, max_results, - collected: None, + rx: Mutex::new(rx), })) } + + /// Drain queued mempool updates and return the resulting set of pending + /// tx hashes, capped at `max_results`. + /// + /// Semantics within a single drain window: + /// - `Add` inserts the tx hash. + /// - `Remove` cancels a prior `Add` from the *same* window. A `Remove` + /// for a hash that was already returned by an earlier `drain` call is + /// a no-op on the set — that hash was already reported as pending, + /// so the client has seen it and the cancellation does not need to + /// propagate. + /// + /// Why process `Remove` at all: a tx can leave the mempool between two + /// client polls (mined into a tipset, replaced via RBF, or evicted). If + /// we ignored `Remove` we would surface a hash whose tx is no longer + /// pending, which is misleading for `eth_newPendingTransactionFilter` + /// consumers. + pub fn drain(&self, chain_id: EthChainIdType) -> Vec { + use broadcast::error::TryRecvError; + + let mut rx = self.rx.lock(); + let mut pending: IndexSet = IndexSet::new(); + loop { + match rx.try_recv() { + Ok(MpoolUpdate::Add(m)) => { + if let Some(h) = hash_or_log(&m, chain_id) { + pending.insert(h); + } + } + Ok(MpoolUpdate::Remove(m)) => { + if let Some(h) = hash_or_log(&m, chain_id) { + // Cancels a matching Add buffered earlier in the + // same window. No-op if the hash is not in the set. + pending.shift_remove(&h); + } + } + Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break, + Err(TryRecvError::Lagged(n)) => { + tracing::warn!("mempool filter lagged, dropped {n} events"); + } + } + } + pending.into_iter().take(self.max_results).collect() + } } impl Filter for MempoolFilter { @@ -43,76 +136,165 @@ impl Filter for MempoolFilter { } } -/// `MempoolFilterManager` uses a `RwLock` to handle concurrent access to a collection of `MempoolFilter` -/// instances, each identified by a `FilterID`. The number of results returned by the filters is capped by `max_filter_results`. -#[derive(Debug)] +fn hash_or_log(msg: &SignedMessage, chain_id: EthChainIdType) -> Option { + match eth_tx_hash_from_signed_message(msg, chain_id) { + Ok(h) => Some(h), + Err(e) => { + tracing::debug!("mempool filter: dropping message, hash error: {e}"); + None + } + } +} + +/// Manages installed `MempoolFilter`s. Each `install` calls the configured +/// [`MpoolSubscriber`] to obtain a fresh independent +/// `broadcast::Receiver`. Contexts without a real `MessagePool` +/// (tests, snapshot tools, offline server) pass a subscriber whose receivers +/// always yield `Empty`. pub struct MempoolFilterManager { filters: RwLock>>, max_filter_results: usize, + subscriber: MpoolSubscriber, +} + +impl std::fmt::Debug for MempoolFilterManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MempoolFilterManager") + .field("max_filter_results", &self.max_filter_results) + .finish_non_exhaustive() + } } impl MempoolFilterManager { - pub fn new(max_filter_results: usize) -> Arc { + pub fn new(max_filter_results: usize, subscriber: MpoolSubscriber) -> Arc { Arc::new(Self { filters: RwLock::new(HashMap::new()), max_filter_results, + subscriber, }) } } impl FilterManager for MempoolFilterManager { fn install(&self) -> Result> { - let filter = MempoolFilter::new(self.max_filter_results) + let filter = MempoolFilter::new(self.max_filter_results, self.subscriber.subscribe()) .context("Failed to create a new mempool filter")?; - let id = filter.id().clone(); - - self.filters.write().insert(id, filter.clone()); - + self.filters + .write() + .insert(filter.id().clone(), filter.clone()); Ok(filter) } fn remove(&self, id: &FilterID) -> Option> { - let mut filters = self.filters.write(); - filters.remove(id) + self.filters.write().remove(id) } } #[cfg(test)] mod tests { use super::*; + use crate::shim::address::Address; + use crate::shim::econ::TokenAmount; + use crate::shim::message::Message as ShimMessage; + + const TEST_CHAIN_ID: EthChainIdType = 314; + + fn make_smsg(seq: u64) -> SignedMessage { + SignedMessage::mock_bls_signed_message(ShimMessage { + from: Address::new_id(1), + to: Address::new_id(2), + sequence: seq, + gas_premium: TokenAmount::from_atto(100u64), + gas_limit: 1_000_000, + ..ShimMessage::default() + }) + } + + fn hash_of(seq: u64) -> EthHash { + eth_tx_hash_from_signed_message(&make_smsg(seq), TEST_CHAIN_ID).unwrap() + } + + /// Build a subscriber backed by `tx` so tests can drive + /// `MpoolUpdate` events through the manager. + fn subscriber_from(tx: broadcast::Sender) -> MpoolSubscriber { + MpoolSubscriber::new(move || tx.subscribe()) + } + + #[test] + fn drain_returns_empty_when_no_events() { + let (tx, _) = broadcast::channel::(1); + let filter = MempoolFilter::new(10, tx.subscribe()).unwrap(); + assert!(filter.drain(TEST_CHAIN_ID).is_empty()); + } + + #[test] + fn drain_add_remove_cancel_within_window() { + let (tx, _) = broadcast::channel::(16); + let filter = MempoolFilter::new(10, tx.subscribe()).unwrap(); + + tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Add(make_smsg(1))).unwrap(); + tx.send(MpoolUpdate::Remove(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Add(make_smsg(2))).unwrap(); + + let hashes = filter.drain(TEST_CHAIN_ID); + assert!(!hashes.contains(&hash_of(0)), "Add+Remove should cancel"); + assert!(hashes.contains(&hash_of(1))); + assert!(hashes.contains(&hash_of(2))); + assert!(filter.drain(TEST_CHAIN_ID).is_empty(), "second drain empty"); + } #[test] - fn test_mempool_filter() { - // Test case 1: Create a mempool filter - let max_results = 10; - let filter = MempoolFilter::new(max_results).expect("Failed to create mempool filter"); - assert_eq!(filter.max_results, max_results); - - // Test case 2: Create a mempool filter manager and install the mempool filter - let mempool_manager = MempoolFilterManager::new(max_results); - let installed_filter = mempool_manager - .install() - .expect("Failed to install mempool filter"); - - // Verify that the filter has been added to the mempool manager - { - let filters = mempool_manager.filters.read(); - assert!(filters.contains_key(installed_filter.id())); + fn drain_truncates_to_max_results() { + let (tx, _) = broadcast::channel::(64); + let filter = MempoolFilter::new(2, tx.subscribe()).unwrap(); + for seq in 0..5u64 { + tx.send(MpoolUpdate::Add(make_smsg(seq))).unwrap(); } + assert_eq!(filter.drain(TEST_CHAIN_ID).len(), 2); + } - // Test case 3: Remove the installed mempool filter - let filter_id = installed_filter.id().clone(); - let removed = mempool_manager.remove(&filter_id); - assert_eq!( - removed.map(|f| f.id().clone()), - Some(installed_filter.id().clone()), - "Filter should be successfully removed" - ); - - // Verify that the filter is no longer in the mempool manager - { - let filters = mempool_manager.filters.read(); - assert!(!filters.contains_key(&filter_id)); + #[test] + fn drain_handles_lag_and_returns_remaining() { + let (tx, _) = broadcast::channel::(4); + let filter = MempoolFilter::new(100, tx.subscribe()).unwrap(); + for seq in 0..10u64 { + tx.send(MpoolUpdate::Add(make_smsg(seq))).unwrap(); } + // Buffer was 4; receiver lagged. Drain returns the remaining buffered + // events without panicking. + assert!(!filter.drain(TEST_CHAIN_ID).is_empty()); + } + + #[test] + fn manager_subscribes_each_filter_to_independent_receiver() { + let (tx, _) = broadcast::channel::(16); + let manager = MempoolFilterManager::new(100, subscriber_from(tx.clone())); + + let f1 = manager.install().expect("install f1"); + let f2 = manager.install().expect("install f2"); + + tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Add(make_smsg(1))).unwrap(); + + let f1 = f1.as_any().downcast_ref::().unwrap(); + let f2 = f2.as_any().downcast_ref::().unwrap(); + + // Each receiver sees the full broadcast, independently. + let h1 = f1.drain(TEST_CHAIN_ID); + let h2 = f2.drain(TEST_CHAIN_ID); + assert_eq!(h1.len(), 2); + assert_eq!(h2.len(), 2); + + // Draining once empties only that receiver. + assert!(f1.drain(TEST_CHAIN_ID).is_empty()); + } + + #[test] + fn manager_with_dummy_subscriber_yields_empty() { + let manager = MempoolFilterManager::new(100, MpoolSubscriber::dummy()); + let f = manager.install().expect("install"); + let f = f.as_any().downcast_ref::().unwrap(); + assert!(f.drain(TEST_CHAIN_ID).is_empty()); } } diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index da2dd8e23f1..a6cd909182e 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -137,11 +137,16 @@ pub enum EventRevertStatus { impl EthEventHandler { pub fn new() -> Self { - let config = EventsConfig::default(); - Self::from_config(&config) + // Standalone handler with no live mempool: subscribers see an empty + // stream forever. Used in tests, snapshot tools, and other contexts + // where no `MessagePool` is available. + Self::from_config(&EventsConfig::default(), MpoolSubscriber::dummy()) } - pub fn from_config(config: &EventsConfig) -> Self { + /// Build a handler from `config`. Each `MempoolFilter` installed via the + /// returned handler invokes `mpool_subscriber` to obtain its own + /// independent broadcast receiver for pending-tx updates. + pub fn from_config(config: &EventsConfig, mpool_subscriber: MpoolSubscriber) -> Self { let max_filters: usize = env_or_default("FOREST_MAX_FILTERS", 100); let max_filter_results = std::env::var("FOREST_MAX_FILTER_RESULTS") .ok() @@ -167,7 +172,10 @@ impl EthEventHandler { Some(MemFilterStore::new(max_filters) as Arc); let event_filter_manager = Some(EventFilterManager::new(max_filter_results)); let tipset_filter_manager = Some(TipSetFilterManager::new(max_filter_results)); - let mempool_filter_manager = Some(MempoolFilterManager::new(max_filter_results)); + let mempool_filter_manager = Some(MempoolFilterManager::new( + max_filter_results, + mpool_subscriber, + )); Self { filter_store, diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 2bacc33897e..57043f18c0b 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -98,6 +98,13 @@ pub async fn offline_rpc_state( let sync_network_context = SyncNetworkContext::new(network_send, peer_manager, state_manager.db_owned()); let nonce_tracker = NonceTracker::new(); + let eth_event_handler = { + let mp = message_pool.shallow_clone(); + let subscriber = crate::rpc::eth::filter::mempool::MpoolSubscriber::new(move || { + mp.subscribe_to_updates() + }); + Arc::new(EthEventHandler::from_config(&events_config, subscriber)) + }; Ok(( RPCState { state_manager, @@ -105,7 +112,7 @@ pub async fn offline_rpc_state( mpool: message_pool, bad_blocks: Default::default(), sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), + eth_event_handler, eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 431a8ce8bed..17405761ec6 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -310,26 +310,19 @@ async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> { unreachable!("loop always returns within the branches above") } -async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { - let tipset = client.call(ChainHead::request(())?).await?; +/// Poll `MpoolPending` until `message_cid` is visible. Does not wait for +/// execution. +async fn wait_in_mempool(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { let mut retries = 100; loop { let pending = client .call(MpoolPending::request((ApiTipsetKey(None),))?) .await?; - if pending.0.iter().any(|msg| msg.cid() == message_cid) { - client - .call( - StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))? - .with_timeout(Duration::from_secs(300)), - ) - .await?; break Ok(()); } ensure!(retries != 0, "Message not found in mpool"); retries -= 1; - tokio::time::sleep(Duration::from_millis(10)).await; } } @@ -1009,7 +1002,8 @@ fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario { .await? .context("no Eth transaction hash for CID")?; - wait_pending_message(&client, cid).await?; + // Observe mempool state *before* the message is mined. + wait_in_mempool(&client, cid).await?; let filter_result = client .call(EthGetFilterChanges::request((filter_id.clone(),))?) @@ -1044,6 +1038,83 @@ fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario { }) } +/// Verify that successive `eth_getFilterChanges` polls return only the +/// pending transactions added since the previous poll. +/// +/// 1. Install a pending-tx filter. +/// 2. Drain any baseline state with an initial poll. +/// 3. Submit tx A, wait for it in the mempool, poll — assert hash A present. +/// 4. Submit tx B, wait for it in the mempool, poll — assert hash B present +/// and hash A absent (it was already consumed by the previous poll). +fn eth_new_pending_transaction_filter_multi_poll(tx: TestTransaction) -> RpcTestScenario { + RpcTestScenario::basic(move |client| { + let tx = tx.clone(); + async move { + let filter_id = client + .call(EthNewPendingTransactionFilter::request(())?) + .await?; + + let result = async { + // Baseline: clear any pre-existing pending state. + let _ = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + + // First tx. + let cid_a = invoke_contract(&client, &tx).await?; + let hash_a = client + .call(EthGetTransactionHashByCid::request((cid_a,))?) + .await? + .context("no Eth transaction hash for cid_a")?; + wait_in_mempool(&client, cid_a).await?; + let poll_a = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + let EthFilterResult::Hashes(hashes_a) = poll_a else { + anyhow::bail!("expected hashes, got {poll_a:?}"); + }; + anyhow::ensure!( + hashes_a.contains(&hash_a), + "first poll missing tx_a: hash_a={hash_a:?} hashes={hashes_a:?}" + ); + + // Second tx. + let cid_b = invoke_contract(&client, &tx).await?; + let hash_b = client + .call(EthGetTransactionHashByCid::request((cid_b,))?) + .await? + .context("no Eth transaction hash for cid_b")?; + wait_in_mempool(&client, cid_b).await?; + let poll_b = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + let EthFilterResult::Hashes(hashes_b) = poll_b else { + anyhow::bail!("expected hashes, got {poll_b:?}"); + }; + anyhow::ensure!( + hashes_b.contains(&hash_b), + "second poll missing tx_b: hash_b={hash_b:?} hashes={hashes_b:?}" + ); + anyhow::ensure!( + !hashes_b.contains(&hash_a), + "second poll should not return previously-consumed tx_a: \ + hash_a={hash_a:?} hashes={hashes_b:?}" + ); + + anyhow::Ok(()) + } + .await; + + let removed = client + .call(EthUninstallFilter::request((filter_id,))?) + .await?; + anyhow::ensure!(removed); + + result + } + }) +} + fn as_logs(input: EthFilterResult) -> EthFilterResult { match input { EthFilterResult::Hashes(vec) if vec.is_empty() => EthFilterResult::Logs(Vec::new()), @@ -1153,6 +1224,14 @@ pub(super) async fn create_tests(tx: TestTransaction) -> Vec { EthGetTransactionHashByCid, EthUninstallFilter ), + with_methods!( + eth_new_pending_transaction_filter_multi_poll(tx.clone()) + .name("eth_getFilterChanges returns only new pending txs per poll"), + EthNewPendingTransactionFilter, + EthGetFilterChanges, + EthGetTransactionHashByCid, + EthUninstallFilter + ), with_methods!( eth_get_filter_logs(tx.clone()).name("eth_getFilterLogs works"), EthNewFilter, From 44937fde7ba256b4b9efade5843d48996a67e451 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 25 Jun 2026 20:30:45 +0530 Subject: [PATCH 2/5] restrucutre the changes and improve readability --- CHANGELOG.md | 2 + src/daemon/mod.rs | 12 +- src/message_pool/msgpool/events.rs | 29 ++ src/message_pool/msgpool/mod.rs | 4 +- src/message_pool/msgpool/msg_pool.rs | 15 +- src/message_pool/msgpool/pending_store.rs | 10 +- src/rpc/methods/eth.rs | 4 +- src/rpc/methods/eth/filter/mempool.rs | 371 ++++++++++-------- src/rpc/methods/eth/filter/mod.rs | 34 +- src/rpc/methods/eth/pubsub.rs | 25 +- src/tool/offline_server/server.rs | 12 +- .../subcommands/api_cmd/stateful_tests.rs | 111 +++--- src/utils/task/mod.rs | 2 +- 13 files changed, 358 insertions(+), 273 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dda0b272156..ac994cb557e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -123,6 +123,8 @@ Non-mandatory release for all node operators. It includes a few bug fixes as wel ### Fixed +- [#7109](https://github.com/ChainSafe/forest/pull/7109): `eth_newPendingTransactionFilter` now returns genuinely-pending mempool transaction hashes instead of executed on-chain events. Each filter collects pending transactions continuously from the mempool (up to `max_filter_results`), so `eth_getFilterChanges` reports the transactions added since the previous poll. + - [#7018](https://github.com/ChainSafe/forest/issues/7018): Fixed `forest-wallet set-default` failing when the keystore has no `default` entry. - [#6941](https://github.com/ChainSafe/forest/pull/6941): The `eth_subscribe` `logs` subscription now emits one log object per notification instead of one array of logs per tipset. diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index c7ca8b5122a..3e35f82ac9a 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -571,13 +571,11 @@ fn maybe_start_rpc_service( .map(|path| crate::rpc::FilterList::new_from_file(path).map(Arc::new)) .transpose()?; info!("JSON-RPC endpoint will listen at {rpc_address}"); - let eth_event_handler = { - let mp = mpool.shallow_clone(); - let subscriber = crate::rpc::eth::filter::mempool::MpoolSubscriber::new(move || { - mp.subscribe_to_updates() - }); - Arc::new(EthEventHandler::from_config(&config.events, subscriber)) - }; + let eth_event_handler = Arc::new(EthEventHandler::from_config( + &config.events, + ctx.chain_config().eth_chain_id, + mpool.subscriber(), + )); if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") { warn!( "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments" diff --git a/src/message_pool/msgpool/events.rs b/src/message_pool/msgpool/events.rs index 8c44d61d2a3..16666f83621 100644 --- a/src/message_pool/msgpool/events.rs +++ b/src/message_pool/msgpool/events.rs @@ -4,6 +4,7 @@ //! Event types published by the pending pool. use crate::message::SignedMessage; +use tokio::sync::broadcast; pub(in crate::message_pool) const MPOOL_UPDATE_CHANNEL_CAPACITY: usize = 256; @@ -14,3 +15,31 @@ pub enum MpoolUpdate { #[allow(dead_code)] Remove(SignedMessage), } + +/// Subscribe-only handle to the pending pool's [`MpoolUpdate`] broadcast bus. +/// +/// Hands out independent receivers via [`subscribe`](Self::subscribe), each with +/// its own cursor. The inner `Sender` is private and never leaves the message +/// pool, so holders can listen but cannot publish events — the send capability +/// stays with [`PendingStore`](super::pending_store::PendingStore). +#[derive(Clone, Debug)] +pub struct MpoolSubscriber(broadcast::Sender); + +impl MpoolSubscriber { + pub(in crate::message_pool) fn new(events: broadcast::Sender) -> Self { + Self(events) + } + + /// A detached handle with no producer behind it: its receivers never observe + /// any event. Used by standalone contexts (tests, snapshot tools, the + /// offline server) that have no live mempool attached. + pub fn dummy() -> Self { + Self(broadcast::channel(1).0) + } + + /// Open a fresh receiver that observes every [`MpoolUpdate`] published from + /// this point on. + pub(crate) fn subscribe(&self) -> broadcast::Receiver { + self.0.subscribe() + } +} diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 9202dd34e21..9aa6f75f3a2 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -14,9 +14,7 @@ pub mod selection; pub mod test_provider; pub(in crate::message_pool) mod utils; -// TODO: This will be used in https://github.com/ChainSafe/forest/pull/6941 -#[allow(unused_imports)] -pub use events::MpoolUpdate; +pub use events::{MpoolSubscriber, MpoolUpdate}; pub(in crate::message_pool) use utils::recover_sig; diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index c5c3ab636cb..2d170e49bf4 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -15,8 +15,11 @@ use crate::message_pool::{ config::MpoolConfig, errors::Error, msgpool::{ - BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, pending_store::PendingStore, - recover_sig, republish::RepublishState, + BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, + events::{MpoolSubscriber, MpoolUpdate}, + pending_store::PendingStore, + recover_sig, + republish::RepublishState, }, provider::Provider, utils::get_base_fee_lower_bound, @@ -434,6 +437,14 @@ where self.pending.subscribe() } + /// A subscribe-only handle to the [`MpoolUpdate`] bus that mints independent + /// receivers on demand. Unlike [`subscribe_to_updates`](Self::subscribe_to_updates), + /// the handle can be stored and re-used to open new receivers later (e.g. + /// one per installed pending-transaction filter). + pub fn subscriber(&self) -> MpoolSubscriber { + self.pending.subscriber() + } + /// Return Vector of signed messages given a block header for self. pub fn messages_for_blocks<'a>( &self, diff --git a/src/message_pool/msgpool/pending_store.rs b/src/message_pool/msgpool/pending_store.rs index b04e2f393c9..a3f26739c79 100644 --- a/src/message_pool/msgpool/pending_store.rs +++ b/src/message_pool/msgpool/pending_store.rs @@ -9,7 +9,9 @@ use tokio::sync::broadcast; use crate::message::SignedMessage; use crate::message_pool::errors::Error; -use crate::message_pool::msgpool::events::{MPOOL_UPDATE_CHANNEL_CAPACITY, MpoolUpdate}; +use crate::message_pool::msgpool::events::{ + MPOOL_UPDATE_CHANNEL_CAPACITY, MpoolSubscriber, MpoolUpdate, +}; use crate::message_pool::msgpool::msg_pool::TrustPolicy; use crate::message_pool::msgpool::msg_set::{MsgSet, MsgSetLimits, StrictnessPolicy}; use crate::prelude::*; @@ -129,6 +131,12 @@ impl PendingStore { pub fn subscribe(&self) -> broadcast::Receiver { self.inner.events.subscribe() } + + /// A subscribe-only handle to the [`MpoolUpdate`] bus that can mint + /// independent receivers on demand without exposing the send half. + pub(in crate::message_pool) fn subscriber(&self) -> MpoolSubscriber { + MpoolSubscriber::new(self.inner.events.clone()) + } } #[derive(derive_more::Debug, derive_more::Deref)] diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index d4ef62d4e40..274ec6d3d46 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -3542,9 +3542,7 @@ impl RpcMethod<1> for EthGetFilterChanges { return Ok(eth_filter_result_from_tipsets(&events)?); } if let Some(mempool_filter) = filter.as_any().downcast_ref::() { - let chain_id = ctx.chain_config().eth_chain_id; - let hashes = mempool_filter.drain(chain_id); - return Ok(EthFilterResult::Hashes(hashes)); + return Ok(EthFilterResult::Hashes(mempool_filter.drain())); } } Err(anyhow::anyhow!("method not supported").into()) diff --git a/src/rpc/methods/eth/filter/mempool.rs b/src/rpc/methods/eth/filter/mempool.rs index 13923d2a65f..23f6dbf88b2 100644 --- a/src/rpc/methods/eth/filter/mempool.rs +++ b/src/rpc/methods/eth/filter/mempool.rs @@ -1,128 +1,138 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::prelude::*; use crate::eth::EthChainId as EthChainIdType; -use crate::message::SignedMessage; -use crate::message_pool::MpoolUpdate; +use crate::message_pool::{MpoolSubscriber, MpoolUpdate}; +use crate::prelude::*; use crate::rpc::Arc; use crate::rpc::eth::eth_tx_hash_from_signed_message; use crate::rpc::eth::types::EthHash; use crate::rpc::eth::{FilterID, filter::Filter, filter::FilterManager}; -use crate::shim::fvm_shared_latest::clock::ChainEpoch; -use ahash::AHashMap as HashMap; +use crate::utils::broadcast::subscription_stream; +use crate::utils::task::AbortHandles; +use ahash::HashMap; use anyhow::{Context, Result}; +use futures::{Stream, StreamExt as _}; use indexmap::IndexSet; use parking_lot::{Mutex, RwLock}; use std::any::Any; +use std::pin::Pin; use tokio::sync::broadcast; -/// Factory that yields a fresh independent `broadcast::Receiver` -/// on each call. Wraps the `MessagePool` so the filter layer never sees the -/// pool's broadcast `Sender` directly — preserves the send-only encapsulation -/// owned by the message pool module. -#[derive(Clone)] -pub struct MpoolSubscriber { - inner: Arc broadcast::Receiver + Send + Sync>, +/// Stream of the eth tx hash for every [`MpoolUpdate::Add`] published on the +/// mempool bus. +/// +/// Shared by the two pending-transaction surfaces — `eth_subscribe`'s +/// `newPendingTransactions` (see [`super::super::pubsub`]) and +/// `eth_newPendingTransactionFilter` — so both derive identical hashes by +/// construction and treat the feed as purely additive. +/// +/// [`MpoolUpdate::Remove`] is ignored: a tx leaves the pool only once it is +/// mined on-chain, and — like Lotus and Forest's own pending-tx subscription — +/// neither surface retracts an already-reported pending hash. Lagged and closed +/// receivers are handled by [`subscription_stream`]. +pub(crate) fn pending_tx_added_hashes( + rx: broadcast::Receiver, + eth_chain_id: EthChainIdType, +) -> Pin + Send>> { + subscription_stream(rx) + .filter_map(move |update| async move { + let MpoolUpdate::Add(msg) = update else { + return None; + }; + eth_tx_hash_from_signed_message(&msg, eth_chain_id) + .inspect_err(|e| { + tracing::error!("Failed to compute eth tx hash from mpool message: {e:#}") + }) + .ok() + }) + .boxed() } -impl MpoolSubscriber { - /// Build a subscriber from a factory closure that yields a fresh - /// receiver on each call (typically `move || mp.subscribe_to_updates()`). - pub fn new(factory: F) -> Self - where - F: Fn() -> broadcast::Receiver + Send + Sync + 'static, - { +/// Pending-tx hashes a [`MempoolFilter`] has accumulated since the last poll. +/// +/// Insertion-ordered and de-duplicated. Bounded at `cap` hashes (`0` = no limit, +/// the subsystem-wide convention — see [`super::ensure_filter_cap`]); on overflow +/// the oldest hash is evicted, matching Lotus's mempool filter. +#[derive(Debug)] +struct Collected { + hashes: IndexSet, + cap: usize, +} + +impl Collected { + fn new(cap: usize) -> Self { Self { - inner: Arc::new(factory), + hashes: IndexSet::new(), + cap, } } - /// Subscriber whose receivers never receive any events. Used by - /// standalone contexts (tests, snapshot tools, offline server when no - /// real mempool is attached). - pub fn dummy() -> Self { - let (tx, _) = broadcast::channel::(1); - Self::new(move || tx.subscribe()) - } - - fn subscribe(&self) -> broadcast::Receiver { - (self.inner)() + /// Record a newly-seen pending tx hash. Duplicates are ignored; on overflow + /// the oldest hash is dropped to stay within `cap`. + fn push(&mut self, hash: EthHash) { + self.hashes.insert(hash); + if self.cap != 0 && self.hashes.len() > self.cap { + self.hashes.shift_remove_index(0); + } } -} -impl std::fmt::Debug for MpoolSubscriber { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MpoolSubscriber").finish_non_exhaustive() + /// Take everything collected since the previous call, leaving the set empty. + fn take(&mut self) -> Vec { + std::mem::take(&mut self.hashes).into_iter().collect() } } -/// Filter backing `eth_newPendingTransactionFilter`. Each instance owns its -/// own `broadcast::Receiver`. +/// Filter backing `eth_newPendingTransactionFilter`. +/// +/// A background task drains the mempool [`MpoolUpdate`] bus continuously into +/// `collected`, mirroring Lotus's `WaitForMpoolUpdates`/`CollectMessage`. Filling +/// the buffer *between* polls (rather than reading the bounded broadcast ring at +/// poll time) is what lets a poll return up to `max_filter_results` hashes +/// instead of just what happens to still be in the ring. +/// [`drain`](Self::drain) then takes and clears whatever accumulated since the +/// previous `eth_getFilterChanges`. The feed is additive — see +/// [`pending_tx_added_hashes`]. #[derive(Debug)] pub struct MempoolFilter { - // Unique id used to identify the filter - pub id: FilterID, - // Maximum number of results to collect - pub max_results: usize, - // Receiver for mempool updates - rx: Mutex>, + id: FilterID, + collected: Arc>, + /// Aborts the background drain task when the filter is dropped on uninstall. + _drain_task: AbortHandles, } impl MempoolFilter { - pub fn new( - max_results: usize, + fn new( rx: broadcast::Receiver, + eth_chain_id: EthChainIdType, + max_results: usize, ) -> Result, uuid::Error> { + let collected = Arc::new(Mutex::new(Collected::new(max_results))); + + // Drain the bus into `collected` continuously, so polls are bounded by + // `max_results` rather than the broadcast ring's capacity. + let mut hashes = pending_tx_added_hashes(rx, eth_chain_id); + let task = { + let collected = collected.clone(); + tokio::spawn(async move { + while let Some(hash) = hashes.next().await { + collected.lock().push(hash); + } + }) + }; + let mut drain_task = AbortHandles::default(); + drain_task.push(task.abort_handle()); + Ok(Arc::new(Self { id: FilterID::new()?, - max_results, - rx: Mutex::new(rx), + collected, + _drain_task: drain_task, })) } - /// Drain queued mempool updates and return the resulting set of pending - /// tx hashes, capped at `max_results`. - /// - /// Semantics within a single drain window: - /// - `Add` inserts the tx hash. - /// - `Remove` cancels a prior `Add` from the *same* window. A `Remove` - /// for a hash that was already returned by an earlier `drain` call is - /// a no-op on the set — that hash was already reported as pending, - /// so the client has seen it and the cancellation does not need to - /// propagate. - /// - /// Why process `Remove` at all: a tx can leave the mempool between two - /// client polls (mined into a tipset, replaced via RBF, or evicted). If - /// we ignored `Remove` we would surface a hash whose tx is no longer - /// pending, which is misleading for `eth_newPendingTransactionFilter` - /// consumers. - pub fn drain(&self, chain_id: EthChainIdType) -> Vec { - use broadcast::error::TryRecvError; - - let mut rx = self.rx.lock(); - let mut pending: IndexSet = IndexSet::new(); - loop { - match rx.try_recv() { - Ok(MpoolUpdate::Add(m)) => { - if let Some(h) = hash_or_log(&m, chain_id) { - pending.insert(h); - } - } - Ok(MpoolUpdate::Remove(m)) => { - if let Some(h) = hash_or_log(&m, chain_id) { - // Cancels a matching Add buffered earlier in the - // same window. No-op if the hash is not in the set. - pending.shift_remove(&h); - } - } - Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break, - Err(TryRecvError::Lagged(n)) => { - tracing::warn!("mempool filter lagged, dropped {n} events"); - } - } - } - pending.into_iter().take(self.max_results).collect() + /// Take the pending-tx hashes collected since the previous poll. + pub fn drain(&self) -> Vec { + self.collected.lock().take() } } @@ -136,40 +146,28 @@ impl Filter for MempoolFilter { } } -fn hash_or_log(msg: &SignedMessage, chain_id: EthChainIdType) -> Option { - match eth_tx_hash_from_signed_message(msg, chain_id) { - Ok(h) => Some(h), - Err(e) => { - tracing::debug!("mempool filter: dropping message, hash error: {e}"); - None - } - } -} - -/// Manages installed `MempoolFilter`s. Each `install` calls the configured -/// [`MpoolSubscriber`] to obtain a fresh independent -/// `broadcast::Receiver`. Contexts without a real `MessagePool` -/// (tests, snapshot tools, offline server) pass a subscriber whose receivers -/// always yield `Empty`. +/// Manages installed [`MempoolFilter`]s. Each `install` opens a fresh independent +/// receiver on the shared [`MpoolSubscriber`] and spawns the filter's background +/// drain task. Contexts without a real `MessagePool` (tests, snapshot tools, the +/// offline server) pass a dummy subscriber whose receivers never yield events. +#[derive(Debug)] pub struct MempoolFilterManager { filters: RwLock>>, max_filter_results: usize, + eth_chain_id: EthChainIdType, subscriber: MpoolSubscriber, } -impl std::fmt::Debug for MempoolFilterManager { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MempoolFilterManager") - .field("max_filter_results", &self.max_filter_results) - .finish_non_exhaustive() - } -} - impl MempoolFilterManager { - pub fn new(max_filter_results: usize, subscriber: MpoolSubscriber) -> Arc { + pub fn new( + max_filter_results: usize, + eth_chain_id: EthChainIdType, + subscriber: MpoolSubscriber, + ) -> Arc { Arc::new(Self { filters: RwLock::new(HashMap::new()), max_filter_results, + eth_chain_id, subscriber, }) } @@ -177,8 +175,12 @@ impl MempoolFilterManager { impl FilterManager for MempoolFilterManager { fn install(&self) -> Result> { - let filter = MempoolFilter::new(self.max_filter_results, self.subscriber.subscribe()) - .context("Failed to create a new mempool filter")?; + let filter = MempoolFilter::new( + self.subscriber.subscribe(), + self.eth_chain_id, + self.max_filter_results, + ) + .context("Failed to create a new mempool filter")?; self.filters .write() .insert(filter.id().clone(), filter.clone()); @@ -193,9 +195,11 @@ impl FilterManager for MempoolFilterManager { #[cfg(test)] mod tests { use super::*; + use crate::message::SignedMessage; use crate::shim::address::Address; use crate::shim::econ::TokenAmount; use crate::shim::message::Message as ShimMessage; + use std::time::Duration; const TEST_CHAIN_ID: EthChainIdType = 314; @@ -214,87 +218,108 @@ mod tests { eth_tx_hash_from_signed_message(&make_smsg(seq), TEST_CHAIN_ID).unwrap() } - /// Build a subscriber backed by `tx` so tests can drive - /// `MpoolUpdate` events through the manager. - fn subscriber_from(tx: broadcast::Sender) -> MpoolSubscriber { - MpoolSubscriber::new(move || tx.subscribe()) + /// Poll the filter until it has yielded at least `n` hashes in total, + /// accumulating across polls. Avoids racing the background drain task, which + /// collects asynchronously after an event is published. + async fn collect_at_least(filter: &MempoolFilter, n: usize) -> Vec { + let mut all = Vec::new(); + for _ in 0..200 { + all.extend(filter.drain()); + if all.len() >= n { + return all; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + panic!("collected only {} of {n} expected hashes", all.len()); } + // ---- Collected: the per-filter buffer logic (pure, deterministic) ---- + #[test] - fn drain_returns_empty_when_no_events() { - let (tx, _) = broadcast::channel::(1); - let filter = MempoolFilter::new(10, tx.subscribe()).unwrap(); - assert!(filter.drain(TEST_CHAIN_ID).is_empty()); + fn collected_dedups_and_preserves_insertion_order() { + let mut c = Collected::new(0); + c.push(hash_of(0)); + c.push(hash_of(1)); + c.push(hash_of(0)); // duplicate — ignored, keeps original position + assert_eq!(c.take(), vec![hash_of(0), hash_of(1)]); } #[test] - fn drain_add_remove_cancel_within_window() { - let (tx, _) = broadcast::channel::(16); - let filter = MempoolFilter::new(10, tx.subscribe()).unwrap(); - - tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); - tx.send(MpoolUpdate::Add(make_smsg(1))).unwrap(); - tx.send(MpoolUpdate::Remove(make_smsg(0))).unwrap(); - tx.send(MpoolUpdate::Add(make_smsg(2))).unwrap(); - - let hashes = filter.drain(TEST_CHAIN_ID); - assert!(!hashes.contains(&hash_of(0)), "Add+Remove should cancel"); - assert!(hashes.contains(&hash_of(1))); - assert!(hashes.contains(&hash_of(2))); - assert!(filter.drain(TEST_CHAIN_ID).is_empty(), "second drain empty"); + fn collected_take_clears_the_buffer() { + let mut c = Collected::new(0); + c.push(hash_of(0)); + assert_eq!(c.take(), vec![hash_of(0)]); + assert!( + c.take().is_empty(), + "a second take with no new pushes is empty" + ); } #[test] - fn drain_truncates_to_max_results() { - let (tx, _) = broadcast::channel::(64); - let filter = MempoolFilter::new(2, tx.subscribe()).unwrap(); - for seq in 0..5u64 { - tx.send(MpoolUpdate::Add(make_smsg(seq))).unwrap(); - } - assert_eq!(filter.drain(TEST_CHAIN_ID).len(), 2); + fn collected_evicts_oldest_at_cap() { + let mut c = Collected::new(2); + c.push(hash_of(0)); + c.push(hash_of(1)); + c.push(hash_of(2)); // overflow — oldest (0) evicted + assert_eq!(c.take(), vec![hash_of(1), hash_of(2)]); } #[test] - fn drain_handles_lag_and_returns_remaining() { - let (tx, _) = broadcast::channel::(4); - let filter = MempoolFilter::new(100, tx.subscribe()).unwrap(); - for seq in 0..10u64 { - tx.send(MpoolUpdate::Add(make_smsg(seq))).unwrap(); + fn collected_cap_zero_means_unbounded() { + let mut c = Collected::new(0); + for seq in 0..10 { + c.push(hash_of(seq)); } - // Buffer was 4; receiver lagged. Drain returns the remaining buffered - // events without panicking. - assert!(!filter.drain(TEST_CHAIN_ID).is_empty()); + assert_eq!(c.take().len(), 10, "cap == 0 never evicts"); } - #[test] - fn manager_subscribes_each_filter_to_independent_receiver() { - let (tx, _) = broadcast::channel::(16); - let manager = MempoolFilterManager::new(100, subscriber_from(tx.clone())); + // ---- pending_tx_added_hashes: the shared Add-only hash stream ---- - let f1 = manager.install().expect("install f1"); - let f2 = manager.install().expect("install f2"); + #[tokio::test] + async fn added_hashes_maps_adds_and_ignores_removes() { + let (tx, rx) = broadcast::channel::(16); + let stream = pending_tx_added_hashes(rx, TEST_CHAIN_ID); tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); - tx.send(MpoolUpdate::Add(make_smsg(1))).unwrap(); + tx.send(MpoolUpdate::Remove(make_smsg(1))).unwrap(); // ignored + tx.send(MpoolUpdate::Add(make_smsg(2))).unwrap(); + drop(tx); // close the channel so the stream terminates + + let hashes: Vec = stream.collect().await; + assert_eq!(hashes, vec![hash_of(0), hash_of(2)]); + } + + // ---- MempoolFilter / manager wiring ---- - let f1 = f1.as_any().downcast_ref::().unwrap(); - let f2 = f2.as_any().downcast_ref::().unwrap(); + #[tokio::test] + async fn filter_collects_adds_from_its_receiver() { + let (tx, rx) = broadcast::channel::(16); + let filter = MempoolFilter::new(rx, TEST_CHAIN_ID, 100).unwrap(); - // Each receiver sees the full broadcast, independently. - let h1 = f1.drain(TEST_CHAIN_ID); - let h2 = f2.drain(TEST_CHAIN_ID); - assert_eq!(h1.len(), 2); - assert_eq!(h2.len(), 2); + tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Add(make_smsg(1))).unwrap(); + + let hashes = collect_at_least(&filter, 2).await; + assert!(hashes.contains(&hash_of(0))); + assert!(hashes.contains(&hash_of(1))); + } - // Draining once empties only that receiver. - assert!(f1.drain(TEST_CHAIN_ID).is_empty()); + #[tokio::test] + async fn manager_installs_and_removes_filters() { + let manager = MempoolFilterManager::new(100, TEST_CHAIN_ID, MpoolSubscriber::dummy()); + let filter = manager.install().expect("install"); + let id = filter.id().clone(); + assert!(manager.remove(&id).is_some()); + assert!(manager.remove(&id).is_none(), "second remove finds nothing"); } - #[test] - fn manager_with_dummy_subscriber_yields_empty() { - let manager = MempoolFilterManager::new(100, MpoolSubscriber::dummy()); - let f = manager.install().expect("install"); - let f = f.as_any().downcast_ref::().unwrap(); - assert!(f.drain(TEST_CHAIN_ID).is_empty()); + #[tokio::test] + async fn dummy_subscriber_yields_no_hashes() { + // A standalone handler (no live mempool) installs fine but never collects. + let manager = MempoolFilterManager::new(100, TEST_CHAIN_ID, MpoolSubscriber::dummy()); + let filter = manager.install().expect("install"); + let filter = filter.as_any().downcast_ref::().unwrap(); + tokio::time::sleep(Duration::from_millis(20)).await; + assert!(filter.drain().is_empty()); } } diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index a6cd909182e..5a16c3d02c5 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -28,6 +28,8 @@ use crate::blocks::Tipset; use crate::blocks::TipsetKey; use crate::chain::index::ResolveNullTipset; use crate::cli_shared::cli::EventsConfig; +use crate::eth::EthChainId; +use crate::message_pool::MpoolSubscriber; use crate::prelude::*; use crate::rpc::eth::errors::EthErrors; use crate::rpc::eth::filter::event::*; @@ -139,14 +141,23 @@ impl EthEventHandler { pub fn new() -> Self { // Standalone handler with no live mempool: subscribers see an empty // stream forever. Used in tests, snapshot tools, and other contexts - // where no `MessagePool` is available. - Self::from_config(&EventsConfig::default(), MpoolSubscriber::dummy()) + // where no `MessagePool` is available. The chain id is irrelevant here + // because the dummy subscriber never yields a message to hash. + Self::from_config( + &EventsConfig::default(), + crate::networks::mainnet::ETH_CHAIN_ID, + MpoolSubscriber::dummy(), + ) } /// Build a handler from `config`. Each `MempoolFilter` installed via the - /// returned handler invokes `mpool_subscriber` to obtain its own - /// independent broadcast receiver for pending-tx updates. - pub fn from_config(config: &EventsConfig, mpool_subscriber: MpoolSubscriber) -> Self { + /// returned handler opens its own independent receiver on `mpool_subscriber` + /// and hashes pending txs with `eth_chain_id`. + pub fn from_config( + config: &EventsConfig, + eth_chain_id: EthChainId, + mpool_subscriber: MpoolSubscriber, + ) -> Self { let max_filters: usize = env_or_default("FOREST_MAX_FILTERS", 100); let max_filter_results = std::env::var("FOREST_MAX_FILTER_RESULTS") .ok() @@ -174,6 +185,7 @@ impl EthEventHandler { let tipset_filter_manager = Some(TipSetFilterManager::new(max_filter_results)); let mempool_filter_manager = Some(MempoolFilterManager::new( max_filter_results, + eth_chain_id, mpool_subscriber, )); @@ -1227,8 +1239,10 @@ mod tests { assert!(result.is_ok(), "Expected successful block filter creation"); } - #[test] - fn test_eth_new_pending_transaction_filter() { + // `async`: installing a pending-tx filter spawns its background drain task, + // which requires a Tokio runtime. + #[tokio::test] + async fn test_eth_new_pending_transaction_filter() { let eth_event_handler = EthEventHandler::new(); let result = eth_event_handler.eth_new_pending_transaction_filter(); @@ -1238,8 +1252,10 @@ mod tests { ); } - #[test] - fn test_eth_uninstall_filter() { + // `async`: uninstalling exercises a pending-tx filter, whose install spawns + // a background task that requires a Tokio runtime. + #[tokio::test] + async fn test_eth_uninstall_filter() { let event_handler = EthEventHandler::new(); let mut filter_ids = Vec::new(); let filter_spec = EthFilterSpec { diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 37d505da672..e882393796c 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -60,15 +60,12 @@ //! use crate::blocks::Tipset; -use crate::message_pool::MpoolUpdate; use crate::prelude::ShallowClone; use crate::rpc::RPCState; +use crate::rpc::eth::filter::mempool::pending_tx_added_hashes; use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams}; use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec, EthHashList, EthTopicSpec}; -use crate::rpc::eth::{ - Block as EthBlock, EthHash, EthLog, TxInfo, eth_logs_for_head_change, - eth_tx_hash_from_signed_message, -}; +use crate::rpc::eth::{Block as EthBlock, EthHash, EthLog, TxInfo, eth_logs_for_head_change}; use crate::utils::broadcast::subscription_stream; use futures::{Stream, StreamExt as _}; use jsonrpsee::core::SubscriptionResult; @@ -241,20 +238,10 @@ fn log_matches(spec: &EthFilterSpec, log: &EthLog) -> bool { } fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc) { - let mpool_rx = ctx.mpool.subscribe_to_updates(); - let eth_chain_id = ctx.chain_config().eth_chain_id; - let stream = subscription_stream(mpool_rx) - .filter_map(move |update| async move { - let MpoolUpdate::Add(msg) = update else { - return None; - }; - eth_tx_hash_from_signed_message(&msg, eth_chain_id) - .inspect_err(|e| { - tracing::error!("Failed to compute eth tx hash from mpool message: {e:#}") - }) - .ok() - }) - .boxed(); + let stream = pending_tx_added_hashes( + ctx.mpool.subscribe_to_updates(), + ctx.chain_config().eth_chain_id, + ); tokio::spawn(pipe_stream_to_sink(stream, sink)); } diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 57043f18c0b..d2bff582885 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -98,13 +98,11 @@ pub async fn offline_rpc_state( let sync_network_context = SyncNetworkContext::new(network_send, peer_manager, state_manager.db_owned()); let nonce_tracker = NonceTracker::new(); - let eth_event_handler = { - let mp = message_pool.shallow_clone(); - let subscriber = crate::rpc::eth::filter::mempool::MpoolSubscriber::new(move || { - mp.subscribe_to_updates() - }); - Arc::new(EthEventHandler::from_config(&events_config, subscriber)) - }; + let eth_event_handler = Arc::new(EthEventHandler::from_config( + &events_config, + state_manager.chain_config().eth_chain_id, + message_pool.subscriber(), + )); Ok(( RPCState { state_manager, diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 17405761ec6..6a3af3217b5 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -310,19 +310,59 @@ async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> { unreachable!("loop always returns within the branches above") } -/// Poll `MpoolPending` until `message_cid` is visible. Does not wait for -/// execution. -async fn wait_in_mempool(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { +async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { + let tipset = client.call(ChainHead::request(())?).await?; let mut retries = 100; loop { let pending = client .call(MpoolPending::request((ApiTipsetKey(None),))?) .await?; + if pending.0.iter().any(|msg| msg.cid() == message_cid) { + client + .call( + StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))? + .with_timeout(Duration::from_secs(300)), + ) + .await?; break Ok(()); } ensure!(retries != 0, "Message not found in mpool"); retries -= 1; + + tokio::time::sleep(Duration::from_millis(10)).await; + } +} + +/// Poll `eth_getFilterChanges` until the hashes seen so far contain `want`. +/// +/// The mempool filter collects pending `txs` on a background task, so a single +/// poll right after a tx becomes visible in the mempool can race the collector. +/// Each poll consumes the filter's buffer, so hashes are accumulated across +/// polls and the full set seen is returned. +async fn poll_pending_filter_until( + client: &rpc::Client, + filter_id: &FilterID, + want: &EthHash, +) -> anyhow::Result> { + let mut seen: Vec = Vec::new(); + let mut retries = 100; + loop { + let result = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + let EthFilterResult::Hashes(hashes) = result else { + anyhow::bail!("expected hashes, got {result:?}"); + }; + seen.extend(hashes); + if seen.contains(want) { + break Ok(seen); + } + ensure!( + retries != 0, + "filter did not return {want:?} in time; saw {seen:?}" + ); + retries -= 1; tokio::time::sleep(Duration::from_millis(10)).await; } } @@ -1002,28 +1042,20 @@ fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario { .await? .context("no Eth transaction hash for CID")?; - // Observe mempool state *before* the message is mined. - wait_in_mempool(&client, cid).await?; - - let filter_result = client - .call(EthGetFilterChanges::request((filter_id.clone(),))?) - .await?; + // Observe mempool state *before* the message is mined. The + // filter collects asynchronously, so poll until tx_hash appears. + wait_pending_message(&client, cid).await?; + let hashes = poll_pending_filter_until(&client, &filter_id, &tx_hash).await?; - if let EthFilterResult::Hashes(hashes) = filter_result { - anyhow::ensure!( - prev_hashes != hashes, - "prev_hashes={prev_hashes:?} hashes={hashes:?}" - ); - - anyhow::ensure!( - hashes.contains(&tx_hash), - "transaction hash missing from filter results: tx_hash={tx_hash:?} cid={cid:?} hashes={hashes:?}" - ); - - Ok(()) - } else { - Err(anyhow::anyhow!("expecting hashes")) - } + anyhow::ensure!( + prev_hashes != hashes, + "prev_hashes={prev_hashes:?} hashes={hashes:?}" + ); + anyhow::ensure!( + hashes.contains(&tx_hash), + "transaction hash missing from filter results: tx_hash={tx_hash:?} cid={cid:?} hashes={hashes:?}" + ); + Ok(()) } else { Err(anyhow::anyhow!("expecting transactions")) }; @@ -1060,41 +1092,24 @@ fn eth_new_pending_transaction_filter_multi_poll(tx: TestTransaction) -> RpcTest .call(EthGetFilterChanges::request((filter_id.clone(),))?) .await?; - // First tx. + // First tx — poll until it shows up (collection is async). let cid_a = invoke_contract(&client, &tx).await?; let hash_a = client .call(EthGetTransactionHashByCid::request((cid_a,))?) .await? .context("no Eth transaction hash for cid_a")?; - wait_in_mempool(&client, cid_a).await?; - let poll_a = client - .call(EthGetFilterChanges::request((filter_id.clone(),))?) - .await?; - let EthFilterResult::Hashes(hashes_a) = poll_a else { - anyhow::bail!("expected hashes, got {poll_a:?}"); - }; - anyhow::ensure!( - hashes_a.contains(&hash_a), - "first poll missing tx_a: hash_a={hash_a:?} hashes={hashes_a:?}" - ); + wait_pending_message(&client, cid_a).await?; + poll_pending_filter_until(&client, &filter_id, &hash_a).await?; - // Second tx. + // Second tx — the next polls return it but not the + // already-consumed tx_a. let cid_b = invoke_contract(&client, &tx).await?; let hash_b = client .call(EthGetTransactionHashByCid::request((cid_b,))?) .await? .context("no Eth transaction hash for cid_b")?; - wait_in_mempool(&client, cid_b).await?; - let poll_b = client - .call(EthGetFilterChanges::request((filter_id.clone(),))?) - .await?; - let EthFilterResult::Hashes(hashes_b) = poll_b else { - anyhow::bail!("expected hashes, got {poll_b:?}"); - }; - anyhow::ensure!( - hashes_b.contains(&hash_b), - "second poll missing tx_b: hash_b={hash_b:?} hashes={hashes_b:?}" - ); + wait_pending_message(&client, cid_b).await?; + let hashes_b = poll_pending_filter_until(&client, &filter_id, &hash_b).await?; anyhow::ensure!( !hashes_b.contains(&hash_a), "second poll should not return previously-consumed tx_a: \ diff --git a/src/utils/task/mod.rs b/src/utils/task/mod.rs index 23141b8f311..55204d270ae 100644 --- a/src/utils/task/mod.rs +++ b/src/utils/task/mod.rs @@ -4,7 +4,7 @@ use tokio::task::AbortHandle; /// Holds a collection of [`AbortHandle`] and aborts them automatically on drop -#[derive(Default, derive_more::Deref, derive_more::DerefMut)] +#[derive(Debug, Default, derive_more::Deref, derive_more::DerefMut)] pub struct AbortHandles(Vec); impl Drop for AbortHandles { From 11b230e8676759bf60c30ac5f5a74eeb04cebb36 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 29 Jun 2026 15:21:20 +0530 Subject: [PATCH 3/5] rmeove the previous msg pool update subscriber and cleanup --- CHANGELOG.md | 2 +- src/message_pool/msgpool/events.rs | 12 +- src/message_pool/msgpool/msg_pool.rs | 27 +- src/message_pool/msgpool/pending_store.rs | 16 +- src/message_pool/msgpool/selection.rs | 2 +- src/rpc/methods/eth/filter/mempool.rs | 266 ++++++++---------- src/rpc/methods/eth/filter/mod.rs | 11 - src/rpc/methods/eth/pubsub.rs | 2 +- .../subcommands/api_cmd/stateful_tests.rs | 10 +- 9 files changed, 134 insertions(+), 214 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac994cb557e..4eb6fe0a547 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -123,7 +123,7 @@ Non-mandatory release for all node operators. It includes a few bug fixes as wel ### Fixed -- [#7109](https://github.com/ChainSafe/forest/pull/7109): `eth_newPendingTransactionFilter` now returns genuinely-pending mempool transaction hashes instead of executed on-chain events. Each filter collects pending transactions continuously from the mempool (up to `max_filter_results`), so `eth_getFilterChanges` reports the transactions added since the previous poll. +- [#7109](https://github.com/ChainSafe/forest/pull/7109): `eth_newPendingTransactionFilter` now returns actually pending mempool transaction hashes instead of executed on-chain events. - [#7018](https://github.com/ChainSafe/forest/issues/7018): Fixed `forest-wallet set-default` failing when the keystore has no `default` entry. diff --git a/src/message_pool/msgpool/events.rs b/src/message_pool/msgpool/events.rs index 16666f83621..f0f3cc9e18d 100644 --- a/src/message_pool/msgpool/events.rs +++ b/src/message_pool/msgpool/events.rs @@ -19,20 +19,18 @@ pub enum MpoolUpdate { /// Subscribe-only handle to the pending pool's [`MpoolUpdate`] broadcast bus. /// /// Hands out independent receivers via [`subscribe`](Self::subscribe), each with -/// its own cursor. The inner `Sender` is private and never leaves the message -/// pool, so holders can listen but cannot publish events — the send capability -/// stays with [`PendingStore`](super::pending_store::PendingStore). +/// its own cursor. #[derive(Clone, Debug)] pub struct MpoolSubscriber(broadcast::Sender); impl MpoolSubscriber { - pub(in crate::message_pool) fn new(events: broadcast::Sender) -> Self { + /// Wrap the pending pool's broadcast `Sender`. + pub(crate) fn new(events: broadcast::Sender) -> Self { Self(events) } - /// A detached handle with no producer behind it: its receivers never observe - /// any event. Used by standalone contexts (tests, snapshot tools, the - /// offline server) that have no live mempool attached. + /// A detached handle with no producer behind it, its receivers never observe + /// any event. pub fn dummy() -> Self { Self(broadcast::channel(1).0) } diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 2d170e49bf4..44208bf3039 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -15,11 +15,8 @@ use crate::message_pool::{ config::MpoolConfig, errors::Error, msgpool::{ - BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, - events::{MpoolSubscriber, MpoolUpdate}, - pending_store::PendingStore, - recover_sig, - republish::RepublishState, + BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolSubscriber, + pending_store::PendingStore, recover_sig, republish::RepublishState, }, provider::Provider, utils::get_base_fee_lower_bound, @@ -46,11 +43,7 @@ use nonzero_ext::nonzero; use parking_lot::RwLock as SyncRwLock; use std::num::NonZeroUsize; use std::time::Duration; -use tokio::{ - sync::broadcast::{self, error::RecvError}, - task::JoinSet, - time::interval, -}; +use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval}; use tracing::warn; /// Maximum size of a serialized message in bytes. Anti-DoS measure to keep @@ -117,7 +110,7 @@ impl ShallowClone for Caches { /// transactions. pub struct MessagePool { /// Pending messages, keyed by resolved-key address, together with the - /// broadcast channel for [`MpoolUpdate`] events. See [`PendingStore`]. + /// broadcast channel for [`MpoolUpdate`](super::events::MpoolUpdate) events. See [`PendingStore`]. pub(in crate::message_pool) pending: PendingStore, pub(in crate::message_pool) caches: Caches, /// Resolved-key senders of locally submitted messages. @@ -431,16 +424,8 @@ where ) } - /// Subscribe to [`MpoolUpdate`] events for every insertion into and - /// removal from the pending pool. - pub fn subscribe_to_updates(&self) -> broadcast::Receiver { - self.pending.subscribe() - } - - /// A subscribe-only handle to the [`MpoolUpdate`] bus that mints independent - /// receivers on demand. Unlike [`subscribe_to_updates`](Self::subscribe_to_updates), - /// the handle can be stored and re-used to open new receivers later (e.g. - /// one per installed pending-transaction filter). + /// A subscribe-only handle to the [`MpoolUpdate`](super::events::MpoolUpdate) bus, the single entry + /// point for observing insertions into and removals from the pending pool. pub fn subscriber(&self) -> MpoolSubscriber { self.pending.subscriber() } diff --git a/src/message_pool/msgpool/pending_store.rs b/src/message_pool/msgpool/pending_store.rs index a3f26739c79..9c66c721fde 100644 --- a/src/message_pool/msgpool/pending_store.rs +++ b/src/message_pool/msgpool/pending_store.rs @@ -126,12 +126,6 @@ impl PendingStore { self.inner.pending.read().get(addr).cloned() } - /// Subscribe to the [`MpoolUpdate`] stream. Returned receiver is - /// independent; dropping it does not affect other subscribers. - pub fn subscribe(&self) -> broadcast::Receiver { - self.inner.events.subscribe() - } - /// A subscribe-only handle to the [`MpoolUpdate`] bus that can mint /// independent receivers on demand without exposing the send half. pub(in crate::message_pool) fn subscriber(&self) -> MpoolSubscriber { @@ -243,7 +237,7 @@ mod tests { #[test] fn insert_emits_add_and_stores_message() { let store = PendingStore::new(TEST_LIMITS); - let mut rx = store.subscribe(); + let mut rx = store.subscriber().subscribe(); let addr = Address::new_id(1); store @@ -267,7 +261,7 @@ mod tests { #[test] fn rbf_replacement_emits_add_for_the_new_message() { let store = PendingStore::new(TEST_LIMITS); - let mut rx = store.subscribe(); + let mut rx = store.subscriber().subscribe(); let addr = Address::new_id(1); store @@ -300,7 +294,7 @@ mod tests { #[test] fn remove_emits_remove_once_then_is_idempotent() { let store = PendingStore::new(TEST_LIMITS); - let mut rx = store.subscribe(); + let mut rx = store.subscriber().subscribe(); let addr = Address::new_id(1); store @@ -328,7 +322,7 @@ mod tests { #[test] fn remove_of_unknown_sender_is_silent() { let store = PendingStore::new(TEST_LIMITS); - let mut rx = store.subscribe(); + let mut rx = store.subscriber().subscribe(); let addr = Address::new_id(42); assert!(store.remove(&addr, 0, true).is_none()); @@ -387,7 +381,7 @@ mod tests { // same pending map and the same broadcast channel. let store = PendingStore::new(TEST_LIMITS); let handle = store.shallow_clone(); - let mut rx = handle.subscribe(); + let mut rx = handle.subscriber().subscribe(); let addr = Address::new_id(7); store diff --git a/src/message_pool/msgpool/selection.rs b/src/message_pool/msgpool/selection.rs index 568b2e7856a..1e663dbe123 100644 --- a/src/message_pool/msgpool/selection.rs +++ b/src/message_pool/msgpool/selection.rs @@ -1090,7 +1090,7 @@ mod test_selection { // Subscribe AFTER the insert so we only observe events emitted by the // selection call below. - let mut rx = mpool.pending.subscribe(); + let mut rx = mpool.pending.subscriber().subscribe(); // Select against the non-current tipset. let _ = mpool.select_messages(&ts2, 1.0).unwrap(); diff --git a/src/rpc/methods/eth/filter/mempool.rs b/src/rpc/methods/eth/filter/mempool.rs index 23f6dbf88b2..be532bb3afd 100644 --- a/src/rpc/methods/eth/filter/mempool.rs +++ b/src/rpc/methods/eth/filter/mempool.rs @@ -3,7 +3,7 @@ use crate::eth::EthChainId as EthChainIdType; use crate::message_pool::{MpoolSubscriber, MpoolUpdate}; -use crate::prelude::*; +use crate::prelude::ShallowClone; use crate::rpc::Arc; use crate::rpc::eth::eth_tx_hash_from_signed_message; use crate::rpc::eth::types::EthHash; @@ -13,24 +13,14 @@ use crate::utils::task::AbortHandles; use ahash::HashMap; use anyhow::{Context, Result}; use futures::{Stream, StreamExt as _}; -use indexmap::IndexSet; use parking_lot::{Mutex, RwLock}; use std::any::Any; +use std::collections::VecDeque; use std::pin::Pin; +use std::sync::OnceLock; use tokio::sync::broadcast; -/// Stream of the eth tx hash for every [`MpoolUpdate::Add`] published on the -/// mempool bus. -/// -/// Shared by the two pending-transaction surfaces — `eth_subscribe`'s -/// `newPendingTransactions` (see [`super::super::pubsub`]) and -/// `eth_newPendingTransactionFilter` — so both derive identical hashes by -/// construction and treat the feed as purely additive. -/// -/// [`MpoolUpdate::Remove`] is ignored: a tx leaves the pool only once it is -/// mined on-chain, and — like Lotus and Forest's own pending-tx subscription — -/// neither surface retracts an already-reported pending hash. Lagged and closed -/// receivers are handled by [`subscription_stream`]. +/// Stream of the eth tx hash for every [`MpoolUpdate::Add`]; `Remove`s are skipped. pub(crate) fn pending_tx_added_hashes( rx: broadcast::Receiver, eth_chain_id: EthChainIdType, @@ -49,90 +39,35 @@ pub(crate) fn pending_tx_added_hashes( .boxed() } -/// Pending-tx hashes a [`MempoolFilter`] has accumulated since the last poll. -/// -/// Insertion-ordered and de-duplicated. Bounded at `cap` hashes (`0` = no limit, -/// the subsystem-wide convention — see [`super::ensure_filter_cap`]); on overflow -/// the oldest hash is evicted, matching Lotus's mempool filter. -#[derive(Debug)] -struct Collected { - hashes: IndexSet, - cap: usize, -} - -impl Collected { - fn new(cap: usize) -> Self { - Self { - hashes: IndexSet::new(), - cap, - } - } - - /// Record a newly-seen pending tx hash. Duplicates are ignored; on overflow - /// the oldest hash is dropped to stay within `cap`. - fn push(&mut self, hash: EthHash) { - self.hashes.insert(hash); - if self.cap != 0 && self.hashes.len() > self.cap { - self.hashes.shift_remove_index(0); - } - } - - /// Take everything collected since the previous call, leaving the set empty. - fn take(&mut self) -> Vec { - std::mem::take(&mut self.hashes).into_iter().collect() - } -} - -/// Filter backing `eth_newPendingTransactionFilter`. -/// -/// A background task drains the mempool [`MpoolUpdate`] bus continuously into -/// `collected`, mirroring Lotus's `WaitForMpoolUpdates`/`CollectMessage`. Filling -/// the buffer *between* polls (rather than reading the bounded broadcast ring at -/// poll time) is what lets a poll return up to `max_filter_results` hashes -/// instead of just what happens to still be in the ring. -/// [`drain`](Self::drain) then takes and clears whatever accumulated since the -/// previous `eth_getFilterChanges`. The feed is additive — see -/// [`pending_tx_added_hashes`]. +/// A bounded FIFO of pending-tx hashes, backing `eth_newPendingTransactionFilter`. #[derive(Debug)] pub struct MempoolFilter { id: FilterID, - collected: Arc>, - /// Aborts the background drain task when the filter is dropped on uninstall. - _drain_task: AbortHandles, + hashes: Mutex>, + cap: usize, } impl MempoolFilter { - fn new( - rx: broadcast::Receiver, - eth_chain_id: EthChainIdType, - max_results: usize, - ) -> Result, uuid::Error> { - let collected = Arc::new(Mutex::new(Collected::new(max_results))); - - // Drain the bus into `collected` continuously, so polls are bounded by - // `max_results` rather than the broadcast ring's capacity. - let mut hashes = pending_tx_added_hashes(rx, eth_chain_id); - let task = { - let collected = collected.clone(); - tokio::spawn(async move { - while let Some(hash) = hashes.next().await { - collected.lock().push(hash); - } - }) - }; - let mut drain_task = AbortHandles::default(); - drain_task.push(task.abort_handle()); - + fn new(max_results: usize) -> Result, uuid::Error> { Ok(Arc::new(Self { id: FilterID::new()?, - collected, - _drain_task: drain_task, + hashes: Mutex::new(VecDeque::new()), + cap: max_results, })) } - /// Take the pending-tx hashes collected since the previous poll. + /// Append a hash, evicting the oldest when over `cap`. + fn push(&self, hash: EthHash) { + let mut hashes = self.hashes.lock(); + hashes.push_back(hash); + if self.cap != 0 && hashes.len() > self.cap { + hashes.pop_front(); + } + } + + /// Take and clear the hashes collected since the last poll. pub fn drain(&self) -> Vec { - self.collected.lock().take() + self.hashes.lock().drain(..).collect() } } @@ -146,16 +81,23 @@ impl Filter for MempoolFilter { } } -/// Manages installed [`MempoolFilter`]s. Each `install` opens a fresh independent -/// receiver on the shared [`MpoolSubscriber`] and spawns the filter's background -/// drain task. Contexts without a real `MessagePool` (tests, snapshot tools, the -/// offline server) pass a dummy subscriber whose receivers never yield events. +/// Push `hash` into every installed filter. +fn fan_out(filters: &HashMap>, hash: EthHash) { + for filter in filters.values() { + filter.push(hash); + } +} + +/// Manages installed [`MempoolFilter`]s and the single fan-out task that drains +/// the mempool bus and pushes each pending-tx hash into every filter. #[derive(Debug)] pub struct MempoolFilterManager { - filters: RwLock>>, - max_filter_results: usize, + filters: Arc>>>, eth_chain_id: EthChainIdType, + max_filter_results: usize, subscriber: MpoolSubscriber, + /// Aborts the fan-out task when the manager is dropped. + fanout_task: OnceLock, } impl MempoolFilterManager { @@ -165,22 +107,37 @@ impl MempoolFilterManager { subscriber: MpoolSubscriber, ) -> Arc { Arc::new(Self { - filters: RwLock::new(HashMap::new()), - max_filter_results, + filters: Arc::new(RwLock::new(HashMap::default())), eth_chain_id, + max_filter_results, subscriber, + fanout_task: OnceLock::new(), }) } + + /// Lazily start the fan-out task on the first install. + fn ensure_fanout_task(&self) { + self.fanout_task.get_or_init(|| { + let filters = self.filters.shallow_clone(); + let mut hashes = + pending_tx_added_hashes(self.subscriber.subscribe(), self.eth_chain_id); + let task = tokio::spawn(async move { + while let Some(hash) = hashes.next().await { + fan_out(&filters.read(), hash); + } + }); + let mut handles = AbortHandles::default(); + handles.push(task.abort_handle()); + handles + }); + } } impl FilterManager for MempoolFilterManager { fn install(&self) -> Result> { - let filter = MempoolFilter::new( - self.subscriber.subscribe(), - self.eth_chain_id, - self.max_filter_results, - ) - .context("Failed to create a new mempool filter")?; + self.ensure_fanout_task(); + let filter = MempoolFilter::new(self.max_filter_results) + .context("Failed to create a new mempool filter")?; self.filters .write() .insert(filter.id().clone(), filter.clone()); @@ -188,7 +145,10 @@ impl FilterManager for MempoolFilterManager { } fn remove(&self, id: &FilterID) -> Option> { - self.filters.write().remove(id) + self.filters + .write() + .remove(id) + .map(|f| -> Arc { f }) } } @@ -199,7 +159,6 @@ mod tests { use crate::shim::address::Address; use crate::shim::econ::TokenAmount; use crate::shim::message::Message as ShimMessage; - use std::time::Duration; const TEST_CHAIN_ID: EthChainIdType = 314; @@ -218,62 +177,58 @@ mod tests { eth_tx_hash_from_signed_message(&make_smsg(seq), TEST_CHAIN_ID).unwrap() } - /// Poll the filter until it has yielded at least `n` hashes in total, - /// accumulating across polls. Avoids racing the background drain task, which - /// collects asynchronously after an event is published. - async fn collect_at_least(filter: &MempoolFilter, n: usize) -> Vec { - let mut all = Vec::new(); - for _ in 0..200 { - all.extend(filter.drain()); - if all.len() >= n { - return all; - } - tokio::time::sleep(Duration::from_millis(5)).await; - } - panic!("collected only {} of {n} expected hashes", all.len()); - } - - // ---- Collected: the per-filter buffer logic (pure, deterministic) ---- - #[test] - fn collected_dedups_and_preserves_insertion_order() { - let mut c = Collected::new(0); - c.push(hash_of(0)); - c.push(hash_of(1)); - c.push(hash_of(0)); // duplicate — ignored, keeps original position - assert_eq!(c.take(), vec![hash_of(0), hash_of(1)]); + fn filter_keeps_duplicates_in_order() { + // a re-added tx is a distinct event; duplicates are kept (like reth/Lotus/geth) + let f = MempoolFilter::new(0).unwrap(); + f.push(hash_of(0)); + f.push(hash_of(1)); + f.push(hash_of(0)); // same tx seen again — recorded again + assert_eq!(f.drain(), vec![hash_of(0), hash_of(1), hash_of(0)]); } #[test] - fn collected_take_clears_the_buffer() { - let mut c = Collected::new(0); - c.push(hash_of(0)); - assert_eq!(c.take(), vec![hash_of(0)]); + fn filter_drain_clears_the_buffer() { + let f = MempoolFilter::new(0).unwrap(); + f.push(hash_of(0)); + assert_eq!(f.drain(), vec![hash_of(0)]); assert!( - c.take().is_empty(), - "a second take with no new pushes is empty" + f.drain().is_empty(), + "a second drain with no new pushes is empty" ); } #[test] - fn collected_evicts_oldest_at_cap() { - let mut c = Collected::new(2); - c.push(hash_of(0)); - c.push(hash_of(1)); - c.push(hash_of(2)); // overflow — oldest (0) evicted - assert_eq!(c.take(), vec![hash_of(1), hash_of(2)]); + fn filter_evicts_oldest_at_cap() { + let f = MempoolFilter::new(2).unwrap(); + f.push(hash_of(0)); + f.push(hash_of(1)); + f.push(hash_of(2)); // overflow — oldest (0) evicted + assert_eq!(f.drain(), vec![hash_of(1), hash_of(2)]); } #[test] - fn collected_cap_zero_means_unbounded() { - let mut c = Collected::new(0); + fn filter_cap_zero_means_unbounded() { + let f = MempoolFilter::new(0).unwrap(); for seq in 0..10 { - c.push(hash_of(seq)); + f.push(hash_of(seq)); } - assert_eq!(c.take().len(), 10, "cap == 0 never evicts"); + assert_eq!(f.drain().len(), 10, "cap == 0 never evicts"); } - // ---- pending_tx_added_hashes: the shared Add-only hash stream ---- + #[test] + fn fan_out_pushes_hash_to_every_filter() { + let f1 = MempoolFilter::new(100).unwrap(); + let f2 = MempoolFilter::new(100).unwrap(); + let mut filters = HashMap::default(); + filters.insert(f1.id().clone(), f1.clone()); + filters.insert(f2.id().clone(), f2.clone()); + + fan_out(&filters, hash_of(0)); + + assert_eq!(f1.drain(), vec![hash_of(0)]); + assert_eq!(f2.drain(), vec![hash_of(0)]); + } #[tokio::test] async fn added_hashes_maps_adds_and_ignores_removes() { @@ -289,19 +244,23 @@ mod tests { assert_eq!(hashes, vec![hash_of(0), hash_of(2)]); } - // ---- MempoolFilter / manager wiring ---- - #[tokio::test] - async fn filter_collects_adds_from_its_receiver() { - let (tx, rx) = broadcast::channel::(16); - let filter = MempoolFilter::new(rx, TEST_CHAIN_ID, 100).unwrap(); + async fn dispatcher_fans_out_bus_events_to_all_filters() { + let (tx, _) = broadcast::channel::(16); + let manager = + MempoolFilterManager::new(100, TEST_CHAIN_ID, MpoolSubscriber::new(tx.clone())); + let f1 = manager.install().expect("install f1"); + let f2 = manager.install().expect("install f2"); tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); - tx.send(MpoolUpdate::Add(make_smsg(1))).unwrap(); - - let hashes = collect_at_least(&filter, 2).await; - assert!(hashes.contains(&hash_of(0))); - assert!(hashes.contains(&hash_of(1))); + // The receiver is subscribed during install, so the event is already + // buffered; one yield lets the fan-out task drain it into both filters. + tokio::task::yield_now().await; + + let f1 = f1.as_any().downcast_ref::().unwrap(); + let f2 = f2.as_any().downcast_ref::().unwrap(); + assert_eq!(f1.drain(), vec![hash_of(0)]); + assert_eq!(f2.drain(), vec![hash_of(0)]); } #[tokio::test] @@ -315,11 +274,10 @@ mod tests { #[tokio::test] async fn dummy_subscriber_yields_no_hashes() { - // A standalone handler (no live mempool) installs fine but never collects. let manager = MempoolFilterManager::new(100, TEST_CHAIN_ID, MpoolSubscriber::dummy()); let filter = manager.install().expect("install"); let filter = filter.as_any().downcast_ref::().unwrap(); - tokio::time::sleep(Duration::from_millis(20)).await; + tokio::task::yield_now().await; // let the task run; the dummy produces nothing assert!(filter.drain().is_empty()); } } diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 5a16c3d02c5..0e0b821bbbe 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -139,10 +139,6 @@ pub enum EventRevertStatus { impl EthEventHandler { pub fn new() -> Self { - // Standalone handler with no live mempool: subscribers see an empty - // stream forever. Used in tests, snapshot tools, and other contexts - // where no `MessagePool` is available. The chain id is irrelevant here - // because the dummy subscriber never yields a message to hash. Self::from_config( &EventsConfig::default(), crate::networks::mainnet::ETH_CHAIN_ID, @@ -150,9 +146,6 @@ impl EthEventHandler { ) } - /// Build a handler from `config`. Each `MempoolFilter` installed via the - /// returned handler opens its own independent receiver on `mpool_subscriber` - /// and hashes pending txs with `eth_chain_id`. pub fn from_config( config: &EventsConfig, eth_chain_id: EthChainId, @@ -1239,8 +1232,6 @@ mod tests { assert!(result.is_ok(), "Expected successful block filter creation"); } - // `async`: installing a pending-tx filter spawns its background drain task, - // which requires a Tokio runtime. #[tokio::test] async fn test_eth_new_pending_transaction_filter() { let eth_event_handler = EthEventHandler::new(); @@ -1252,8 +1243,6 @@ mod tests { ); } - // `async`: uninstalling exercises a pending-tx filter, whose install spawns - // a background task that requires a Tokio runtime. #[tokio::test] async fn test_eth_uninstall_filter() { let event_handler = EthEventHandler::new(); diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index e882393796c..d7cdc29f347 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -239,7 +239,7 @@ fn log_matches(spec: &EthFilterSpec, log: &EthLog) -> bool { fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc) { let stream = pending_tx_added_hashes( - ctx.mpool.subscribe_to_updates(), + ctx.mpool.subscriber().subscribe(), ctx.chain_config().eth_chain_id, ); tokio::spawn(pipe_stream_to_sink(stream, sink)); diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 6a3af3217b5..fa419f7ff22 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -336,8 +336,8 @@ async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow: /// Poll `eth_getFilterChanges` until the hashes seen so far contain `want`. /// -/// The mempool filter collects pending `txs` on a background task, so a single -/// poll right after a tx becomes visible in the mempool can race the collector. +/// The filter collects pending txs on a background task, so a poll right after a +/// tx appears in the mempool may run before the collector has processed it. /// Each poll consumes the filter's buffer, so hashes are accumulated across /// polls and the full set seen is returned. async fn poll_pending_filter_until( @@ -1033,16 +1033,12 @@ fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario { let result = if let EthFilterResult::Hashes(prev_hashes) = filter_result { let cid = invoke_contract(&client, &tx).await?; - - // Get the Eth transaction hash for our CID directly, rather than - // reverse-mapping every hash from the filter results back to CIDs - // (which is fragile — the mapping can return None for recent txns). let tx_hash = client .call(EthGetTransactionHashByCid::request((cid,))?) .await? .context("no Eth transaction hash for CID")?; - // Observe mempool state *before* the message is mined. The + // Observe the mempool state before the message is mined. The // filter collects asynchronously, so poll until tx_hash appears. wait_pending_message(&client, cid).await?; let hashes = poll_pending_filter_until(&client, &filter_id, &tx_hash).await?; From 45f7c4b1cc4dc6ed3bba0921107ab859964e7538 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Mon, 29 Jun 2026 16:39:18 +0530 Subject: [PATCH 4/5] address comments --- src/message_pool/msgpool/events.rs | 3 +- src/rpc/methods/eth/filter/mod.rs | 1 + .../api_cmd/generate_test_snapshot.rs | 7 +++- .../subcommands/api_cmd/stateful_tests.rs | 34 +++++++++++-------- src/tool/subcommands/api_cmd/test_snapshot.rs | 7 +++- 5 files changed, 35 insertions(+), 17 deletions(-) diff --git a/src/message_pool/msgpool/events.rs b/src/message_pool/msgpool/events.rs index f0f3cc9e18d..9c8e4bf5e27 100644 --- a/src/message_pool/msgpool/events.rs +++ b/src/message_pool/msgpool/events.rs @@ -31,7 +31,8 @@ impl MpoolSubscriber { /// A detached handle with no producer behind it, its receivers never observe /// any event. - pub fn dummy() -> Self { + #[cfg(test)] + pub(crate) fn dummy() -> Self { Self(broadcast::channel(1).0) } diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 0e0b821bbbe..9a5c1097635 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -138,6 +138,7 @@ pub enum EventRevertStatus { } impl EthEventHandler { + #[cfg(test)] pub fn new() -> Self { Self::from_config( &EventsConfig::default(), diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 43395b6f6bc..2a55a1ae95a 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -137,13 +137,18 @@ async fn ctx( SyncNetworkContext::new(network_send, peer_manager, state_manager.db_owned()); let (shutdown, shutdown_recv) = mpsc::channel(1); let nonce_tracker = NonceTracker::new(); + let eth_event_handler = Arc::new(EthEventHandler::from_config( + &crate::cli_shared::cli::EventsConfig::default(), + state_manager.chain_config().eth_chain_id, + message_pool.subscriber(), + )); let rpc_state = Arc::new(RPCState { state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: message_pool, bad_blocks: Default::default(), sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::new()), + eth_event_handler, eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index fa419f7ff22..425c51e25c8 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -310,33 +310,39 @@ async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> { unreachable!("loop always returns within the branches above") } -async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { - let tipset = client.call(ChainHead::request(())?).await?; +/// Poll `MpoolPending` until `message_cid` is visible. Returns while the message +/// is still pending; it does not wait for on-chain inclusion. +async fn wait_in_mempool(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { let mut retries = 100; loop { let pending = client .call(MpoolPending::request((ApiTipsetKey(None),))?) .await?; - if pending.0.iter().any(|msg| msg.cid() == message_cid) { - client - .call( - StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))? - .with_timeout(Duration::from_secs(300)), - ) - .await?; break Ok(()); } ensure!(retries != 0, "Message not found in mpool"); retries -= 1; - tokio::time::sleep(Duration::from_millis(10)).await; } } +/// Wait for `message_cid` to appear in the mempool and then be included on chain. +async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { + let tipset = client.call(ChainHead::request(())?).await?; + wait_in_mempool(client, message_cid).await?; + client + .call( + StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))? + .with_timeout(Duration::from_secs(300)), + ) + .await?; + Ok(()) +} + /// Poll `eth_getFilterChanges` until the hashes seen so far contain `want`. /// -/// The filter collects pending txs on a background task, so a poll right after a +/// The filter collects pending `txs` on a background task, so a poll right after a /// tx appears in the mempool may run before the collector has processed it. /// Each poll consumes the filter's buffer, so hashes are accumulated across /// polls and the full set seen is returned. @@ -1040,7 +1046,7 @@ fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario { // Observe the mempool state before the message is mined. The // filter collects asynchronously, so poll until tx_hash appears. - wait_pending_message(&client, cid).await?; + wait_in_mempool(&client, cid).await?; let hashes = poll_pending_filter_until(&client, &filter_id, &tx_hash).await?; anyhow::ensure!( @@ -1094,7 +1100,7 @@ fn eth_new_pending_transaction_filter_multi_poll(tx: TestTransaction) -> RpcTest .call(EthGetTransactionHashByCid::request((cid_a,))?) .await? .context("no Eth transaction hash for cid_a")?; - wait_pending_message(&client, cid_a).await?; + wait_in_mempool(&client, cid_a).await?; poll_pending_filter_until(&client, &filter_id, &hash_a).await?; // Second tx — the next polls return it but not the @@ -1104,7 +1110,7 @@ fn eth_new_pending_transaction_filter_multi_poll(tx: TestTransaction) -> RpcTest .call(EthGetTransactionHashByCid::request((cid_b,))?) .await? .context("no Eth transaction hash for cid_b")?; - wait_pending_message(&client, cid_b).await?; + wait_in_mempool(&client, cid_b).await?; let hashes_b = poll_pending_filter_until(&client, &filter_id, &hash_b).await?; anyhow::ensure!( !hashes_b.contains(&hash_a), diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 7607a50893d..b02307c2cbf 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -166,13 +166,18 @@ async fn ctx( SyncNetworkContext::new(network_send, peer_manager, state_manager.db_owned()); let (shutdown, shutdown_recv) = mpsc::channel(1); let nonce_tracker = NonceTracker::new(); + let eth_event_handler = Arc::new(EthEventHandler::from_config( + &crate::cli_shared::cli::EventsConfig::default(), + state_manager.chain_config().eth_chain_id, + message_pool.subscriber(), + )); let rpc_state = Arc::new(RPCState { state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: message_pool, bad_blocks: Default::default(), sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::new()), + eth_event_handler, eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), From 4a9c911f0369e973cb3fb54980cf0fce6ab85d76 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 2 Jul 2026 15:20:27 +0530 Subject: [PATCH 5/5] address comments and fix merge issue --- src/message_pool/msgpool/events.rs | 2 +- src/rpc/methods/eth/filter/mempool.rs | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/message_pool/msgpool/events.rs b/src/message_pool/msgpool/events.rs index 9c8e4bf5e27..90633d53dd6 100644 --- a/src/message_pool/msgpool/events.rs +++ b/src/message_pool/msgpool/events.rs @@ -20,7 +20,7 @@ pub enum MpoolUpdate { /// /// Hands out independent receivers via [`subscribe`](Self::subscribe), each with /// its own cursor. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct MpoolSubscriber(broadcast::Sender); impl MpoolSubscriber { diff --git a/src/rpc/methods/eth/filter/mempool.rs b/src/rpc/methods/eth/filter/mempool.rs index be532bb3afd..44edaf85e8c 100644 --- a/src/rpc/methods/eth/filter/mempool.rs +++ b/src/rpc/methods/eth/filter/mempool.rs @@ -9,7 +9,6 @@ use crate::rpc::eth::eth_tx_hash_from_signed_message; use crate::rpc::eth::types::EthHash; use crate::rpc::eth::{FilterID, filter::Filter, filter::FilterManager}; use crate::utils::broadcast::subscription_stream; -use crate::utils::task::AbortHandles; use ahash::HashMap; use anyhow::{Context, Result}; use futures::{Stream, StreamExt as _}; @@ -19,6 +18,7 @@ use std::collections::VecDeque; use std::pin::Pin; use std::sync::OnceLock; use tokio::sync::broadcast; +use tokio_util::task::AbortOnDropHandle; /// Stream of the eth tx hash for every [`MpoolUpdate::Add`]; `Remove`s are skipped. pub(crate) fn pending_tx_added_hashes( @@ -97,7 +97,7 @@ pub struct MempoolFilterManager { max_filter_results: usize, subscriber: MpoolSubscriber, /// Aborts the fan-out task when the manager is dropped. - fanout_task: OnceLock, + fanout_task: OnceLock>, } impl MempoolFilterManager { @@ -121,14 +121,11 @@ impl MempoolFilterManager { let filters = self.filters.shallow_clone(); let mut hashes = pending_tx_added_hashes(self.subscriber.subscribe(), self.eth_chain_id); - let task = tokio::spawn(async move { + AbortOnDropHandle::new(tokio::spawn(async move { while let Some(hash) = hashes.next().await { fan_out(&filters.read(), hash); } - }); - let mut handles = AbortHandles::default(); - handles.push(task.abort_handle()); - handles + })) }); } }