Skip to content

Commit

Permalink
instance_manager: Remove deployment head state and move stop block lo…
Browse files Browse the repository at this point in the history
…gic after it's processed

This removes the deployment head state from the instance manager which
could be a point of failure since it wasn't being updated in reverts,
etc.

And moves the stop block logic right after we progress a block since we
don't store the state anymore.
  • Loading branch information
evaporei committed Feb 11, 2022
1 parent 6bb2be5 commit dddfbb7
Showing 1 changed file with 24 additions and 28 deletions.
52 changes: 24 additions & 28 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ where

async fn new_block_stream<C: Blockchain>(
inputs: Arc<IndexingInputs<C>>,
start_block: Option<BlockPtr>,
filter: C::TriggerFilter,
block_stream_metrics: Arc<BlockStreamMetrics>,
) -> Result<Box<dyn BlockStream<C>>, Error> {
Expand All @@ -485,14 +484,18 @@ async fn new_block_stream<C: Blockchain>(
block_stream_metrics.clone(),
inputs.unified_api_version.clone(),
),
false => chain.new_polling_block_stream(
inputs.deployment.clone(),
inputs.start_blocks.clone(),
start_block,
Arc::new(filter.clone()),
block_stream_metrics.clone(),
inputs.unified_api_version.clone(),
),
false => {
let current_ptr = inputs.store.block_ptr();

chain.new_polling_block_stream(
inputs.deployment.clone(),
inputs.start_blocks.clone(),
current_ptr,
Arc::new(filter.clone()),
block_stream_metrics.clone(),
inputs.unified_api_version.clone(),
)
}
}
.await?;

Expand Down Expand Up @@ -533,13 +536,10 @@ where
let metrics = ctx.block_stream_metrics.clone();
let filter = ctx.state.filter.clone();
let stream_inputs = inputs.clone();
let start_block = inputs.store.block_ptr();
let mut deployment_head = start_block.clone();
let mut block_stream =
new_block_stream(stream_inputs, start_block, filter, metrics.cheap_clone())
.await?
.map_err(CancelableError::Error)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
let mut block_stream = new_block_stream(stream_inputs, filter, metrics.cheap_clone())
.await?
.map_err(CancelableError::Error)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
let chain = inputs.chain.clone();
let chain_store = chain.chain_store();

Expand Down Expand Up @@ -625,16 +625,6 @@ where

let block_ptr = block.ptr();

match (&inputs.stop_block, &deployment_head) {
(Some(stop_block), Some(deployment_head)) => {
if deployment_head.number > *stop_block {
info!(&logger, "stop block reached for subgraph");
return Ok(());
}
}
_ => {}
}

if block.trigger_count() > 0 {
subgraph_metrics
.block_trigger_count
Expand All @@ -652,7 +642,7 @@ where
if should_try_unfail_deterministic {
should_try_unfail_deterministic = false;

if let Some(current_ptr) = deployment_head {
if let Some(current_ptr) = inputs.store.block_ptr() {
if let Some(parent_ptr) =
inputs.triggers_adapter.parent_ptr(&current_ptr).await?
{
Expand Down Expand Up @@ -685,7 +675,6 @@ where

match res {
Ok(needs_restart) => {
deployment_head = Some(block_ptr.clone());
// Once synced, no need to try to update the status again.
if !synced && is_deployment_synced(&block_ptr, chain_store.cached_head_ptr()?) {
// Updating the sync status is an one way operation.
Expand Down Expand Up @@ -733,6 +722,13 @@ where
// And restart the subgraph
break;
}

if let Some(stop_block) = &inputs.stop_block {
if block_ptr.number >= *stop_block {
info!(&logger, "stop block reached for subgraph");
return Ok(());
}
}
}
Err(BlockProcessingError::Canceled) => {
debug!(&logger, "Subgraph block stream shut down cleanly");
Expand Down

0 comments on commit dddfbb7

Please sign in to comment.