Skip to content

Commit

Permalink
refactor the APQ implementation (#2129)
Browse files Browse the repository at this point in the history
remove duplicated code
  • Loading branch information
Geal authored Nov 24, 2022
1 parent 9da6890 commit c42f0d8
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 127 deletions.
9 changes: 9 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ to validate the data sent back to the client. Those query shapes were invalid fo
By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2102

## 🛠 Maintenance


### Refactor APQ ([PR #2129](https://github.com/apollographql/router/pull/2129))

Remove duplicated code.

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2129


## 📚 Documentation

### Docs: Update cors match regex example ([Issue #2151](https://github.com/apollographql/router/issues/2151))
Expand Down
198 changes: 71 additions & 127 deletions apollo-router/src/services/layers/apq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,69 +50,9 @@ impl APQLayer {

pub(crate) async fn apq_request(
&self,
mut request: SupergraphRequest,
request: SupergraphRequest,
) -> Result<SupergraphRequest, SupergraphResponse> {
let maybe_query_hash: Option<(String, Vec<u8>)> = request
.supergraph_request
.body()
.extensions
.get("persistedQuery")
.and_then(|value| serde_json_bytes::from_value::<PersistedQuery>(value.clone()).ok())
.and_then(|persisted_query| {
hex::decode(persisted_query.sha256hash.as_bytes())
.ok()
.map(|decoded| (persisted_query.sha256hash, decoded))
});

let body_query = request.supergraph_request.body().query.clone();

match (maybe_query_hash, body_query) {
(Some((query_hash, query_hash_bytes)), Some(query)) => {
if query_matches_hash(query.as_str(), query_hash_bytes.as_slice()) {
tracing::trace!("apq: cache insert");
let _ = request.context.insert("persisted_query_hit", false);
self.cache.insert(format!("apq|{query_hash}"), query).await;
} else {
tracing::warn!("apq: graphql request doesn't match provided sha256Hash");
}
Ok(request)
}
(Some((apq_hash, _)), _) => {
if let Ok(cached_query) =
self.cache.get(&format!("apq|{apq_hash}")).await.get().await
{
let _ = request.context.insert("persisted_query_hit", true);
tracing::trace!("apq: cache hit");
request.supergraph_request.body_mut().query = Some(cached_query);
Ok(request)
} else {
tracing::trace!("apq: cache miss");
let errors = vec![crate::error::Error {
message: "PersistedQueryNotFound".to_string(),
locations: Default::default(),
path: Default::default(),
extensions: serde_json_bytes::from_value(json!({
"code": "PERSISTED_QUERY_NOT_FOUND",
"exception": {
"stacktrace": [
"PersistedQueryNotFoundError: PersistedQueryNotFound",
],
},
}))
.unwrap(),
}];
let res = SupergraphResponse::builder()
.data(Value::default())
.errors(errors)
.context(request.context)
.build()
.expect("response is valid");

Err(res)
}
}
_ => Ok(request),
}
apq_request(&self.cache, request).await
}
}

Expand All @@ -136,73 +76,12 @@ where
fn layer(&self, service: S) -> Self::Service {
let cache = self.cache.clone();
AsyncCheckpointService::new(
move |mut req| {
move |request| {
let cache = cache.clone();
Box::pin(async move {
let maybe_query_hash: Option<(String, Vec<u8>)> = req
.supergraph_request
.body()
.extensions
.get("persistedQuery")
.and_then(|value| {
serde_json_bytes::from_value::<PersistedQuery>(value.clone()).ok()
})
.and_then(|persisted_query| {
hex::decode(persisted_query.sha256hash.as_bytes())
.ok()
.map(|decoded| (persisted_query.sha256hash, decoded))
});

let body_query = req.supergraph_request.body().query.clone();

match (maybe_query_hash, body_query) {
(Some((query_hash, query_hash_bytes)), Some(query)) => {
if query_matches_hash(query.as_str(), query_hash_bytes.as_slice()) {
tracing::trace!("apq: cache insert");
let _ = req.context.insert("persisted_query_hit", false);
cache.insert(format!("apq|{query_hash}"), query).await;
} else {
tracing::warn!(
"apq: graphql request doesn't match provided sha256Hash"
);
}
Ok(ControlFlow::Continue(req))
}
(Some((apq_hash, _)), _) => {
if let Ok(cached_query) =
cache.get(&format!("apq|{apq_hash}")).await.get().await
{
let _ = req.context.insert("persisted_query_hit", true);
tracing::trace!("apq: cache hit");
req.supergraph_request.body_mut().query = Some(cached_query);
Ok(ControlFlow::Continue(req))
} else {
tracing::trace!("apq: cache miss");
let errors = vec![crate::error::Error {
message: "PersistedQueryNotFound".to_string(),
locations: Default::default(),
path: Default::default(),
extensions: serde_json_bytes::from_value(json!({
"code": "PERSISTED_QUERY_NOT_FOUND",
"exception": {
"stacktrace": [
"PersistedQueryNotFoundError: PersistedQueryNotFound",
],
},
}))
.unwrap(),
}];
let res = SupergraphResponse::builder()
.data(Value::default())
.errors(errors)
.context(req.context)
.build()
.expect("response is valid");

Ok(ControlFlow::Break(res))
}
}
_ => Ok(ControlFlow::Continue(req)),
match apq_request(&cache, request).await {
Ok(request) => Ok(ControlFlow::Continue(request)),
Err(response) => Ok(ControlFlow::Break(response)),
}
})
as BoxFuture<
Expand All @@ -221,6 +100,71 @@ where
}
}

pub(crate) async fn apq_request(
cache: &DeduplicatingCache<String, String>,
mut request: SupergraphRequest,
) -> Result<SupergraphRequest, SupergraphResponse> {
let maybe_query_hash: Option<(String, Vec<u8>)> = request
.supergraph_request
.body()
.extensions
.get("persistedQuery")
.and_then(|value| serde_json_bytes::from_value::<PersistedQuery>(value.clone()).ok())
.and_then(|persisted_query| {
hex::decode(persisted_query.sha256hash.as_bytes())
.ok()
.map(|decoded| (persisted_query.sha256hash, decoded))
});

let body_query = request.supergraph_request.body().query.clone();

match (maybe_query_hash, body_query) {
(Some((query_hash, query_hash_bytes)), Some(query)) => {
if query_matches_hash(query.as_str(), query_hash_bytes.as_slice()) {
tracing::trace!("apq: cache insert");
let _ = request.context.insert("persisted_query_hit", false);
cache.insert(format!("apq|{query_hash}"), query).await;
} else {
tracing::warn!("apq: graphql request doesn't match provided sha256Hash");
}
Ok(request)
}
(Some((apq_hash, _)), _) => {
if let Ok(cached_query) = cache.get(&format!("apq|{apq_hash}")).await.get().await {
let _ = request.context.insert("persisted_query_hit", true);
tracing::trace!("apq: cache hit");
request.supergraph_request.body_mut().query = Some(cached_query);
Ok(request)
} else {
tracing::trace!("apq: cache miss");
let errors = vec![crate::error::Error {
message: "PersistedQueryNotFound".to_string(),
locations: Default::default(),
path: Default::default(),
extensions: serde_json_bytes::from_value(json!({
"code": "PERSISTED_QUERY_NOT_FOUND",
"exception": {
"stacktrace": [
"PersistedQueryNotFoundError: PersistedQueryNotFound",
],
},
}))
.unwrap(),
}];
let res = SupergraphResponse::builder()
.data(Value::default())
.errors(errors)
.context(request.context)
.build()
.expect("response is valid");

Err(res)
}
}
_ => Ok(request),
}
}

fn query_matches_hash(query: &str, hash: &[u8]) -> bool {
let mut digest = Sha256::new();
digest.update(query.as_bytes());
Expand Down

0 comments on commit c42f0d8

Please sign in to comment.