Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deployment head state from instance manager #3241

Merged
merged 1 commit into from
Feb 11, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 24 additions & 28 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ where

async fn new_block_stream<C: Blockchain>(
inputs: Arc<IndexingInputs<C>>,
start_block: Option<BlockPtr>,
filter: C::TriggerFilter,
block_stream_metrics: Arc<BlockStreamMetrics>,
) -> Result<Box<dyn BlockStream<C>>, Error> {
Expand All @@ -485,14 +484,18 @@ async fn new_block_stream<C: Blockchain>(
block_stream_metrics.clone(),
inputs.unified_api_version.clone(),
),
false => chain.new_polling_block_stream(
inputs.deployment.clone(),
inputs.start_blocks.clone(),
start_block,
Arc::new(filter.clone()),
block_stream_metrics.clone(),
inputs.unified_api_version.clone(),
),
false => {
let current_ptr = inputs.store.block_ptr();

chain.new_polling_block_stream(
inputs.deployment.clone(),
inputs.start_blocks.clone(),
current_ptr,
Arc::new(filter.clone()),
block_stream_metrics.clone(),
inputs.unified_api_version.clone(),
)
}
}
.await?;

Expand Down Expand Up @@ -533,13 +536,10 @@ where
let metrics = ctx.block_stream_metrics.clone();
let filter = ctx.state.filter.clone();
let stream_inputs = inputs.clone();
let start_block = inputs.store.block_ptr();
let mut deployment_head = start_block.clone();
let mut block_stream =
new_block_stream(stream_inputs, start_block, filter, metrics.cheap_clone())
.await?
.map_err(CancelableError::Error)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
let mut block_stream = new_block_stream(stream_inputs, filter, metrics.cheap_clone())
.await?
.map_err(CancelableError::Error)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
let chain = inputs.chain.clone();
let chain_store = chain.chain_store();

Expand Down Expand Up @@ -625,16 +625,6 @@ where

let block_ptr = block.ptr();

match (&inputs.stop_block, &deployment_head) {
(Some(stop_block), Some(deployment_head)) => {
if deployment_head.number > *stop_block {
info!(&logger, "stop block reached for subgraph");
return Ok(());
}
}
_ => {}
}

if block.trigger_count() > 0 {
subgraph_metrics
.block_trigger_count
Expand All @@ -652,7 +642,7 @@ where
if should_try_unfail_deterministic {
should_try_unfail_deterministic = false;

if let Some(current_ptr) = deployment_head {
if let Some(current_ptr) = inputs.store.block_ptr() {
if let Some(parent_ptr) =
inputs.triggers_adapter.parent_ptr(&current_ptr).await?
{
Expand Down Expand Up @@ -685,7 +675,6 @@ where

match res {
Ok(needs_restart) => {
deployment_head = Some(block_ptr.clone());
// Once synced, no need to try to update the status again.
if !synced && is_deployment_synced(&block_ptr, chain_store.cached_head_ptr()?) {
// Updating the sync status is an one way operation.
Expand Down Expand Up @@ -733,6 +722,13 @@ where
// And restart the subgraph
break;
}

if let Some(stop_block) = &inputs.stop_block {
if block_ptr.number >= *stop_block {
info!(&logger, "stop block reached for subgraph");
return Ok(());
}
}
}
Err(BlockProcessingError::Canceled) => {
debug!(&logger, "Subgraph block stream shut down cleanly");
Expand Down