Skip to content

Commit

Permalink
feat: request and response mapping for inner/connector fetch nodes (#276
Browse files Browse the repository at this point in the history
)

* feat: request and response mapping for inner/connector fetch notes

This change inspects subgraph requests from the inner query plan and constructs a vec of requests for the SubgraphConnector to call.

It handles:
* root fields (including aliased fields)
* entity resolvers
* fields on entities (by basically providing a stub entity resolver)

For each request, we collect the info necessary for mapping the response (JSON selection transforms and key path). The response handling function applies this information to construct a single response from multiple JSON objects.

---------

Co-authored-by: o0Ignition0o <jeremy.lempereur@gmail.com>
  • Loading branch information
lennyburdette and o0Ignition0o authored Dec 8, 2023
1 parent 0c946f8 commit a4f4cb5
Show file tree
Hide file tree
Showing 12 changed files with 1,536 additions and 228 deletions.
207 changes: 101 additions & 106 deletions apollo-router/src/plugins/connectors/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use std::sync::Arc;

use anyhow::anyhow;
use anyhow::bail;
use apollo_compiler::ast;
use apollo_compiler::ast::Selection;
use apollo_compiler::schema::ExtendedType;
use apollo_compiler::schema::FieldDefinition;
use apollo_compiler::NodeStr;
use apollo_compiler::schema::Name;
use apollo_compiler::validation::Valid;
use apollo_compiler::Schema;
use tower::BoxError;

Expand All @@ -21,7 +23,11 @@ use super::join_spec_helpers::make_any_scalar;
use super::join_spec_helpers::parameters_to_selection_set;
use super::join_spec_helpers::selection_set_to_string;
use super::join_spec_helpers::Key;
use crate::json_ext::Object;
use super::request_response::handle_responses;
use super::request_response::make_requests;
use super::request_response::ResponseParams;
use super::selection_parser::Selection as JSONSelection;
use super::url_path_parser::URLPathTemplate;
use crate::services::SubgraphRequest;
use crate::services::SubgraphResponse;
use crate::Context;
Expand Down Expand Up @@ -132,13 +138,13 @@ impl Connector {
ConnectorType::RootField(field) => {
let mut changes = vec![
Change::Type {
name: field.parent_type_name.clone().into(),
name: ast::Name::new(field.parent_type_name.clone())?,
graph: graph.clone(),
key: Key::None,
},
Change::Field {
type_name: field.parent_type_name.clone().into(),
field_name: field.field_name.clone().into(),
type_name: ast::Name::new(field.parent_type_name.clone())?,
field_name: ast::Name::new(field.field_name.clone())?,
graph: graph.clone(),
},
];
Expand All @@ -165,12 +171,12 @@ impl Connector {

let mut changes = vec![
Change::Type {
name: ty.type_name.clone().into(),
name: ast::Name::new(ty.type_name.clone())?,
graph: graph.clone(),
key: Key::Resolvable(key_string),
},
Change::MagicFinder {
type_name: ty.type_name.clone().into(),
type_name: ast::Name::new(ty.type_name.clone())?,
graph: graph.clone(),
},
];
Expand Down Expand Up @@ -211,17 +217,17 @@ impl Connector {

let mut changes = vec![
Change::Type {
name: field.parent_type_name.clone().into(),
name: ast::Name::new(field.parent_type_name.clone())?,
graph: graph.clone(),
key: Key::Resolvable(key_string),
},
Change::Field {
type_name: field.parent_type_name.clone().into(),
field_name: field.field_name.clone().into(),
type_name: ast::Name::new(field.parent_type_name.clone())?,
field_name: ast::Name::new(field.field_name.clone())?,
graph: graph.clone(),
},
Change::MagicFinder {
type_name: field.parent_type_name.clone().into(),
type_name: ast::Name::new(field.parent_type_name.clone())?,
graph: graph.clone(),
},
];
Expand Down Expand Up @@ -267,72 +273,48 @@ impl Connector {
self.name.as_str()
}

pub(crate) fn create_request(
&self,
subgraph_request: SubgraphRequest,
) -> Result<(Context, http::Request<hyper::Body>), BoxError> {
println!(
"create request: self={self:?}, subgraph req={:?}",
subgraph_request.subgraph_request
);

let request = if let Some(http) = &self.api.http {
let mut builder = http::Request::builder()
.method("GET") //TODO: do we support others methods?
.uri(http.base_url.clone());

for header in &http.headers {
if let Some(value) = &header.value {
let name = header.r#as.as_ref().unwrap_or(&header.name).clone();
builder = builder.header(name, value.clone());
}
}
builder
.body(hyper::Body::empty())
.map_err(|e| BoxError::from(format!("couldn't create connector request {}", e)))?
} else {
let SubgraphRequest {
subgraph_request, ..
} = subgraph_request;
pub(super) fn base_uri(&self) -> Result<http::Uri, http::uri::InvalidUri> {
self.api.base_uri()
}

let (parts, body) = subgraph_request.into_parts();
pub(super) fn path_template(&self) -> &URLPathTemplate {
match self.ty.as_ref() {
ConnectorType::Entity(source_type) => source_type.path_template(),
ConnectorType::RootField(source_field) => source_field.path_template(),
ConnectorType::EntityField(source_field) => source_field.path_template(),
}
}

let body = serde_json::to_string(&body)?;
pub(super) fn method(&self) -> http::Method {
match self.ty.as_ref() {
ConnectorType::Entity(source_type) => source_type.method().clone(),
ConnectorType::RootField(source_field) => source_field.method().clone(),
ConnectorType::EntityField(source_field) => source_field.method().clone(),
}
}

http::request::Request::from_parts(parts, body.into())
};
println!("generated req: {request:?}");
pub(super) fn json_selection(&self) -> JSONSelection {
match self.ty.as_ref() {
ConnectorType::Entity(source_type) => source_type.selection.clone(),
ConnectorType::RootField(source_field) => source_field.selection.clone(),
ConnectorType::EntityField(source_field) => source_field.selection.clone(),
}
}

Ok((subgraph_request.context, request))
pub(crate) fn create_requests(
&self,
subgraph_request: SubgraphRequest,
schema: Arc<Valid<Schema>>,
) -> Result<Vec<(http::Request<hyper::Body>, ResponseParams)>, BoxError> {
make_requests(subgraph_request, self, schema.clone())
}

pub(crate) async fn map_http_response(
pub(crate) async fn map_http_responses(
&self,
response: http::Response<hyper::Body>,
responses: Vec<http::Response<hyper::Body>>,
context: Context,
) -> Result<SubgraphResponse, BoxError> {
// TODO (content type, status etc...) but I'll hardcode putting the JSON from ipinfo.io in the "data" section for this example
let (parts, body) = response.into_parts();
let graphql_entity: serde_json_bytes::Value = serde_json::from_slice(
&hyper::body::to_bytes(body)
.await
.map_err(|_| "couldn't retrieve http response body")?,
)
.map_err(|_| "couldn't deserialize response body")?;

// TODO: selection set parent + entities etc etc etc
let graphql_data = serde_json_bytes::json! {{
"serverNetworkInfo": graphql_entity
}};

let response = SubgraphResponse::builder()
.data(graphql_data)
.context(context)
.headers(parts.headers)
.extensions(Object::default())
.build();

Ok(response)
handle_responses(context, self, responses).await
}
}

Expand All @@ -343,20 +325,16 @@ impl Connector {
#[derive(Debug)]
pub(super) enum Change {
/// Include a type in the schema and add the `@join__type` directive
Type {
name: NodeStr,
graph: String,
key: Key,
},
Type { name: Name, graph: String, key: Key },
/// Include a field on a type in the schema and add the `@join__field` directive
/// TODO: currently assumes that the type already exists (order matters!)
Field {
type_name: NodeStr,
field_name: NodeStr,
type_name: Name,
field_name: Name,
graph: String,
},
/// Add a special field to Query that we can use instead of `_entities`
MagicFinder { type_name: NodeStr, graph: String },
MagicFinder { type_name: Name, graph: String },
}

impl Change {
Expand Down Expand Up @@ -403,8 +381,8 @@ impl Change {
fn upsert_field<'a>(
source: &Schema,
dest: &'a mut Schema,
type_name: &NodeStr,
field_name: &NodeStr,
type_name: &Name,
field_name: &Name,
) -> anyhow::Result<&'a mut FieldDefinition> {
let new_ty = dest
.types
Expand Down Expand Up @@ -447,20 +425,20 @@ fn upsert_type<'a>(
.map(|op| op.as_str() == name)
.unwrap_or(false)
{
dest.schema_definition.make_mut().query = Some(name.into());
dest.schema_definition.make_mut().query = Some(ast::Name::new(name)?.into());
}

if source
.root_operation(apollo_compiler::executable::OperationType::Mutation)
.map(|op| op.as_str() == name)
.unwrap_or(false)
{
dest.schema_definition.make_mut().mutation = Some(name.into());
dest.schema_definition.make_mut().mutation = Some(ast::Name::new(name)?.into());
}

let ty = dest
.types
.entry(name.into())
.entry(ast::Name::new(name)?)
.or_insert_with(|| clean_copy_of_type(original));

Ok(ty)
Expand All @@ -471,7 +449,10 @@ fn add_type<'a>(
name: &str,
ty: ExtendedType,
) -> anyhow::Result<&'a mut ExtendedType> {
Ok(dest.types.entry(NodeStr::new(name)).or_insert_with(|| ty))
Ok(dest
.types
.entry(ast::Name::new(name)?)
.or_insert_with(|| ty))
}

fn clean_copy_of_field(f: &FieldDefinition) -> FieldDefinition {
Expand Down Expand Up @@ -530,7 +511,7 @@ fn recurse_selection(
let mut mutations = Vec::new();

mutations.push(Change::Type {
name: NodeStr::new(&type_name.clone()),
name: ast::Name::new(type_name.clone())?,
graph: graph.clone(),
key: Key::None,
});
Expand All @@ -540,19 +521,16 @@ fn recurse_selection(
for selection in selections {
match selection {
Selection::Field(selection) => {
let field = obj
.fields
.get(&NodeStr::new(selection.name.to_string().as_str()))
.ok_or(anyhow!(
"missing field {} for type {}",
selection.name.to_string().as_str(),
type_name
))?;
let field = obj.fields.get(&selection.name).ok_or(anyhow!(
"missing field {} for type {}",
selection.name.to_string().as_str(),
type_name
))?;

let field_type_name = field.ty.inner_named_type();

mutations.push(Change::Field {
type_name: NodeStr::new(&type_name.clone()),
type_name: ast::Name::new(&type_name.clone())?,
field_name: selection.name.clone(),
graph: graph.clone(),
});
Expand Down Expand Up @@ -589,41 +567,58 @@ fn recurse_selection(
mod tests {
use super::*;
use crate::plugins::connectors::directives::HTTPSourceAPI;
use crate::plugins::connectors::directives::HTTPSourceType;
use crate::plugins::connectors::directives::HTTPSourceField;
use crate::plugins::connectors::selection_parser::Selection as JSONSelection;
use crate::plugins::connectors::url_path_parser::URLPathTemplate;
use crate::services::subgraph;

#[test]
fn request() {
let subgraph_request = subgraph::Request::fake_builder().build();
let subgraph_request = subgraph::Request::fake_builder()
.subgraph_request(
http::Request::builder()
.body(crate::graphql::Request::builder().query("{field}").build())
.unwrap(),
)
.build();
let connector = Connector {
name: "API".to_string(),
name: "CONNECTOR_QUERY_FIELDB".to_string(),
api: Arc::new(SourceAPI {
graph: "B".to_string(),
name: "C".to_string(),
name: "API".to_string(),
http: Some(HTTPSourceAPI {
base_url: "http://localhost/api".to_string(),
default: false,
default: true,
headers: vec![],
}),
}),
ty: Arc::new(ConnectorType::Entity(SourceType {
ty: Arc::new(ConnectorType::RootField(SourceField {
graph: "B".to_string(),
type_name: "TypeB".to_string(),
parent_type_name: "Query".to_string(),
field_name: "field".to_string(),
output_type_name: "String".to_string(),
api: "API".to_string(),
http: Some(HTTPSourceType {
path_template: URLPathTemplate::parse("/path").unwrap(),
http: Some(HTTPSourceField {
method: http::Method::GET,
headers: vec![],
path_template: URLPathTemplate::parse("/path").unwrap(),
body: None,
}),
selection: JSONSelection::parse("id").unwrap().1,
key_type_map: None,
selection: JSONSelection::parse(".data").unwrap().1,
})),
};

let (_context, request) = connector.create_request(subgraph_request).unwrap();
insta::assert_debug_snapshot!(request);
let requests_and_params = connector
.create_requests(
subgraph_request,
Arc::new(
Schema::parse_and_validate(
"type Query { field: String }".to_string(),
"schema.graphql",
)
.unwrap(),
),
)
.unwrap();
insta::assert_debug_snapshot!(requests_and_params);
}
}
Loading

0 comments on commit a4f4cb5

Please sign in to comment.