diff --git a/Cargo.lock b/Cargo.lock index a4b49afe7e..0a4801f4fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -985,9 +985,9 @@ dependencies = [ [[package]] name = "deno_core" -version = "0.112.0" +version = "0.118.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28b0a532994e7d5ddfc029a33a67709c91e6c8926b97e4825e4d3056ee11abfc" +checksum = "68be6990e070141d1413797b84d8a05d3eb426fa18a9962ecf21d2f045014db4" dependencies = [ "anyhow", "futures", @@ -3110,7 +3110,7 @@ dependencies = [ [[package]] name = "router-bridge" version = "0.1.0" -source = "git+https://github.com/apollographql/federation.git?rev=950eb931e38746bb7cfed05382d6970a22e43cc4#950eb931e38746bb7cfed05382d6970a22e43cc4" +source = "git+https://github.com/apollographql/federation.git?rev=17b9ca519b54bad773b89c44a9ef033949788edb#17b9ca519b54bad773b89c44a9ef033949788edb" dependencies = [ "anyhow", "deno_core", @@ -3363,9 +3363,9 @@ dependencies = [ [[package]] name = "serde_v8" -version = "0.23.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "487456399b8722492bdc4c1834397f91c935247590dae30f8e779b7f48e3181a" +checksum = "27266c014ef9b11fcf7f1a248f25603529432aab4d0ce777d6c6f6aea2b367bb" dependencies = [ "serde", "serde_bytes", @@ -4318,9 +4318,9 @@ dependencies = [ [[package]] name = "v8" -version = "0.36.0" +version = "0.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "506523e86ccc15982be412bdde87a142771c139e94a8ecedda1da051a079b81d" +checksum = "684e95fe02e0acfeaf630df3a1623f6ad02145f9a92c54ceae8a1923319d3273" dependencies = [ "bitflags", "fslock", diff --git a/apollo-router-core/Cargo.toml b/apollo-router-core/Cargo.toml index dfd75cd28e..1a05d30c3b 100644 --- a/apollo-router-core/Cargo.toml +++ b/apollo-router-core/Cargo.toml @@ -30,7 +30,7 @@ paste = "1.0.6" reqwest = { version = "0.11.9", features = ["json", "stream"] } reqwest-middleware = "0.1.3" reqwest-tracing = { version = "0.2", features = ["opentelemetry_0_16"] } -router-bridge = { git = "https://github.com/apollographql/federation.git", rev = "950eb931e38746bb7cfed05382d6970a22e43cc4" } +router-bridge = { git = "https://github.com/apollographql/federation.git", rev = "17b9ca519b54bad773b89c44a9ef033949788edb" } serde = { version = "1.0.136", features = ["derive", "rc"] } serde_json = { version = "1.0.78", features = ["preserve_order"] } serde_json_bytes = { version = "0.2.0", features = ["preserve_order"] } diff --git a/apollo-router-core/src/error.rs b/apollo-router-core/src/error.rs index 227c997e50..21a1964444 100644 --- a/apollo-router-core/src/error.rs +++ b/apollo-router-core/src/error.rs @@ -12,7 +12,7 @@ use tracing::level_filters::LevelFilter; /// /// Note that these are not actually returned to the client, but are instead converted to JSON for /// [`struct@Error`]. -#[derive(Error, Display, Debug, Serialize, Deserialize)] +#[derive(Error, Display, Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] #[ignore_extra_doc_attributes] pub enum FetchError { diff --git a/apollo-router-core/src/query_planner/mod.rs b/apollo-router-core/src/query_planner/mod.rs index d7b58774fe..1aac741934 100644 --- a/apollo-router-core/src/query_planner/mod.rs +++ b/apollo-router-core/src/query_planner/mod.rs @@ -234,6 +234,9 @@ mod fetch { /// The GraphQL subquery that is used for the fetch. operation: String, + + /// The kind of operation (query, mutation or subscription) + operationKind: String, } struct Variables { diff --git a/apollo-router-core/src/request.rs b/apollo-router-core/src/request.rs index 9b71552474..fa630db966 100644 --- a/apollo-router-core/src/request.rs +++ b/apollo-router-core/src/request.rs @@ -10,7 +10,7 @@ use typed_builder::TypedBuilder; #[derive(Clone, Derivative, Serialize, Deserialize, TypedBuilder)] #[serde(rename_all = "camelCase")] #[builder(field_defaults(setter(into)))] -#[derivative(Debug, PartialEq)] +#[derivative(Debug, PartialEq, Eq, Hash)] pub struct Request { /// The graphql query. pub query: String, diff --git a/apollo-router-core/src/services/http_compat.rs b/apollo-router-core/src/services/http_compat.rs index 348a557169..c237f75c82 100644 --- a/apollo-router-core/src/services/http_compat.rs +++ b/apollo-router-core/src/services/http_compat.rs @@ -1,6 +1,10 @@ //! wrapper typpes for Request and Response from the http crate to improve their usability -use std::ops::{Deref, DerefMut}; +use std::{ + cmp::PartialEq, + hash::Hash, + ops::{Deref, DerefMut}, +}; #[derive(Debug, Default)] pub struct Request { @@ -61,6 +65,52 @@ impl DerefMut for Request { } } +impl Hash for Request { + fn hash(&self, state: &mut H) { + self.inner.method().hash(state); + self.inner.version().hash(state); + self.inner.uri().hash(state); + // this assumes headers are in the same order + for (name, value) in self.inner.headers() { + name.hash(state); + value.hash(state); + } + self.inner.body().hash(state); + } +} + +impl PartialEq for Request { + fn eq(&self, other: &Self) -> bool { + let mut res = self.inner.method().eq(other.inner.method()) + && self.inner.version().eq(&other.inner.version()) + && self.inner.uri().eq(other.inner.uri()); + + if !res { + return false; + } + if self.inner.headers().len() != other.inner.headers().len() { + return false; + } + + // this assumes headers are in the same order + for ((name, value), (other_name, other_value)) in self + .inner + .headers() + .iter() + .zip(other.inner.headers().iter()) + { + res = name.eq(other_name) && value.eq(other_value); + if !res { + return false; + } + } + + self.inner.body().eq(other.inner.body()) + } +} + +impl Eq for Request {} + impl From> for Request { fn from(inner: http::Request) -> Self { Request { inner } diff --git a/apollo-router/src/reqwest_subgraph_service.rs b/apollo-router/src/reqwest_subgraph_service.rs index d905ecd5a2..ef10a1abdb 100644 --- a/apollo-router/src/reqwest_subgraph_service.rs +++ b/apollo-router/src/reqwest_subgraph_service.rs @@ -1,7 +1,10 @@ use apollo_router_core::prelude::*; -use futures::future::BoxFuture; +use futures::lock::Mutex; +use futures::{future::BoxFuture, TryFutureExt}; +use std::collections::HashMap; use std::sync::Arc; use std::task::Poll; +use tokio::sync::broadcast::{self, Sender}; use tracing::Instrument; use typed_builder::TypedBuilder; @@ -15,6 +18,20 @@ pub struct ReqwestSubgraphService { // a url::Url instead of using the http crate // for now, to make things work, if the URL in the request is /, we use this URL url: reqwest::Url, + + wait_map: Arc< + Mutex< + HashMap< + apollo_router_core::http_compat::Request, + Sender< + Result< + apollo_router_core::http_compat::Response, + graphql::FetchError, + >, + >, + >, + >, + >, } impl ReqwestSubgraphService { @@ -34,8 +51,133 @@ impl ReqwestSubgraphService { .build(), service: Arc::new(service), url, + wait_map: Arc::new(Mutex::new(HashMap::new())), + } + } + + async fn dedup( + &self, + graphql::SubgraphRequest { + http_request, + context, + }: graphql::SubgraphRequest, + ) -> Result { + loop { + let mut locked_wait_map = self.wait_map.lock().await; + match locked_wait_map.get_mut(&http_request) { + Some(waiter) => { + // Register interest in key + let mut receiver = waiter.subscribe(); + drop(locked_wait_map); + + match receiver.recv().await { + Ok(value) => { + return value + .map(|response| graphql::SubgraphResponse { response, context }) + } + // there was an issue with the broadcast channel, retry fetching + Err(_) => continue, + } + } + None => { + let (tx, _rx) = broadcast::channel(1); + locked_wait_map.insert(http_request.clone(), tx.clone()); + drop(locked_wait_map); + + let res = self.fetch(http_request.clone()).await; + + { + let mut locked_wait_map = self.wait_map.lock().await; + locked_wait_map.remove(&http_request); + } + + // Let our waiters know + let broadcast_value = res.clone(); + // Our use case is very specific, so we are sure that + // we won't get any errors here. + tokio::task::spawn_blocking(move || { + tx.send(broadcast_value) + .expect("there is always at least one receiver alive, the _rx guard; qed") + }).await + .expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed"); + return res.map(|response| graphql::SubgraphResponse { response, context }); + } + } } } + + async fn fetch( + &self, + http_request: apollo_router_core::http_compat::Request, + ) -> Result< + apollo_router_core::http_compat::Response, + graphql::FetchError, + > { + let http_client = self.http_client.clone(); + let target_url = if http_request.uri() == "/" { + self.url.clone() + } else { + reqwest::Url::parse(&http_request.uri().to_string()).expect("todo") + }; + let service_name = (*self.service).to_owned(); + + let ( + http::request::Parts { + method, + version, + headers, + extensions: _, + .. + }, + body, + ) = http_request.into_parts(); + + let mut request = http_client + .request(method, target_url) + .json(&body) + .build() + .map_err(|err| { + tracing::error!(fetch_error = err.to_string().as_str()); + graphql::FetchError::SubrequestHttpError { + service: (*self.service).to_owned(), + reason: err.to_string(), + } + })?; + request.headers_mut().extend(headers.into_iter()); + *request.version_mut() = version; + + let response = http_client.execute(request).await.map_err(|err| { + tracing::error!(fetch_error = err.to_string().as_str()); + graphql::FetchError::SubrequestHttpError { + service: (*self.service).to_owned(), + reason: err.to_string(), + } + })?; + let body = response + .bytes() + .instrument(tracing::debug_span!("aggregate_response_data")) + .await + .map_err(|err| { + tracing::error!(fetch_error = format!("{:?}", err).as_str()); + + graphql::FetchError::SubrequestHttpError { + service: service_name.clone(), + reason: err.to_string(), + } + })?; + + let graphql: graphql::Response = + tracing::debug_span!("parse_subgraph_response").in_scope(|| { + graphql::Response::from_bytes(&service_name, body).map_err(|error| { + graphql::FetchError::SubrequestMalformedResponse { + service: service_name.clone(), + reason: error.to_string(), + } + }) + })?; + + Ok(http::Response::builder().body(graphql).expect("no argument can fail to parse or converted to the internal representation here; qed").into()) + } } impl tower::Service for ReqwestSubgraphService { @@ -48,14 +190,9 @@ impl tower::Service for ReqwestSubgraphService { Poll::Ready(Ok(())) } - fn call( - &mut self, - graphql::SubgraphRequest { - http_request, - context, - }: graphql::SubgraphRequest, - ) -> Self::Future { - let http_client = self.http_client.clone(); + fn call(&mut self, request: graphql::SubgraphRequest) -> Self::Future { + Box::pin(self.dedup(request).map_err(|e| e.into())) + /*let http_client = self.http_client.clone(); let target_url = if http_request.uri() == "/" { self.url.clone() } else { @@ -110,7 +247,7 @@ impl tower::Service for ReqwestSubgraphService { response: http::Response::builder().body(graphql).expect("no argument can fail to parse or converted to the internal representation here; qed").into(), context, }) - }) + })*/ } }