Skip to content

Commit 0634883

Browse files
committed
core: Extract TriggerRunner component for unified trigger processing
Eliminate the duplicated trigger processing loops in runner.rs by extracting a TriggerRunner component that handles the execution of triggers against runtime hosts. This component is now used for both initial trigger processing and dynamic data source trigger processing. This is Phase 1 of the runner refactor as described in docs/plans/runner-refactor.md.
1 parent 40d9f8a commit 0634883

File tree

3 files changed

+126
-67
lines changed

3 files changed

+126
-67
lines changed
Lines changed: 43 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
mod trigger_runner;
2+
13
use crate::subgraph::context::IndexingContext;
24
use crate::subgraph::error::{
35
ClassifyErrorHelper as _, DetailHelper as _, NonDeterministicErrorHelper as _, ProcessingError,
@@ -42,6 +44,8 @@ use std::sync::Arc;
4244
use std::time::{Duration, Instant};
4345
use std::vec;
4446

47+
use self::trigger_runner::TriggerRunner;
48+
4549
const MINUTE: Duration = Duration::from_secs(60);
4650

4751
const SKIP_PTR_UPDATES_THRESHOLD: Duration = Duration::from_secs(60 * 5);
@@ -615,44 +619,32 @@ where
615619

616620
// Match and decode all triggers in the block
617621
let hosts_filter = |trigger: &TriggerData<C>| self.ctx.instance.hosts_for_trigger(trigger);
618-
let match_res = self
622+
let runnables = self
619623
.match_and_decode_many(&logger, &block, triggers, hosts_filter)
620624
.await;
621625

622626
// Process events one after the other, passing in entity operations
623627
// collected previously to every new event being processed
624-
let mut res = Ok(block_state);
625-
match match_res {
628+
let trigger_runner = TriggerRunner::new(
629+
self.ctx.trigger_processor.as_ref(),
630+
&self.logger,
631+
&self.metrics.subgraph,
632+
&self.inputs.debug_fork,
633+
self.inputs.instrument,
634+
);
635+
let res = match runnables {
626636
Ok(runnables) => {
627-
for runnable in runnables {
628-
let process_res = self
629-
.ctx
630-
.trigger_processor
631-
.process_trigger(
632-
&self.logger,
633-
runnable.hosted_triggers,
634-
&block,
635-
res.unwrap(),
636-
&proof_of_indexing,
637-
&causality_region,
638-
&self.inputs.debug_fork,
639-
&self.metrics.subgraph,
640-
self.inputs.instrument,
641-
)
642-
.await
643-
.map_err(|e| e.add_trigger_context(&runnable.trigger));
644-
match process_res {
645-
Ok(state) => res = Ok(state),
646-
Err(e) => {
647-
res = Err(e);
648-
break;
649-
}
650-
}
651-
}
652-
}
653-
Err(e) => {
654-
res = Err(e);
637+
trigger_runner
638+
.execute(
639+
&block,
640+
runnables,
641+
block_state,
642+
&proof_of_indexing,
643+
&causality_region,
644+
)
645+
.await
655646
}
647+
Err(e) => Err(e),
656648
};
657649

658650
match res {
@@ -751,43 +743,31 @@ where
751743
let hosts_filter = |_: &'_ TriggerData<C>| -> Box<dyn Iterator<Item = _> + Send> {
752744
Box::new(runtime_hosts.iter().map(Arc::as_ref))
753745
};
754-
let match_res: Result<Vec<_>, _> = self
746+
let runnables = self
755747
.match_and_decode_many(&logger, &block, triggers, hosts_filter)
756748
.await;
757749

758-
let mut res = Ok(block_state);
759-
match match_res {
750+
let trigger_runner = TriggerRunner::new(
751+
self.ctx.trigger_processor.as_ref(),
752+
&self.logger,
753+
&self.metrics.subgraph,
754+
&self.inputs.debug_fork,
755+
self.inputs.instrument,
756+
);
757+
let res = match runnables {
760758
Ok(runnables) => {
761-
for runnable in runnables {
762-
let process_res = self
763-
.ctx
764-
.trigger_processor
765-
.process_trigger(
766-
&self.logger,
767-
runnable.hosted_triggers,
768-
&block,
769-
res.unwrap(),
770-
&proof_of_indexing,
771-
&causality_region,
772-
&self.inputs.debug_fork,
773-
&self.metrics.subgraph,
774-
self.inputs.instrument,
775-
)
776-
.await
777-
.map_err(|e| e.add_trigger_context(&runnable.trigger));
778-
match process_res {
779-
Ok(state) => res = Ok(state),
780-
Err(e) => {
781-
res = Err(e);
782-
break;
783-
}
784-
}
785-
}
786-
}
787-
Err(e) => {
788-
res = Err(e);
759+
trigger_runner
760+
.execute(
761+
&block,
762+
runnables,
763+
block_state,
764+
&proof_of_indexing,
765+
&causality_region,
766+
)
767+
.await
789768
}
790-
}
769+
Err(e) => Err(e),
770+
};
791771

792772
block_state = res.map_err(|e| {
793773
// This treats a `PossibleReorg` as an ordinary error which will fail the subgraph.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use std::sync::Arc;
2+
3+
use graph::blockchain::Blockchain;
4+
use graph::components::store::SubgraphFork;
5+
use graph::components::subgraph::{MappingError, SharedProofOfIndexing};
6+
use graph::components::trigger_processor::RunnableTriggers;
7+
use graph::prelude::{BlockState, RuntimeHostBuilder, SubgraphInstanceMetrics, TriggerProcessor};
8+
use graph::slog::Logger;
9+
10+
/// Handles the execution of triggers against runtime hosts, accumulating state.
11+
///
12+
/// This component unifies the trigger processing loop that was previously duplicated
13+
/// for initial triggers and dynamically created data source triggers.
14+
pub struct TriggerRunner<'a, C: Blockchain, T: RuntimeHostBuilder<C>> {
15+
processor: &'a dyn TriggerProcessor<C, T>,
16+
logger: &'a Logger,
17+
metrics: &'a Arc<SubgraphInstanceMetrics>,
18+
debug_fork: &'a Option<Arc<dyn SubgraphFork>>,
19+
instrument: bool,
20+
}
21+
22+
impl<'a, C, T> TriggerRunner<'a, C, T>
23+
where
24+
C: Blockchain,
25+
T: RuntimeHostBuilder<C>,
26+
{
27+
/// Create a new TriggerRunner with the given dependencies.
28+
pub fn new(
29+
processor: &'a dyn TriggerProcessor<C, T>,
30+
logger: &'a Logger,
31+
metrics: &'a Arc<SubgraphInstanceMetrics>,
32+
debug_fork: &'a Option<Arc<dyn SubgraphFork>>,
33+
instrument: bool,
34+
) -> Self {
35+
Self {
36+
processor,
37+
logger,
38+
metrics,
39+
debug_fork,
40+
instrument,
41+
}
42+
}
43+
44+
/// Execute a sequence of runnable triggers, accumulating state changes.
45+
///
46+
/// Processes each trigger in order. If any trigger fails with a non-deterministic
47+
/// error, processing stops and the error is returned. Deterministic errors are
48+
/// accumulated in the block state.
49+
pub async fn execute(
50+
&self,
51+
block: &Arc<C::Block>,
52+
runnables: Vec<RunnableTriggers<'a, C>>,
53+
block_state: BlockState,
54+
proof_of_indexing: &SharedProofOfIndexing,
55+
causality_region: &str,
56+
) -> Result<BlockState, MappingError> {
57+
let mut state = block_state;
58+
59+
for runnable in runnables {
60+
state = self
61+
.processor
62+
.process_trigger(
63+
self.logger,
64+
runnable.hosted_triggers,
65+
block,
66+
state,
67+
proof_of_indexing,
68+
causality_region,
69+
self.debug_fork,
70+
self.metrics,
71+
self.instrument,
72+
)
73+
.await
74+
.map_err(|e| e.add_trigger_context(&runnable.trigger))?;
75+
}
76+
77+
Ok(state)
78+
}
79+
}

docs/plans/runner-refactor.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,10 @@ Each phase is complete when:
211211

212212
### Phase 1: Extract TriggerRunner Component
213213

214-
- [ ] Create `TriggerRunner` struct with execute method
215-
- [ ] Replace first trigger loop (lines 616-656)
216-
- [ ] Replace second trigger loop (lines 754-790)
217-
- [ ] Verify tests pass
214+
- [x] Create `TriggerRunner` struct with execute method
215+
- [x] Replace first trigger loop (lines 616-656)
216+
- [x] Replace second trigger loop (lines 754-790)
217+
- [x] Verify tests pass
218218

219219
### Phase 2: Define RunnerState Enum
220220

0 commit comments

Comments
 (0)