Skip to content

Commit

Permalink
Remove SubgraphRunner Clones 🔫 (graphprotocol#3420)
Browse files Browse the repository at this point in the history
* refactor: Remove unnecessary Logger clones/parameters/variables

* refactor: Remove unnecessary input and filter clones for core/stream module

* refactor: Remove unnecessary triggers_adapter parameter in process_block

* refactor: Remove unnecessary clone of store in SubgraphRunner

* refactor: Remove all unnecessary metrics variables/parameters from SubgraphRunner

* refactor: Remove unnecessary clone of stopwatch metrics

* refactor: Move a few clones from Runner to Instance

* refactor: Pass DataSource by value and keep clones in Runner

Co-authored-by: Eva Pace <eba.pachi.com>
  • Loading branch information
evaporei authored Apr 2, 2022
1 parent 2127e67 commit 14d4d2e
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 86 deletions.
3 changes: 1 addition & 2 deletions core/src/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ where
<= data_source.creation_block()
);

let host =
Arc::new(self.new_host(logger.clone(), data_source, templates, metrics.clone())?);
let host = Arc::new(self.new_host(logger.clone(), data_source, templates, metrics)?);

Ok(if self.hosts.contains(&host) {
None
Expand Down
98 changes: 36 additions & 62 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ where
}

pub async fn run(mut self) -> Result<(), Error> {
// Clone a few things for different parts of the async processing
let logger = self.logger.cheap_clone();

// If a subgraph failed for deterministic reasons, before start indexing, we first
// revert the deployment head. It should lead to the same result since the error was
// deterministic.
Expand All @@ -90,25 +87,22 @@ where
}

loop {
debug!(logger, "Starting or restarting subgraph");
debug!(self.logger, "Starting or restarting subgraph");

let block_stream_canceler = CancelGuard::new();
let block_stream_cancel_handle = block_stream_canceler.handle();

let stream_metrics = self.metrics.stream.clone();
let filter = self.ctx.filter.clone();
let stream_inputs = self.inputs.clone();
let mut block_stream = new_block_stream(stream_inputs, filter)
let mut block_stream = new_block_stream(&self.inputs, &self.ctx.filter)
.await?
.map_err(CancelableError::Error)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));

debug!(logger, "Starting block stream");
debug!(self.logger, "Starting block stream");

// Process events from the stream as long as no restart is needed
loop {
let event = {
let _section = stream_metrics.stopwatch.start_section("scan_blocks");
let _section = self.metrics.stream.stopwatch.start_section("scan_blocks");

block_stream.next().await
};
Expand All @@ -131,7 +125,6 @@ where
/// whether new dynamic data sources have been added to the subgraph.
async fn process_block(
&mut self,
triggers_adapter: Arc<C::TriggersAdapter>,
block_stream_cancel_handle: &CancelHandle,
block: BlockWithTriggers<C>,
firehose_cursor: Option<String>,
Expand All @@ -155,15 +148,7 @@ where
);
}

let subgraph_metrics = self.metrics.subgraph.clone();

let proof_of_indexing = if self
.inputs
.store
.clone()
.supports_proof_of_indexing()
.await?
{
let proof_of_indexing = if self.inputs.store.supports_proof_of_indexing().await? {
Some(Arc::new(AtomicRefCell::new(ProofOfIndexing::new(
block_ptr.number,
))))
Expand Down Expand Up @@ -205,7 +190,6 @@ where
// If new data sources have been created, restart the subgraph after this block.
// This is necessary to re-create the block stream.
let needs_restart = block_state.has_created_data_sources();
let host_metrics = self.metrics.host.clone();

// This loop will:
// 1. Instantiate created data sources.
Expand All @@ -217,16 +201,15 @@ where
// very contrived subgraph would be able to observe this.
while block_state.has_created_data_sources() {
// Instantiate dynamic data sources, removing them from the block state.
let (data_sources, runtime_hosts) = self.create_dynamic_data_sources(
logger.clone(),
host_metrics.clone(),
block_state.drain_created_data_sources(),
)?;
let (data_sources, runtime_hosts) =
self.create_dynamic_data_sources(block_state.drain_created_data_sources())?;

let filter = C::TriggerFilter::from_data_sources(data_sources.iter());

// Reprocess the triggers from this block that match the new data sources
let block_with_triggers = triggers_adapter
let block_with_triggers = self
.inputs
.triggers_adapter
.triggers_in_block(&logger, block.as_ref().clone(), &filter)
.await?;

Expand All @@ -247,11 +230,7 @@ where

// Add entity operations for the new data sources to the block state
// and add runtimes for the data sources to the subgraph instance.
self.persist_dynamic_data_sources(
logger.clone(),
&mut block_state.entity_cache,
data_sources,
);
self.persist_dynamic_data_sources(&mut block_state.entity_cache, data_sources);

// Process the triggers in each host in the same order the
// corresponding data sources have been created.
Expand Down Expand Up @@ -299,14 +278,18 @@ where
let proof_of_indexing = Arc::try_unwrap(proof_of_indexing).unwrap().into_inner();
update_proof_of_indexing(
proof_of_indexing,
&host_metrics.stopwatch,
&self.metrics.host.stopwatch,
&self.inputs.deployment.hash,
&mut block_state.entity_cache,
)
.await?;
}

let section = host_metrics.stopwatch.start_section("as_modifications");
let section = self
.metrics
.host
.stopwatch
.start_section("as_modifications");
let ModificationsAndCache {
modifications: mut mods,
data_sources,
Expand Down Expand Up @@ -336,8 +319,7 @@ where

// Transact entity operations into the store and update the
// subgraph's block stream pointer
let _section = host_metrics.stopwatch.start_section("transact_block");
let stopwatch = host_metrics.stopwatch.clone();
let _section = self.metrics.host.stopwatch.start_section("transact_block");
let start = Instant::now();

let store = &self.inputs.store;
Expand Down Expand Up @@ -365,7 +347,7 @@ where
block_ptr,
firehose_cursor,
mods,
stopwatch,
&self.metrics.host.stopwatch,
data_sources,
deterministic_errors,
) {
Expand All @@ -383,7 +365,8 @@ where
}

let elapsed = start.elapsed().as_secs_f64();
subgraph_metrics
self.metrics
.subgraph
.block_ops_transaction_duration
.observe(elapsed);

Expand Down Expand Up @@ -456,8 +439,6 @@ where

fn create_dynamic_data_sources(
&mut self,
logger: Logger,
host_metrics: Arc<HostMetrics>,
created_data_sources: Vec<DataSourceTemplateInfo<C>>,
) -> Result<(Vec<C::DataSource>, Vec<Arc<T::Host>>), Error> {
let mut data_sources = vec![];
Expand All @@ -469,10 +450,10 @@ where

// Try to create a runtime host for the data source
let host = self.ctx.instance.add_dynamic_data_source(
&logger,
&self.logger,
data_source.clone(),
self.inputs.templates.clone(),
host_metrics.clone(),
self.metrics.host.clone(),
)?;

match host {
Expand All @@ -483,7 +464,7 @@ where
None => {
fail_point!("error_on_duplicate_ds", |_| Err(anyhow!("duplicate ds")));
warn!(
logger,
self.logger,
"no runtime hosted created, there is already a runtime host instantiated for \
this data source";
"name" => &data_source.name(),
Expand All @@ -500,13 +481,12 @@ where

fn persist_dynamic_data_sources(
&mut self,
logger: Logger,
entity_cache: &mut EntityCache,
data_sources: Vec<C::DataSource>,
) {
if !data_sources.is_empty() {
debug!(
logger,
self.logger,
"Creating {} dynamic data source(s)",
data_sources.len()
);
Expand All @@ -516,7 +496,7 @@ where
// the dynamic data sources
for data_source in data_sources.iter() {
debug!(
logger,
self.logger,
"Persisting data_source";
"name" => &data_source.name(),
"address" => &data_source.address().map(|address| hex::encode(address)).unwrap_or("none".to_string()),
Expand Down Expand Up @@ -596,7 +576,6 @@ where
cursor: Option<String>,
cancel_handle: &CancelHandle,
) -> Result<Action, Error> {
let logger = self.logger.cheap_clone();
let block_ptr = block.ptr();
self.metrics
.stream
Expand All @@ -620,15 +599,9 @@ where
}

let start = Instant::now();
let deployment_failed = self.metrics.stream.deployment_failed.clone();

let res = self
.process_block(
self.inputs.triggers_adapter.cheap_clone(),
&cancel_handle,
block,
cursor.into(),
)
.process_block(&cancel_handle, block, cursor.into())
.await;

let elapsed = start.elapsed().as_secs_f64();
Expand Down Expand Up @@ -671,7 +644,7 @@ where
if let UnfailOutcome::Unfailed = outcome {
// Stop trying to unfail.
self.state.should_try_unfail_non_deterministic = false;
deployment_failed.set(0.0);
self.metrics.stream.deployment_failed.set(0.0);
self.state.backoff.reset();
}
}
Expand All @@ -690,21 +663,20 @@ where

if let Some(stop_block) = &self.inputs.stop_block {
if block_ptr.number >= *stop_block {
info!(&logger, "stop block reached for subgraph");
info!(self.logger, "stop block reached for subgraph");
return Ok(Action::Stop);
}
}

return Ok(Action::Continue);
}
Err(BlockProcessingError::Canceled) => {
debug!(&logger, "Subgraph block stream shut down cleanly");
debug!(self.logger, "Subgraph block stream shut down cleanly");
return Ok(Action::Stop);
}

// Handle unexpected stream errors by marking the subgraph as failed.
Err(e) => {
let store_for_err = self.inputs.store.cheap_clone();
// Clear entity cache when a subgraph fails.
//
// This is done to be safe and sure that there's no state that's
Expand All @@ -714,7 +686,7 @@ where
// and be transacted incorrectly in the next run.
self.state.entity_lfu_cache = LfuCache::new();

deployment_failed.set(1.0);
self.metrics.stream.deployment_failed.set(1.0);

let message = format!("{:#}", e).replace("\n", "\t");
let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure);
Expand All @@ -733,7 +705,8 @@ where
// Fail subgraph:
// - Change status/health.
// - Save the error to the database.
store_for_err
self.inputs
.store
.fail_subgraph(error)
.await
.context("Failed to set subgraph status to `failed`")?;
Expand All @@ -757,7 +730,8 @@ where
// Fail subgraph:
// - Change status/health.
// - Save the error to the database.
store_for_err
self.inputs
.store
.fail_subgraph(error)
.await
.context("Failed to set subgraph status to `failed`")?;
Expand All @@ -773,7 +747,7 @@ where
.remove(&self.inputs.deployment.id);

let message = format!("{:#}", e).replace("\n", "\t");
error!(logger, "Subgraph failed with non-deterministic error: {}", message;
error!(self.logger, "Subgraph failed with non-deterministic error: {}", message;
"attempt" => self.state.backoff.attempt,
"retry_delay_s" => self.state.backoff.delay().as_secs());

Expand Down
13 changes: 6 additions & 7 deletions core/src/subgraph/stream.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use crate::subgraph::inputs::IndexingInputs;
use graph::blockchain::block_stream::{BlockStream, BufferedBlockStream};
use graph::blockchain::Blockchain;
use graph::prelude::{CheapClone, Error};
use graph::prelude::Error;
use std::sync::Arc;

const BUFFERED_BLOCK_STREAM_SIZE: usize = 100;
const BUFFERED_FIREHOSE_STREAM_SIZE: usize = 1;

pub async fn new_block_stream<C: Blockchain>(
inputs: Arc<IndexingInputs<C>>,
filter: C::TriggerFilter,
inputs: &IndexingInputs<C>,
filter: &C::TriggerFilter,
) -> Result<Box<dyn BlockStream<C>>, Error> {
let chain = inputs.chain.cheap_clone();
let is_firehose = chain.is_firehose_supported();
let is_firehose = inputs.chain.is_firehose_supported();

let buffer_size = match is_firehose {
true => BUFFERED_FIREHOSE_STREAM_SIZE,
Expand All @@ -22,15 +21,15 @@ pub async fn new_block_stream<C: Blockchain>(
let current_ptr = inputs.store.block_ptr().await;

let block_stream = match is_firehose {
true => chain.new_firehose_block_stream(
true => inputs.chain.new_firehose_block_stream(
inputs.deployment.clone(),
inputs.store.block_cursor().await,
inputs.start_blocks.clone(),
current_ptr,
Arc::new(filter.clone()),
inputs.unified_api_version.clone(),
),
false => chain.new_polling_block_stream(
false => inputs.chain.new_polling_block_stream(
inputs.deployment.clone(),
inputs.start_blocks.clone(),
current_ptr,
Expand Down
2 changes: 1 addition & 1 deletion graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub trait WritableStore: Send + Sync + 'static {
block_ptr_to: BlockPtr,
firehose_cursor: Option<String>,
mods: Vec<EntityModification>,
stopwatch: StopwatchMetrics,
stopwatch: &StopwatchMetrics,
data_sources: Vec<StoredDynamicDataSource>,
deterministic_errors: Vec<SubgraphError>,
) -> Result<(), StoreError>;
Expand Down
2 changes: 1 addition & 1 deletion graph/tests/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl WritableStore for MockStore {
_: BlockPtr,
_: Option<String>,
_: Vec<EntityModification>,
_: StopwatchMetrics,
_: &StopwatchMetrics,
_: Vec<StoredDynamicDataSource>,
_: Vec<SubgraphError>,
) -> Result<(), StoreError> {
Expand Down
Loading

0 comments on commit 14d4d2e

Please sign in to comment.