Skip to content

Commit

Permalink
chain,graph,store: added code to properly resume from a subgraph bloc…
Browse files Browse the repository at this point in the history
…k ptr and no firehose cursor

Firehose is now using the subgraph ptr if no cursor is found. To ensure we do not create inconsistency in the data due to how Firehose resolves current block number, a sanity check has now been added. This check ensure that the first received block's parent from firehose is the same as the subgraph pointer block. If it's not the case, we revert to the last know final block so we are sure to start from a correct chain segment.

What final block to pick is decided by the chain itself. On Ethereum, we use `block.number - reorg_threshold` (guarded) while on NEAR, we use the last final block height defined by the block itself.
  • Loading branch information
maoueh authored and leoyvens committed Mar 3, 2022
1 parent 28f9c40 commit 7c030f5
Show file tree
Hide file tree
Showing 22 changed files with 500 additions and 112 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
/tests/integration-tests/**/generated
/tests/integration-tests/**/node_modules
/tests/integration-tests/**/yarn.lock
/tests/integration-tests/**/yarn-error.log

# Built solidity contracts.
/tests/integration-tests/**/bin
Expand Down
2 changes: 1 addition & 1 deletion chain/ethereum/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ fn main() {
.out_dir("src/protobuf")
.format(true)
.compile(&["proto/codec.proto"], &["proto"])
.expect("Failed to compile StreamingFast Ethereum proto(s)");
.expect("Failed to compile Firehose Ethereum proto(s)");
}
10 changes: 10 additions & 0 deletions chain/ethereum/proto/codec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ message Block {
string filtering_exclude_filter_expr = 42;
}

// HeaderOnlyBlock is a standard [Block] structure where all other fields are
// removed so that hydrating that object from a [Block] bytes payload will
// drastically reduced allocated memory required to hold the full block.
//
// This can be used to unpack a [Block] when only the [BlockHeader] information
// is required and greatly reduced required memory.
message HeaderOnlyBlock {
BlockHeader header = 5;
}

// BlockWithRefs is a lightweight block, with traces and transactions
// purged from the `block` within, and only. It is used in transports
// to pass block data around.
Expand Down
30 changes: 27 additions & 3 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Context, Error};
use graph::blockchain::BlockchainKind;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::env_var;
use graph::firehose::{FirehoseEndpoints, ForkStep};
use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints, ForkStep};
use graph::prelude::{
EthereumBlock, EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, StopwatchMetrics,
};
Expand Down Expand Up @@ -187,6 +187,7 @@ impl Blockchain for Chain {
deployment: DeploymentLocator,
block_cursor: Option<String>,
start_blocks: Vec<BlockNumber>,
subgraph_start_block: Option<BlockPtr>,
filter: Arc<Self::TriggerFilter>,
metrics: Arc<BlockStreamMetrics>,
unified_api_version: UnifiedMappingApiVersion,
Expand Down Expand Up @@ -214,10 +215,13 @@ impl Blockchain for Chain {
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {});
let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});

Ok(Box::new(FirehoseBlockStream::new(
firehose_endpoint,
subgraph_start_block,
block_cursor,
firehose_mapper,
adapter,
Expand Down Expand Up @@ -519,7 +523,9 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}
}

pub struct FirehoseMapper {}
pub struct FirehoseMapper {
endpoint: Arc<FirehoseEndpoint>,
}

#[async_trait]
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
Expand Down Expand Up @@ -585,4 +591,22 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
}
}
}

async fn final_block_ptr_for(
&self,
logger: &Logger,
block: &BlockFinality,
) -> Result<BlockPtr, Error> {
// Firehose for Ethereum has an hard-coded confirmations for finality sets to 200 block
// behind the current block. The magic value 200 here comes from this hard-coded Firehose
// value.
let final_block_number = match block.number() {
x if x >= 200 => x - 200,
_ => 0,
};

self.endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, final_block_number)
.await
}
}
68 changes: 56 additions & 12 deletions chain/ethereum/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,21 @@ impl Into<EthereumBlockWithCalls> for &Block {
}
}

impl From<Block> for BlockPtr {
fn from(b: Block) -> BlockPtr {
(&b).into()
impl BlockHeader {
pub fn parent_ptr(&self) -> Option<BlockPtr> {
match self.parent_hash.len() {
0 => None,
_ => Some(BlockPtr::from((
H256::from_slice(self.parent_hash.as_ref()),
self.number - 1,
))),
}
}
}

impl<'a> From<&'a BlockHeader> for BlockPtr {
fn from(b: &'a BlockHeader) -> BlockPtr {
BlockPtr::from((H256::from_slice(b.hash.as_ref()), b.number))
}
}

Expand All @@ -314,24 +326,56 @@ impl<'a> From<&'a Block> for BlockPtr {
}
}

impl Block {
pub fn header(&self) -> &BlockHeader {
self.header.as_ref().unwrap()
}

pub fn ptr(&self) -> BlockPtr {
BlockPtr::from(self.header())
}

pub fn parent_ptr(&self) -> Option<BlockPtr> {
self.header().parent_ptr()
}
}

impl BlockchainBlock for Block {
fn number(&self) -> i32 {
BlockNumber::try_from(self.number).unwrap()
BlockNumber::try_from(self.header().number).unwrap()
}

fn ptr(&self) -> BlockPtr {
self.into()
}

fn parent_ptr(&self) -> Option<BlockPtr> {
let parent_hash = &self.header.as_ref().unwrap().parent_hash;
self.parent_ptr()
}
}

match parent_hash.len() {
0 => None,
_ => Some(BlockPtr::from((
H256::from_slice(parent_hash.as_ref()),
self.number - 1,
))),
}
impl HeaderOnlyBlock {
pub fn header(&self) -> &BlockHeader {
self.header.as_ref().unwrap()
}
}

impl<'a> From<&'a HeaderOnlyBlock> for BlockPtr {
fn from(b: &'a HeaderOnlyBlock) -> BlockPtr {
BlockPtr::from(b.header())
}
}

impl BlockchainBlock for HeaderOnlyBlock {
fn number(&self) -> i32 {
BlockNumber::try_from(self.header().number).unwrap()
}

fn ptr(&self) -> BlockPtr {
self.into()
}

fn parent_ptr(&self) -> Option<BlockPtr> {
self.header().parent_ptr()
}
}
32 changes: 30 additions & 2 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,21 @@ async fn fetch_transaction_receipts_in_batch_with_retry(
fetch_transaction_receipts_in_batch(web3, hashes, block_hash, logger).boxed()
})
.await
.map_err(|_timeout| anyhow!(block_hash).into())
.map_err(|timeout| {
anyhow::format_err!(
"batch eth_getTransactionReceipt RPC call at {}: {:?}",
block_hash,
timeout
)
.into()

// Error::new(timeout)
// .context(format!(
// "batch eth_getTransactionReceipt RPC call at {}",
// block_hash
// ))
// .into()
})
}

/// Deprecated. Attempts to fetch multiple transaction receipts in a batching contex.
Expand Down Expand Up @@ -1846,7 +1860,21 @@ async fn fetch_transaction_receipt_with_retry(
.timeout_secs(*JSON_RPC_TIMEOUT)
.run(move || web3.eth().transaction_receipt(transaction_hash).boxed())
.await
.map_err(|_timeout| anyhow!(block_hash).into())
.map_err(|timeout| {
anyhow::format_err!(
"eth_getTransactionReceipt RPC call at {}: {:?}",
block_hash,
timeout
)
.into()

// Error::new(timeout)
// .context(format!(
// "eth_getTransactionReceipt RPC call at {}",
// block_hash
// ))
// .into()
})
.and_then(move |some_receipt| {
resolve_transaction_receipt(some_receipt, transaction_hash, block_hash, logger)
})
Expand Down
5 changes: 2 additions & 3 deletions chain/ethereum/src/ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ impl BlockIngestor {
None => {
info!(
self.logger,
"Downloading latest blocks from Ethereum. \
This may take a few minutes..."
"Downloading latest blocks from Ethereum, this may take a few minutes..."
);
}
Some(head_block_ptr) => {
Expand All @@ -135,7 +134,7 @@ impl BlockIngestor {
if distance > 0 {
info!(
self.logger,
"Syncing {} blocks from Ethereum.",
"Syncing {} blocks from Ethereum",
blocks_needed;
"current_block_head" => head_number,
"latest_block_head" => latest_number,
Expand Down
11 changes: 11 additions & 0 deletions chain/ethereum/src/protobuf/sf.ethereum.codec.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ pub struct Block {
#[prost(string, tag = "42")]
pub filtering_exclude_filter_expr: ::prost::alloc::string::String,
}
/// HeaderOnlyBlock is a standard [Block] structure where all other fields are
/// removed so that hydrating that object from a [Block] bytes payload will
/// drastically reduced allocated memory required to hold the full block.
///
/// This can be used to unpack a [Block] when only the [BlockHeader] information
/// is required and greatly reduced required memory.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HeaderOnlyBlock {
#[prost(message, optional, tag = "5")]
pub header: ::core::option::Option<BlockHeader>,
}
/// BlockWithRefs is a lightweight block, with traces and transactions
/// purged from the `block` within, and only. It is used in transports
/// to pass block data around.
Expand Down
2 changes: 1 addition & 1 deletion chain/near/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ fn main() {
.out_dir("src/protobuf")
.format(true)
.compile(&["proto/codec.proto"], &["proto"])
.expect("Failed to compile StreamingFast NEAR proto(s)");
.expect("Failed to compile Firehose NEAR proto(s)");
}
46 changes: 34 additions & 12 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use graph::blockchain::BlockchainKind;
use graph::cheap_clone::CheapClone;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::FirehoseEndpoints;
use graph::prelude::StopwatchMetrics;
use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints};
use graph::prelude::{StopwatchMetrics, TryFutureExt};
use graph::{
anyhow,
blockchain::{
Expand Down Expand Up @@ -102,6 +102,7 @@ impl Blockchain for Chain {
deployment: DeploymentLocator,
block_cursor: Option<String>,
start_blocks: Vec<BlockNumber>,
subgraph_start_block: Option<BlockPtr>,
filter: Arc<Self::TriggerFilter>,
metrics: Arc<BlockStreamMetrics>,
unified_api_version: UnifiedMappingApiVersion,
Expand All @@ -117,18 +118,21 @@ impl Blockchain for Chain {

let firehose_endpoint = match self.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available",)),
None => return Err(anyhow::format_err!("no firehose endpoint available")),
};

let logger = self
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {});
let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});

Ok(Box::new(FirehoseBlockStream::new(
firehose_endpoint,
subgraph_start_block,
block_cursor,
firehose_mapper,
adapter,
Expand Down Expand Up @@ -156,14 +160,18 @@ impl Blockchain for Chain {

async fn block_pointer_from_number(
&self,
_logger: &Logger,
_number: BlockNumber,
logger: &Logger,
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
// FIXME (NEAR): Hmmm, what to do with this?
Ok(BlockPtr {
hash: BlockHash::from(vec![0xff; 32]),
number: 0,
})
let firehose_endpoint = match self.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available").into()),
};

firehose_endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
.map_err(Into::into)
.await
}

fn runtime_adapter(&self) -> Arc<Self::RuntimeAdapter> {
Expand Down Expand Up @@ -261,7 +269,9 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}
}

pub struct FirehoseMapper {}
pub struct FirehoseMapper {
endpoint: Arc<FirehoseEndpoint>,
}

#[async_trait]
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
Expand Down Expand Up @@ -322,4 +332,16 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
}
}
}

async fn final_block_ptr_for(
&self,
logger: &Logger,
block: &codec::Block,
) -> Result<BlockPtr, Error> {
let final_block_number = block.header().last_final_block_height as BlockNumber;

self.endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, final_block_number)
.await
}
}
Loading

0 comments on commit 7c030f5

Please sign in to comment.