Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatch errors from the primary response to deferred responses #2192

Merged
merged 34 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8b8c8b9
split queries between primary and deferred parts
Geal Nov 15, 2022
994dac2
handle named fragments
Geal Nov 15, 2022
84e9d83
cleanup
Geal Nov 16, 2022
5f993b9
do nnot add the defer directive to the generated query
Geal Nov 16, 2022
597fbc8
logs
Geal Nov 16, 2022
62a3fb7
Merge branch 'dev' into geal/split-queries
Geal Dec 5, 2022
73c24fb
update router-bridge and apollo-encoder
Geal Dec 5, 2022
405803c
handle queryPath in deferred nodes
Geal Dec 5, 2022
02bdb78
split arrays
Geal Dec 6, 2022
0364c60
Merge branch 'dev' into geal/split-queries
Geal Dec 6, 2022
0563c78
missing snapshots
Geal Dec 6, 2022
f903ee5
missing snapshot
Geal Dec 6, 2022
ecb2dcb
lint
Geal Dec 6, 2022
93a7ec2
handle arrays
Geal Dec 6, 2022
9b222bf
remove unused code
Geal Dec 6, 2022
606dcbd
add comments
Geal Dec 6, 2022
cdf1b03
Merge branch 'dev' into geal/split-queries
Geal Dec 6, 2022
3a1bc19
update router-bridge to fix CI builds
Geal Dec 6, 2022
e794955
update router-bridge
Geal Dec 12, 2022
223bc97
Merge branch 'dev' into geal/split-queries
Geal Dec 12, 2022
20eeefe
Merge branch 'dev' into geal/split-queries
Geal Dec 12, 2022
55e6ad9
filter the path for valueCompletion extensions
Geal Dec 1, 2022
45c5ce8
dispatch errors from the primary query
Geal Dec 1, 2022
ca55832
add a test for expected behaviour
Geal Dec 1, 2022
68b9b18
we need to check that a path is handled by a query
Geal Dec 2, 2022
d268507
lint
Geal Dec 6, 2022
0fe7cbf
rename test
Geal Dec 6, 2022
7c606d7
check that a query contains an error path
Geal Dec 6, 2022
d564e36
changelog & lint
Geal Dec 6, 2022
b2fb376
remove debug print
Geal Dec 12, 2022
1744aa6
Merge branch 'dev' into geal/dispatch-errors-from-primary
Geal Dec 12, 2022
38954e1
fix broken english
Geal Dec 12, 2022
dce1978
Apply suggestions from code review
Geal Dec 12, 2022
60a3b2e
This is a combination of 2 commits.
Geal Dec 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/p
When we drop Telemetry we spawn a thread to perform the global opentelemetry trace provider shutdown. The documentation of this function indicates that "This will invoke the shutdown method on all span processors. span processors should export remaining spans before return". We should give that process some time to complete (5 seconds currently) before returning from the `drop`. This will provide more opportunity for spans to be exported.

By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/2191
### Dispatch errors from the primary response to deferred responses ([Issue #1818](https://github.com/apollographql/router/issues/1818), [Issue #2185](https://github.com/apollographql/router/issues/2185))

When errors are generated during the primary execution, some of them can be assigned to
deferred responses.

By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/pull/2192

### Reconstruct deferred queries with knowledge about fragments ([Issue #2105](https://github.com/apollographql/router/issues/2105))

Expand Down
21 changes: 14 additions & 7 deletions apollo-router/src/query_planner/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ impl PlanNode {
HashMap::new();
let mut futures = Vec::new();

let (primary_sender, _) = tokio::sync::broadcast::channel::<Value>(1);
let (primary_sender, _) =
tokio::sync::broadcast::channel::<(Value, Vec<Error>)>(1);

for deferred_node in deferred {
let fut = deferred_node
Expand Down Expand Up @@ -266,10 +267,10 @@ impl PlanNode {
errors.extend(err.into_iter());
subselection = primary_subselection.clone();

let _ = primary_sender.send(value.clone());
let _ = primary_sender.send((value.clone(), errors.clone()));
} else {
subselection = primary_subselection.clone();
let _ = primary_sender.send(value.clone());
let _ = primary_sender.send((value.clone(), errors.clone()));
}
}
.instrument(tracing::info_span!(
Expand Down Expand Up @@ -357,7 +358,7 @@ impl DeferredNode {
parameters: &'a ExecutionParameters<'a, SF>,
parent_value: &Value,
sender: futures::channel::mpsc::Sender<Response>,
primary_sender: &Sender<Value>,
primary_sender: &Sender<(Value, Vec<Error>)>,
deferred_fetches: &mut HashMap<String, Sender<(Value, Vec<Error>)>>,
) -> impl Future<Output = ()>
where
Expand Down Expand Up @@ -406,8 +407,10 @@ impl DeferredNode {
let mut errors = Vec::new();

if is_depends_empty {
let primary_value = primary_receiver.recv().await.unwrap_or_default();
let (primary_value, primary_errors) =
primary_receiver.recv().await.unwrap_or_default();
value.deep_merge(primary_value);
errors.extend(primary_errors.into_iter())
} else {
while let Some((v, _remaining)) = stream.next().await {
// a Err(RecvError) means either that the fetch was not performed and the
Expand Down Expand Up @@ -449,8 +452,10 @@ impl DeferredNode {
.await;

if !is_depends_empty {
let primary_value = primary_receiver.recv().await.unwrap_or_default();
let (primary_value, primary_errors) =
primary_receiver.recv().await.unwrap_or_default();
v.deep_merge(primary_value);
errors.extend(primary_errors.into_iter())
}

if let Err(e) = tx
Expand All @@ -473,8 +478,10 @@ impl DeferredNode {
};
tx.disconnect();
} else {
let primary_value = primary_receiver.recv().await.unwrap_or_default();
let (primary_value, primary_errors) =
primary_receiver.recv().await.unwrap_or_default();
value.deep_merge(primary_value);
errors.extend(primary_errors.into_iter());

if let Err(e) = tx
.send(
Expand Down
57 changes: 54 additions & 3 deletions apollo-router/src/services/execution_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::subgraph_service::SubgraphServiceFactory;
use super::Plugins;
use crate::graphql::IncrementalResponse;
use crate::graphql::Response;
use crate::json_ext::Object;
use crate::json_ext::Path;
use crate::json_ext::PathElement;
use crate::json_ext::ValueExt;
Expand Down Expand Up @@ -128,6 +129,10 @@ where
response.has_next = Some(has_next);
}

response.errors.retain(|error| match &error.path {
None => true,
Some(error_path) => query.contains_error_path(operation_name.as_deref(), response.subselection.as_deref(), response.path.as_ref(), error_path),
});
ready(Some(response))
}
// if the deferred response specified a path, we must extract the
Expand Down Expand Up @@ -157,6 +162,9 @@ where
}
});

let query = query.clone();
let operation_name = operation_name.clone();

let incremental = sub_responses
.into_iter()
.filter_map(move |(path, data)| {
Expand All @@ -166,25 +174,67 @@ where
.iter()
.filter(|error| match &error.path {
None => false,
Some(err_path) => err_path.starts_with(&path),
Some(error_path) =>query.contains_error_path(operation_name.as_deref(), response.subselection.as_deref(), response.path.as_ref(), error_path) && error_path.starts_with(&path),

})
.cloned()
.collect::<Vec<_>>();

let extensions: Object = response
.extensions
.iter()
.map(|(key, value)| {
if key.as_str() == "valueCompletion" {
let value = match value.as_array() {
None => Value::Null,
Some(v) => Value::Array(
v.iter()
.filter(|ext| {
match ext
.as_object()
.as_ref()
.and_then(|ext| {
ext.get("path")
})
.and_then(|v| {
let p:Option<Path> = serde_json_bytes::from_value(v.clone()).ok();
p
}) {
None => false,
Some(ext_path) => {
ext_path
.starts_with(
&path,
)
}
}
})
.cloned()
.collect(),
),
};

(key.clone(), value)
} else {
(key.clone(), value.clone())
}
})
.collect();

// an empty response should not be sent
// still, if there's an error or extension to show, we should
// send it
if !data.is_null()
|| !errors.is_empty()
|| !response.extensions.is_empty()
|| !extensions.is_empty()
{
Some(
IncrementalResponse::builder()
.and_label(response.label.clone())
.data(data)
.path(path)
.errors(errors)
.extensions(response.extensions.clone())
.extensions(extensions)
.build(),
)
} else {
Expand All @@ -199,6 +249,7 @@ where
.incremental(incremental)
.build(),
))

}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,6 @@ expression: stream.next_response().await.unwrap()
"suborga",
0
]
},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is removing the redundant parts of the snapshot, now that error dispatching is fixed

{
"message": "Cannot return null for non-nullable field Organization.nonNullId",
"path": [
"currentUser",
"activeOrganization",
"suborga",
1
]
},
{
"message": "Cannot return null for non-nullable field Organization.nonNullId",
"path": [
"currentUser",
"activeOrganization",
"suborga",
2
]
}
]
}
Expand All @@ -55,15 +37,6 @@ expression: stream.next_response().await.unwrap()
],
"extensions": {
"valueCompletion": [
{
"message": "Cannot return null for non-nullable field Organization.nonNullId",
"path": [
"currentUser",
"activeOrganization",
"suborga",
0
]
},
{
"message": "Cannot return null for non-nullable field Organization.nonNullId",
"path": [
Expand All @@ -72,15 +45,6 @@ expression: stream.next_response().await.unwrap()
"suborga",
1
]
},
{
"message": "Cannot return null for non-nullable field Organization.nonNullId",
"path": [
"currentUser",
"activeOrganization",
"suborga",
2
]
}
]
}
Expand All @@ -95,24 +59,6 @@ expression: stream.next_response().await.unwrap()
],
"extensions": {
"valueCompletion": [
{
"message": "Cannot return null for non-nullable field Organization.nonNullId",
"path": [
"currentUser",
"activeOrganization",
"suborga",
0
]
},
{
"message": "Cannot return null for non-nullable field Organization.nonNullId",
"path": [
"currentUser",
"activeOrganization",
"suborga",
1
]
},
{
"message": "Cannot return null for non-nullable field Organization.nonNullId",
"path": [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
source: apollo-router/src/services/supergraph_service.rs
expression: stream.next_response().await.unwrap()
---
{
"hasNext": false,
"incremental": [
{
"data": {
"errorField": null
},
"path": [
"computer"
],
"errors": [
{
"message": "Error field",
"locations": [
{
"line": 1,
"column": 93
}
],
"path": [
"computer",
"errorField"
]
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
source: apollo-router/src/services/supergraph_service.rs
expression: stream.next_response().await.unwrap()
---
{
"data": {
"computer": {
"id": "Computer1"
}
},
"hasNext": true
}
Loading