Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@

- [#7164](https://github.com/ChainSafe/forest/issues/7164): JSON-RPC authentication is now performed once per connection (e.g. at the WebSocket upgrade) instead of on every request, matching Lotus. Note that token expiry is no longer re-checked for the lifetime of an established connection.

- [#7096](https://github.com/ChainSafe/forest/issues/7096): `eth_subscribe` `logs` filters now match with go-ethereum's semantics: the event data payload no longer participates in topic matching, empty topic positions act as wildcards, and logs with fewer topics than the filter has positions never match.

### Removed

### Fixed

- [#7096](https://github.com/ChainSafe/forest/issues/7096): `eth_subscribe` `logs` now re-emits the logs of reorg-reverted tipsets with `removed: true`, ahead of the logs of the replacing tipsets.

- [#7129](https://github.com/ChainSafe/forest/pull/7129): Fixed a few inaccurate cache size metrics.

## Forest v0.33.6 "Ebb"
Expand Down
1 change: 1 addition & 0 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ fn maybe_start_rpc_service(
bad_blocks,
sync_status,
eth_event_handler,
eth_logs_feed: Default::default(),
sync_network_context,
start_time,
shutdown,
Expand Down
134 changes: 83 additions & 51 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use crate::rpc::{
error::ServerError,
eth::{
errors::EthErrors,
filter::{SkipEvent, event::EventFilter, mempool::MempoolFilter, tipset::TipSetFilter},
filter::{
EventRevertStatus, SkipEvent, event::EventFilter, mempool::MempoolFilter,
tipset::TipSetFilter,
},
utils::decode_revert_reason,
},
methods::chain::ChainGetTipSetV2,
Expand Down Expand Up @@ -1410,17 +1413,30 @@ pub async fn eth_logs_for_block_and_transaction(
eth_filter_logs_from_events(ctx, &events)
}

pub async fn eth_logs_with_filter(
/// Collects the Ethereum logs of the message tipset whose receipts live in `receipt_ts`
/// (its child), marking them as removed when the head change that surfaced them was a reorg
/// revert. The receipt tipset is passed explicitly because reverted tipsets can no longer be
/// resolved through the canonical chain.
pub(in crate::rpc) async fn eth_logs_for_head_change(
ctx: &Ctx,
ts: &Tipset,
spec: Option<EthFilterSpec>,
receipt_ts: &Tipset,
revert_status: EventRevertStatus,
) -> anyhow::Result<Vec<EthLog>> {
let msg_ts = ctx
.chain_index()
.load_required_tipset(receipt_ts.parents())?;
let executed_ts = ctx
.state_manager
.load_executed_tipset_with_receipt(&msg_ts, receipt_ts)
.await?;
let mut events = vec![];
EthEventHandler::collect_events(
EthEventHandler::collect_events_from_messages(
&ctx.state_manager,
ts,
spec.as_ref(),
&msg_ts,
&executed_ts.executed_messages,
None::<&ParsedFilter>,
SkipEvent::OnUnresolvedAddress,
revert_status,
&mut events,
)
.await?;
Expand Down Expand Up @@ -3028,6 +3044,11 @@ pub struct CollectedEvent {
pub(crate) msg_cid: Cid,
}

/// Positions `(message index, event index)` of collected events, grouped by tipset.
/// Identifies events without retaining their entry payloads; grouping by tipset lets
/// membership checks borrow the tipset key and stores each distinct key only once.
pub type SeenEventPositions = HashMap<TipsetKey, HashSet<(u64, u64)>>;

fn match_key(key: &str) -> Option<usize> {
match key.get(0..2) {
Some("t1") => Some(0),
Expand Down Expand Up @@ -3289,6 +3310,59 @@ impl RpcMethod<1> for EthGetLogs {
}
}

/// Shared implementation of `eth_getFilterLogs` / `eth_getFilterChanges` for installed event
/// filters: collects the filter's full result set from the canonical chain, returns only the
/// events that were not present in the previous poll, and stores the latest set as the new
/// baseline.
async fn poll_event_filter(
ctx: &Ctx,
event_filter: &EventFilter,
) -> anyhow::Result<Vec<CollectedEvent>> {
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(
ctx,
&Arc::new(event_filter.into()),
SkipEvent::OnUnresolvedAddress,
)
.await?;
// An event's position identifies it uniquely, so the poll baseline stores positions
// instead of whole `CollectedEvent`s and the filter does not pin entry payloads between
// polls. A re-orged duplicate lands under a different tipset key and is correctly
// reported again.
let mut seen_positions = SeenEventPositions::default();
let mut recent_events = Vec::new();
for event in events {
let position = (event.msg_idx, event.event_idx);
let already_seen = event_filter
.seen_positions
.get(&event.tipset_key)
.is_some_and(|positions| positions.contains(&position));
match seen_positions.get_mut(&event.tipset_key) {
Some(positions) => {
positions.insert(position);
}
None => {
seen_positions.insert(event.tipset_key.clone(), HashSet::from_iter([position]));
}
}
if !already_seen {
recent_events.push(event);
}
}
if let Some(store) = &ctx.eth_event_handler.filter_store {
store.update(Arc::new(EventFilter {
id: event_filter.id.clone(),
tipsets: event_filter.tipsets.clone(),
addresses: event_filter.addresses.clone(),
keys_with_codec: event_filter.keys_with_codec.clone(),
max_results: event_filter.max_results,
seen_positions,
}));
}
Ok(recent_events)
}

pub enum EthGetFilterLogs {}
impl RpcMethod<1> for EthGetFilterLogs {
const NAME: &'static str = "Filecoin.EthGetFilterLogs";
Expand All @@ -3308,28 +3382,7 @@ impl RpcMethod<1> for EthGetFilterLogs {
if let Some(store) = &eth_event_handler.filter_store {
let filter = store.get(&filter_id)?;
if let Some(event_filter) = filter.as_any().downcast_ref::<EventFilter>() {
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&Arc::new(event_filter.into()),
SkipEvent::OnUnresolvedAddress,
)
.await?;
let recent_events: Vec<CollectedEvent> = events
.clone()
.into_iter()
.filter(|event| !event_filter.collected.contains(event))
.collect();
let filter = Arc::new(EventFilter {
id: event_filter.id.clone(),
tipsets: event_filter.tipsets.clone(),
addresses: event_filter.addresses.clone(),
keys_with_codec: event_filter.keys_with_codec.clone(),
max_results: event_filter.max_results,
collected: events.clone(),
});
store.update(filter);
let recent_events = poll_event_filter(&ctx, event_filter).await?;
return Ok(eth_filter_result_from_events(&ctx, &recent_events)?);
}
}
Expand Down Expand Up @@ -3359,28 +3412,7 @@ impl RpcMethod<1> for EthGetFilterChanges {
if let Some(store) = &eth_event_handler.filter_store {
let filter = store.get(&filter_id)?;
if let Some(event_filter) = filter.as_any().downcast_ref::<EventFilter>() {
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&Arc::new(event_filter.into()),
SkipEvent::OnUnresolvedAddress,
)
.await?;
let recent_events: Vec<CollectedEvent> = events
.clone()
.into_iter()
.filter(|event| !event_filter.collected.contains(event))
.collect();
let filter = Arc::new(EventFilter {
id: event_filter.id.clone(),
tipsets: event_filter.tipsets.clone(),
addresses: event_filter.addresses.clone(),
keys_with_codec: event_filter.keys_with_codec.clone(),
max_results: event_filter.max_results,
collected: events.clone(),
});
store.update(filter);
let recent_events = poll_event_filter(&ctx, event_filter).await?;
return Ok(eth_filter_result_from_events(&ctx, &recent_events)?);
}
if let Some(tipset_filter) = filter.as_any().downcast_ref::<TipSetFilter>() {
Expand Down
8 changes: 4 additions & 4 deletions src/rpc/methods/eth/filter/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::prelude::*;
use crate::rpc::eth::filter::{ActorEventBlock, ParsedFilter, ParsedFilterTipsets};
use crate::rpc::eth::{CollectedEvent, FilterID, filter::Filter};
use crate::rpc::eth::{FilterID, SeenEventPositions, filter::Filter};
use crate::shim::address::Address;
use ahash::HashMap;
use anyhow::Result;
Expand All @@ -22,8 +22,8 @@ pub struct EventFilter {
pub keys_with_codec: HashMap<String, Vec<ActorEventBlock>>,
// Maximum number of results to collect
pub max_results: usize,
// Collected events
pub collected: Vec<CollectedEvent>,
// Positions of the events returned by the last poll, used to compute the next poll's delta
pub seen_positions: SeenEventPositions,
}

impl From<&EventFilter> for ParsedFilter {
Expand Down Expand Up @@ -72,7 +72,7 @@ impl EventFilterManager {
addresses: pf.addresses,
keys_with_codec: pf.keys,
max_results: self.max_filter_results,
collected: vec![],
seen_positions: Default::default(),
});

self.filters.write().insert(id, filter.clone());
Expand Down
Loading
Loading