Skip to content

Commit

Permalink
instance_manager: Remove deployment head state and move stop block lo…
Browse files Browse the repository at this point in the history
…gic after it's processed (graphprotocol#3241)

This removes the deployment head state from the instance manager which
could be a point of failure since it wasn't being updated in reverts,
etc.

And moves the stop block logic right after we progress a block since we
don't store the state anymore.
  • Loading branch information
evaporei authored Feb 11, 2022
1 parent 6bb2be5 commit b744153
Showing 1 changed file with 24 additions and 28 deletions.
52 changes: 24 additions & 28 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ where

async fn new_block_stream<C: Blockchain>(
inputs: Arc<IndexingInputs<C>>,
start_block: Option<BlockPtr>,
filter: C::TriggerFilter,
block_stream_metrics: Arc<BlockStreamMetrics>,
) -> Result<Box<dyn BlockStream<C>>, Error> {
Expand All @@ -485,14 +484,18 @@ async fn new_block_stream<C: Blockchain>(
block_stream_metrics.clone(),
inputs.unified_api_version.clone(),
),
false => chain.new_polling_block_stream(
inputs.deployment.clone(),
inputs.start_blocks.clone(),
start_block,
Arc::new(filter.clone()),
block_stream_metrics.clone(),
inputs.unified_api_version.clone(),
),
false => {
let current_ptr = inputs.store.block_ptr();

chain.new_polling_block_stream(
inputs.deployment.clone(),
inputs.start_blocks.clone(),
current_ptr,
Arc::new(filter.clone()),
block_stream_metrics.clone(),
inputs.unified_api_version.clone(),
)
}
}
.await?;

Expand Down Expand Up @@ -533,13 +536,10 @@ where
let metrics = ctx.block_stream_metrics.clone();
let filter = ctx.state.filter.clone();
let stream_inputs = inputs.clone();
let start_block = inputs.store.block_ptr();
let mut deployment_head = start_block.clone();
let mut block_stream =
new_block_stream(stream_inputs, start_block, filter, metrics.cheap_clone())
.await?
.map_err(CancelableError::Error)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
let mut block_stream = new_block_stream(stream_inputs, filter, metrics.cheap_clone())
.await?
.map_err(CancelableError::Error)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
let chain = inputs.chain.clone();
let chain_store = chain.chain_store();

Expand Down Expand Up @@ -625,16 +625,6 @@ where

let block_ptr = block.ptr();

match (&inputs.stop_block, &deployment_head) {
(Some(stop_block), Some(deployment_head)) => {
if deployment_head.number > *stop_block {
info!(&logger, "stop block reached for subgraph");
return Ok(());
}
}
_ => {}
}

if block.trigger_count() > 0 {
subgraph_metrics
.block_trigger_count
Expand All @@ -652,7 +642,7 @@ where
if should_try_unfail_deterministic {
should_try_unfail_deterministic = false;

if let Some(current_ptr) = deployment_head {
if let Some(current_ptr) = inputs.store.block_ptr() {
if let Some(parent_ptr) =
inputs.triggers_adapter.parent_ptr(&current_ptr).await?
{
Expand Down Expand Up @@ -685,7 +675,6 @@ where

match res {
Ok(needs_restart) => {
deployment_head = Some(block_ptr.clone());
// Once synced, no need to try to update the status again.
if !synced && is_deployment_synced(&block_ptr, chain_store.cached_head_ptr()?) {
// Updating the sync status is an one way operation.
Expand Down Expand Up @@ -733,6 +722,13 @@ where
// And restart the subgraph
break;
}

if let Some(stop_block) = &inputs.stop_block {
if block_ptr.number >= *stop_block {
info!(&logger, "stop block reached for subgraph");
return Ok(());
}
}
}
Err(BlockProcessingError::Canceled) => {
debug!(&logger, "Subgraph block stream shut down cleanly");
Expand Down

0 comments on commit b744153

Please sign in to comment.