Skip to content

Commit db95c7d

Browse files
authored
chain/ethereum, graph: Replace AnyNetwork with AnyNetworkBare to remove WithOtherFields overhead (#6398)
AnyNetwork wraps every RPC response (block, transaction, receipt) in WithOtherFields<T>, which uses #[serde(flatten)] to capture unknown JSON keys into a BTreeMap. This forces serde to buffer entire JSON objects into intermediate Value maps and re-serialize them to diff keys on every block and every transaction. graph-node never reads these extra fields. Define AnyNetworkBare, a custom Network impl that uses the same AnyTxEnvelope/AnyReceiptEnvelope types (for L2/sidechain tx support) but without the WithOtherFields wrapper on response types. This eliminates the flatten buffering, BTreeMap allocation, and re-serialization from the deserialization hot path.
1 parent 4270a64 commit db95c7d

File tree

13 files changed

+298
-96
lines changed

13 files changed

+298
-96
lines changed

chain/ethereum/src/codec.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@ use graph::{
88
blockchain::{
99
self, Block as BlockchainBlock, BlockPtr, BlockTime, ChainStoreBlock, ChainStoreData,
1010
},
11-
components::ethereum::{AnyBlock, AnyHeader, AnyRpcHeader, AnyRpcTransaction, AnyTxEnvelope},
11+
components::ethereum::{
12+
AnyBlock, AnyHeader, AnyRpcHeader, AnyTransactionReceiptBare, AnyTxEnvelope,
13+
},
1214
prelude::{
1315
alloy::{
1416
self,
1517
consensus::{ReceiptWithBloom, TxEnvelope, TxType},
1618
network::AnyReceiptEnvelope,
1719
primitives::{aliases::B2048, Address, Bloom, Bytes, LogData, B256, U256},
1820
rpc::types::{self as alloy_rpc_types, AccessList, AccessListItem, Transaction},
19-
serde::WithOtherFields,
2021
},
2122
BlockNumber, Error, EthereumBlock, EthereumBlockWithCalls, EthereumCall,
2223
LightEthereumBlock,
@@ -556,19 +557,12 @@ impl TryInto<AnyBlock> for &Block {
556557

557558
let any_header: AnyRpcHeader = rpc_header.map(AnyHeader::from);
558559

559-
let any_transactions: Vec<AnyRpcTransaction> = transactions
560-
.into_iter()
561-
.map(|tx| AnyRpcTransaction::new(WithOtherFields::new(tx)))
562-
.collect();
563-
564-
let any_block = Block {
560+
Ok(Block {
565561
header: any_header,
566-
transactions: alloy::rpc::types::BlockTransactions::Full(any_transactions),
562+
transactions: alloy::rpc::types::BlockTransactions::Full(transactions),
567563
uncles,
568564
withdrawals: None,
569-
};
570-
571-
Ok(AnyBlock::new(WithOtherFields::new(any_block)))
565+
})
572566
}
573567
}
574568

@@ -619,7 +613,7 @@ impl TryInto<EthereumBlockWithCalls> for &Block {
619613
fn transaction_trace_to_alloy_txn_reciept(
620614
t: &TransactionTrace,
621615
block: &Block,
622-
) -> Result<Option<alloy::network::AnyTransactionReceipt>, Error> {
616+
) -> Result<Option<AnyTransactionReceiptBare>, Error> {
623617
use alloy::consensus::{Eip658Value, Receipt};
624618
let r = t.receipt.as_ref();
625619

@@ -719,7 +713,7 @@ fn transaction_trace_to_alloy_txn_reciept(
719713
inner: any_envelope,
720714
};
721715

722-
Ok(Some(WithOtherFields::new(receipt)))
716+
Ok(Some(receipt))
723717
}
724718

725719
impl BlockHeader {
@@ -1014,7 +1008,7 @@ mod test {
10141008

10151009
let receipt = receipt_opt.unwrap();
10161010

1017-
assert_eq!(receipt.inner.inner.r#type, 126);
1011+
assert_eq!(receipt.inner.r#type, 126);
10181012
assert_eq!(receipt.gas_used, 21000);
10191013
assert_eq!(receipt.transaction_index, Some(0));
10201014
}

chain/ethereum/src/data_source.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,6 @@ fn create_dummy_transaction(
444444
transaction_index: Option<u64>,
445445
transaction_hash: Option<B256>,
446446
) -> Result<AnyTransaction, anyhow::Error> {
447-
use alloy::serde::WithOtherFields;
448447
use graph::components::ethereum::AnyTxEnvelope;
449448
use graph::prelude::alloy::{
450449
consensus::transaction::Recovered, consensus::Signed, primitives::Signature,
@@ -465,15 +464,13 @@ fn create_dummy_transaction(
465464

466465
let recovered = Recovered::new_unchecked(any_envelope, Address::ZERO);
467466

468-
let inner_tx = Transaction {
467+
Ok(Transaction {
469468
inner: recovered,
470469
block_hash: Some(block_hash),
471470
block_number: Some(block_number),
472471
transaction_index,
473472
effective_gas_price: None,
474-
};
475-
476-
Ok(AnyTransaction::new(WithOtherFields::new(inner_tx)))
473+
})
477474
}
478475

479476
impl DataSource {
@@ -812,7 +809,7 @@ impl DataSource {
812809
let logging_extras = Arc::new(o! {
813810
"signature" => event_handler.event.to_string(),
814811
"address" => format!("{}", &log.address()),
815-
"transaction" => format!("{}", &transaction.inner.tx_hash()),
812+
"transaction" => format!("{}", &transaction.tx_hash()),
816813
});
817814
let handler = event_handler.handler.clone();
818815
let calls = DeclaredCall::from_log_trigger_with_event(
@@ -921,7 +918,7 @@ impl DataSource {
921918
let logging_extras = Arc::new(o! {
922919
"function" => handler.function.to_string(),
923920
"to" => format!("{}", &call.to),
924-
"transaction" => format!("{}", &transaction.inner.tx_hash()),
921+
"transaction" => format!("{}", &transaction.tx_hash()),
925922
});
926923
Ok(Some(TriggerWithHandler::<Chain>::new_with_logging_extras(
927924
MappingTrigger::Call {

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use graph::futures03::{
2525
use graph::prelude::{
2626
alloy::{
2727
self,
28-
network::{AnyNetwork, TransactionResponse},
28+
network::TransactionResponse,
2929
primitives::{Address, B256},
3030
providers::{
3131
ext::TraceApi,
@@ -86,8 +86,8 @@ type AlloyProvider = FillProvider<
8686
Identity,
8787
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
8888
>,
89-
RootProvider<AnyNetwork>,
90-
AnyNetwork,
89+
RootProvider<AnyNetworkBare>,
90+
AnyNetworkBare,
9191
>;
9292

9393
#[derive(Clone)]
@@ -167,22 +167,22 @@ impl EthereumAdapter {
167167
) -> Self {
168168
let alloy = match &transport {
169169
Transport::RPC(client) => Arc::new(
170-
alloy::providers::ProviderBuilder::<_, _, AnyNetwork>::default()
171-
.network::<AnyNetwork>()
170+
alloy::providers::ProviderBuilder::<_, _, AnyNetworkBare>::default()
171+
.network::<AnyNetworkBare>()
172172
.with_recommended_fillers()
173173
.connect_client(client.clone()),
174174
),
175175
Transport::IPC(ipc_connect) => Arc::new(
176-
alloy::providers::ProviderBuilder::<_, _, AnyNetwork>::default()
177-
.network::<AnyNetwork>()
176+
alloy::providers::ProviderBuilder::<_, _, AnyNetworkBare>::default()
177+
.network::<AnyNetworkBare>()
178178
.with_recommended_fillers()
179179
.connect_ipc(ipc_connect.clone())
180180
.await
181181
.expect("Failed to connect to Ethereum IPC"),
182182
),
183183
Transport::WS(ws_connect) => Arc::new(
184-
alloy::providers::ProviderBuilder::<_, _, AnyNetwork>::default()
185-
.network::<AnyNetwork>()
184+
alloy::providers::ProviderBuilder::<_, _, AnyNetworkBare>::default()
185+
.network::<AnyNetworkBare>()
186186
.with_recommended_fillers()
187187
.connect_ws(ws_connect.clone())
188188
.await
@@ -2062,7 +2062,7 @@ pub(crate) fn parse_block_triggers(
20622062
async fn fetch_receipt_from_ethereum_client(
20632063
eth: &EthereumAdapter,
20642064
transaction_hash: B256,
2065-
) -> anyhow::Result<alloy::network::AnyTransactionReceipt> {
2065+
) -> anyhow::Result<AnyTransactionReceiptBare> {
20662066
match eth.alloy.get_transaction_receipt(transaction_hash).await {
20672067
Ok(Some(receipt)) => Ok(receipt),
20682068
Ok(None) => bail!("Could not find transaction receipt"),
@@ -2215,7 +2215,7 @@ async fn fetch_transaction_receipts_in_batch_with_retry(
22152215
hashes: Vec<B256>,
22162216
block_hash: B256,
22172217
logger: ProviderLogger,
2218-
) -> Result<Vec<Arc<alloy::network::AnyTransactionReceipt>>, IngestorError> {
2218+
) -> Result<Vec<Arc<AnyTransactionReceiptBare>>, IngestorError> {
22192219
let retry_log_message = format!(
22202220
"batch eth_getTransactionReceipt RPC call for block {:?}",
22212221
block_hash
@@ -2241,7 +2241,7 @@ async fn fetch_transaction_receipts_in_batch(
22412241
hashes: Vec<B256>,
22422242
block_hash: B256,
22432243
logger: ProviderLogger,
2244-
) -> Result<Vec<Arc<alloy::network::AnyTransactionReceipt>>, IngestorError> {
2244+
) -> Result<Vec<Arc<AnyTransactionReceiptBare>>, IngestorError> {
22452245
// Use the batch method to get all receipts at once
22462246
let receipts = batch_get_transaction_receipts(alloy, hashes.clone())
22472247
.await
@@ -2270,17 +2270,16 @@ async fn fetch_transaction_receipts_in_batch(
22702270
async fn batch_get_transaction_receipts(
22712271
provider: Arc<AlloyProvider>,
22722272
tx_hashes: Vec<B256>,
2273-
) -> Result<Vec<Option<alloy::network::AnyTransactionReceipt>>, Box<dyn std::error::Error>> {
2273+
) -> Result<Vec<Option<AnyTransactionReceiptBare>>, Box<dyn std::error::Error>> {
22742274
let mut batch = alloy::rpc::client::BatchRequest::new(provider.client());
22752275
let mut receipt_futures = Vec::new();
22762276

22772277
// Add all receipt requests to batch
22782278
for tx_hash in &tx_hashes {
2279-
let receipt_future = batch
2280-
.add_call::<(B256,), Option<alloy::network::AnyTransactionReceipt>>(
2281-
"eth_getTransactionReceipt",
2282-
&(*tx_hash,),
2283-
)?;
2279+
let receipt_future = batch.add_call::<(B256,), Option<AnyTransactionReceiptBare>>(
2280+
"eth_getTransactionReceipt",
2281+
&(*tx_hash,),
2282+
)?;
22842283
receipt_futures.push(receipt_future);
22852284
}
22862285

@@ -2332,7 +2331,7 @@ async fn fetch_receipts_with_retry(
23322331
block_hash: B256,
23332332
logger: ProviderLogger,
23342333
supports_block_receipts: bool,
2335-
) -> Result<Vec<Arc<alloy::network::AnyTransactionReceipt>>, IngestorError> {
2334+
) -> Result<Vec<Arc<AnyTransactionReceiptBare>>, IngestorError> {
23362335
if supports_block_receipts {
23372336
return fetch_block_receipts_with_retry(alloy, hashes, block_hash, logger).await;
23382337
}
@@ -2345,7 +2344,7 @@ async fn fetch_individual_receipts_with_retry(
23452344
hashes: Vec<B256>,
23462345
block_hash: B256,
23472346
logger: ProviderLogger,
2348-
) -> Result<Vec<Arc<alloy::network::AnyTransactionReceipt>>, IngestorError> {
2347+
) -> Result<Vec<Arc<AnyTransactionReceiptBare>>, IngestorError> {
23492348
if ENV_VARS.fetch_receipts_in_batches {
23502349
return fetch_transaction_receipts_in_batch_with_retry(alloy, hashes, block_hash, logger)
23512350
.await;
@@ -2364,9 +2363,9 @@ async fn fetch_individual_receipts_with_retry(
23642363
})
23652364
.buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls);
23662365

2367-
tokio_stream::StreamExt::collect::<
2368-
Result<Vec<Arc<alloy::network::AnyTransactionReceipt>>, IngestorError>,
2369-
>(receipt_stream)
2366+
tokio_stream::StreamExt::collect::<Result<Vec<Arc<AnyTransactionReceiptBare>>, IngestorError>>(
2367+
receipt_stream,
2368+
)
23702369
.await
23712370
}
23722371

@@ -2376,7 +2375,7 @@ async fn fetch_block_receipts_with_retry(
23762375
hashes: Vec<B256>,
23772376
block_hash: B256,
23782377
logger: ProviderLogger,
2379-
) -> Result<Vec<Arc<alloy::network::AnyTransactionReceipt>>, IngestorError> {
2378+
) -> Result<Vec<Arc<AnyTransactionReceiptBare>>, IngestorError> {
23802379
use graph::prelude::alloy::rpc::types::BlockId;
23812380
let retry_log_message = format!("eth_getBlockReceipts RPC call for block {:?}", block_hash);
23822381

@@ -2420,7 +2419,7 @@ async fn fetch_transaction_receipt_with_retry(
24202419
transaction_hash: B256,
24212420
block_hash: B256,
24222421
logger: ProviderLogger,
2423-
) -> Result<Arc<alloy::network::AnyTransactionReceipt>, IngestorError> {
2422+
) -> Result<Arc<AnyTransactionReceiptBare>, IngestorError> {
24242423
let retry_log_message = format!(
24252424
"eth_getTransactionReceipt RPC call for transaction {:?}",
24262425
transaction_hash
@@ -2443,11 +2442,11 @@ async fn fetch_transaction_receipt_with_retry(
24432442
}
24442443

24452444
fn resolve_transaction_receipt(
2446-
transaction_receipt: Option<alloy::network::AnyTransactionReceipt>,
2445+
transaction_receipt: Option<AnyTransactionReceiptBare>,
24472446
transaction_hash: B256,
24482447
block_hash: B256,
24492448
logger: ProviderLogger,
2450-
) -> Result<alloy::network::AnyTransactionReceipt, IngestorError> {
2449+
) -> Result<AnyTransactionReceiptBare, IngestorError> {
24512450
match transaction_receipt {
24522451
// A receipt might be missing because the block was uncled, and the transaction never
24532452
// made it back into the main chain.
@@ -2580,11 +2579,10 @@ async fn get_transaction_receipts_for_transaction_hashes(
25802579
transaction_hashes_by_block: &HashMap<B256, HashSet<B256>>,
25812580
subgraph_metrics: Arc<SubgraphEthRpcMetrics>,
25822581
logger: ProviderLogger,
2583-
) -> Result<HashMap<B256, Arc<alloy::network::AnyTransactionReceipt>>, anyhow::Error> {
2582+
) -> Result<HashMap<B256, Arc<AnyTransactionReceiptBare>>, anyhow::Error> {
25842583
use std::collections::hash_map::Entry::Vacant;
25852584

2586-
let mut receipts_by_hash: HashMap<B256, Arc<alloy::network::AnyTransactionReceipt>> =
2587-
HashMap::new();
2585+
let mut receipts_by_hash: HashMap<B256, Arc<AnyTransactionReceiptBare>> = HashMap::new();
25882586

25892587
// Return early if input set is empty
25902588
if transaction_hashes_by_block.is_empty() {
@@ -2665,8 +2663,7 @@ mod tests {
26652663
EthereumBlockWithCalls,
26662664
};
26672665
use graph::blockchain::BlockPtr;
2668-
use graph::components::ethereum::AnyBlock;
2669-
use graph::prelude::alloy::network::AnyNetwork;
2666+
use graph::components::ethereum::AnyNetworkBare;
26702667
use graph::prelude::alloy::primitives::{Address, Bytes, B256};
26712668
use graph::prelude::alloy::providers::mock::Asserter;
26722669
use graph::prelude::alloy::providers::ProviderBuilder;
@@ -2682,7 +2679,7 @@ mod tests {
26822679

26832680
let block = EthereumBlockWithCalls {
26842681
ethereum_block: EthereumBlock {
2685-
block: Arc::new(LightEthereumBlock::new(AnyBlock::from(block))),
2682+
block: Arc::new(LightEthereumBlock::new(block)),
26862683
..Default::default()
26872684
},
26882685
calls: Some(vec![EthereumCall {
@@ -2743,8 +2740,8 @@ mod tests {
27432740
let json_value: Value = serde_json::from_str(json_response).unwrap();
27442741

27452742
let asserter = Asserter::new();
2746-
let provider = ProviderBuilder::<_, _, AnyNetwork>::default()
2747-
.network::<AnyNetwork>()
2743+
let provider = ProviderBuilder::<_, _, AnyNetworkBare>::default()
2744+
.network::<AnyNetworkBare>()
27482745
.with_recommended_fillers()
27492746
.connect_mocked_client(asserter.clone());
27502747

@@ -2827,7 +2824,7 @@ mod tests {
28272824
#[allow(unreachable_code)]
28282825
let block = EthereumBlockWithCalls {
28292826
ethereum_block: EthereumBlock {
2830-
block: Arc::new(LightEthereumBlock::new(AnyBlock::from(block))),
2827+
block: Arc::new(LightEthereumBlock::new(block)),
28312828
..Default::default()
28322829
},
28332830
calls: Some(vec![EthereumCall {
@@ -2858,7 +2855,7 @@ mod tests {
28582855
#[allow(unreachable_code)]
28592856
let block = EthereumBlockWithCalls {
28602857
ethereum_block: EthereumBlock {
2861-
block: Arc::new(LightEthereumBlock::new(AnyBlock::from(block))),
2858+
block: Arc::new(LightEthereumBlock::new(block)),
28622859
..Default::default()
28632860
},
28642861
calls: Some(vec![EthereumCall {

chain/ethereum/src/runtime/abi.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use graph::prelude::alloy;
99
use graph::prelude::alloy::consensus::TxReceipt;
1010
use graph::prelude::alloy::network::ReceiptResponse;
1111
use graph::prelude::alloy::rpc::types::{Log, TransactionReceipt};
12-
use graph::prelude::alloy::serde::WithOtherFields;
1312
use graph::{
1413
prelude::BigInt,
1514
runtime::{
@@ -587,10 +586,7 @@ where
587586

588587
#[async_trait]
589588
impl<'a, T, B, Inner> ToAscObj<AscEthereumEvent_0_0_7<T, B>>
590-
for (
591-
EthereumEventData<'a>,
592-
Option<&WithOtherFields<TransactionReceipt<Inner>>>,
593-
)
589+
for (EthereumEventData<'a>, Option<&TransactionReceipt<Inner>>)
594590
where
595591
T: AscType + AscIndexId + Send,
596592
B: AscType + AscIndexId + Send,
@@ -615,7 +611,7 @@ where
615611
params,
616612
} = event_data.to_asc_obj(heap, gas).await?;
617613
let receipt = if let Some(receipt_data) = optional_receipt {
618-
asc_new(heap, &receipt_data.inner(), gas).await?
614+
asc_new(heap, receipt_data, gas).await?
619615
} else {
620616
AscPtr::null()
621617
};

chain/ethereum/src/trigger.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ use graph::abi;
33
use graph::blockchain::MappingTriggerTrait;
44
use graph::blockchain::TriggerData;
55
use graph::components::ethereum::AnyTransaction;
6+
use graph::components::ethereum::AnyTransactionReceiptBare as AlloyTransactionReceipt;
67
use graph::data::subgraph::API_VERSION_0_0_2;
78
use graph::data::subgraph::API_VERSION_0_0_6;
89
use graph::data::subgraph::API_VERSION_0_0_7;
910
use graph::data_source::common::DeclaredCall;
1011
use graph::prelude::alloy::consensus::Transaction as TransactionTrait;
11-
use graph::prelude::alloy::network::AnyTransactionReceipt as AlloyTransactionReceipt;
1212
use graph::prelude::alloy::network::TransactionResponse;
1313
use graph::prelude::alloy::primitives::{Address, B256, U256};
1414
use graph::prelude::alloy::rpc::types::Log;
Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
mod network;
12
mod types;
23

4+
pub use self::network::AnyNetworkBare;
35
pub use self::types::{
4-
AnyBlock, AnyTransaction, EthereumBlock, EthereumBlockWithCalls, EthereumCall,
5-
LightEthereumBlock, LightEthereumBlockExt,
6+
AnyBlock, AnyTransaction, AnyTransactionReceiptBare, EthereumBlock, EthereumBlockWithCalls,
7+
EthereumCall, LightEthereumBlock, LightEthereumBlockExt,
68
};
79

810
// Re-export Alloy network types for convenience
9-
pub use alloy::network::{AnyHeader, AnyRpcBlock, AnyRpcHeader, AnyRpcTransaction, AnyTxEnvelope};
11+
pub use alloy::network::{AnyHeader, AnyRpcHeader, AnyTxEnvelope};

0 commit comments

Comments
 (0)