Skip to content

Commit

Permalink
query deduplication
Browse files Browse the repository at this point in the history
not compiling for now, will be reimplemented as a layer
  • Loading branch information
Geal committed Feb 11, 2022
1 parent d234b2e commit 18c20c5
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 21 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apollo-router-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ paste = "1.0.6"
reqwest = { version = "0.11.9", features = ["json", "stream"] }
reqwest-middleware = "0.1.3"
reqwest-tracing = { version = "0.2", features = ["opentelemetry_0_16"] }
router-bridge = { git = "https://github.com/apollographql/federation.git", rev = "950eb931e38746bb7cfed05382d6970a22e43cc4" }
router-bridge = { git = "https://github.com/apollographql/federation.git", rev = "17b9ca519b54bad773b89c44a9ef033949788edb" }
serde = { version = "1.0.136", features = ["derive", "rc"] }
serde_json = { version = "1.0.78", features = ["preserve_order"] }
serde_json_bytes = { version = "0.2.0", features = ["preserve_order"] }
Expand Down
2 changes: 1 addition & 1 deletion apollo-router-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::level_filters::LevelFilter;
///
/// Note that these are not actually returned to the client, but are instead converted to JSON for
/// [`struct@Error`].
#[derive(Error, Display, Debug, Serialize, Deserialize)]
#[derive(Error, Display, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
#[ignore_extra_doc_attributes]
pub enum FetchError {
Expand Down
3 changes: 3 additions & 0 deletions apollo-router-core/src/query_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ mod fetch {

/// The GraphQL subquery that is used for the fetch.
operation: String,

/// The kind of operation (query, mutation or subscription)
operationKind: String,
}

struct Variables {
Expand Down
2 changes: 1 addition & 1 deletion apollo-router-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use typed_builder::TypedBuilder;
#[derive(Clone, Derivative, Serialize, Deserialize, TypedBuilder)]
#[serde(rename_all = "camelCase")]
#[builder(field_defaults(setter(into)))]
#[derivative(Debug, PartialEq)]
#[derivative(Debug, PartialEq, Eq, Hash)]
pub struct Request {
/// The graphql query.
pub query: String,
Expand Down
52 changes: 51 additions & 1 deletion apollo-router-core/src/services/http_compat.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
//! wrapper typpes for Request and Response from the http crate to improve their usability

use std::ops::{Deref, DerefMut};
use std::{
cmp::PartialEq,
hash::Hash,
ops::{Deref, DerefMut},
};

#[derive(Debug, Default)]
pub struct Request<T> {
Expand Down Expand Up @@ -61,6 +65,52 @@ impl<T> DerefMut for Request<T> {
}
}

impl<T: Hash> Hash for Request<T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.inner.method().hash(state);
self.inner.version().hash(state);
self.inner.uri().hash(state);
// this assumes headers are in the same order
for (name, value) in self.inner.headers() {
name.hash(state);
value.hash(state);
}
self.inner.body().hash(state);
}
}

impl<T: PartialEq> PartialEq for Request<T> {
fn eq(&self, other: &Self) -> bool {
let mut res = self.inner.method().eq(other.inner.method())
&& self.inner.version().eq(&other.inner.version())
&& self.inner.uri().eq(other.inner.uri());

if !res {
return false;
}
if self.inner.headers().len() != other.inner.headers().len() {
return false;
}

// this assumes headers are in the same order
for ((name, value), (other_name, other_value)) in self
.inner
.headers()
.iter()
.zip(other.inner.headers().iter())
{
res = name.eq(other_name) && value.eq(other_value);
if !res {
return false;
}
}

self.inner.body().eq(other.inner.body())
}
}

impl<T: PartialEq> Eq for Request<T> {}

impl<T> From<http::Request<T>> for Request<T> {
fn from(inner: http::Request<T>) -> Self {
Request { inner }
Expand Down
157 changes: 147 additions & 10 deletions apollo-router/src/reqwest_subgraph_service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use apollo_router_core::prelude::*;
use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::{future::BoxFuture, TryFutureExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::task::Poll;
use tokio::sync::broadcast::{self, Sender};
use tracing::Instrument;
use typed_builder::TypedBuilder;

Expand All @@ -15,6 +18,20 @@ pub struct ReqwestSubgraphService {
// a url::Url instead of using the http crate
// for now, to make things work, if the URL in the request is /, we use this URL
url: reqwest::Url,

wait_map: Arc<
Mutex<
HashMap<
apollo_router_core::http_compat::Request<graphql::Request>,
Sender<
Result<
apollo_router_core::http_compat::Response<graphql::Response>,
graphql::FetchError,
>,
>,
>,
>,
>,
}

impl ReqwestSubgraphService {
Expand All @@ -34,8 +51,133 @@ impl ReqwestSubgraphService {
.build(),
service: Arc::new(service),
url,
wait_map: Arc::new(Mutex::new(HashMap::new())),
}
}

async fn dedup(
&self,
graphql::SubgraphRequest {
http_request,
context,
}: graphql::SubgraphRequest,
) -> Result<graphql::SubgraphResponse, graphql::FetchError> {
loop {
let mut locked_wait_map = self.wait_map.lock().await;
match locked_wait_map.get_mut(&http_request) {
Some(waiter) => {
// Register interest in key
let mut receiver = waiter.subscribe();
drop(locked_wait_map);

match receiver.recv().await {
Ok(value) => {
return value
.map(|response| graphql::SubgraphResponse { response, context })
}
// there was an issue with the broadcast channel, retry fetching
Err(_) => continue,
}
}
None => {
let (tx, _rx) = broadcast::channel(1);
locked_wait_map.insert(http_request.clone(), tx.clone());
drop(locked_wait_map);

let res = self.fetch(http_request.clone()).await;

{
let mut locked_wait_map = self.wait_map.lock().await;
locked_wait_map.remove(&http_request);
}

// Let our waiters know
let broadcast_value = res.clone();
// Our use case is very specific, so we are sure that
// we won't get any errors here.
tokio::task::spawn_blocking(move || {
tx.send(broadcast_value)
.expect("there is always at least one receiver alive, the _rx guard; qed")
}).await
.expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed");
return res.map(|response| graphql::SubgraphResponse { response, context });
}
}
}
}

async fn fetch(
&self,
http_request: apollo_router_core::http_compat::Request<apollo_router_core::Request>,
) -> Result<
apollo_router_core::http_compat::Response<apollo_router_core::Response>,
graphql::FetchError,
> {
let http_client = self.http_client.clone();
let target_url = if http_request.uri() == "/" {
self.url.clone()
} else {
reqwest::Url::parse(&http_request.uri().to_string()).expect("todo")
};
let service_name = (*self.service).to_owned();

let (
http::request::Parts {
method,
version,
headers,
extensions: _,
..
},
body,
) = http_request.into_parts();

let mut request = http_client
.request(method, target_url)
.json(&body)
.build()
.map_err(|err| {
tracing::error!(fetch_error = err.to_string().as_str());
graphql::FetchError::SubrequestHttpError {
service: (*self.service).to_owned(),
reason: err.to_string(),
}
})?;
request.headers_mut().extend(headers.into_iter());
*request.version_mut() = version;

let response = http_client.execute(request).await.map_err(|err| {
tracing::error!(fetch_error = err.to_string().as_str());
graphql::FetchError::SubrequestHttpError {
service: (*self.service).to_owned(),
reason: err.to_string(),
}
})?;
let body = response
.bytes()
.instrument(tracing::debug_span!("aggregate_response_data"))
.await
.map_err(|err| {
tracing::error!(fetch_error = format!("{:?}", err).as_str());

graphql::FetchError::SubrequestHttpError {
service: service_name.clone(),
reason: err.to_string(),
}
})?;

let graphql: graphql::Response =
tracing::debug_span!("parse_subgraph_response").in_scope(|| {
graphql::Response::from_bytes(&service_name, body).map_err(|error| {
graphql::FetchError::SubrequestMalformedResponse {
service: service_name.clone(),
reason: error.to_string(),
}
})
})?;

Ok(http::Response::builder().body(graphql).expect("no argument can fail to parse or converted to the internal representation here; qed").into())
}
}

impl tower::Service<graphql::SubgraphRequest> for ReqwestSubgraphService {
Expand All @@ -48,14 +190,9 @@ impl tower::Service<graphql::SubgraphRequest> for ReqwestSubgraphService {
Poll::Ready(Ok(()))
}

fn call(
&mut self,
graphql::SubgraphRequest {
http_request,
context,
}: graphql::SubgraphRequest,
) -> Self::Future {
let http_client = self.http_client.clone();
fn call(&mut self, request: graphql::SubgraphRequest) -> Self::Future {
Box::pin(self.dedup(request).map_err(|e| e.into()))
/*let http_client = self.http_client.clone();
let target_url = if http_request.uri() == "/" {
self.url.clone()
} else {
Expand Down Expand Up @@ -110,7 +247,7 @@ impl tower::Service<graphql::SubgraphRequest> for ReqwestSubgraphService {
response: http::Response::builder().body(graphql).expect("no argument can fail to parse or converted to the internal representation here; qed").into(),
context,
})
})
})*/
}
}

Expand Down

0 comments on commit 18c20c5

Please sign in to comment.