Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ Non-mandatory release for all node operators. It includes a few bug fixes as wel

### Fixed

- [#7109](https://github.com/ChainSafe/forest/pull/7109): `eth_newPendingTransactionFilter` now returns actually pending mempool transaction hashes instead of executed on-chain events.

- [#7018](https://github.com/ChainSafe/forest/issues/7018): Fixed `forest-wallet set-default` failing when the keystore has no `default` entry.

- [#6941](https://github.com/ChainSafe/forest/pull/6941): The `eth_subscribe` `logs` subscription now emits one log object per notification instead of one array of logs per tipset.
Expand Down
6 changes: 5 additions & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,11 @@ fn maybe_start_rpc_service(
.map(|path| crate::rpc::FilterList::new_from_file(path).map(Arc::new))
.transpose()?;
info!("JSON-RPC endpoint will listen at {rpc_address}");
let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
let eth_event_handler = Arc::new(EthEventHandler::from_config(
&config.events,
ctx.chain_config().eth_chain_id,
mpool.subscriber(),
));
if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
warn!(
"JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
Expand Down
28 changes: 28 additions & 0 deletions src/message_pool/msgpool/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Event types published by the pending pool.

use crate::message::SignedMessage;
use tokio::sync::broadcast;

pub(in crate::message_pool) const MPOOL_UPDATE_CHANNEL_CAPACITY: usize = 256;

Expand All @@ -14,3 +15,30 @@ pub enum MpoolUpdate {
#[allow(dead_code)]
Remove(SignedMessage),
}

/// Subscribe-only handle to the pending pool's [`MpoolUpdate`] broadcast bus.
///
/// Hands out independent receivers via [`subscribe`](Self::subscribe), each with
/// its own cursor.
#[derive(Debug)]
pub struct MpoolSubscriber(broadcast::Sender<MpoolUpdate>);

impl MpoolSubscriber {
/// Wrap the pending pool's broadcast `Sender`.
pub(crate) fn new(events: broadcast::Sender<MpoolUpdate>) -> Self {
Self(events)
}

/// A detached handle with no producer behind it, its receivers never observe
/// any event.
#[cfg(test)]
pub(crate) fn dummy() -> Self {
Self(broadcast::channel(1).0)
}

/// Open a fresh receiver that observes every [`MpoolUpdate`] published from
/// this point on.
pub(crate) fn subscribe(&self) -> broadcast::Receiver<MpoolUpdate> {
self.0.subscribe()
}
}
4 changes: 1 addition & 3 deletions src/message_pool/msgpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ pub mod selection;
pub mod test_provider;
pub(in crate::message_pool) mod utils;

// TODO: This will be used in https://github.com/ChainSafe/forest/pull/6941
#[allow(unused_imports)]
pub use events::MpoolUpdate;
pub use events::{MpoolSubscriber, MpoolUpdate};

pub(in crate::message_pool) use utils::recover_sig;

Expand Down
20 changes: 8 additions & 12 deletions src/message_pool/msgpool/msg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::message_pool::{
config::MpoolConfig,
errors::Error,
msgpool::{
BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, pending_store::PendingStore,
recover_sig, republish::RepublishState,
BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolSubscriber,
pending_store::PendingStore, recover_sig, republish::RepublishState,
},
provider::Provider,
utils::get_base_fee_lower_bound,
Expand All @@ -43,11 +43,7 @@ use nonzero_ext::nonzero;
use parking_lot::RwLock as SyncRwLock;
use std::num::NonZeroUsize;
use std::time::Duration;
use tokio::{
sync::broadcast::{self, error::RecvError},
task::JoinSet,
time::interval,
};
use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
use tracing::warn;

/// Maximum size of a serialized message in bytes. Anti-DoS measure to keep
Expand Down Expand Up @@ -114,7 +110,7 @@ impl ShallowClone for Caches {
/// transactions.
pub struct MessagePool<T> {
/// Pending messages, keyed by resolved-key address, together with the
/// broadcast channel for [`MpoolUpdate`] events. See [`PendingStore`].
/// broadcast channel for [`MpoolUpdate`](super::events::MpoolUpdate) events. See [`PendingStore`].
pub(in crate::message_pool) pending: PendingStore,
pub(in crate::message_pool) caches: Caches,
/// Resolved-key senders of locally submitted messages.
Expand Down Expand Up @@ -428,10 +424,10 @@ where
)
}

/// Subscribe to [`MpoolUpdate`] events for every insertion into and
/// removal from the pending pool.
pub fn subscribe_to_updates(&self) -> broadcast::Receiver<MpoolUpdate> {
self.pending.subscribe()
/// A subscribe-only handle to the [`MpoolUpdate`](super::events::MpoolUpdate) bus, the single entry
/// point for observing insertions into and removals from the pending pool.
pub fn subscriber(&self) -> MpoolSubscriber {
self.pending.subscriber()
}

/// Return Vector of signed messages given a block header for self.
Expand Down
22 changes: 12 additions & 10 deletions src/message_pool/msgpool/pending_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use tokio::sync::broadcast;

use crate::message::SignedMessage;
use crate::message_pool::errors::Error;
use crate::message_pool::msgpool::events::{MPOOL_UPDATE_CHANNEL_CAPACITY, MpoolUpdate};
use crate::message_pool::msgpool::events::{
MPOOL_UPDATE_CHANNEL_CAPACITY, MpoolSubscriber, MpoolUpdate,
};
use crate::message_pool::msgpool::msg_pool::TrustPolicy;
use crate::message_pool::msgpool::msg_set::{MsgSet, MsgSetLimits, StrictnessPolicy};
use crate::prelude::*;
Expand Down Expand Up @@ -124,10 +126,10 @@ impl PendingStore {
self.inner.pending.read().get(addr).cloned()
}

/// Subscribe to the [`MpoolUpdate`] stream. Returned receiver is
/// independent; dropping it does not affect other subscribers.
pub fn subscribe(&self) -> broadcast::Receiver<MpoolUpdate> {
self.inner.events.subscribe()
/// A subscribe-only handle to the [`MpoolUpdate`] bus that can mint
/// independent receivers on demand without exposing the send half.
pub(in crate::message_pool) fn subscriber(&self) -> MpoolSubscriber {
MpoolSubscriber::new(self.inner.events.clone())
}
}

Expand Down Expand Up @@ -235,7 +237,7 @@ mod tests {
#[test]
fn insert_emits_add_and_stores_message() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let mut rx = store.subscriber().subscribe();
let addr = Address::new_id(1);

store
Expand All @@ -259,7 +261,7 @@ mod tests {
#[test]
fn rbf_replacement_emits_add_for_the_new_message() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let mut rx = store.subscriber().subscribe();
let addr = Address::new_id(1);

store
Expand Down Expand Up @@ -292,7 +294,7 @@ mod tests {
#[test]
fn remove_emits_remove_once_then_is_idempotent() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let mut rx = store.subscriber().subscribe();
let addr = Address::new_id(1);

store
Expand Down Expand Up @@ -320,7 +322,7 @@ mod tests {
#[test]
fn remove_of_unknown_sender_is_silent() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let mut rx = store.subscriber().subscribe();
let addr = Address::new_id(42);

assert!(store.remove(&addr, 0, true).is_none());
Expand Down Expand Up @@ -379,7 +381,7 @@ mod tests {
// same pending map and the same broadcast channel.
let store = PendingStore::new(TEST_LIMITS);
let handle = store.shallow_clone();
let mut rx = handle.subscribe();
let mut rx = handle.subscriber().subscribe();
let addr = Address::new_id(7);

store
Expand Down
2 changes: 1 addition & 1 deletion src/message_pool/msgpool/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ mod test_selection {

// Subscribe AFTER the insert so we only observe events emitted by the
// selection call below.
let mut rx = mpool.pending.subscribe();
let mut rx = mpool.pending.subscriber().subscribe();

// Select against the non-current tipset.
let _ = mpool.select_messages(&ts2, 1.0).unwrap();
Expand Down
73 changes: 2 additions & 71 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2701,15 +2701,7 @@ impl EthGetTransactionHashByCid {
if let Ok(smsgs) = smsgs_result
&& let Some(smsg) = smsgs.first()
{
let hash = if smsg.is_delegated() {
let (_, tx) = eth_tx_from_signed_eth_message(smsg, eth_chain_id)?;
tx.eth_hash()?.into()
} else if smsg.is_secp256k1() {
smsg.cid().into()
} else {
smsg.message().cid().into()
};
return Ok(Some(hash));
return Ok(Some(eth_tx_hash_from_signed_message(smsg, eth_chain_id)?));
}

let msg_result = crate::chain::get_chain_message(db, &cid);
Expand Down Expand Up @@ -3271,29 +3263,6 @@ fn eth_filter_logs_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result<Vec
.collect()
}

fn eth_filter_logs_from_messages(
ctx: &Ctx,
events: &[CollectedEvent],
) -> anyhow::Result<Vec<EthHash>> {
events
.iter()
.filter_map(|event| {
match eth_tx_hash_from_message_cid(
ctx.db(),
&event.msg_cid,
ctx.state_manager.chain_config().eth_chain_id,
) {
Ok(Some(hash)) => Some(Ok(hash)),
Ok(None) => {
tracing::warn!("Ignoring event");
None
}
Err(err) => Some(Err(err)),
}
})
.collect()
}

fn eth_filter_logs_from_events(
ctx: &Ctx,
events: &[CollectedEvent],
Expand Down Expand Up @@ -3374,15 +3343,6 @@ fn eth_filter_result_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result<E
)?))
}

fn eth_filter_result_from_messages(
ctx: &Ctx,
events: &[CollectedEvent],
) -> anyhow::Result<EthFilterResult> {
Ok(EthFilterResult::Hashes(eth_filter_logs_from_messages(
ctx, events,
)?))
}

pub enum EthGetLogs {}
impl RpcMethod<1> for EthGetLogs {
const NAME: &'static str = "Filecoin.EthGetLogs";
Expand Down Expand Up @@ -3553,36 +3513,7 @@ impl RpcMethod<1> for EthGetFilterChanges {
return Ok(eth_filter_result_from_tipsets(&events)?);
}
if let Some(mempool_filter) = filter.as_any().downcast_ref::<MempoolFilter>() {
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range(
// heaviest tipset doesn't have events because its messages haven't been executed yet
RangeInclusive::new(
mempool_filter
.collected
.unwrap_or(ctx.chain_store().heaviest_tipset().epoch() - 1),
// Use -1 to indicate that the range extends until the latest available tipset.
-1,
),
))),
SkipEvent::OnUnresolvedAddress,
)
.await?;
let new_collected = events
.iter()
.max_by_key(|event| event.height)
.map(|e| e.height);
if let Some(height) = new_collected {
let filter = Arc::new(MempoolFilter {
id: mempool_filter.id.clone(),
max_results: mempool_filter.max_results,
collected: Some(height),
});
store.update(filter);
}
return Ok(eth_filter_result_from_messages(&ctx, &events)?);
return Ok(EthFilterResult::Hashes(mempool_filter.drain()));
}
}
Err(anyhow::anyhow!("method not supported").into())
Expand Down
Loading
Loading