Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ dev = "run --package qp-dev-cli"
subgraphs = "run --package subgraphs"
test_all = "test --workspace"
test_qp = "test --package query-planner -- --nocapture"
test_qpe = "test --package query-plan-executor -- --nocapture"
"clippy:fix" = "clippy --all --fix --allow-dirty --allow-staged"
29 changes: 25 additions & 4 deletions lib/query-plan-executor/benches/executor_benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use query_plan_executor::executors::map::SubgraphExecutorMap;
use query_planner::ast::selection_item::SelectionItem;
use query_planner::ast::selection_set::InlineFragmentSelection;
use query_planner::graph::PlannerOverrideContext;
use query_planner::planner::plan_nodes::FlattenNodePathSegment;
use std::hint::black_box;

use query_plan_executor::execute_query_plan;
Expand Down Expand Up @@ -242,8 +243,17 @@ fn project_data_by_operation(c: &mut Criterion) {

fn traverse_and_collect(c: &mut Criterion) {
let path = [
"users", "@", "reviews", "@", "product", "reviews", "@", "author", "reviews", "@",
"product",
FlattenNodePathSegment::Field("users".into()),
FlattenNodePathSegment::List,
FlattenNodePathSegment::Field("reviews".into()),
FlattenNodePathSegment::List,
FlattenNodePathSegment::Field("product".into()),
FlattenNodePathSegment::Field("reviews".into()),
FlattenNodePathSegment::List,
FlattenNodePathSegment::Field("author".into()),
FlattenNodePathSegment::Field("reviews".into()),
FlattenNodePathSegment::List,
FlattenNodePathSegment::Field("product".into()),
];
let mut result: Value = non_projected_result::get_result();
c.bench_function("traverse_and_collect", |b| {
Expand Down Expand Up @@ -328,8 +338,19 @@ fn deep_merge_with_complex(c: &mut Criterion) {

fn project_requires(c: &mut Criterion) {
let path = [
"users", "@", "reviews", "@", "product", "reviews", "@", "author", "reviews", "@",
"product",
FlattenNodePathSegment::Field("users".into()),
FlattenNodePathSegment::List,
FlattenNodePathSegment::Field("reviews".into()),
FlattenNodePathSegment::List,
FlattenNodePathSegment::Field("product".into()),
FlattenNodePathSegment::List,
FlattenNodePathSegment::Field("reviews".into()),
FlattenNodePathSegment::List,
FlattenNodePathSegment::Field("author".into()),
FlattenNodePathSegment::List,
FlattenNodePathSegment::Field("reviews".into()),
FlattenNodePathSegment::List,
FlattenNodePathSegment::Field("product".into()),
];
let mut result: Value = non_projected_result::get_result();
let data = result.get_mut("data").unwrap();
Expand Down
117 changes: 93 additions & 24 deletions lib/query-plan-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use query_planner::{
operation::OperationDefinition, selection_item::SelectionItem, selection_set::SelectionSet,
},
planner::plan_nodes::{
ConditionNode, FetchNode, FetchNodePathSegment, FetchRewrite, FlattenNode, KeyRenamer,
ParallelNode, PlanNode, QueryPlan, SequenceNode, ValueSetter,
ConditionNode, FetchNode, FetchNodePathSegment, FetchRewrite, FlattenNode, FlattenNodePath,
FlattenNodePathSegment, KeyRenamer, ParallelNode, PlanNode, QueryPlan, SequenceNode,
ValueSetter,
},
state::supergraph_state::OperationKind,
};
Expand Down Expand Up @@ -550,7 +551,7 @@ type ExecutionStepJob<'a, T> = BoxFuture<'a, T>;
struct ExecutionStep<'a> {
fetch_job: Option<ExecutionStepJob<'a, ExecutionResult>>,
flatten_job: Option<ExecutionStepJob<'a, ExecuteForRepresentationsResult>>,
flatten_path: Option<Vec<&'a str>>,
flatten_path: Option<&'a FlattenNodePath>,
}

fn create_execution_step<'a>(
Expand All @@ -572,12 +573,22 @@ fn create_execution_step<'a>(
return ExecutionStep::default();
};

let normalized_path: Vec<&str> = flatten_node.path.iter().map(String::as_str).collect();
let collected_representations = traverse_and_collect(data, &normalized_path);
let collected_representations =
traverse_and_collect(data, flatten_node.path.as_slice());

if collected_representations.is_empty() {
// No representations collected, skip execution
return ExecutionStep::default();
}

let project_result =
fetch_node.project_representations(execution_context, &collected_representations);

if project_result.representations.is_empty() {
// No representations collected, skip execution
return ExecutionStep::default();
}

let job = fetch_node.execute_for_projected_representations(
execution_context,
project_result.representations,
Expand All @@ -586,7 +597,7 @@ fn create_execution_step<'a>(

ExecutionStep {
flatten_job: Some(job),
flatten_path: Some(normalized_path),
flatten_path: Some(&flatten_node.path),
..Default::default()
}
}
Expand Down Expand Up @@ -643,7 +654,7 @@ impl ExecutablePlanNode for ParallelNode {
flatten_jobs.push(flatten_job);
}
if let Some(flatten_path) = res.flatten_path {
flatten_paths.push(flatten_path);
flatten_paths.push(flatten_path.as_slice());
}
}
trace!(
Expand Down Expand Up @@ -671,7 +682,7 @@ impl ExecutablePlanNode for ParallelNode {
for (result, path) in flatten_results.into_iter().zip(flatten_paths) {
// Process FlattenNode results
if let Some(entities) = result.entities {
let mut collected_representations = traverse_and_collect(data, &path);
let mut collected_representations = traverse_and_collect(data, path);
for (entity, index) in entities.into_iter().zip(result.indexes.into_iter()) {
if let Some(representation) = collected_representations.get_mut(index) {
// Merge the entity into the representation
Expand Down Expand Up @@ -739,14 +750,20 @@ impl ExecutablePlanNode for FlattenNode {
) {
// Execute the child node. `execution_context` can be borrowed mutably
// because `collected_representations` borrows `data_for_flatten`, not `execution_context.data`.
let normalized_path: Vec<&str> = self.path.iter().map(String::as_str).collect();
let now = std::time::Instant::now();
let mut representations = traverse_and_collect(data, normalized_path.as_slice());
let mut representations = traverse_and_collect(data, self.path.as_slice());
trace!(
"traversed and collected representations: {:?} in {:#?}",
representations.len(),
now.elapsed()
);

if representations.is_empty() {
// If there are no representations,
// return early without executing the child node.
return;
}

match self.node.as_ref() {
PlanNode::Fetch(fetch_node) => {
let now = std::time::Instant::now();
Expand All @@ -760,6 +777,12 @@ impl ExecutablePlanNode for FlattenNode {
now.elapsed()
);

if filtered_representations.is_empty() {
// If there are no filtered representations,
// return early without executing the child node.
return;
}

let now = std::time::Instant::now();
let result = fetch_node
.execute_for_projected_representations(
Expand Down Expand Up @@ -1033,26 +1056,72 @@ fn entity_satisfies_type_condition(
))]
pub fn traverse_and_collect<'a>(
current_data: &'a mut Value,
remaining_path: &[&str],
remaining_path: &[FlattenNodePathSegment],
) -> Vec<&'a mut Value> {
match (current_data, remaining_path) {
(Value::Array(arr), []) => arr.iter_mut().collect(), // Base case: No more path segments, return all items in the array
(current_data, []) => vec![current_data], // Base case: No more path segments,
(Value::Object(obj), [next_segment, next_remaining_path @ ..]) => {
if let Some(next_value) = obj.get_mut(*next_segment) {
traverse_and_collect(next_value, next_remaining_path)
} else {
vec![] // No valid path segment
// If the path is empty, we're done traversing.
let Some((segment, remaining_path)) = remaining_path.split_first() else {
return match current_data {
// If the final result is an array, return all its items.
Value::Array(arr) => arr.iter_mut().collect(),
// Otherwise, return the value itself in a vector.
_ => vec![current_data],
};
};

match segment {
FlattenNodePathSegment::Field(field) => {
// Attempt to access a field on an object
match current_data.get_mut(field) {
Some(next_value) => traverse_and_collect(next_value, remaining_path),
// Either the field doesn't exist, or it's is not an object
None => vec![],
}
}

FlattenNodePathSegment::List => {
match current_data {
Value::Array(arr) => arr
.iter_mut()
.flat_map(|item| traverse_and_collect(item, remaining_path))
.collect(),
// List is only valid for arrays
_ => vec![],
}
}

FlattenNodePathSegment::Cast(type_name) => {
match current_data {
Value::Object(_) => {
// For a single object, a missing `__typename` is a pass-through
if contains_typename(current_data, type_name, true) {
traverse_and_collect(current_data, remaining_path)
} else {
vec![]
}
}
Value::Array(arr) => {
// Filter an array based on matching `__typename`
arr.iter_mut()
.filter(|item| contains_typename(item, type_name, false))
.flat_map(|item| traverse_and_collect(item, remaining_path))
.collect()
}
// Cast is only valid for objects and arrays
_ => vec![],
}
}
(Value::Array(arr), ["@", next_remaining_path @ ..]) => arr
.iter_mut()
.flat_map(|item| traverse_and_collect(item, next_remaining_path))
.collect(),
_ => vec![], // No valid path segment
}
}

/// Checks if a serde_json::Value has a `__typename` that matches the given `type_name`.
fn contains_typename(value: &Value, type_name: &str, default_for_missing: bool) -> bool {
value
.as_object()
.and_then(|obj| obj.get("__typename"))
.and_then(Value::as_str)
.map_or(default_for_missing, |s| s == type_name)
}

// --- Helper Functions ---

// --- Main Function (for testing) ---
Expand Down
2 changes: 2 additions & 0 deletions lib/query-plan-executor/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use subgraphs::accounts;

use crate::executors::{common::SubgraphExecutor, map::SubgraphExecutorMap};

mod traverse_and_collect;

#[test]
fn query_executor_pipeline_locally() {
tokio_test::block_on(async {
Expand Down
Loading
Loading