Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query deduplication #285

Merged
merged 13 commits into from
Feb 16, 2022
2 changes: 1 addition & 1 deletion apollo-router-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::Future;
use std::sync::Arc;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Context<T = Arc<http_compat::Request<Request>>> {
/// Original request to the Router.
pub request: T,
Expand Down
2 changes: 1 addition & 1 deletion apollo-router-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::level_filters::LevelFilter;
///
/// Note that these are not actually returned to the client, but are instead converted to JSON for
/// [`struct@Error`].
#[derive(Error, Display, Debug, Serialize, Deserialize)]
#[derive(Error, Display, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
#[ignore_extra_doc_attributes]
pub enum FetchError {
Expand Down
130 changes: 130 additions & 0 deletions apollo-router-core/src/layers/deduplication.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use crate::{fetch::OperationKind, http_compat, Request, SubgraphRequest, SubgraphResponse};
use futures::{future::BoxFuture, lock::Mutex};
use std::{collections::HashMap, sync::Arc, task::Poll};
use tokio::sync::broadcast::{self, Sender};
use tower::{BoxError, Layer, ServiceExt};

pub struct QueryDeduplicationLayer;

impl<S> Layer<S> for QueryDeduplicationLayer
where
S: tower::Service<SubgraphRequest, Response = SubgraphResponse, Error = BoxError> + Clone,
{
type Service = QueryDeduplicationService<S>;

fn layer(&self, service: S) -> Self::Service {
QueryDeduplicationService::new(service)
}
}

type WaitMap =
Arc<Mutex<HashMap<http_compat::Request<Request>, Sender<Result<SubgraphResponse, String>>>>>;

pub struct QueryDeduplicationService<S> {
service: S,
wait_map: WaitMap,
}

impl<S> QueryDeduplicationService<S>
where
S: tower::Service<SubgraphRequest, Response = SubgraphResponse, Error = BoxError> + Clone,
{
fn new(service: S) -> Self {
QueryDeduplicationService {
service,
wait_map: Arc::new(Mutex::new(HashMap::new())),
}
}

async fn dedup(
service: S,
wait_map: WaitMap,
request: SubgraphRequest,
) -> Result<SubgraphResponse, BoxError> {
loop {
let mut locked_wait_map = wait_map.lock().await;
match locked_wait_map.get_mut(&request.http_request) {
Some(waiter) => {
// Register interest in key
let mut receiver = waiter.subscribe();
drop(locked_wait_map);

match receiver.recv().await {
Ok(value) => {
return value
.map(|response| SubgraphResponse {
response: response.response,
context: request.context,
})
.map_err(|e| e.into())
}
// there was an issue with the broadcast channel, retry fetching
Err(_) => continue,
}
}
None => {
let (tx, _rx) = broadcast::channel(1);
locked_wait_map.insert(request.http_request.clone(), tx.clone());
drop(locked_wait_map);

let context = request.context.clone();
let http_request = request.http_request.clone();
let res = service.ready_oneshot().await?.call(request).await;

{
let mut locked_wait_map = wait_map.lock().await;
locked_wait_map.remove(&http_request);
}

// Let our waiters know
let broadcast_value = res
.as_ref()
.map(|response| response.clone())
.map_err(|e| e.to_string());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use BoxError all over the code (mainly because Buffer enforces BoxError), and BoxError is not Clone. But here we want to copy the response to all of the waiting queries, even if we got an error. Since we cannot access the underlying type, the best we can do is convert to a String and pass it around

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok as a String? Would it be better as a json Value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could serialize a JSON value to a string and pass it as error. Unfortunately, the only way to interact with std::error::Error is through strings: https://doc.rust-lang.org/nightly/std/error/trait.Error.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we convert to String and preserve the error as source()? If we did, would it deliver any value?
(I'm just trying to avoid lowest common denominator of error type, but I guess we don't have many options.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC we discussed (with @BrynCooke I think?) using FetchError as the error type for all subgraph services and layers, and convert to BoxError just before passing it back to the Buffer layer, but deferred that to later as that would be a big change introduced by that PR


// Our use case is very specific, so we are sure that
// we won't get any errors here.
tokio::task::spawn_blocking(move || {
tx.send(broadcast_value)
.expect("there is always at least one receiver alive, the _rx guard; qed")
}).await
.expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed");

return res.map(|response| SubgraphResponse {
response: response.response,
context,
});
}
}
}
}
}

impl<S> tower::Service<SubgraphRequest> for QueryDeduplicationService<S>
where
S: tower::Service<SubgraphRequest, Response = SubgraphResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
<S as tower::Service<SubgraphRequest>>::Future: Send + 'static,
{
type Response = SubgraphResponse;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, request: SubgraphRequest) -> Self::Future {
let mut service = self.service.clone();

if request.operation_kind == OperationKind::Query {
let wait_map = self.wait_map.clone();

Box::pin(async move { Self::dedup(service, wait_map, request).await })
} else {
Box::pin(async move { service.call(request).await })
}
}
}
2 changes: 2 additions & 0 deletions apollo-router-core/src/layers/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ mod test {
use tower::{BoxError, Layer};
use tower::{Service, ServiceExt};

use crate::fetch::OperationKind;
use crate::headers::{
InsertConfig, InsertLayer, PropagateConfig, PropagateLayer, RemoveConfig, RemoveLayer,
};
Expand Down Expand Up @@ -471,6 +472,7 @@ mod test {
.body(Request::builder().query("query").build())
.unwrap()
.into(),
operation_kind: OperationKind::Query,
context: example_originating_request(),
}
}
Expand Down
1 change: 1 addition & 0 deletions apollo-router-core/src/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod cache;
pub mod deduplication;
pub mod forbid_http_get_mutations;
pub mod headers;
6 changes: 4 additions & 2 deletions apollo-router-core/src/query_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ pub(crate) mod fetch {
operation_kind: OperationKind,
}

#[derive(Debug, PartialEq, Deserialize)]
#[derive(Copy, Clone, Debug, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) enum OperationKind {
pub enum OperationKind {
Query,
Mutation,
Subscription,
Expand Down Expand Up @@ -349,6 +349,7 @@ pub(crate) mod fetch {
) -> Result<Value, FetchError> {
let FetchNode {
operation,
operation_kind,
service_name,
..
} = self;
Expand All @@ -375,6 +376,7 @@ pub(crate) mod fetch {
.unwrap()
.into(),
context: context.clone(),
operation_kind: *operation_kind,
};

let service = service_registry
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: apollo-router-core/src/query_planner/mod.rs
assertion_line: 473
assertion_line: 467
expression: query_plan

---
Expand Down
2 changes: 1 addition & 1 deletion apollo-router-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use typed_builder::TypedBuilder;
#[derive(Clone, Derivative, Serialize, Deserialize, TypedBuilder, Default)]
#[serde(rename_all = "camelCase")]
#[builder(field_defaults(setter(into)))]
#[derivative(Debug, PartialEq)]
#[derivative(Debug, PartialEq, Eq, Hash)]
pub struct Request {
/// The graphql query.
pub query: String,
Expand Down
52 changes: 51 additions & 1 deletion apollo-router-core/src/services/http_compat.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
//! wrapper typpes for Request and Response from the http crate to improve their usability

use std::ops::{Deref, DerefMut};
use std::{
cmp::PartialEq,
hash::Hash,
ops::{Deref, DerefMut},
};

#[derive(Debug, Default)]
pub struct Request<T> {
Expand Down Expand Up @@ -61,6 +65,52 @@ impl<T> DerefMut for Request<T> {
}
}

impl<T: Hash> Hash for Request<T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.inner.method().hash(state);
self.inner.version().hash(state);
self.inner.uri().hash(state);
// this assumes headers are in the same order
for (name, value) in self.inner.headers() {
name.hash(state);
value.hash(state);
}
Comment on lines +73 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really dislike HeaderMap. We can't even sort it...

I suppose the downside of not spotting duplicates is that our query can't be de-duplicated. Not a bug, but defeats the purpose of de-duplicating.

We can do something a bit crafty here. HeaderValue does implement Ord and HeaderName always convert to &str, so:

        // Map Header names into &str so we can sort
        let mut tmp: Vec<(&str, &HeaderValue)> = self
            .inner
            .headers()
            .iter()
            .map(|(k, v)| (k.as_str(), v))
            .collect();
        tmp.sort();
        for (name, value) in tmp {
            name.hash(state);
            value.hash(state);
        }

would give us a consistent ordering for hashing purposes. I think we could do the same for Eq as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should refrain from sorting the headers, since their order can have an impact. Example: the Accept header can have multiple values, which can come either as comma separated in one header value, or as multiple separated Accept headers. In that second case, if we sort the headers, that might reorder the values and change the behaviour.

The assumption for the cache key here is that similar queries coming from the same client will have the same shape (same user agent, same list of headers in the same order...).

What we could do though is decide on which headers we consider for the cache. Once we do that, we would get stronger guarantees, and could expose in the docs the issues around ordering. Could we explore that in another PR though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only sorting our copy, the original headers are un-impacted. I agree that we can't touch the original headers, which we couldn't sort anyway since HeaderName doesn't implement Ord.

Does the original order of headers matter for hashing purposes? i.e.: don't we want the ordering to be consistent to help improve our de-duplication chances?

I'm fine with moving this discussion to a follow-up. I think it's important to make the comparison less fragile, but it doesn't need to be decided before merging to the tower branch. If you don't want to put my suggestion in, perhaps preserve it in the follow-up issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's definitely a good idea to revisit it and find a robust solution

self.inner.body().hash(state);
}
}

impl<T: PartialEq> PartialEq for Request<T> {
fn eq(&self, other: &Self) -> bool {
let mut res = self.inner.method().eq(other.inner.method())
&& self.inner.version().eq(&other.inner.version())
&& self.inner.uri().eq(other.inner.uri());

if !res {
return false;
}
if self.inner.headers().len() != other.inner.headers().len() {
return false;
}

// this assumes headers are in the same order
for ((name, value), (other_name, other_value)) in self
.inner
.headers()
.iter()
.zip(other.inner.headers().iter())
{
res = name.eq(other_name) && value.eq(other_value);
if !res {
return false;
}
}

self.inner.body().eq(other.inner.body())
}
}

impl<T: PartialEq> Eq for Request<T> {}

impl<T> From<http::Request<T>> for Request<T> {
fn from(inner: http::Request<T>) -> Self {
Request { inner }
Expand Down
4 changes: 4 additions & 0 deletions apollo-router-core/src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub use self::execution_service::*;
pub use self::router_service::*;
use crate::fetch::OperationKind;
use crate::layers::cache::CachingLayer;
use crate::prelude::graphql::*;
use moka::sync::Cache;
Expand Down Expand Up @@ -84,9 +85,12 @@ pub struct SubgraphRequest {
pub http_request: http_compat::Request<Request>,

pub context: Context,

pub operation_kind: OperationKind,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding the operation kind here for now, as I am not sure yet if it should be in the Context object: this is data that's only needed for this request and does not need to be shared with other request, and there's no place in the HTTP request to put it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh that's interesting, we have it deep in the query planner, I use a rough variant of this in the http get related PR but this doesn't fit your usecase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has to be in the query planner, and from there it's used in subgraph queries. So both use cases are linked

}

assert_impl_all!(SubgraphResponse: Send);
#[derive(Clone, Debug)]
pub struct SubgraphResponse {
pub response: http_compat::Response<Response>,

Expand Down
22 changes: 15 additions & 7 deletions apollo-router/src/reqwest_subgraph_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use apollo_router_core::prelude::*;
use futures::future::BoxFuture;
use std::sync::Arc;
use std::task::Poll;
use tower::BoxError;
use tracing::Instrument;
use typed_builder::TypedBuilder;

Expand Down Expand Up @@ -40,21 +41,21 @@ impl ReqwestSubgraphService {

impl tower::Service<graphql::SubgraphRequest> for ReqwestSubgraphService {
type Response = graphql::SubgraphResponse;
type Error = tower::BoxError;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
//TODO backpressure
Poll::Ready(Ok(()))
}

fn call(
&mut self,
graphql::SubgraphRequest {
fn call(&mut self, request: graphql::SubgraphRequest) -> Self::Future {
let graphql::SubgraphRequest {
http_request,
context,
}: graphql::SubgraphRequest,
) -> Self::Future {
..
} = request;

let http_client = self.http_client.clone();
let target_url = if http_request.uri() == "/" {
self.url.clone()
Expand Down Expand Up @@ -82,7 +83,14 @@ impl tower::Service<graphql::SubgraphRequest> for ReqwestSubgraphService {
request.headers_mut().extend(headers.into_iter());
*request.version_mut() = version;

let response = http_client.execute(request).await?;
let response = http_client.execute(request).await.map_err(|err| {
tracing::error!(fetch_error = format!("{:?}", err).as_str());

graphql::FetchError::SubrequestHttpError {
service: service_name.clone(),
reason: err.to_string(),
}
})?;
let body = response
.bytes()
.instrument(tracing::debug_span!("aggregate_response_data"))
Expand Down
7 changes: 4 additions & 3 deletions apollo-router/src/router_factory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::configuration::{Configuration, ConfigurationError};
use crate::reqwest_subgraph_service::ReqwestSubgraphService;
use apollo_router_core::deduplication::QueryDeduplicationLayer;
use apollo_router_core::{
http_compat::{Request, Response},
PluggableRouterServiceBuilder, ResponseBody, RouterRequest, Schema,
Expand Down Expand Up @@ -70,9 +71,9 @@ impl RouterServiceFactory for YamlRouterServiceFactory {
let mut builder = PluggableRouterServiceBuilder::new(schema, buffer, dispatcher.clone());

for (name, subgraph) in &configuration.subgraphs {
let mut subgraph_service = BoxService::new(ReqwestSubgraphService::new(
name.to_string(),
subgraph.routing_url.clone(),
let dedup_layer = QueryDeduplicationLayer;
let mut subgraph_service = BoxService::new(dedup_layer.layer(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding the dedup layer here has less impact on the types than making the subgraph a BoxCloneService then adding the layer then boxing it again

ReqwestSubgraphService::new(name.to_string(), subgraph.routing_url.clone()),
));

for layer in &subgraph.layers {
Expand Down