Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions chain/ethereum/src/ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{EthereumAdapter, EthereumAdapterTrait as _};
use graph::blockchain::client::ChainClient;
use graph::blockchain::BlockchainKind;
use graph::components::network_provider::ChainName;
use graph::slog::o;
use graph::slog::{debug, o};
use graph::util::backoff::ExponentialBackoff;
use graph::{
blockchain::{BlockHash, BlockIngestor, BlockPtr, IngestorError},
Expand Down Expand Up @@ -138,6 +138,8 @@ impl PollingBlockIngestor {
.ingest_block(&logger, &eth_adapter, &latest_block.hash)
.await?;

debug!(&logger, "Fetched missing block hash"; "missing_block_hash" => ?missing_block_hash);

// Repeatedly fetch missing parent blocks, and ingest them.
// ingest_blocks will continue to tell us about more missing parent
// blocks until we have filled in all missing pieces of the
Expand All @@ -156,9 +158,13 @@ impl PollingBlockIngestor {
// most block number N, then the missing parents in the next
// iteration will have at most block number N-1.
// - Therefore, the loop will iterate at most ancestor_count times.
let mut ingested_blocks = 1;
while let Some(hash) = missing_block_hash {
missing_block_hash = self.ingest_block(&logger, &eth_adapter, &hash).await?;
debug!(&logger, "Fetched missing block hash parent"; "missing_block_hash" => ?missing_block_hash, "ingested_blocks" => ingested_blocks);
ingested_blocks += 1;
}
info!(&logger, "Synced {} blocks from Ethereum", ingested_blocks);
Ok(())
}

Expand All @@ -178,6 +184,8 @@ impl PollingBlockIngestor {
.ok_or(IngestorError::BlockUnavailable(block_hash))?;
let ethereum_block = eth_adapter.load_full_block(&logger, block).await?;

trace!(logger, "PollingBlockIngestor::ingest_block - fetched block from adapter"; "block_hash" => ?block_hash);

// We need something that implements `Block` to store the block; the
// store does not care whether the block is final or not
let ethereum_block = BlockFinality::NonFinal(EthereumBlockWithCalls {
Expand All @@ -190,11 +198,16 @@ impl PollingBlockIngestor {
.upsert_block(Arc::new(ethereum_block))
.await?;

trace!(logger, "PollingBlockIngestor::ingest_block - stored block in DB"; "block_hash" => ?block_hash);

self.chain_store
.cheap_clone()
.attempt_chain_head_update(self.ancestor_count)
.await
.map(|missing| missing.map(|h256| h256.into()))
.map(|missing| {
trace!(&logger, "PollingBlockIngestor::ingest_block - chain head updated"; "block_hash" => ?block_hash, "next_missing" => ?missing);
missing.map(|h256| h256.into())
})
.map_err(|e| {
error!(logger, "failed to update chain head");
IngestorError::Unknown(e)
Expand Down
11 changes: 7 additions & 4 deletions chain/ethereum/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,13 @@ impl<'a> EthereumEventData<'a> {
}

pub fn transaction_log_index(&self) -> &U256 {
self.log
.transaction_log_index
.as_ref()
.unwrap_or(&U256_DEFAULT)
// We purposely use the `log_index` here. Geth does not support
// `transaction_log_index`, and subgraphs that use it only care that
// it identifies the log, the specific value is not important. Still
// this will change the output of subgraphs that use this field.
//
// This was initially changed in commit b95c6953
self.log.log_index.as_ref().unwrap_or(&U256_DEFAULT)
}

pub fn log_type(&self) -> &Option<String> {
Expand Down
18 changes: 15 additions & 3 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use graph::data::store::ethereum::call;
use graph::derive::CheapClone;
use graph::env::ENV_VARS;
use graph::parking_lot::RwLock;
use graph::prelude::MetricsRegistry;
use graph::prelude::{trace, MetricsRegistry};
use graph::prometheus::{CounterVec, GaugeVec};
use graph::slog::Logger;
use graph::stable_hash::crypto_stable_hash;
Expand Down Expand Up @@ -2018,16 +2018,23 @@ impl ChainStoreTrait for ChainStore {
ancestor_count: BlockNumber,
) -> Result<Option<H256>, Error> {
use public::ethereum_networks as n;
let logger = self.logger.cheap_clone();
trace!(&logger, "ChainStore::attempt_chain_head_update - begin"; "ancestor_count" => ancestor_count);

let (missing, ptr) = {
let chain_store = self.clone();

let genesis_block_ptr = self.genesis_block_ptr()?.hash_as_h256();
self.pool
trace!(&logger, "ChainStore::attempt_chain_head_update - got genesis_block_ptr"; "genesis_block_ptr" => ?genesis_block_ptr);

let res = self.pool
.with_conn(move |conn, _| {
let candidate = chain_store
.storage
.chain_head_candidate(conn, &chain_store.chain)
.map_err(CancelableError::from)?;

trace!(&logger, "ChainStore::attempt_chain_head_update - got chain head candidate"; "candidate" => ?candidate);
let (ptr, first_block) = match &candidate {
None => return Ok((None, None)),
Some(ptr) => (ptr, 0.max(ptr.number.saturating_sub(ancestor_count))),
Expand All @@ -2045,6 +2052,7 @@ impl ChainStoreTrait for ChainStore {
.map_err(CancelableError::from)?
{
Some(missing) => {
trace!(&logger, "ChainStore::attempt_chain_head_update - got missing parent"; "missing" => ?missing);
return Ok((Some(missing), None));
}
None => { /* we have a complete chain, no missing parents */ }
Expand All @@ -2053,6 +2061,7 @@ impl ChainStoreTrait for ChainStore {
let hash = ptr.hash_hex();
let number = ptr.number as i64;

trace!(&logger, "ChainStore::attempt_chain_head_update - begin head block update transaction");
conn.transaction(
|conn| -> Result<(Option<H256>, Option<(String, i64)>), StoreError> {
update(n::table.filter(n::name.eq(&chain_store.chain)))
Expand All @@ -2066,7 +2075,10 @@ impl ChainStoreTrait for ChainStore {
)
.map_err(CancelableError::from)
})
.await?
.await;

trace!(&self.logger, "ChainStore::attempt_chain_head_update - transaction complete with result"; "res" => ?res);
res?
};
if let Some((hash, number)) = ptr {
self.chain_head_update_sender.send(&hash, number)?;
Expand Down