Skip to content
Open
25 changes: 11 additions & 14 deletions core/src/amp_subgraph/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use graph::{
cheap_clone::CheapClone, components::store::DeploymentLocator, log::factory::LoggerFactory,
};
use slog::{debug, error, info, warn, Logger};
use tokio::{sync::mpsc, task::JoinHandle, time::timeout};
use tokio::{
sync::mpsc::{self, error::SendError},
task::JoinHandle,
time::timeout,
};
use tokio_util::sync::CancellationToken;

/// Represents the maximum amount of time a subgraph instance is allowed to run
Expand Down Expand Up @@ -137,7 +141,7 @@ impl Monitor {
.new(slog::o!("method" => "start"));

info!(logger, "Starting subgraph");
handle_send_result(
log_send_error(
&logger,
self.command_tx.send(Command::Start {
id: self.subgraph_instance_id.fetch_add(1, SeqCst),
Expand All @@ -164,7 +168,7 @@ impl Monitor {
.new(slog::o!("method" => "stop"));

info!(logger, "Stopping subgraph");
handle_send_result(&logger, self.command_tx.send(Command::Stop { deployment }));
log_send_error(&logger, self.command_tx.send(Command::Stop { deployment }));
}

/// Processes commands sent through the command channel.
Expand All @@ -184,10 +188,6 @@ impl Monitor {
loop {
tokio::select! {
Some(command) = command_rx.recv() => {
debug!(logger, "Processing a new command";
"command" => ?command
);

match &command {
Command::Start { .. } => {
Self::process_start_command(
Expand Down Expand Up @@ -390,7 +390,7 @@ impl Monitor {

if let Some(pending_start_command) = pending_start_commands.remove(&deployment) {
debug!(logger, "Resending a pending start command");
handle_send_result(&logger, command_tx.send(pending_start_command));
log_send_error(&logger, command_tx.send(pending_start_command));
}
}

Expand Down Expand Up @@ -476,7 +476,7 @@ impl Monitor {
}

debug!(logger, "Sending clear command");
handle_send_result(&logger, command_tx.send(Command::Clear { id, deployment }));
log_send_error(&logger, command_tx.send(Command::Clear { id, deployment }));
}
});

Expand Down Expand Up @@ -546,13 +546,10 @@ impl fmt::Debug for Command {
}
}

fn handle_send_result(
logger: &Logger,
result: Result<(), tokio::sync::mpsc::error::SendError<Command>>,
) {
fn log_send_error(logger: &Logger, result: Result<(), SendError<Command>>) {
match result {
Ok(()) => {
debug!(logger, "Command was sent successfully");
// No need to log anything
}

// This should only happen if the parent cancel token of the subgraph monitor was cancelled
Expand Down
20 changes: 6 additions & 14 deletions core/src/amp_subgraph/runner/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ pub(in super::super) struct Context<AC> {
pub(super) logger: Logger,
pub(super) client: Arc<AC>,
pub(super) store: Arc<dyn WritableStore>,
pub(super) max_buffer_size: usize,
pub(super) max_block_range: usize,
pub(super) buffer_size: usize,
pub(super) block_range: usize,
pub(super) backoff: ExponentialBackoff,
pub(super) deployment: DeploymentHash,
pub(super) manifest: Manifest,
Expand All @@ -45,8 +45,8 @@ impl<AC> Context<AC> {
logger,
client,
store,
max_buffer_size: env.max_buffer_size,
max_block_range: env.max_block_range,
buffer_size: env.buffer_size,
block_range: env.block_range,
backoff,
deployment,
manifest,
Expand Down Expand Up @@ -77,15 +77,7 @@ impl<AC> Context<AC> {
.map(|block_ptr| (block_ptr.number.compat(), block_ptr.hash.compat()))
}

pub(super) fn total_queries(&self) -> usize {
self.manifest
.data_sources
.iter()
.map(|data_source| data_source.transformer.tables.len())
.sum()
}

pub(super) fn min_start_block(&self) -> BlockNumber {
pub(super) fn start_block(&self) -> BlockNumber {
self.manifest
.data_sources
.iter()
Expand All @@ -94,7 +86,7 @@ impl<AC> Context<AC> {
.unwrap()
}

pub(super) fn max_end_block(&self) -> BlockNumber {
pub(super) fn end_block(&self) -> BlockNumber {
self.manifest
.data_sources
.iter()
Expand Down
2 changes: 1 addition & 1 deletion core/src/amp_subgraph/runner/data_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async fn process_record_batch_group<AC>(
entity_lfu_cache,
evict_stats: _,
} = entity_cache
.as_modifications(block_number.compat())
.as_modifications(block_number.compat(), &cx.metrics.stopwatch)
.await
.map_err(Error::from)
.map_err(|e| e.context("failed to extract entity modifications from the state"))?;
Expand Down
Loading