diff --git a/CHANGELOG.md b/CHANGELOG.md index e05b32089e4d..46f21c6cda37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -133,6 +133,8 @@ Non-mandatory release for all node operators. It includes a few bug fixes as wel ### Fixed +- [#7109](https://github.com/ChainSafe/forest/pull/7109): `eth_newPendingTransactionFilter` now returns actually pending mempool transaction hashes instead of executed on-chain events. + - [#7018](https://github.com/ChainSafe/forest/issues/7018): Fixed `forest-wallet set-default` failing when the keystore has no `default` entry. - [#6941](https://github.com/ChainSafe/forest/pull/6941): The `eth_subscribe` `logs` subscription now emits one log object per notification instead of one array of logs per tipset. diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 273ebb4175f2..60045846cd30 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -572,7 +572,11 @@ fn maybe_start_rpc_service( .map(|path| crate::rpc::FilterList::new_from_file(path).map(Arc::new)) .transpose()?; info!("JSON-RPC endpoint will listen at {rpc_address}"); - let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events)); + let eth_event_handler = Arc::new(EthEventHandler::from_config( + &config.events, + ctx.chain_config().eth_chain_id, + mpool.subscriber(), + )); if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") { warn!( "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments" diff --git a/src/message_pool/msgpool/events.rs b/src/message_pool/msgpool/events.rs index 8c44d61d2a39..90633d53dd64 100644 --- a/src/message_pool/msgpool/events.rs +++ b/src/message_pool/msgpool/events.rs @@ -4,6 +4,7 @@ //! Event types published by the pending pool. use crate::message::SignedMessage; +use tokio::sync::broadcast; pub(in crate::message_pool) const MPOOL_UPDATE_CHANNEL_CAPACITY: usize = 256; @@ -14,3 +15,30 @@ pub enum MpoolUpdate { #[allow(dead_code)] Remove(SignedMessage), } + +/// Subscribe-only handle to the pending pool's [`MpoolUpdate`] broadcast bus. +/// +/// Hands out independent receivers via [`subscribe`](Self::subscribe), each with +/// its own cursor. +#[derive(Debug)] +pub struct MpoolSubscriber(broadcast::Sender); + +impl MpoolSubscriber { + /// Wrap the pending pool's broadcast `Sender`. + pub(crate) fn new(events: broadcast::Sender) -> Self { + Self(events) + } + + /// A detached handle with no producer behind it, its receivers never observe + /// any event. + #[cfg(test)] + pub(crate) fn dummy() -> Self { + Self(broadcast::channel(1).0) + } + + /// Open a fresh receiver that observes every [`MpoolUpdate`] published from + /// this point on. + pub(crate) fn subscribe(&self) -> broadcast::Receiver { + self.0.subscribe() + } +} diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 9202dd34e211..9aa6f75f3a2c 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -14,9 +14,7 @@ pub mod selection; pub mod test_provider; pub(in crate::message_pool) mod utils; -// TODO: This will be used in https://github.com/ChainSafe/forest/pull/6941 -#[allow(unused_imports)] -pub use events::MpoolUpdate; +pub use events::{MpoolSubscriber, MpoolUpdate}; pub(in crate::message_pool) use utils::recover_sig; diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index c5c3ab636cb7..44208bf30392 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -15,8 +15,8 @@ use crate::message_pool::{ config::MpoolConfig, errors::Error, msgpool::{ - BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, pending_store::PendingStore, - recover_sig, republish::RepublishState, + BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolSubscriber, + pending_store::PendingStore, recover_sig, republish::RepublishState, }, provider::Provider, utils::get_base_fee_lower_bound, @@ -43,11 +43,7 @@ use nonzero_ext::nonzero; use parking_lot::RwLock as SyncRwLock; use std::num::NonZeroUsize; use std::time::Duration; -use tokio::{ - sync::broadcast::{self, error::RecvError}, - task::JoinSet, - time::interval, -}; +use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval}; use tracing::warn; /// Maximum size of a serialized message in bytes. Anti-DoS measure to keep @@ -114,7 +110,7 @@ impl ShallowClone for Caches { /// transactions. pub struct MessagePool { /// Pending messages, keyed by resolved-key address, together with the - /// broadcast channel for [`MpoolUpdate`] events. See [`PendingStore`]. + /// broadcast channel for [`MpoolUpdate`](super::events::MpoolUpdate) events. See [`PendingStore`]. pub(in crate::message_pool) pending: PendingStore, pub(in crate::message_pool) caches: Caches, /// Resolved-key senders of locally submitted messages. @@ -428,10 +424,10 @@ where ) } - /// Subscribe to [`MpoolUpdate`] events for every insertion into and - /// removal from the pending pool. - pub fn subscribe_to_updates(&self) -> broadcast::Receiver { - self.pending.subscribe() + /// A subscribe-only handle to the [`MpoolUpdate`](super::events::MpoolUpdate) bus, the single entry + /// point for observing insertions into and removals from the pending pool. + pub fn subscriber(&self) -> MpoolSubscriber { + self.pending.subscriber() } /// Return Vector of signed messages given a block header for self. diff --git a/src/message_pool/msgpool/pending_store.rs b/src/message_pool/msgpool/pending_store.rs index b04e2f393c93..9c66c721fde3 100644 --- a/src/message_pool/msgpool/pending_store.rs +++ b/src/message_pool/msgpool/pending_store.rs @@ -9,7 +9,9 @@ use tokio::sync::broadcast; use crate::message::SignedMessage; use crate::message_pool::errors::Error; -use crate::message_pool::msgpool::events::{MPOOL_UPDATE_CHANNEL_CAPACITY, MpoolUpdate}; +use crate::message_pool::msgpool::events::{ + MPOOL_UPDATE_CHANNEL_CAPACITY, MpoolSubscriber, MpoolUpdate, +}; use crate::message_pool::msgpool::msg_pool::TrustPolicy; use crate::message_pool::msgpool::msg_set::{MsgSet, MsgSetLimits, StrictnessPolicy}; use crate::prelude::*; @@ -124,10 +126,10 @@ impl PendingStore { self.inner.pending.read().get(addr).cloned() } - /// Subscribe to the [`MpoolUpdate`] stream. Returned receiver is - /// independent; dropping it does not affect other subscribers. - pub fn subscribe(&self) -> broadcast::Receiver { - self.inner.events.subscribe() + /// A subscribe-only handle to the [`MpoolUpdate`] bus that can mint + /// independent receivers on demand without exposing the send half. + pub(in crate::message_pool) fn subscriber(&self) -> MpoolSubscriber { + MpoolSubscriber::new(self.inner.events.clone()) } } @@ -235,7 +237,7 @@ mod tests { #[test] fn insert_emits_add_and_stores_message() { let store = PendingStore::new(TEST_LIMITS); - let mut rx = store.subscribe(); + let mut rx = store.subscriber().subscribe(); let addr = Address::new_id(1); store @@ -259,7 +261,7 @@ mod tests { #[test] fn rbf_replacement_emits_add_for_the_new_message() { let store = PendingStore::new(TEST_LIMITS); - let mut rx = store.subscribe(); + let mut rx = store.subscriber().subscribe(); let addr = Address::new_id(1); store @@ -292,7 +294,7 @@ mod tests { #[test] fn remove_emits_remove_once_then_is_idempotent() { let store = PendingStore::new(TEST_LIMITS); - let mut rx = store.subscribe(); + let mut rx = store.subscriber().subscribe(); let addr = Address::new_id(1); store @@ -320,7 +322,7 @@ mod tests { #[test] fn remove_of_unknown_sender_is_silent() { let store = PendingStore::new(TEST_LIMITS); - let mut rx = store.subscribe(); + let mut rx = store.subscriber().subscribe(); let addr = Address::new_id(42); assert!(store.remove(&addr, 0, true).is_none()); @@ -379,7 +381,7 @@ mod tests { // same pending map and the same broadcast channel. let store = PendingStore::new(TEST_LIMITS); let handle = store.shallow_clone(); - let mut rx = handle.subscribe(); + let mut rx = handle.subscriber().subscribe(); let addr = Address::new_id(7); store diff --git a/src/message_pool/msgpool/selection.rs b/src/message_pool/msgpool/selection.rs index 568b2e7856ad..1e663dbe1239 100644 --- a/src/message_pool/msgpool/selection.rs +++ b/src/message_pool/msgpool/selection.rs @@ -1090,7 +1090,7 @@ mod test_selection { // Subscribe AFTER the insert so we only observe events emitted by the // selection call below. - let mut rx = mpool.pending.subscribe(); + let mut rx = mpool.pending.subscriber().subscribe(); // Select against the non-current tipset. let _ = mpool.select_messages(&ts2, 1.0).unwrap(); diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index fe1532b721c6..5eb0a0472215 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -2701,15 +2701,7 @@ impl EthGetTransactionHashByCid { if let Ok(smsgs) = smsgs_result && let Some(smsg) = smsgs.first() { - let hash = if smsg.is_delegated() { - let (_, tx) = eth_tx_from_signed_eth_message(smsg, eth_chain_id)?; - tx.eth_hash()?.into() - } else if smsg.is_secp256k1() { - smsg.cid().into() - } else { - smsg.message().cid().into() - }; - return Ok(Some(hash)); + return Ok(Some(eth_tx_hash_from_signed_message(smsg, eth_chain_id)?)); } let msg_result = crate::chain::get_chain_message(db, &cid); @@ -3271,29 +3263,6 @@ fn eth_filter_logs_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result anyhow::Result> { - events - .iter() - .filter_map(|event| { - match eth_tx_hash_from_message_cid( - ctx.db(), - &event.msg_cid, - ctx.state_manager.chain_config().eth_chain_id, - ) { - Ok(Some(hash)) => Some(Ok(hash)), - Ok(None) => { - tracing::warn!("Ignoring event"); - None - } - Err(err) => Some(Err(err)), - } - }) - .collect() -} - fn eth_filter_logs_from_events( ctx: &Ctx, events: &[CollectedEvent], @@ -3374,15 +3343,6 @@ fn eth_filter_result_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result anyhow::Result { - Ok(EthFilterResult::Hashes(eth_filter_logs_from_messages( - ctx, events, - )?)) -} - pub enum EthGetLogs {} impl RpcMethod<1> for EthGetLogs { const NAME: &'static str = "Filecoin.EthGetLogs"; @@ -3553,36 +3513,7 @@ impl RpcMethod<1> for EthGetFilterChanges { return Ok(eth_filter_result_from_tipsets(&events)?); } if let Some(mempool_filter) = filter.as_any().downcast_ref::() { - let events = ctx - .eth_event_handler - .get_events_for_parsed_filter( - &ctx, - &Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( - // heaviest tipset doesn't have events because its messages haven't been executed yet - RangeInclusive::new( - mempool_filter - .collected - .unwrap_or(ctx.chain_store().heaviest_tipset().epoch() - 1), - // Use -1 to indicate that the range extends until the latest available tipset. - -1, - ), - ))), - SkipEvent::OnUnresolvedAddress, - ) - .await?; - let new_collected = events - .iter() - .max_by_key(|event| event.height) - .map(|e| e.height); - if let Some(height) = new_collected { - let filter = Arc::new(MempoolFilter { - id: mempool_filter.id.clone(), - max_results: mempool_filter.max_results, - collected: Some(height), - }); - store.update(filter); - } - return Ok(eth_filter_result_from_messages(&ctx, &events)?); + return Ok(EthFilterResult::Hashes(mempool_filter.drain())); } } Err(anyhow::anyhow!("method not supported").into()) diff --git a/src/rpc/methods/eth/filter/mempool.rs b/src/rpc/methods/eth/filter/mempool.rs index 885979814432..44edaf85e8c1 100644 --- a/src/rpc/methods/eth/filter/mempool.rs +++ b/src/rpc/methods/eth/filter/mempool.rs @@ -1,36 +1,74 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::prelude::*; +use crate::eth::EthChainId as EthChainIdType; +use crate::message_pool::{MpoolSubscriber, MpoolUpdate}; +use crate::prelude::ShallowClone; +use crate::rpc::Arc; +use crate::rpc::eth::eth_tx_hash_from_signed_message; +use crate::rpc::eth::types::EthHash; use crate::rpc::eth::{FilterID, filter::Filter, filter::FilterManager}; -use crate::shim::fvm_shared_latest::clock::ChainEpoch; +use crate::utils::broadcast::subscription_stream; use ahash::HashMap; -use anyhow::Result; -use parking_lot::RwLock; +use anyhow::{Context, Result}; +use futures::{Stream, StreamExt as _}; +use parking_lot::{Mutex, RwLock}; use std::any::Any; +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::OnceLock; +use tokio::sync::broadcast; +use tokio_util::task::AbortOnDropHandle; -/// Data structure for filtering and collecting pending transactions -/// from the mempool before they are confirmed in a block. -#[allow(dead_code)] -#[derive(Debug, PartialEq)] +/// Stream of the eth tx hash for every [`MpoolUpdate::Add`]; `Remove`s are skipped. +pub(crate) fn pending_tx_added_hashes( + rx: broadcast::Receiver, + eth_chain_id: EthChainIdType, +) -> Pin + Send>> { + subscription_stream(rx) + .filter_map(move |update| async move { + let MpoolUpdate::Add(msg) = update else { + return None; + }; + eth_tx_hash_from_signed_message(&msg, eth_chain_id) + .inspect_err(|e| { + tracing::error!("Failed to compute eth tx hash from mpool message: {e:#}") + }) + .ok() + }) + .boxed() +} + +/// A bounded FIFO of pending-tx hashes, backing `eth_newPendingTransactionFilter`. +#[derive(Debug)] pub struct MempoolFilter { - // Unique id used to identify the filter - pub id: FilterID, - // Maximum number of results to collect - pub max_results: usize, - // Epoch at which the results were collected - pub collected: Option, + id: FilterID, + hashes: Mutex>, + cap: usize, } impl MempoolFilter { - pub fn new(max_results: usize) -> Result, uuid::Error> { - let id = FilterID::new()?; + fn new(max_results: usize) -> Result, uuid::Error> { Ok(Arc::new(Self { - id, - max_results, - collected: None, + id: FilterID::new()?, + hashes: Mutex::new(VecDeque::new()), + cap: max_results, })) } + + /// Append a hash, evicting the oldest when over `cap`. + fn push(&self, hash: EthHash) { + let mut hashes = self.hashes.lock(); + hashes.push_back(hash); + if self.cap != 0 && hashes.len() > self.cap { + hashes.pop_front(); + } + } + + /// Take and clear the hashes collected since the last poll. + pub fn drain(&self) -> Vec { + self.hashes.lock().drain(..).collect() + } } impl Filter for MempoolFilter { @@ -43,76 +81,200 @@ impl Filter for MempoolFilter { } } -/// `MempoolFilterManager` uses a `RwLock` to handle concurrent access to a collection of `MempoolFilter` -/// instances, each identified by a `FilterID`. The number of results returned by the filters is capped by `max_filter_results`. +/// Push `hash` into every installed filter. +fn fan_out(filters: &HashMap>, hash: EthHash) { + for filter in filters.values() { + filter.push(hash); + } +} + +/// Manages installed [`MempoolFilter`]s and the single fan-out task that drains +/// the mempool bus and pushes each pending-tx hash into every filter. #[derive(Debug)] pub struct MempoolFilterManager { - filters: RwLock>>, + filters: Arc>>>, + eth_chain_id: EthChainIdType, max_filter_results: usize, + subscriber: MpoolSubscriber, + /// Aborts the fan-out task when the manager is dropped. + fanout_task: OnceLock>, } impl MempoolFilterManager { - pub fn new(max_filter_results: usize) -> Arc { + pub fn new( + max_filter_results: usize, + eth_chain_id: EthChainIdType, + subscriber: MpoolSubscriber, + ) -> Arc { Arc::new(Self { - filters: RwLock::new(HashMap::new()), + filters: Arc::new(RwLock::new(HashMap::default())), + eth_chain_id, max_filter_results, + subscriber, + fanout_task: OnceLock::new(), }) } + + /// Lazily start the fan-out task on the first install. + fn ensure_fanout_task(&self) { + self.fanout_task.get_or_init(|| { + let filters = self.filters.shallow_clone(); + let mut hashes = + pending_tx_added_hashes(self.subscriber.subscribe(), self.eth_chain_id); + AbortOnDropHandle::new(tokio::spawn(async move { + while let Some(hash) = hashes.next().await { + fan_out(&filters.read(), hash); + } + })) + }); + } } impl FilterManager for MempoolFilterManager { fn install(&self) -> Result> { + self.ensure_fanout_task(); let filter = MempoolFilter::new(self.max_filter_results) .context("Failed to create a new mempool filter")?; - let id = filter.id().clone(); - - self.filters.write().insert(id, filter.clone()); - + self.filters + .write() + .insert(filter.id().clone(), filter.clone()); Ok(filter) } fn remove(&self, id: &FilterID) -> Option> { - let mut filters = self.filters.write(); - filters.remove(id) + self.filters + .write() + .remove(id) + .map(|f| -> Arc { f }) } } #[cfg(test)] mod tests { use super::*; + use crate::message::SignedMessage; + use crate::shim::address::Address; + use crate::shim::econ::TokenAmount; + use crate::shim::message::Message as ShimMessage; + + const TEST_CHAIN_ID: EthChainIdType = 314; + + fn make_smsg(seq: u64) -> SignedMessage { + SignedMessage::mock_bls_signed_message(ShimMessage { + from: Address::new_id(1), + to: Address::new_id(2), + sequence: seq, + gas_premium: TokenAmount::from_atto(100u64), + gas_limit: 1_000_000, + ..ShimMessage::default() + }) + } + + fn hash_of(seq: u64) -> EthHash { + eth_tx_hash_from_signed_message(&make_smsg(seq), TEST_CHAIN_ID).unwrap() + } #[test] - fn test_mempool_filter() { - // Test case 1: Create a mempool filter - let max_results = 10; - let filter = MempoolFilter::new(max_results).expect("Failed to create mempool filter"); - assert_eq!(filter.max_results, max_results); - - // Test case 2: Create a mempool filter manager and install the mempool filter - let mempool_manager = MempoolFilterManager::new(max_results); - let installed_filter = mempool_manager - .install() - .expect("Failed to install mempool filter"); - - // Verify that the filter has been added to the mempool manager - { - let filters = mempool_manager.filters.read(); - assert!(filters.contains_key(installed_filter.id())); - } + fn filter_keeps_duplicates_in_order() { + // a re-added tx is a distinct event; duplicates are kept (like reth/Lotus/geth) + let f = MempoolFilter::new(0).unwrap(); + f.push(hash_of(0)); + f.push(hash_of(1)); + f.push(hash_of(0)); // same tx seen again — recorded again + assert_eq!(f.drain(), vec![hash_of(0), hash_of(1), hash_of(0)]); + } - // Test case 3: Remove the installed mempool filter - let filter_id = installed_filter.id().clone(); - let removed = mempool_manager.remove(&filter_id); - assert_eq!( - removed.map(|f| f.id().clone()), - Some(installed_filter.id().clone()), - "Filter should be successfully removed" + #[test] + fn filter_drain_clears_the_buffer() { + let f = MempoolFilter::new(0).unwrap(); + f.push(hash_of(0)); + assert_eq!(f.drain(), vec![hash_of(0)]); + assert!( + f.drain().is_empty(), + "a second drain with no new pushes is empty" ); + } + + #[test] + fn filter_evicts_oldest_at_cap() { + let f = MempoolFilter::new(2).unwrap(); + f.push(hash_of(0)); + f.push(hash_of(1)); + f.push(hash_of(2)); // overflow — oldest (0) evicted + assert_eq!(f.drain(), vec![hash_of(1), hash_of(2)]); + } - // Verify that the filter is no longer in the mempool manager - { - let filters = mempool_manager.filters.read(); - assert!(!filters.contains_key(&filter_id)); + #[test] + fn filter_cap_zero_means_unbounded() { + let f = MempoolFilter::new(0).unwrap(); + for seq in 0..10 { + f.push(hash_of(seq)); } + assert_eq!(f.drain().len(), 10, "cap == 0 never evicts"); + } + + #[test] + fn fan_out_pushes_hash_to_every_filter() { + let f1 = MempoolFilter::new(100).unwrap(); + let f2 = MempoolFilter::new(100).unwrap(); + let mut filters = HashMap::default(); + filters.insert(f1.id().clone(), f1.clone()); + filters.insert(f2.id().clone(), f2.clone()); + + fan_out(&filters, hash_of(0)); + + assert_eq!(f1.drain(), vec![hash_of(0)]); + assert_eq!(f2.drain(), vec![hash_of(0)]); + } + + #[tokio::test] + async fn added_hashes_maps_adds_and_ignores_removes() { + let (tx, rx) = broadcast::channel::(16); + let stream = pending_tx_added_hashes(rx, TEST_CHAIN_ID); + + tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Remove(make_smsg(1))).unwrap(); // ignored + tx.send(MpoolUpdate::Add(make_smsg(2))).unwrap(); + drop(tx); // close the channel so the stream terminates + + let hashes: Vec = stream.collect().await; + assert_eq!(hashes, vec![hash_of(0), hash_of(2)]); + } + + #[tokio::test] + async fn dispatcher_fans_out_bus_events_to_all_filters() { + let (tx, _) = broadcast::channel::(16); + let manager = + MempoolFilterManager::new(100, TEST_CHAIN_ID, MpoolSubscriber::new(tx.clone())); + let f1 = manager.install().expect("install f1"); + let f2 = manager.install().expect("install f2"); + + tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); + // The receiver is subscribed during install, so the event is already + // buffered; one yield lets the fan-out task drain it into both filters. + tokio::task::yield_now().await; + + let f1 = f1.as_any().downcast_ref::().unwrap(); + let f2 = f2.as_any().downcast_ref::().unwrap(); + assert_eq!(f1.drain(), vec![hash_of(0)]); + assert_eq!(f2.drain(), vec![hash_of(0)]); + } + + #[tokio::test] + async fn manager_installs_and_removes_filters() { + let manager = MempoolFilterManager::new(100, TEST_CHAIN_ID, MpoolSubscriber::dummy()); + let filter = manager.install().expect("install"); + let id = filter.id().clone(); + assert!(manager.remove(&id).is_some()); + assert!(manager.remove(&id).is_none(), "second remove finds nothing"); + } + + #[tokio::test] + async fn dummy_subscriber_yields_no_hashes() { + let manager = MempoolFilterManager::new(100, TEST_CHAIN_ID, MpoolSubscriber::dummy()); + let filter = manager.install().expect("install"); + let filter = filter.as_any().downcast_ref::().unwrap(); + tokio::task::yield_now().await; // let the task run; the dummy produces nothing + assert!(filter.drain().is_empty()); } } diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 3943e1a89ced..cb35708aa349 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -28,6 +28,8 @@ use crate::blocks::Tipset; use crate::blocks::TipsetKey; use crate::chain::index::ResolveNullTipset; use crate::cli_shared::cli::EventsConfig; +use crate::eth::EthChainId; +use crate::message_pool::MpoolSubscriber; use crate::prelude::*; use crate::rpc::eth::errors::EthErrors; use crate::rpc::eth::filter::event::*; @@ -136,12 +138,20 @@ pub enum EventRevertStatus { } impl EthEventHandler { + #[cfg(test)] pub fn new() -> Self { - let config = EventsConfig::default(); - Self::from_config(&config) + Self::from_config( + &EventsConfig::default(), + crate::networks::mainnet::ETH_CHAIN_ID, + MpoolSubscriber::dummy(), + ) } - pub fn from_config(config: &EventsConfig) -> Self { + pub fn from_config( + config: &EventsConfig, + eth_chain_id: EthChainId, + mpool_subscriber: MpoolSubscriber, + ) -> Self { let max_filters: usize = env_or_default("FOREST_MAX_FILTERS", 100); let max_filter_results = std::env::var("FOREST_MAX_FILTER_RESULTS") .ok() @@ -167,7 +177,11 @@ impl EthEventHandler { Some(MemFilterStore::new(max_filters) as Arc); let event_filter_manager = Some(EventFilterManager::new(max_filter_results)); let tipset_filter_manager = Some(TipSetFilterManager::new(max_filter_results)); - let mempool_filter_manager = Some(MempoolFilterManager::new(max_filter_results)); + let mempool_filter_manager = Some(MempoolFilterManager::new( + max_filter_results, + eth_chain_id, + mpool_subscriber, + )); Self { filter_store, @@ -1217,8 +1231,8 @@ mod tests { assert!(result.is_ok(), "Expected successful block filter creation"); } - #[test] - fn test_eth_new_pending_transaction_filter() { + #[tokio::test] + async fn test_eth_new_pending_transaction_filter() { let eth_event_handler = EthEventHandler::new(); let result = eth_event_handler.eth_new_pending_transaction_filter(); @@ -1228,8 +1242,8 @@ mod tests { ); } - #[test] - fn test_eth_uninstall_filter() { + #[tokio::test] + async fn test_eth_uninstall_filter() { let event_handler = EthEventHandler::new(); let mut filter_ids = Vec::new(); let filter_spec = EthFilterSpec { diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 37d505da6726..d7cdc29f3470 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -60,15 +60,12 @@ //! use crate::blocks::Tipset; -use crate::message_pool::MpoolUpdate; use crate::prelude::ShallowClone; use crate::rpc::RPCState; +use crate::rpc::eth::filter::mempool::pending_tx_added_hashes; use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams}; use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec, EthHashList, EthTopicSpec}; -use crate::rpc::eth::{ - Block as EthBlock, EthHash, EthLog, TxInfo, eth_logs_for_head_change, - eth_tx_hash_from_signed_message, -}; +use crate::rpc::eth::{Block as EthBlock, EthHash, EthLog, TxInfo, eth_logs_for_head_change}; use crate::utils::broadcast::subscription_stream; use futures::{Stream, StreamExt as _}; use jsonrpsee::core::SubscriptionResult; @@ -241,20 +238,10 @@ fn log_matches(spec: &EthFilterSpec, log: &EthLog) -> bool { } fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc) { - let mpool_rx = ctx.mpool.subscribe_to_updates(); - let eth_chain_id = ctx.chain_config().eth_chain_id; - let stream = subscription_stream(mpool_rx) - .filter_map(move |update| async move { - let MpoolUpdate::Add(msg) = update else { - return None; - }; - eth_tx_hash_from_signed_message(&msg, eth_chain_id) - .inspect_err(|e| { - tracing::error!("Failed to compute eth tx hash from mpool message: {e:#}") - }) - .ok() - }) - .boxed(); + let stream = pending_tx_added_hashes( + ctx.mpool.subscriber().subscribe(), + ctx.chain_config().eth_chain_id, + ); tokio::spawn(pipe_stream_to_sink(stream, sink)); } diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 2bacc33897e3..d2bff582885c 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -98,6 +98,11 @@ pub async fn offline_rpc_state( let sync_network_context = SyncNetworkContext::new(network_send, peer_manager, state_manager.db_owned()); let nonce_tracker = NonceTracker::new(); + let eth_event_handler = Arc::new(EthEventHandler::from_config( + &events_config, + state_manager.chain_config().eth_chain_id, + message_pool.subscriber(), + )); Ok(( RPCState { state_manager, @@ -105,7 +110,7 @@ pub async fn offline_rpc_state( mpool: message_pool, bad_blocks: Default::default(), sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), + eth_event_handler, eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 43395b6f6bc3..2a55a1ae95a6 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -137,13 +137,18 @@ async fn ctx( SyncNetworkContext::new(network_send, peer_manager, state_manager.db_owned()); let (shutdown, shutdown_recv) = mpsc::channel(1); let nonce_tracker = NonceTracker::new(); + let eth_event_handler = Arc::new(EthEventHandler::from_config( + &crate::cli_shared::cli::EventsConfig::default(), + state_manager.chain_config().eth_chain_id, + message_pool.subscriber(), + )); let rpc_state = Arc::new(RPCState { state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: message_pool, bad_blocks: Default::default(), sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::new()), + eth_event_handler, eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 00c7f98d4af6..93ce55925dff 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -310,26 +310,65 @@ async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> { unreachable!("loop always returns within the branches above") } -async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { - let tipset = client.call(ChainHead::request(())?).await?; +/// Poll `MpoolPending` until `message_cid` is visible. Returns while the message +/// is still pending; it does not wait for on-chain inclusion. +async fn wait_in_mempool(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { let mut retries = 100; loop { let pending = client .call(MpoolPending::request((ApiTipsetKey(None),))?) .await?; - if pending.0.iter().any(|msg| msg.cid() == message_cid) { - client - .call( - StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))? - .with_timeout(Duration::from_secs(300)), - ) - .await?; break Ok(()); } ensure!(retries != 0, "Message not found in mpool"); retries -= 1; + tokio::time::sleep(Duration::from_millis(10)).await; + } +} +/// Wait for `message_cid` to appear in the mempool and then be included on chain. +async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { + let tipset = client.call(ChainHead::request(())?).await?; + wait_in_mempool(client, message_cid).await?; + client + .call( + StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))? + .with_timeout(Duration::from_secs(300)), + ) + .await?; + Ok(()) +} + +/// Poll `eth_getFilterChanges` until the hashes seen so far contain `want`. +/// +/// The filter collects pending `txs` on a background task, so a poll right after a +/// tx appears in the mempool may run before the collector has processed it. +/// Each poll consumes the filter's buffer, so hashes are accumulated across +/// polls and the full set seen is returned. +async fn poll_pending_filter_until( + client: &rpc::Client, + filter_id: &FilterID, + want: &EthHash, +) -> anyhow::Result> { + let mut seen: Vec = Vec::new(); + let mut retries = 100; + loop { + let result = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + let EthFilterResult::Hashes(hashes) = result else { + anyhow::bail!("expected hashes, got {result:?}"); + }; + seen.extend(hashes); + if seen.contains(want) { + break Ok(seen); + } + ensure!( + retries != 0, + "filter did not return {want:?} in time; saw {seen:?}" + ); + retries -= 1; tokio::time::sleep(Duration::from_millis(10)).await; } } @@ -1002,39 +1041,88 @@ fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario { let result = if let EthFilterResult::Hashes(prev_hashes) = filter_result { let cid = invoke_contract(&client, &tx).await?; - - // Get the Eth transaction hash for our CID directly, rather than - // reverse-mapping every hash from the filter results back to CIDs - // (which is fragile — the mapping can return None for recent txns). let tx_hash = client .call(EthGetTransactionHashByCid::request((cid,))?) .await? .context("no Eth transaction hash for CID")?; - wait_pending_message(&client, cid).await?; + // Observe the mempool state before the message is mined. The + // filter collects asynchronously, so poll until tx_hash appears. + wait_in_mempool(&client, cid).await?; + let hashes = poll_pending_filter_until(&client, &filter_id, &tx_hash).await?; - let filter_result = client + anyhow::ensure!( + prev_hashes != hashes, + "prev_hashes={prev_hashes:?} hashes={hashes:?}" + ); + anyhow::ensure!( + hashes.contains(&tx_hash), + "transaction hash missing from filter results: tx_hash={tx_hash:?} cid={cid:?} hashes={hashes:?}" + ); + Ok(()) + } else { + Err(anyhow::anyhow!("expecting transactions")) + }; + + let removed = client + .call(EthUninstallFilter::request((filter_id,))?) + .await?; + anyhow::ensure!(removed); + + result + } + }) +} + +/// Verify that successive `eth_getFilterChanges` polls return only the +/// pending transactions added since the previous poll. +/// +/// 1. Install a pending-tx filter. +/// 2. Drain any baseline state with an initial poll. +/// 3. Submit tx A, wait for it in the mempool, poll — assert hash A present. +/// 4. Submit tx B, wait for it in the mempool, poll — assert hash B present +/// and hash A absent (it was already consumed by the previous poll). +fn eth_new_pending_transaction_filter_multi_poll(tx: TestTransaction) -> RpcTestScenario { + RpcTestScenario::basic(move |client| { + let tx = tx.clone(); + async move { + let filter_id = client + .call(EthNewPendingTransactionFilter::request(())?) + .await?; + + let result = async { + // Baseline: clear any pre-existing pending state. + let _ = client .call(EthGetFilterChanges::request((filter_id.clone(),))?) .await?; - if let EthFilterResult::Hashes(hashes) = filter_result { - anyhow::ensure!( - prev_hashes != hashes, - "prev_hashes={prev_hashes:?} hashes={hashes:?}" - ); - - anyhow::ensure!( - hashes.contains(&tx_hash), - "transaction hash missing from filter results: tx_hash={tx_hash:?} cid={cid:?} hashes={hashes:?}" - ); + // First tx — poll until it shows up (collection is async). + let cid_a = invoke_contract(&client, &tx).await?; + let hash_a = client + .call(EthGetTransactionHashByCid::request((cid_a,))?) + .await? + .context("no Eth transaction hash for cid_a")?; + wait_in_mempool(&client, cid_a).await?; + poll_pending_filter_until(&client, &filter_id, &hash_a).await?; + + // Second tx — the next polls return it but not the + // already-consumed tx_a. + let cid_b = invoke_contract(&client, &tx).await?; + let hash_b = client + .call(EthGetTransactionHashByCid::request((cid_b,))?) + .await? + .context("no Eth transaction hash for cid_b")?; + wait_in_mempool(&client, cid_b).await?; + let hashes_b = poll_pending_filter_until(&client, &filter_id, &hash_b).await?; + anyhow::ensure!( + !hashes_b.contains(&hash_a), + "second poll should not return previously-consumed tx_a: \ + hash_a={hash_a:?} hashes={hashes_b:?}" + ); - Ok(()) - } else { - Err(anyhow::anyhow!("expecting hashes")) - } - } else { - Err(anyhow::anyhow!("expecting transactions")) - }; + anyhow::Ok(()) + } + .await; let removed = client .call(EthUninstallFilter::request((filter_id,))?) @@ -1155,6 +1243,14 @@ pub(super) async fn create_tests(tx: TestTransaction) -> Vec { EthGetTransactionHashByCid, EthUninstallFilter ), + with_methods!( + eth_new_pending_transaction_filter_multi_poll(tx.clone()) + .name("eth_getFilterChanges returns only new pending txs per poll"), + EthNewPendingTransactionFilter, + EthGetFilterChanges, + EthGetTransactionHashByCid, + EthUninstallFilter + ), with_methods!( eth_get_filter_logs(tx.clone()).name("eth_getFilterLogs works"), EthNewFilter, diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 7607a50893d8..b02307c2cbff 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -166,13 +166,18 @@ async fn ctx( SyncNetworkContext::new(network_send, peer_manager, state_manager.db_owned()); let (shutdown, shutdown_recv) = mpsc::channel(1); let nonce_tracker = NonceTracker::new(); + let eth_event_handler = Arc::new(EthEventHandler::from_config( + &crate::cli_shared::cli::EventsConfig::default(), + state_manager.chain_config().eth_chain_id, + message_pool.subscriber(), + )); let rpc_state = Arc::new(RPCState { state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: message_pool, bad_blocks: Default::default(), sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::new()), + eth_event_handler, eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), diff --git a/src/utils/task/mod.rs b/src/utils/task/mod.rs new file mode 100644 index 000000000000..e69de29bb2d1