Skip to content

Commit

Permalink
Merge pull request #264 from apollographql/igni/graph_reentry
Browse files Browse the repository at this point in the history
Tryouts with a two steps execution approach
  • Loading branch information
o0Ignition0o authored Dec 5, 2023
2 parents 28770ac + fba20f3 commit 178f3c7
Show file tree
Hide file tree
Showing 19 changed files with 966 additions and 121 deletions.
2 changes: 1 addition & 1 deletion apollo-router/src/axum_factory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2325,7 +2325,7 @@ async fn test_supergraph_timeout() {
let supergraph_creator = builder.build().await.unwrap();

let service = RouterCreator::new(
QueryAnalysisLayer::new(supergraph_creator.schema(), Arc::clone(&conf)).await,
QueryAnalysisLayer::new(supergraph_creator.schema(), Arc::clone(&conf)),
Arc::new(PersistedQueryLayer::new(&conf).await.unwrap()),
Arc::new(supergraph_creator),
conf.clone(),
Expand Down
165 changes: 146 additions & 19 deletions apollo-router/src/plugins/connectors/connector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::bail;
Expand All @@ -8,6 +9,7 @@ use apollo_compiler::schema::ExtendedType;
use apollo_compiler::schema::FieldDefinition;
use apollo_compiler::NodeStr;
use apollo_compiler::Schema;
use tower::BoxError;

use super::directives::SourceAPI;
use super::directives::SourceField;
Expand All @@ -16,24 +18,30 @@ use super::join_spec_helpers::add_entities_field;
use super::join_spec_helpers::add_join_field_directive;
use super::join_spec_helpers::add_join_type_directive;
use super::join_spec_helpers::make_any_scalar;
use crate::json_ext::Object;
use crate::services::SubgraphRequest;
use crate::services::SubgraphResponse;
use crate::Context;

/// A connector wraps the API and type/field connector metadata and has
/// a unique name used to construct a "subgraph" in the inner supergraph
#[derive(Clone, Debug)]
pub(crate) struct Connector {
/// Internal name used to construct "subgraphs" in the inner supergraph
pub(crate) name: String,
api: SourceAPI,
ty: ConnectorType,
name: String,
api: Arc<SourceAPI>,
ty: Arc<ConnectorType>,
}

#[derive(Debug)]
pub(super) enum ConnectorType {
Type(SourceType),
Field(SourceField),
}

/// The list of the subgraph names that should use the inner query planner
/// instead of making a normal subgraph request.
pub(crate) fn connector_subgraph_names(connectors: HashMap<String, Connector>) -> HashSet<String> {
pub(crate) fn connector_subgraph_names(connectors: &HashMap<String, Connector>) -> HashSet<String> {
connectors
.values()
.map(|c| c.outer_subgraph_name())
Expand All @@ -42,7 +50,7 @@ pub(crate) fn connector_subgraph_names(connectors: HashMap<String, Connector>) -

impl Connector {
/// Generate a map of connectors with unique names
pub(super) fn from_schema(schema: &Schema) -> anyhow::Result<HashMap<String, Self>> {
pub(crate) fn from_schema(schema: &Schema) -> anyhow::Result<HashMap<String, Self>> {
let apis = SourceAPI::from_schema(schema)?;
let types = SourceType::from_schema(schema)?;
let fields = SourceField::from_schema(schema)?;
Expand All @@ -56,11 +64,12 @@ impl Connector {
connector_name.clone(),
Connector {
name: connector_name,
api: apis
.get(&directive.api_name())
.ok_or(anyhow!("missing API {}", directive.api_name()))? // TODO support default
.clone(),
ty: ConnectorType::Type(directive),
api: Arc::new(
apis.get(&directive.api_name())
.ok_or(anyhow!("missing API {}", directive.api_name()))? // TODO support default
.clone(),
),
ty: Arc::new(ConnectorType::Type(directive)),
},
);
}
Expand All @@ -76,11 +85,12 @@ impl Connector {
connector_name.clone(),
Connector {
name: connector_name,
api: apis
.get(&directive.api_name())
.ok_or(anyhow!("missing API {}", directive.api_name()))? // TODO support default
.clone(),
ty: ConnectorType::Field(directive),
api: Arc::new(
apis.get(&directive.api_name())
.ok_or(anyhow!("missing API {}", directive.api_name()))? // TODO support default
.clone(),
),
ty: Arc::new(ConnectorType::Field(directive)),
},
);
}
Expand All @@ -91,7 +101,7 @@ impl Connector {
/// Generate a list of changes to apply to the new schame
pub(super) fn changes(&self, schema: &Schema) -> anyhow::Result<Vec<Change>> {
let graph = self.name.clone();
match &self.ty {
match &*self.ty {
ConnectorType::Type(ty) => {
let mut changes = vec![
Change::Type {
Expand Down Expand Up @@ -151,11 +161,83 @@ impl Connector {
}

pub(super) fn outer_subgraph_name(&self) -> String {
match self.ty {
match &*self.ty {
ConnectorType::Type(ref ty) => ty.graph.clone(),
ConnectorType::Field(ref field) => field.graph.clone(),
}
}

pub(crate) fn name(&self) -> &str {
self.name.as_str()
}

pub(crate) fn create_request(
&self,
subgraph_request: SubgraphRequest,
) -> Result<(Context, http::Request<hyper::Body>), BoxError> {
println!(
"create request: self={self:?}, subgraph req={:?}",
subgraph_request.subgraph_request
);

let request = if let Some(http) = &self.api.http {
let mut builder = http::Request::builder()
.method("GET") //TODO: do we support others methods?
.uri(http.base_url.clone());

for header in &http.headers {
if let Some(value) = &header.value {
let name = header.r#as.as_ref().unwrap_or(&header.name).clone();
builder = builder.header(name, value.clone());
}
}
builder
.body(hyper::Body::empty())
.map_err(|e| BoxError::from(format!("couldn't create connector request {}", e)))?
} else {
let SubgraphRequest {
subgraph_request, ..
} = subgraph_request;

let (parts, body) = subgraph_request.into_parts();

let body = serde_json::to_string(&body)?;

http::request::Request::from_parts(parts, body.into())
};
println!("generated req: {request:?}");

Ok((subgraph_request.context, request))
}

pub(crate) async fn map_http_response(
&self,
response: http::Response<hyper::Body>,
context: Context,
) -> Result<SubgraphResponse, BoxError> {
// TODO (content type, status etc...) but I'll hardcode putting the JSON from ipinfo.io in the "data" section for this example
let (parts, body) = response.into_parts();
let graphql_entity: serde_json_bytes::Value = serde_json::from_slice(
&hyper::body::to_bytes(body)
.await
.map_err(|_| "couldn't retrieve http response body")?,
)
.map_err(|_| "couldn't deserialize response body")?;

// TODO: selection set parent + entities etc etc etc
let graphql_data = serde_json_bytes::json! {{
"serverNetworkInfo": graphql_entity
}};

let response = SubgraphResponse::builder()
.data(graphql_data)
.context(context)
.headers(parts.headers)
.extensions(Object::default())
.build();

Ok(response)
}
}

/// A "change" is a unit of work that can be applied to a schema. Each connector
Expand Down Expand Up @@ -365,7 +447,11 @@ fn recurse_selection(
let field = obj
.fields
.get(&NodeStr::new(selection.name.to_string().as_str()))
.ok_or(anyhow!("missing field"))?;
.ok_or(anyhow!(
"missing field {} for type {}",
selection.name.to_string().as_str(),
type_name
))?;

let field_type_name = field.ty.inner_named_type();

Expand All @@ -379,7 +465,7 @@ fn recurse_selection(
let field_type = schema
.types
.get(field_type_name)
.ok_or(anyhow!("missing type"))?;
.ok_or(anyhow!("missing type {}", field_type_name))?;

mutations.extend(recurse_selection(
graph.clone(),
Expand All @@ -402,3 +488,44 @@ fn recurse_selection(

Ok(mutations)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::plugins::connectors::directives::HTTPSourceAPI;
use crate::plugins::connectors::directives::HTTPSourceType;
use crate::services::subgraph;

#[test]
fn request() {
let subgraph_request = subgraph::Request::fake_builder().build();
let connector = Connector {
name: "API".to_string(),
api: Arc::new(SourceAPI {
graph: "B".to_string(),
name: "C".to_string(),
http: Some(HTTPSourceAPI {
base_url: "http://localhost/api".to_string(),
default: None,
headers: vec![],
}),
}),
ty: Arc::new(ConnectorType::Type(SourceType {
graph: "B".to_string(),
type_name: "TypeB".to_string(),
api: "API".to_string(),
http: Some(HTTPSourceType {
get: None,
post: None,
headers: vec![],
body: None,
}),
selection: None,
key_type_map: None,
})),
};

let (_context, request) = connector.create_request(subgraph_request).unwrap();
insta::assert_debug_snapshot!(request);
}
}
35 changes: 18 additions & 17 deletions apollo-router/src/plugins/connectors/directives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ const SOURCE_FIELD_DIRECTIVE_NAME: &str = "sourceField";

#[derive(Clone, Debug, Serialize)]
pub(super) struct SourceAPI {
graph: String,
name: String,
http: Option<HTTPSourceAPI>,
pub(crate) graph: String,
pub(crate) name: String,
pub(crate) http: Option<HTTPSourceAPI>,
}

impl SourceAPI {
Expand Down Expand Up @@ -85,9 +85,9 @@ impl SourceAPI {

#[derive(Clone, Debug, Serialize)]
pub(super) struct HTTPSourceAPI {
base_url: String,
default: Option<bool>,
headers: Vec<HTTPHeaderMapping>,
pub(crate) base_url: String,
pub(crate) default: Option<bool>,
pub(crate) headers: Vec<HTTPHeaderMapping>,
}

impl HTTPSourceAPI {
Expand Down Expand Up @@ -155,9 +155,10 @@ impl HTTPSourceAPI {

#[derive(Clone, Debug, Serialize)]
pub(super) struct HTTPHeaderMapping {
name: String,
r#as: Option<String>,
value: Option<String>,
pub(crate) name: String,
//TODO: how to translate?
pub(crate) r#as: Option<String>,
pub(crate) value: Option<String>,
}

impl HTTPHeaderMapping {
Expand Down Expand Up @@ -220,10 +221,10 @@ impl HTTPHeaderMapping {
pub(super) struct SourceType {
pub(super) graph: String,
pub(super) type_name: String,
api: String,
http: Option<HTTPSourceType>,
selection: Option<JSONSelection>,
key_type_map: Option<KeyTypeMap>,
pub(crate) api: String,
pub(crate) http: Option<HTTPSourceType>,
pub(crate) selection: Option<JSONSelection>,
pub(crate) key_type_map: Option<KeyTypeMap>,
}

impl SourceType {
Expand Down Expand Up @@ -336,10 +337,10 @@ impl SourceType {
// TODO: impl tryfrom with XOR validation on methods
#[derive(Debug, Serialize)]
pub(super) struct HTTPSourceType {
get: Option<URLPathTemplate>,
post: Option<URLPathTemplate>,
headers: Vec<HTTPHeaderMapping>,
body: Option<JSONSelection>,
pub(crate) get: Option<URLPathTemplate>,
pub(crate) post: Option<URLPathTemplate>,
pub(crate) headers: Vec<HTTPHeaderMapping>,
pub(crate) body: Option<JSONSelection>,
}

impl HTTPSourceType {
Expand Down
3 changes: 3 additions & 0 deletions apollo-router/src/plugins/connectors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[allow(dead_code)]
mod connector;
pub(crate) use connector::connector_subgraph_names;
pub(crate) use connector::Connector;
#[allow(dead_code)]
mod directives;
#[allow(dead_code)]
Expand All @@ -8,4 +10,5 @@ mod selection_parser;
pub(crate) mod subgraph_connector;
#[allow(dead_code)]
mod supergraph;
pub(crate) use supergraph::generate_connector_supergraph;
mod url_path_parser;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
source: apollo-router/src/plugins/connectors/connector.rs
expression: request
---
Request {
method: GET,
uri: http://localhost/api,
version: HTTP/1.1,
headers: {},
body: Body(
Empty,
),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
source: apollo-router/src/plugins/connectors/subgraph_connector.rs
expression: response
---
{
"data": {
"serverNetworkInfo": {
"ip": "1.2.3.4",
"city": "Paris",
"country": "France"
}
}
}
Loading

0 comments on commit 178f3c7

Please sign in to comment.