Skip to content

Commit dc81f04

Browse files
committed
core: Extract FSM state transition methods for runner
Extract the initialize(), await_block(), restart(), and finalize() methods that will be used to drive the state machine loop in the next phase of the runner refactor. These methods are currently marked as dead_code since they're not yet wired into the main loop. Each method handles a specific state transition: - initialize(): Pre-loop setup including unfailing deterministic errors - await_block(): Wait for block stream events and determine next state - restart(): Handle store restart and block stream creation - finalize(): Clean up when runner reaches terminal state This is Phase 3.1 of the runner refactor plan.
1 parent 615d5e3 commit dc81f04

File tree

2 files changed

+230
-5
lines changed

2 files changed

+230
-5
lines changed

core/src/subgraph/runner/mod.rs

Lines changed: 226 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use std::sync::Arc;
4545
use std::time::{Duration, Instant};
4646
use std::vec;
4747

48-
use self::state::RunnerState;
48+
use self::state::{RestartReason, RunnerState, StopReason};
4949
use self::trigger_runner::TriggerRunner;
5050

5151
const MINUTE: Duration = Duration::from_secs(60);
@@ -253,6 +253,231 @@ where
253253
}
254254
}
255255

256+
/// Initialize the runner by performing pre-loop setup.
257+
///
258+
/// This method handles:
259+
/// - Updating the deployment synced metric
260+
/// - Attempting to unfail deterministic errors from the previous run
261+
/// - Checking if the subgraph has already reached its max end block
262+
///
263+
/// Returns the next state to transition to:
264+
/// - `Restarting` to start the block stream (normal case)
265+
/// - `Stopped` if the max end block was already reached
266+
///
267+
/// NOTE: This method is part of the Phase 3 runner refactor. It will be used
268+
/// to drive the state machine loop once all extraction methods are complete.
269+
#[allow(dead_code)]
270+
async fn initialize(&mut self) -> Result<RunnerState<C>, SubgraphRunnerError> {
271+
self.update_deployment_synced_metric();
272+
273+
// If a subgraph failed for deterministic reasons, before start indexing, we first
274+
// revert the deployment head. It should lead to the same result since the error was
275+
// deterministic.
276+
if let Some(current_ptr) = self.inputs.store.block_ptr() {
277+
if let Some(parent_ptr) = self
278+
.inputs
279+
.triggers_adapter
280+
.parent_ptr(&current_ptr)
281+
.await?
282+
{
283+
// This reverts the deployment head to the parent_ptr if
284+
// deterministic errors happened.
285+
//
286+
// There's no point in calling it if we have no current or parent block
287+
// pointers, because there would be: no block to revert to or to search
288+
// errors from (first execution).
289+
//
290+
// We attempt to unfail deterministic errors to mitigate deterministic
291+
// errors caused by wrong data being consumed from the providers. It has
292+
// been a frequent case in the past so this helps recover on a larger scale.
293+
let _outcome = self
294+
.inputs
295+
.store
296+
.unfail_deterministic_error(&current_ptr, &parent_ptr)
297+
.await?;
298+
}
299+
300+
// Stop subgraph when we reach maximum endblock.
301+
if let Some(max_end_block) = self.inputs.max_end_block {
302+
if max_end_block <= current_ptr.block_number() {
303+
info!(self.logger, "Stopping subgraph as we reached maximum endBlock";
304+
"max_end_block" => max_end_block,
305+
"current_block" => current_ptr.block_number());
306+
self.inputs.store.flush().await?;
307+
return Ok(RunnerState::Stopped {
308+
reason: StopReason::MaxEndBlockReached,
309+
});
310+
}
311+
}
312+
}
313+
314+
// Normal case: proceed to start the block stream
315+
Ok(RunnerState::Restarting {
316+
reason: RestartReason::StoreError, // Initial start uses the same path as restart
317+
})
318+
}
319+
320+
/// Await the next block stream event and transition to the appropriate state.
321+
///
322+
/// This method waits for the next event from the block stream and determines
323+
/// which state the runner should transition to:
324+
/// - `ProcessingBlock` for new blocks to process
325+
/// - `Reverting` for revert events
326+
/// - `Stopped` when the stream ends or is canceled
327+
/// - Returns back to `AwaitingBlock` for non-fatal errors that allow continuation
328+
///
329+
/// NOTE: This method is part of the Phase 3 runner refactor. It will be used
330+
/// to drive the state machine loop once all extraction methods are complete.
331+
#[allow(dead_code)]
332+
async fn await_block(
333+
&mut self,
334+
mut block_stream: Cancelable<Box<dyn BlockStream<C>>>,
335+
) -> Result<RunnerState<C>, SubgraphRunnerError> {
336+
let event = {
337+
let _section = self.metrics.stream.stopwatch.start_section("scan_blocks");
338+
block_stream.next().await
339+
};
340+
341+
// Check for cancellation after receiving the event
342+
if self.is_canceled() {
343+
if self.ctx.instances.contains(&self.inputs.deployment.id) {
344+
warn!(
345+
self.logger,
346+
"Terminating the subgraph runner because a newer one is active. \
347+
Possible reassignment detected while the runner was in a non-cancellable pending state",
348+
);
349+
return Err(SubgraphRunnerError::Duplicate);
350+
}
351+
352+
warn!(
353+
self.logger,
354+
"Terminating the subgraph runner because subgraph was unassigned",
355+
);
356+
return Ok(RunnerState::Stopped {
357+
reason: StopReason::Unassigned,
358+
});
359+
}
360+
361+
match event {
362+
Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => {
363+
Ok(RunnerState::ProcessingBlock { block, cursor })
364+
}
365+
Some(Ok(BlockStreamEvent::Revert(to_ptr, cursor))) => {
366+
Ok(RunnerState::Reverting { to_ptr, cursor })
367+
}
368+
// Log and drop the errors from the block_stream
369+
// The block stream will continue attempting to produce blocks
370+
Some(Err(e)) => {
371+
// Handle fatal errors by stopping
372+
if let CancelableError::Error(BlockStreamError::Fatal(msg)) = &e {
373+
error!(
374+
&self.logger,
375+
"The block stream encountered a substreams fatal error and will not retry: {}",
376+
msg
377+
);
378+
379+
self.inputs
380+
.store
381+
.fail_subgraph(SubgraphError {
382+
subgraph_id: self.inputs.deployment.hash.clone(),
383+
message: msg.clone(),
384+
block_ptr: None,
385+
handler: None,
386+
deterministic: true,
387+
})
388+
.await
389+
.context("Failed to set subgraph status to `failed`")?;
390+
391+
return Ok(RunnerState::Stopped {
392+
reason: StopReason::DeterministicError,
393+
});
394+
}
395+
396+
// Non-fatal error: log and continue waiting for blocks
397+
debug!(
398+
&self.logger,
399+
"Block stream produced a non-fatal error";
400+
"error" => format!("{}", e),
401+
);
402+
Ok(RunnerState::AwaitingBlock { block_stream })
403+
}
404+
// If the block stream ends, that means that there is no more indexing to do.
405+
None => Ok(RunnerState::Stopped {
406+
reason: StopReason::StreamEnded,
407+
}),
408+
}
409+
}
410+
411+
/// Handle a restart by potentially restarting the store and starting a new block stream.
412+
///
413+
/// This method handles:
414+
/// - Restarting the store if there were errors (to clear error state)
415+
/// - Reverting state to the last good block if the store was restarted
416+
/// - Starting a new block stream with updated filters
417+
///
418+
/// Returns the next state to transition to:
419+
/// - `AwaitingBlock` with the new block stream (normal case)
420+
///
421+
/// NOTE: This method is part of the Phase 3 runner refactor. It will be used
422+
/// to drive the state machine loop once all extraction methods are complete.
423+
#[allow(dead_code)]
424+
async fn restart(
425+
&mut self,
426+
reason: RestartReason,
427+
) -> Result<RunnerState<C>, SubgraphRunnerError> {
428+
debug!(self.logger, "Starting or restarting subgraph"; "reason" => ?reason);
429+
430+
// If restarting due to store error, try to restart the store
431+
if matches!(reason, RestartReason::StoreError) {
432+
let store = self.inputs.store.cheap_clone();
433+
if let Some(store) = store.restart().await? {
434+
let last_good_block = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0);
435+
self.revert_state_to(last_good_block)?;
436+
self.inputs = Arc::new(self.inputs.with_store(store));
437+
}
438+
}
439+
440+
let block_stream = self.start_block_stream().await?;
441+
442+
debug!(self.logger, "Started block stream");
443+
self.metrics.subgraph.deployment_status.running();
444+
self.update_deployment_synced_metric();
445+
446+
Ok(RunnerState::AwaitingBlock { block_stream })
447+
}
448+
449+
/// Finalize the runner when it reaches a terminal state.
450+
///
451+
/// This method handles cleanup tasks when the runner stops:
452+
/// - Flushing the store to ensure all changes are persisted
453+
/// - Logging the stop reason
454+
///
455+
/// NOTE: This method is part of the Phase 3 runner refactor. It will be used
456+
/// to drive the state machine loop once all extraction methods are complete.
457+
#[allow(dead_code)]
458+
async fn finalize(self, reason: StopReason) -> Result<Self, SubgraphRunnerError> {
459+
match reason {
460+
StopReason::MaxEndBlockReached => {
461+
info!(self.logger, "Stopping subgraph - max end block reached");
462+
}
463+
StopReason::Canceled => {
464+
info!(self.logger, "Stopping subgraph - canceled");
465+
}
466+
StopReason::Unassigned => {
467+
info!(self.logger, "Stopping subgraph - unassigned");
468+
}
469+
StopReason::StreamEnded => {
470+
info!(self.logger, "Stopping subgraph - stream ended");
471+
}
472+
StopReason::DeterministicError => {
473+
info!(self.logger, "Stopping subgraph - deterministic error");
474+
}
475+
}
476+
477+
self.inputs.store.flush().await?;
478+
Ok(self)
479+
}
480+
256481
pub async fn run(self) -> Result<(), SubgraphRunnerError> {
257482
self.run_inner(false).await.map(|_| ())
258483
}

docs/plans/runner-refactor.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,10 @@ Each phase is complete when:
226226

227227
### Phase 3: Refactor run_inner to State Machine
228228

229-
- [ ] Extract `initialize()` method
230-
- [ ] Extract `await_block()` method
231-
- [ ] Extract `restart()` method
232-
- [ ] Extract `finalize()` method
229+
- [x] Extract `initialize()` method
230+
- [x] Extract `await_block()` method
231+
- [x] Extract `restart()` method
232+
- [x] Extract `finalize()` method
233233
- [ ] Rewrite `run_inner` as state machine loop
234234
- [ ] Remove nested loop structure
235235
- [ ] Verify tests pass

0 commit comments

Comments
 (0)