diff --git a/CHANGELOG.md b/CHANGELOG.md index 849dafe262d..4bf6463e4a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,10 +35,14 @@ - [#7164](https://github.com/ChainSafe/forest/issues/7164): JSON-RPC authentication is now performed once per connection (e.g. at the WebSocket upgrade) instead of on every request, matching Lotus. Note that token expiry is no longer re-checked for the lifetime of an established connection. +- [#7096](https://github.com/ChainSafe/forest/issues/7096): `eth_subscribe` `logs` filters now match with go-ethereum's semantics: the event data payload no longer participates in topic matching, empty topic positions act as wildcards, and logs with fewer topics than the filter has positions never match. + ### Removed ### Fixed +- [#7096](https://github.com/ChainSafe/forest/issues/7096): `eth_subscribe` `logs` now re-emits the logs of reorg-reverted tipsets with `removed: true`, ahead of the logs of the replacing tipsets. + - [#7129](https://github.com/ChainSafe/forest/pull/7129): Fixed a few inaccurate cache size metrics. ## Forest v0.33.6 "Ebb" diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 1f0af64dc41..29757fcb6c2 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -598,6 +598,7 @@ fn maybe_start_rpc_service( bad_blocks, sync_status, eth_event_handler, + eth_logs_feed: Default::default(), sync_network_context, start_time, shutdown, diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 1367d04b855..26745ac0958 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -35,7 +35,10 @@ use crate::rpc::{ error::ServerError, eth::{ errors::EthErrors, - filter::{SkipEvent, event::EventFilter, mempool::MempoolFilter, tipset::TipSetFilter}, + filter::{ + EventRevertStatus, SkipEvent, event::EventFilter, mempool::MempoolFilter, + tipset::TipSetFilter, + }, utils::decode_revert_reason, }, methods::chain::ChainGetTipSetV2, @@ -1410,17 +1413,30 @@ pub async fn eth_logs_for_block_and_transaction( eth_filter_logs_from_events(ctx, &events) } -pub async fn eth_logs_with_filter( +/// Collects the Ethereum logs of the message tipset whose receipts live in `receipt_ts` +/// (its child), marking them as removed when the head change that surfaced them was a reorg +/// revert. The receipt tipset is passed explicitly because reverted tipsets can no longer be +/// resolved through the canonical chain. +pub(in crate::rpc) async fn eth_logs_for_head_change( ctx: &Ctx, - ts: &Tipset, - spec: Option, + receipt_ts: &Tipset, + revert_status: EventRevertStatus, ) -> anyhow::Result> { + let msg_ts = ctx + .chain_index() + .load_required_tipset(receipt_ts.parents())?; + let executed_ts = ctx + .state_manager + .load_executed_tipset_with_receipt(&msg_ts, receipt_ts) + .await?; let mut events = vec![]; - EthEventHandler::collect_events( + EthEventHandler::collect_events_from_messages( &ctx.state_manager, - ts, - spec.as_ref(), + &msg_ts, + &executed_ts.executed_messages, + None::<&ParsedFilter>, SkipEvent::OnUnresolvedAddress, + revert_status, &mut events, ) .await?; @@ -3028,6 +3044,11 @@ pub struct CollectedEvent { pub(crate) msg_cid: Cid, } +/// Positions `(message index, event index)` of collected events, grouped by tipset. +/// Identifies events without retaining their entry payloads; grouping by tipset lets +/// membership checks borrow the tipset key and stores each distinct key only once. +pub type SeenEventPositions = HashMap>; + fn match_key(key: &str) -> Option { match key.get(0..2) { Some("t1") => Some(0), @@ -3289,6 +3310,59 @@ impl RpcMethod<1> for EthGetLogs { } } +/// Shared implementation of `eth_getFilterLogs` / `eth_getFilterChanges` for installed event +/// filters: collects the filter's full result set from the canonical chain, returns only the +/// events that were not present in the previous poll, and stores the latest set as the new +/// baseline. +async fn poll_event_filter( + ctx: &Ctx, + event_filter: &EventFilter, +) -> anyhow::Result> { + let events = ctx + .eth_event_handler + .get_events_for_parsed_filter( + ctx, + &Arc::new(event_filter.into()), + SkipEvent::OnUnresolvedAddress, + ) + .await?; + // An event's position identifies it uniquely, so the poll baseline stores positions + // instead of whole `CollectedEvent`s and the filter does not pin entry payloads between + // polls. A re-orged duplicate lands under a different tipset key and is correctly + // reported again. + let mut seen_positions = SeenEventPositions::default(); + let mut recent_events = Vec::new(); + for event in events { + let position = (event.msg_idx, event.event_idx); + let already_seen = event_filter + .seen_positions + .get(&event.tipset_key) + .is_some_and(|positions| positions.contains(&position)); + match seen_positions.get_mut(&event.tipset_key) { + Some(positions) => { + positions.insert(position); + } + None => { + seen_positions.insert(event.tipset_key.clone(), HashSet::from_iter([position])); + } + } + if !already_seen { + recent_events.push(event); + } + } + if let Some(store) = &ctx.eth_event_handler.filter_store { + store.update(Arc::new(EventFilter { + id: event_filter.id.clone(), + tipsets: event_filter.tipsets.clone(), + addresses: event_filter.addresses.clone(), + keys_with_codec: event_filter.keys_with_codec.clone(), + max_results: event_filter.max_results, + seen_positions, + })); + } + Ok(recent_events) +} + pub enum EthGetFilterLogs {} impl RpcMethod<1> for EthGetFilterLogs { const NAME: &'static str = "Filecoin.EthGetFilterLogs"; @@ -3308,28 +3382,7 @@ impl RpcMethod<1> for EthGetFilterLogs { if let Some(store) = ð_event_handler.filter_store { let filter = store.get(&filter_id)?; if let Some(event_filter) = filter.as_any().downcast_ref::() { - let events = ctx - .eth_event_handler - .get_events_for_parsed_filter( - &ctx, - &Arc::new(event_filter.into()), - SkipEvent::OnUnresolvedAddress, - ) - .await?; - let recent_events: Vec = events - .clone() - .into_iter() - .filter(|event| !event_filter.collected.contains(event)) - .collect(); - let filter = Arc::new(EventFilter { - id: event_filter.id.clone(), - tipsets: event_filter.tipsets.clone(), - addresses: event_filter.addresses.clone(), - keys_with_codec: event_filter.keys_with_codec.clone(), - max_results: event_filter.max_results, - collected: events.clone(), - }); - store.update(filter); + let recent_events = poll_event_filter(&ctx, event_filter).await?; return Ok(eth_filter_result_from_events(&ctx, &recent_events)?); } } @@ -3359,28 +3412,7 @@ impl RpcMethod<1> for EthGetFilterChanges { if let Some(store) = ð_event_handler.filter_store { let filter = store.get(&filter_id)?; if let Some(event_filter) = filter.as_any().downcast_ref::() { - let events = ctx - .eth_event_handler - .get_events_for_parsed_filter( - &ctx, - &Arc::new(event_filter.into()), - SkipEvent::OnUnresolvedAddress, - ) - .await?; - let recent_events: Vec = events - .clone() - .into_iter() - .filter(|event| !event_filter.collected.contains(event)) - .collect(); - let filter = Arc::new(EventFilter { - id: event_filter.id.clone(), - tipsets: event_filter.tipsets.clone(), - addresses: event_filter.addresses.clone(), - keys_with_codec: event_filter.keys_with_codec.clone(), - max_results: event_filter.max_results, - collected: events.clone(), - }); - store.update(filter); + let recent_events = poll_event_filter(&ctx, event_filter).await?; return Ok(eth_filter_result_from_events(&ctx, &recent_events)?); } if let Some(tipset_filter) = filter.as_any().downcast_ref::() { diff --git a/src/rpc/methods/eth/filter/event.rs b/src/rpc/methods/eth/filter/event.rs index 605ba069c94..14b871bd358 100644 --- a/src/rpc/methods/eth/filter/event.rs +++ b/src/rpc/methods/eth/filter/event.rs @@ -3,7 +3,7 @@ use crate::prelude::*; use crate::rpc::eth::filter::{ActorEventBlock, ParsedFilter, ParsedFilterTipsets}; -use crate::rpc::eth::{CollectedEvent, FilterID, filter::Filter}; +use crate::rpc::eth::{FilterID, SeenEventPositions, filter::Filter}; use crate::shim::address::Address; use ahash::HashMap; use anyhow::Result; @@ -22,8 +22,8 @@ pub struct EventFilter { pub keys_with_codec: HashMap>, // Maximum number of results to collect pub max_results: usize, - // Collected events - pub collected: Vec, + // Positions of the events returned by the last poll, used to compute the next poll's delta + pub seen_positions: SeenEventPositions, } impl From<&EventFilter> for ParsedFilter { @@ -72,7 +72,7 @@ impl EventFilterManager { addresses: pf.addresses, keys_with_codec: pf.keys, max_results: self.max_filter_results, - collected: vec![], + seen_positions: Default::default(), }); self.filters.write().insert(id, filter.clone()); diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index b97a14bf7a3..721ea52b8ac 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -29,7 +29,6 @@ use crate::blocks::TipsetKey; use crate::chain::index::ResolveNullTipset; use crate::cli_shared::cli::EventsConfig; use crate::prelude::*; -use crate::rpc::eth::EVM_WORD_LENGTH; use crate::rpc::eth::errors::EthErrors; use crate::rpc::eth::filter::event::*; use crate::rpc::eth::filter::mempool::*; @@ -130,6 +129,15 @@ pub enum SkipEvent { Never, } +/// Whether the events being collected belong to a tipset that was applied to the canonical +/// chain, or to one that was reverted from it by a reorg. Reverted events surface as +/// `removed: true` in the Ethereum log APIs. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum EventRevertStatus { + Applied, + Reverted, +} + impl EthEventHandler { pub fn new() -> Self { let config = EventsConfig::default(); @@ -341,12 +349,33 @@ impl EthEventHandler { skip_event: SkipEvent, collected_events: &mut Vec, ) -> anyhow::Result<()> { - let msg_cid_filter = spec.and_then(|s| s.msg_cid_filter()).copied(); - let height = tipset.epoch(); - let tipset_key = tipset.key(); let ExecutedTipset { executed_messages, .. } = state_manager.load_executed_tipset_for_rpc(tipset).await?; + Self::collect_events_from_messages( + state_manager, + tipset, + &executed_messages, + spec, + skip_event, + EventRevertStatus::Applied, + collected_events, + ) + .await + } + + pub async fn collect_events_from_messages( + state_manager: &StateManager, + tipset: &Tipset, + executed_messages: &[ExecutedMessage], + spec: Option<&impl Matcher>, + skip_event: SkipEvent, + revert_status: EventRevertStatus, + collected_events: &mut Vec, + ) -> anyhow::Result<()> { + let msg_cid_filter = spec.and_then(|s| s.msg_cid_filter()).copied(); + let height = tipset.epoch(); + let tipset_key = tipset.key(); let mut resolved_id_addrs = HashMap::default(); let mut event_count = 0; for ( @@ -417,7 +446,7 @@ impl EthEventHandler { entries, emitter_addr: resolved, event_idx, - reverted: false, + reverted: matches!(revert_status, EventRevertStatus::Reverted), height, tipset_key: tipset_key.clone(), msg_idx: msg_idx as u64, @@ -565,50 +594,6 @@ impl EthFilterSpec { } } -impl Matcher for EthFilterSpec { - fn matches( - &self, - emitter_addr: &crate::shim::address::Address, - entries: &[Entry], - ) -> anyhow::Result { - fn get_word(value: &[u8]) -> Option<&[u8; EVM_WORD_LENGTH]> { - value.get(..EVM_WORD_LENGTH)?.try_into().ok() - } - - let eth_emitter_addr = EthAddress::from_filecoin_address(emitter_addr)?; - - let match_addr = match self.address { - Some(ref address_list) => { - if address_list.is_empty() { - true - } else { - address_list.iter().any(|other| other == ð_emitter_addr) - } - } - None => true, - }; - - let match_topics = if let Some(spec) = self.topics.as_ref() { - entries.iter().enumerate().all(|(i, entry)| { - if let Some(slice) = get_word(entry.value()) { - let hash: EthHash = (*slice).into(); - match spec.0.get(i) { - Some(EthHashList::List(vec)) => vec.contains(&hash), - Some(EthHashList::Single(Some(h))) => h == &hash, - _ => true, /* wildcard */ - } - } else { - // Drop events with mis-sized topics - false - } - }) - } else { - true - }; - Ok(match_addr && match_topics) - } -} - // TODO(forest): https://github.com/ChainSafe/forest/issues/6411 fn parse_block_range( heaviest: ChainEpoch, @@ -883,19 +868,6 @@ mod tests { ); } - #[test] - fn test_empty_address_list() { - let empty_list_spec = EthFilterSpec { - address: Some(vec![].into()), // Empty list, not None - ..Default::default() - }; - - let addr = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - - // Updated to match Lotus behavior: empty list = wildcard (matches all) - assert!(empty_list_spec.matches(&addr, &[]).unwrap()); - } - #[test] fn test_parse_eth_filter_spec_with_none_address() { let eth_filter_spec = EthFilterSpec { @@ -935,51 +907,6 @@ mod tests { ); } - #[test] - fn test_lotus_compatible_address_behavior() { - // Test the Lotus-compatible behavior: empty list = wildcard - let addr = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - - // Case 1: None (omitted) = wildcard - let none_spec = EthFilterSpec { - address: None, - ..Default::default() - }; - assert!( - none_spec.matches(&addr, &[]).unwrap(), - "None should match all addresses" - ); - - // Case 2: Empty list = wildcard (Lotus behavior) - let empty_spec = EthFilterSpec { - address: Some(vec![].into()), - ..Default::default() - }; - assert!( - empty_spec.matches(&addr, &[]).unwrap(), - "Empty list should match all addresses (Lotus compatible)" - ); - - // Case 3: Specific address = only that address - let eth_addr = EthAddress::from_filecoin_address(&addr).unwrap(); - let specific_spec = EthFilterSpec { - address: Some(vec![eth_addr].into()), - ..Default::default() - }; - assert!( - specific_spec.matches(&addr, &[]).unwrap(), - "Specific address should match itself" - ); - - // Case 4: Different address = no match - let different_addr = - Address::from_str("t410fe2jx2wo3irrsktetbvptcnj7csvitihxyehuaeq").unwrap(); - assert!( - !specific_spec.matches(&different_addr, &[]).unwrap(), - "Specific address should not match different address" - ); - } - #[test] fn test_eth_filter_spec_default_has_none_values() { let default_spec = EthFilterSpec::default(); @@ -1005,14 +932,6 @@ mod tests { default_spec.block_hash.is_none(), "Default EthFilterSpec should have None block_hash" ); - - // Verify that the default spec matches any address (wildcard behavior) - let addr0 = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - let addr1 = Address::from_str("t410fe2jx2wo3irrsktetbvptcnj7csvitihxyehuaeq").unwrap(); - - // Test with no entries - assert!(default_spec.matches(&addr0, &[]).unwrap()); - assert!(default_spec.matches(&addr1, &[]).unwrap()); } #[test] @@ -1345,198 +1264,6 @@ mod tests { } } - #[test] - fn test_do_match_address() { - let empty_spec = EthFilterSpec::default(); - - let addr0 = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - let eth_addr0 = EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap(); - - let addr1 = Address::from_str("t410fe2jx2wo3irrsktetbvptcnj7csvitihxyehuaeq").unwrap(); - let eth_addr1 = EthAddress::from_str("0x26937d59db4463254c930d5f31353f14aa89a0f7").unwrap(); - - let entries0 = vec![ - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t1".into(), - IPLD_RAW, - vec![ - 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, - 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t2".into(), - IPLD_RAW, - vec![ - 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, - 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "d".into(), - IPLD_RAW, - vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, - 254, 169, 229, 74, 6, 24, 52, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 13, 232, 134, 151, 206, 121, 139, 231, 226, 192, - ], - ), - ]; - - // Matching an empty spec - assert!(empty_spec.matches(&addr0, &[]).unwrap()); - - assert!(empty_spec.matches(&addr0, &entries0).unwrap()); - - // Matching the given address 0 - let spec0 = EthFilterSpec { - address: Some(vec![eth_addr0].into()), - ..Default::default() - }; - - assert!(spec0.matches(&addr0, &[]).unwrap()); - - assert!(!spec0.matches(&addr1, &[]).unwrap()); - - // Matching the given address 0 or 1 - let spec1 = EthFilterSpec { - address: Some(vec![eth_addr0, eth_addr1].into()), - ..Default::default() - }; - - assert!(spec1.matches(&addr0, &[]).unwrap()); - - assert!(spec1.matches(&addr1, &[]).unwrap()); - } - - #[test] - fn test_do_match_topic() { - let addr0 = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - - let entries0 = vec![ - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t1".into(), - IPLD_RAW, - vec![ - 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, - 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t2".into(), - IPLD_RAW, - vec![ - 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, - 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "d".into(), - IPLD_RAW, - vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, - 254, 169, 229, 74, 6, 24, 52, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 13, 232, 134, 151, 206, 121, 139, 231, 226, 192, - ], - ), - ]; - - let topic0 = - EthHash::from_str("0xe24720f45cb74f2d55f1deebb6098f50f10b511dab8a7d47c4819a08dcd0b895") - .unwrap(); - - let topic1 = - EthHash::from_str("0x7404e3d104ea7841c3d9e6fd20adfe99b4ad586bc08d8f3bd3afef894cf184de") - .unwrap(); - - let topic2 = - EthHash::from_str("0x000000000000000000000000d0fb381fc644cdd5d694d35e1afb445527b9244b") - .unwrap(); - - let topic3 = - EthHash::from_str("0x00000000000000000000000092c3b379c217fdf8603884770e83fded7b7410f8") - .unwrap(); - - let spec1 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::Single(None)])), - ..Default::default() - }; - - assert!(spec1.matches(&addr0, &entries0).unwrap()); - - let spec2 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![ - EthHashList::Single(None), - EthHashList::Single(None), - ])), - ..Default::default() - }; - - assert!(spec2.matches(&addr0, &entries0).unwrap()); - - let spec2 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic0))])), - ..Default::default() - }; - - assert!(spec2.matches(&addr0, &entries0).unwrap()); - - let spec3 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::List(vec![topic0])])), - ..Default::default() - }; - - assert!(spec3.matches(&addr0, &entries0).unwrap()); - - let spec4 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::List(vec![topic1, topic0])])), - ..Default::default() - }; - - assert!(spec4.matches(&addr0, &entries0).unwrap()); - - let spec5 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic1))])), - ..Default::default() - }; - - assert!(!spec5.matches(&addr0, &entries0).unwrap()); - - let spec6 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::List(vec![topic2, topic3])])), - ..Default::default() - }; - - assert!(!spec6.matches(&addr0, &entries0).unwrap()); - - let spec7 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![ - EthHashList::Single(Some(topic1)), - EthHashList::Single(Some(topic1)), - ])), - ..Default::default() - }; - - assert!(!spec7.matches(&addr0, &entries0).unwrap()); - - let spec8 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![ - EthHashList::Single(Some(topic0)), - EthHashList::Single(Some(topic1)), - EthHashList::Single(Some(topic3)), - ])), - ..Default::default() - }; - - assert!(!spec8.matches(&addr0, &entries0).unwrap()); - } - #[test] fn test_parsed_filter_match_address() { // Note that all the following addresses and topics (base64-encoded strings) come from real data on Calibnet, @@ -1743,12 +1470,6 @@ mod tests { assert!(!filter3.matches(&addr0, &entries0).unwrap()); } - #[test] - fn test_eth_filter_spec_msg_cid_filter_default_none() { - let spec = EthFilterSpec::default(); - assert!(spec.msg_cid_filter().is_none()); - } - #[test] fn test_parsed_filter_msg_cid_filter_returns_field() { let pf_none = ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range(0..=0)); @@ -1794,4 +1515,73 @@ mod tests { assert!(ensure_filter_cap(100, 3, 101).is_err()); } + + #[tokio::test] + async fn test_collect_events_from_messages_sets_revert_status() { + use crate::blocks::{CachingBlockHeader, RawBlockHeader}; + use crate::chain::ChainStore; + use crate::db::MemoryDB; + use crate::message::ChainMessage; + use crate::networks::ChainConfig; + use crate::shim::executor::Receipt; + use crate::shim::message::Message; + + let db = Arc::new(MemoryDB::default()); + let genesis_header = CachingBlockHeader::new(RawBlockHeader { + miner_address: Address::new_id(0), + // A zero genesis timestamp is rejected by the beacon schedule. + timestamp: 7777, + ..Default::default() + }); + let chain_store = + ChainStore::new(db, Arc::new(ChainConfig::default()), genesis_header).unwrap(); + let tipset = chain_store.heaviest_tipset(); + let state_manager = StateManager::new(chain_store).unwrap(); + + let event = StampedEvent::V4(fvm_shared4::event::StampedEvent::new( + 1234, + fvm_shared4::event::ActorEvent { + entries: vec![fvm_shared4::event::Entry { + flags: Flags::FLAG_INDEXED_ALL, + key: "t1".into(), + codec: IPLD_RAW, + value: vec![0xab; 32], + }], + }, + )); + let executed_messages = vec![ExecutedMessage { + message: ChainMessage::Unsigned(Message::default().into()), + receipt: Receipt::V4(fvm_shared4::receipt::Receipt { + exit_code: fvm_shared4::error::ExitCode::OK, + return_data: Default::default(), + gas_used: 0, + events_root: None, + }), + events: Some(vec![event]), + }]; + + for (revert_status, expected_reverted) in [ + (EventRevertStatus::Applied, false), + (EventRevertStatus::Reverted, true), + ] { + let mut events = vec![]; + EthEventHandler::collect_events_from_messages( + &state_manager, + &tipset, + &executed_messages, + None::<&ParsedFilter>, + // The test genesis has no state tree, so the emitter cannot be resolved; + // fall back to its ID address instead of skipping the event. + SkipEvent::Never, + revert_status, + &mut events, + ) + .await + .unwrap(); + assert_eq!(events.len(), 1); + let event = events.first().unwrap(); + assert_eq!(event.reverted, expected_reverted); + assert_eq!(event.emitter_addr, Address::new_id(1234)); + } + } } diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index a1e47b64fe7..7149a8e9d22 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -60,19 +60,29 @@ //! use crate::blocks::Tipset; +use crate::chain::HeadChanges; use crate::message_pool::MpoolUpdate; use crate::prelude::ShallowClone; use crate::rpc::RPCState; +use crate::rpc::chain::PathChange; +use crate::rpc::eth::filter::EventRevertStatus; use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams}; -use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec}; +use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec, EthHashList, EthTopicSpec}; use crate::rpc::eth::{ - Block as EthBlock, TxInfo, eth_logs_with_filter, eth_tx_hash_from_signed_message, + Block as EthBlock, EthLog, TxInfo, eth_logs_for_head_change, eth_tx_hash_from_signed_message, }; use crate::utils::broadcast::subscription_stream; use futures::{Stream, StreamExt as _}; use jsonrpsee::core::SubscriptionResult; use jsonrpsee::{PendingSubscriptionSink, SubscriptionSink}; use std::sync::Arc; +use tokio::sync::broadcast; + +/// A cap on the number of in-flight per-tipset log batches in the shared logs feed. +const LOGS_FEED_CAP: usize = 256; + +/// Sender half of the shared logs feed; see [`RPCState::eth_logs_feed`]. +pub type LogsFeed = broadcast::Sender>>; #[derive(derive_more::Constructor)] pub struct EthPubSub { @@ -102,8 +112,9 @@ impl EthPubSubApiServer for EthPubSub { } } -/// Stream of "message tipsets", the parent of each newly applied tipset. -/// Reverts are ignored; lagged events are dropped (and logged) by [`subscription_stream`]. +/// Stream of "message tipsets", the parent of each newly applied tipset; only used by the +/// `newHeads` subscription. Reverts are ignored; lagged events are dropped (and logged) by +/// [`subscription_stream`]. fn head_message_tipsets(ctx: &Arc) -> impl Stream + Send + use<> { let rx = ctx.chain_store().subscribe_head_changes(); let ctx = ctx.shallow_clone(); @@ -147,25 +158,95 @@ fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { tokio::spawn(pipe_stream_to_sink(stream, sink)); } -fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option) { - let stream = head_message_tipsets(&ctx) - .filter_map(move |ts| { - let ctx = ctx.shallow_clone(); - let filter = filter.clone(); - async move { - eth_logs_with_filter(&ctx, &ts, filter) - .await - .inspect_err(|e| { - tracing::error!("Failed to fetch logs for tipset {}: {e:#}", ts.key()) - }) - .ok() +fn flatten_head_changes(changes: HeadChanges) -> impl Iterator { + changes + .into_change_vec() + .into_iter() + .map(|change| match change { + PathChange::Revert(tipset) => (tipset, EventRevertStatus::Reverted), + PathChange::Apply(tipset) => (tipset, EventRevertStatus::Applied), + }) +} + +/// Drives the shared logs feed: for every chain head change, collects the Ethereum logs of +/// the affected tipsets — reorg-reverted ones (marked `removed: true`) before applied ones — +/// and broadcasts each tipset's logs to all live `eth_subscribe("logs")` subscriptions. +async fn run_logs_feed(ctx: Arc, feed: broadcast::Sender>>) { + let mut head_changes = subscription_stream(ctx.chain_store().subscribe_head_changes()); + while let Some(changes) = head_changes.next().await { + // Collecting events is not free; skip the work entirely while no subscription is live. + if feed.receiver_count() == 0 { + continue; + } + for (tipset, revert_status) in flatten_head_changes(changes) { + if tipset.epoch() == 0 { + continue; + } + match eth_logs_for_head_change(&ctx, &tipset, revert_status).await { + Ok(logs) if !logs.is_empty() => { + // An error only means every receiver vanished since the check above. + let _ = feed.send(Arc::new(logs)); + } + Ok(_) => {} + Err(e) => { + tracing::error!( + "Failed to collect logs for tipset {} ({revert_status:?}): {e:#}", + tipset.key() + ); + } } + } + } +} + +fn subscribe_logs_feed(ctx: &Arc) -> broadcast::Receiver>> { + ctx.eth_logs_feed + .get_or_init(|| { + let (tx, _) = broadcast::channel(LOGS_FEED_CAP); + tokio::spawn(run_logs_feed(ctx.clone(), tx.clone())); + tx + }) + .subscribe() +} + +fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option) { + let rx = subscribe_logs_feed(&ctx); + let stream = subscription_stream(rx) + .flat_map(move |logs| { + let matched: Vec = logs + .iter() + .filter(|log| filter.as_ref().is_none_or(|spec| log_matches(spec, log))) + .cloned() + .collect(); + futures::stream::iter(matched) }) - .flat_map(futures::stream::iter) .boxed(); tokio::spawn(pipe_stream_to_sink(stream, sink)); } +/// Standard Ethereum log filtering (go-ethereum's `filterLogs`) over an already-converted +/// log: any address in the list may match, with an absent or empty list acting as a +/// wildcard; topic positions are ANDed across positions and ORed within one, with absent or +/// null positions acting as wildcards. A log with fewer topics than the filter has positions +/// never matches. The filter's block range does not apply to subscriptions. +fn log_matches(spec: &EthFilterSpec, log: &EthLog) -> bool { + let address_matches = spec + .address + .as_ref() + .is_none_or(|addresses| addresses.is_empty() || addresses.contains(&log.address)); + let topics_match = spec.topics.as_ref().is_none_or(|EthTopicSpec(positions)| { + positions.len() <= log.topics.len() + && positions + .iter() + .zip(&log.topics) + .all(|(position, topic)| match position { + EthHashList::List(hashes) => hashes.is_empty() || hashes.contains(topic), + EthHashList::Single(hash) => hash.as_ref().is_none_or(|h| h == topic), + }) + }); + address_matches && topics_match +} + fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc) { let mpool_rx = ctx.mpool.subscribe_to_updates(); let eth_chain_id = ctx.chain_config().eth_chain_id; @@ -218,3 +299,192 @@ where } tracing::debug!("Subscription task ended (id: {:?})", sink.subscription_id()); } + +#[cfg(test)] +mod tests { + use super::*; + use crate::blocks::{CachingBlockHeader, RawBlockHeader}; + use crate::rpc::eth::{EthAddress, EthHash}; + use crate::shim::clock::ChainEpoch; + use std::str::FromStr as _; + + fn tipset(epoch: ChainEpoch) -> Tipset { + Tipset::from(&CachingBlockHeader::new(RawBlockHeader { + epoch, + ..Default::default() + })) + } + + #[test] + fn flatten_head_changes_emits_reverts_before_applies() { + // `chain_get_path` produces reverts newest-first and applies oldest-first; the + // flattened order must preserve that and put every revert before any apply. + let changes = HeadChanges { + reverts: vec![tipset(5), tipset(4)], + applies: vec![tipset(14), tipset(15)], + }; + let flattened: Vec<(ChainEpoch, EventRevertStatus)> = flatten_head_changes(changes) + .map(|(ts, status)| (ts.epoch(), status)) + .collect(); + assert_eq!( + flattened, + vec![ + (5, EventRevertStatus::Reverted), + (4, EventRevertStatus::Reverted), + (14, EventRevertStatus::Applied), + (15, EventRevertStatus::Applied), + ] + ); + } + + #[test] + fn flatten_head_changes_plain_apply() { + let changes = HeadChanges { + reverts: vec![], + applies: vec![tipset(7)], + }; + let flattened: Vec<(ChainEpoch, EventRevertStatus)> = flatten_head_changes(changes) + .map(|(ts, status)| (ts.epoch(), status)) + .collect(); + assert_eq!(flattened, vec![(7, EventRevertStatus::Applied)]); + } + + fn eth_log(address: &EthAddress, topics: Vec) -> EthLog { + EthLog { + address: *address, + topics, + ..Default::default() + } + } + + fn address_0() -> EthAddress { + EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap() + } + + fn address_1() -> EthAddress { + EthAddress::from_str("0x26937d59db4463254c930d5f31353f14aa89a0f7").unwrap() + } + + fn topic(byte: u8) -> EthHash { + EthHash(ethereum_types::H256::from_slice(&[byte; 32])) + } + + #[test] + fn log_matches_address() { + let log = eth_log(&address_0(), vec![]); + + // Absent and empty address lists are wildcards (Lotus/go-ethereum behavior). + assert!(log_matches(&EthFilterSpec::default(), &log)); + let empty = EthFilterSpec { + address: Some(vec![].into()), + ..Default::default() + }; + assert!(log_matches(&empty, &log)); + + let specific = EthFilterSpec { + address: Some(vec![address_0()].into()), + ..Default::default() + }; + assert!(log_matches(&specific, &log)); + assert!(!log_matches(&specific, ð_log(&address_1(), vec![]))); + + // Any address in the list may match. + let either = EthFilterSpec { + address: Some(vec![address_0(), address_1()].into()), + ..Default::default() + }; + assert!(log_matches(&either, &log)); + assert!(log_matches(&either, ð_log(&address_1(), vec![]))); + } + + #[test] + fn log_matches_topics() { + let log = eth_log(&address_0(), vec![topic(1), topic(2)]); + + let with_topics = |positions: Vec| EthFilterSpec { + topics: Some(EthTopicSpec(positions)), + ..Default::default() + }; + + // Wildcards: null position, empty list position, fewer positions than topics. + assert!(log_matches(&with_topics(vec![]), &log)); + assert!(log_matches( + &with_topics(vec![EthHashList::Single(None)]), + &log + )); + assert!(log_matches( + &with_topics(vec![EthHashList::List(vec![])]), + &log + )); + + // Value in the first position. + assert!(log_matches( + &with_topics(vec![EthHashList::Single(Some(topic(1)))]), + &log + )); + assert!(!log_matches( + &with_topics(vec![EthHashList::Single(Some(topic(2)))]), + &log + )); + + // OR within a position. + assert!(log_matches( + &with_topics(vec![EthHashList::List(vec![topic(9), topic(1)])]), + &log + )); + assert!(!log_matches( + &with_topics(vec![EthHashList::List(vec![topic(8), topic(9)])]), + &log + )); + + // AND across positions. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(2))), + ]), + &log + )); + assert!(!log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(9))), + ]), + &log + )); + + // More filter positions than log topics never match, even with wildcards + // (go-ethereum's `filterLogs` semantics). + assert!(!log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(2))), + EthHashList::Single(None), + ]), + &log + )); + } + + #[test] + fn log_matches_address_and_topics_combined() { + let log = eth_log(&address_0(), vec![topic(1)]); + let spec = EthFilterSpec { + address: Some(vec![address_0()].into()), + topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic(1)))])), + ..Default::default() + }; + assert!(log_matches(&spec, &log)); + + let wrong_address = EthFilterSpec { + address: Some(vec![address_1()].into()), + ..spec.clone() + }; + assert!(!log_matches(&wrong_address, &log)); + + let wrong_topic = EthFilterSpec { + topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic(9)))])), + ..spec + }; + assert!(!log_matches(&wrong_topic, &log)); + } +} diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 4d6075d69ca..253dc236ac3 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -234,6 +234,7 @@ mod tests { bad_blocks: Some(Default::default()), sync_status: Arc::new(RwLock::new(SyncStatusReport::default())), eth_event_handler: Arc::new(EthEventHandler::new()), + eth_logs_feed: Default::default(), sync_network_context, start_time, shutdown: mpsc::channel(1).0, // dummy for tests diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 8160ae76b3d..a63f0948936 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -500,6 +500,11 @@ pub struct RPCState { pub bad_blocks: Option, pub sync_status: crate::chain_sync::SyncStatus, pub eth_event_handler: Arc, + /// Broadcast of per-tipset Ethereum logs derived from chain head changes, covering both + /// applied tipsets and reorg-reverted ones (whose logs carry `removed: true`). Started + /// lazily by the first `eth_subscribe("logs")` subscription and shared by all of them, so + /// events are collected and converted once per tipset regardless of the subscriber count. + pub eth_logs_feed: std::sync::OnceLock, pub sync_network_context: SyncNetworkContext, pub tipset_send: flume::Sender, pub start_time: chrono::DateTime, diff --git a/src/state_manager/state_computation.rs b/src/state_manager/state_computation.rs index bc75428e850..b4978f8903e 100644 --- a/src/state_manager/state_computation.rs +++ b/src/state_manager/state_computation.rs @@ -32,11 +32,9 @@ impl StateManager { } } - /// Load an executed tipset for RPC methods, with state computation unless explicitly enabled. - pub async fn load_executed_tipset_for_rpc( - &self, - ts: &Tipset, - ) -> anyhow::Result { + /// State recomputation policy for RPC methods: recomputation is disabled unless explicitly + /// enabled via the environment. + fn rpc_state_recompute_policy() -> StateRecomputePolicy { crate::def_is_env_truthy!( enable_state_computation, "FOREST_ETH_RPC_COMPUTE_STATE_ON_INDEX_MISS" @@ -54,7 +52,37 @@ impl StateManager { StateRecomputePolicy::Allowed }; - self.load_executed_tipset_with_cache(ts, policy).await + policy + } + + /// Load an executed tipset for RPC methods, with state computation unless explicitly enabled. + pub async fn load_executed_tipset_for_rpc( + &self, + ts: &Tipset, + ) -> anyhow::Result { + self.load_executed_tipset_with_cache(ts, Self::rpc_state_recompute_policy()) + .await + } + + /// Load an executed tipset using an explicitly provided receipt (child) tipset instead of + /// resolving the child on the current heaviest chain. This is required when serving events + /// for tipsets that are no longer canonical — e.g. the divergent segment of a reorg — where + /// a canonical-chain lookup would find no child, or a different one. + pub async fn load_executed_tipset_with_receipt( + &self, + msg_ts: &Tipset, + receipt_ts: &Tipset, + ) -> anyhow::Result { + self.cache + .get_or_insert_async(msg_ts.key(), async move { + self.load_executed_tipset_inner( + msg_ts, + Some(receipt_ts), + Self::rpc_state_recompute_policy(), + ) + .await + }) + .await } /// Load an executed tipset, including state root, message receipts and events with caching. diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index d655bd6990c..9ce633474b7 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -104,6 +104,7 @@ pub async fn offline_rpc_state( bad_blocks: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), + eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), shutdown, diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 0e439b7fd5e..0278d7a44b0 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -143,6 +143,7 @@ async fn ctx( bad_blocks: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), + eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), shutdown, diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 4963fce3f8f..840a8bb8b09 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -172,6 +172,7 @@ async fn ctx( bad_blocks: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), + eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), shutdown,