diff --git a/Cargo.lock b/Cargo.lock index 8628a93293..dd64d03fc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,6 +256,7 @@ dependencies = [ "sha2", "shellexpand", "static_assertions", + "strum_macros", "sys-info", "tempfile", "test-log", diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index e7e354a8ef..6ba91edc27 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -84,9 +84,35 @@ By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router ## 🥼 Experimental - ### Introduce a `router_service` ([Issue #1496](https://github.com/apollographql/router/issues/1496)) A `router_service` is now part of our service stack, which allows plugin developers to process raw http requests and raw http responses, that wrap the already available `supergraph_service` By [@o0Ignition0o](https://github.com/o0Ignition0o) in https://github.com/apollographql/router/pull/2170 + +### Introduce an externalization mechanism based on `router_service` ([Issue #1916](https://github.com/apollographql/router/issues/1916)) + +If external extensibility is configured, then a block of data is transmitted (encoded as JSON) to an endpoint via an HTTP POST request. The router will process the response to the POST request before resuming execution. + +Conceptually, an external co-processor performs the same functionality as you may provide via a rust plugin or a rhai script within the router. The difference is the protocol which governs the interaction between the router and the co-processor. + +Sample configuration: + +```yaml +plugins: + experimental.external: + url: http://127.0.0.1:8081 # mandatory URL which is the address of the co-processor + timeout: 2s # optional timeout (2 seconds in this example). If not set, defaults to 1 second + stages: # In future, multiple stages may be configurable + router: # Currently, the only valid value is router + request: # What data should we transmit to the co-processor from the router request? + headers: true # All of these data content attributes are optional and false by default. + context: true + body: true + sdl: true + response: # What data should we transmit to the co-processor from the router response? + headers: true + context: true +``` + +By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/2229 diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index e11a68d2e1..0c500bedb7 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -164,6 +164,7 @@ serde_json = { version = "1.0.89", features = ["preserve_order"] } serde_urlencoded = "0.7.1" serde_yaml = "0.8.26" static_assertions = "1.1.0" +strum_macros = "0.24.3" sys-info = "0.9.1" thiserror = "1.0.37" tokio = { version = "1.23.0", features = ["full"] } diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 5824417d76..bd5f7f40f7 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -549,6 +549,81 @@ expression: "&schema" "properties": { "experimental.expose_query_plan": { "type": "boolean" + }, + "experimental.external": { + "type": "object", + "required": [ + "url" + ], + "properties": { + "stages": { + "default": null, + "type": "object", + "properties": { + "router": { + "default": null, + "type": "object", + "properties": { + "request": { + "default": null, + "type": "object", + "properties": { + "body": { + "default": false, + "type": "boolean" + }, + "context": { + "default": false, + "type": "boolean" + }, + "headers": { + "default": false, + "type": "boolean" + }, + "sdl": { + "default": false, + "type": "boolean" + } + }, + "nullable": true + }, + "response": { + "default": null, + "type": "object", + "properties": { + "body": { + "default": false, + "type": "boolean" + }, + "context": { + "default": false, + "type": "boolean" + }, + "headers": { + "default": false, + "type": "boolean" + }, + "sdl": { + "default": false, + "type": "boolean" + } + }, + "nullable": true + } + }, + "nullable": true + } + }, + "nullable": true + }, + "timeout": { + "default": null, + "type": "string" + }, + "url": { + "type": "string" + } + } } }, "additionalProperties": false diff --git a/apollo-router/src/context.rs b/apollo-router/src/context.rs index d726c4d4fa..eedc4e42d3 100644 --- a/apollo-router/src/context.rs +++ b/apollo-router/src/context.rs @@ -9,6 +9,7 @@ use std::time::Instant; use dashmap::mapref::multiple::RefMulti; use dashmap::mapref::multiple::RefMutMulti; use dashmap::DashMap; +use serde::Deserialize; use serde::Serialize; use tower::BoxError; @@ -27,12 +28,14 @@ pub(crate) type Entries = Arc>; /// provide [`crate::SubgraphRequest`] or [`crate::SubgraphResponse`] processing. At such times, /// plugins should restrict themselves to the [`Context::get`] and [`Context::upsert`] /// functions to minimise the possibility of mis-sequenced updates. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct Context { // Allows adding custom entries to the context. entries: Entries, /// Creation time + #[serde(skip)] + #[serde(default = "Instant::now")] pub(crate) created_at: Instant, } diff --git a/apollo-router/src/error.rs b/apollo-router/src/error.rs index 50ae884bc4..8495b93fdc 100644 --- a/apollo-router/src/error.rs +++ b/apollo-router/src/error.rs @@ -443,3 +443,12 @@ impl ParseErrors { }; } } + +/// Error types for licensing. +#[derive(Error, Display, Debug, Clone, Serialize, Deserialize)] +pub(crate) enum LicenseError { + /// Apollo graph reference is missing + MissingGraphReference, + /// Apollo key is missing + MissingKey, +} diff --git a/apollo-router/src/plugins/external.rs b/apollo-router/src/plugins/external.rs new file mode 100644 index 0000000000..661182c742 --- /dev/null +++ b/apollo-router/src/plugins/external.rs @@ -0,0 +1,505 @@ +//! Externalization plugin +// With regards to ELv2 licensing, this entire file is license key functionality + +use std::collections::HashMap; +use std::fmt; +use std::ops::ControlFlow; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +use bytes::Bytes; +use http::header::HeaderName; +use http::HeaderMap; +use http::HeaderValue; +use hyper::body; +use hyper::Body; +use schemars::JsonSchema; +use serde::de::DeserializeOwned; +use serde::Deserialize; +use serde::Serialize; +use tower::util::MapFutureLayer; +use tower::BoxError; +use tower::ServiceBuilder; +use tower::ServiceExt; + +use crate::error::Error; +use crate::external::Externalizable; +use crate::external::PipelineStep; +use crate::layers::async_checkpoint::AsyncCheckpointLayer; +use crate::layers::ServiceBuilderExt; +use crate::plugin::Plugin; +use crate::plugin::PluginInit; +use crate::register_plugin; +use crate::services::external::Control; +use crate::services::router; +use crate::Context; + +pub(crate) const EXTERNAL_SPAN_NAME: &str = "external plugin"; + +#[derive(Debug)] +struct ExternalPlugin { + configuration: Conf, + sdl: Arc, +} + +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] +struct BaseConf { + #[serde(default)] + headers: bool, + #[serde(default)] + context: bool, + #[serde(default)] + body: bool, + #[serde(default)] + sdl: bool, +} + +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] +struct RouterStage { + #[serde(default)] + request: Option, + #[serde(default)] + response: Option, +} + +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] +struct Stages { + #[serde(default)] + router: Option, +} + +#[derive(Clone, Debug, Default, Deserialize, JsonSchema)] +struct Conf { + // The url you'd like to offload processing to + url: String, + // The timeout for external requests + #[serde(deserialize_with = "humantime_serde::deserialize", default)] + #[schemars(with = "String", default)] + timeout: Option, + // The stages request/response configuration + #[serde(default)] + stages: Option, +} + +#[async_trait::async_trait] +impl Plugin for ExternalPlugin { + type Config = Conf; + + async fn new(init: PluginInit) -> Result { + Ok(ExternalPlugin { + configuration: init.config, + sdl: init.supergraph_sdl, + }) + } + + fn router_service(&self, service: router::BoxService) -> router::BoxService { + let request_sdl = self.sdl.clone(); + let response_sdl = self.sdl.clone(); + + let request_full_config = self.configuration.clone(); + let response_full_config = self.configuration.clone(); + + let request_layer = if self + .configuration + .stages + .as_ref() + .and_then(|x| x.router.as_ref()) + .and_then(|x| x.request.as_ref()) + .is_some() + { + // Safe to unwrap here because we just confirmed that all optional elements are present + let request_config = request_full_config + .stages + .unwrap() + .router + .unwrap() + .request + .unwrap(); + Some(AsyncCheckpointLayer::new( + move |mut request: router::Request| { + let my_sdl = request_sdl.to_string(); + let proto_url = request_full_config.url.clone(); + let timeout = request_full_config.timeout; + let request_config = request_config.clone(); + async move { + // Call into our out of process processor with a body of our body + // First, extract the data we need from our request and prepare our + // external call. Use our configuration to figure out which data to send. + + let (parts, body) = request.router_request.into_parts(); + let b_bytes = body::to_bytes(body).await?; + + let (headers, payload, context, sdl) = prepare_external_params( + &request_config, + &parts.headers, + &b_bytes, + &request.context, + my_sdl, + )?; + + // Second, call our co-processor and get a reply. + let co_processor_output = call_external( + proto_url, + timeout, + PipelineStep::RouterRequest, + headers, + payload, + context, + sdl, + ) + .await?; + + tracing::debug!(?co_processor_output, "co-processor returned"); + + // Thirdly, we need to interpret the control flow which may have been + // updated by our co-processor and decide if we should proceed or stop. + + if matches!(co_processor_output.control, Control::Break(_)) { + // Ensure the code is a valid http status code + let code = co_processor_output.control.get_http_status()?; + + let res = if !code.is_success() { + router::Response::error_builder() + .errors(vec![Error { + message: co_processor_output + .body + .unwrap_or(serde_json::Value::Null) + .to_string(), + ..Default::default() + }]) + .status_code(code) + .context(request.context) + .build()? + } else { + router::Response::builder() + .data( + co_processor_output + .body + .unwrap_or(serde_json::Value::Null) + .to_string(), + ) + .status_code(code) + .context(request.context) + .build()? + }; + return Ok(ControlFlow::Break(res)); + } + + // Finally, process our reply and act on the contents. Our processing logic is + // that we replace "bits" of our incoming request with the updated bits if they + // are present in our co_processor_output. + + let new_body = match co_processor_output.body { + Some(bytes) => Body::from(serde_json::to_vec(&bytes)?), + None => Body::from(b_bytes), + }; + + request.router_request = http::Request::from_parts(parts, new_body); + + if let Some(context) = co_processor_output.context { + request.context = context; + } + + if let Some(headers) = co_processor_output.headers { + *request.router_request.headers_mut() = + internalize_header_map(headers)?; + } + + Ok(ControlFlow::Continue(request)) + } + }, + )) + } else { + None + }; + + let response_layer = if self + .configuration + .stages + .as_ref() + .and_then(|x| x.router.as_ref()) + .and_then(|x| x.response.as_ref()) + .is_some() + { + // Safe to unwrap here because we just confirmed that all optional elements are present + let response_config = response_full_config + .stages + .unwrap() + .router + .unwrap() + .response + .unwrap(); + Some(MapFutureLayer::new(move |fut| { + let my_sdl = response_sdl.to_string(); + let proto_url = response_full_config.url.clone(); + let timeout = response_full_config.timeout; + let response_config = response_config.clone(); + async move { + let mut response: router::Response = fut.await?; + + // Call into our out of process processor with a body of our body + // First, extract the data we need from our response and prepare our + // external call. Use our configuration to figure out which data to send. + + let (parts, body) = response.response.into_parts(); + let b_bytes = body::to_bytes(body).await?; + + let (headers, payload, context, sdl) = prepare_external_params( + &response_config, + &parts.headers, + &b_bytes, + &response.context, + my_sdl, + )?; + + // Second, call our co-processor and get a reply. + let co_processor_output = call_external( + proto_url, + timeout, + PipelineStep::RouterResponse, + headers, + payload, + context, + sdl, + ) + .await?; + + tracing::debug!(?co_processor_output, "co-processor returned"); + + // Third, process our reply and act on the contents. Our processing logic is + // that we replace "bits" of our incoming response with the updated bits if they + // are present in our co_processor_output. If they aren't present, just use the + // bits that we sent to the co_processor. + + let new_body = match co_processor_output.body { + Some(bytes) => Body::from(serde_json::to_vec(&bytes)?), + None => Body::from(b_bytes), + }; + + response.response = http::Response::from_parts(parts, new_body); + + if let Some(context) = co_processor_output.context { + response.context = context; + } + + if let Some(headers) = co_processor_output.headers { + *response.response.headers_mut() = internalize_header_map(headers)?; + } + + Ok::(response) + } + })) + } else { + None + }; + + fn external_service_span() -> impl Fn(&router::Request) -> tracing::Span + Clone { + move |_request: &router::Request| { + tracing::info_span!( + EXTERNAL_SPAN_NAME, + "external service" = stringify!(router::Request), + "otel.kind" = "INTERNAL" + ) + } + } + + ServiceBuilder::new() + .instrument(external_service_span()) + .option_layer(request_layer) + .option_layer(response_layer) + .buffer(20_000) + .service(service) + .boxed() + } +} + +type ExternalParams<'a> = ( + Option<&'a HeaderMap>, + Option, + Option, + Option, +); + +fn prepare_external_params<'a>( + config: &'a BaseConf, + headers: &'a HeaderMap, + bytes: &'a Bytes, + context: &'a Context, + sdl: String, +) -> Result, BoxError> { + let mut headers_opt = None; + // Note: We have to specify the json type or it won't deserialize correctly... + // let mut payload_opt: Option = None; + let mut payload_opt: Option = None; + let mut context_opt = None; + let mut sdl_opt = None; + + if config.body || config.headers { + if config.body { + payload_opt = Some(serde_json::from_slice(bytes)?); + } + if config.headers { + headers_opt = Some(headers); + } + } + if config.context { + context_opt = Some(context.clone()); + } + if config.sdl { + sdl_opt = Some(sdl); + } + Ok((headers_opt, payload_opt, context_opt, sdl_opt)) +} + +async fn call_external( + url: String, + timeout: Option, + stage: PipelineStep, + headers: Option<&HeaderMap>, + payload: Option, + context: Option, + sdl: Option, +) -> Result, BoxError> +where + T: fmt::Debug + DeserializeOwned + Serialize + Send + Sync + 'static, +{ + let mut converted_headers = None; + if let Some(hdrs) = headers { + converted_headers = Some(externalize_header_map(hdrs)?); + }; + let output = Externalizable::new(stage, converted_headers, payload, context, sdl); + tracing::debug!(?output, "externalized output"); + output.call(&url, timeout).await +} + +/// Convert a HeaderMap into a HashMap +fn externalize_header_map( + input: &HeaderMap, +) -> Result>, BoxError> { + let mut output = HashMap::new(); + for (k, v) in input { + let k = k.as_str().to_owned(); + let v = String::from_utf8(v.as_bytes().to_vec()).map_err(|e| e.to_string())?; + output.entry(k).or_insert_with(Vec::new).push(v) + } + Ok(output) +} + +/// Convert a HashMap into a HeaderMap +fn internalize_header_map( + input: HashMap>, +) -> Result, BoxError> { + let mut output = HeaderMap::new(); + for (k, values) in input { + for v in values { + let key = HeaderName::from_str(k.as_ref())?; + let value = HeaderValue::from_str(v.as_ref())?; + output.append(key, value); + } + } + Ok(output) +} + +// This macro allows us to use it in our plugin registry! +// register_plugin takes a group name, and a plugin name. +// +// In order to keep the plugin names consistent, +// we use using the `Reverse domain name notation` +register_plugin!("experimental", "external", ExternalPlugin); + +#[cfg(test)] +mod tests { + use http::header::ACCEPT; + use http::header::CONTENT_TYPE; + use http::HeaderMap; + use http::HeaderValue; + use mime::APPLICATION_JSON; + use mime::TEXT_HTML; + + use super::*; + + #[tokio::test] + async fn load_plugin() { + let config = serde_json::json!({ + "plugins": { + "experimental.external": { + "url": "http://127.0.0.1:8081" + } + } + }); + // Build a test harness. Usually we'd use this and send requests to + // it, but in this case it's enough to build the harness to see our + // output when our service registers. + let _test_harness = crate::TestHarness::builder() + .configuration_json(config) + .unwrap() + .build_router() + .await + .unwrap(); + } + + #[test] + fn it_externalizes_headers() { + // Build our expected HashMap + let mut expected = HashMap::new(); + + expected.insert( + "content-type".to_string(), + vec![APPLICATION_JSON.essence_str().to_string()], + ); + + expected.insert( + "accept".to_string(), + vec![ + APPLICATION_JSON.essence_str().to_string(), + TEXT_HTML.essence_str().to_string(), + ], + ); + + let mut external_form = HeaderMap::new(); + + external_form.insert( + CONTENT_TYPE, + HeaderValue::from_static(APPLICATION_JSON.essence_str()), + ); + + external_form.insert( + ACCEPT, + HeaderValue::from_static(APPLICATION_JSON.essence_str()), + ); + + external_form.append(ACCEPT, HeaderValue::from_static(TEXT_HTML.essence_str())); + + let actual = externalize_header_map(&external_form).expect("externalized header map"); + + assert_eq!(expected, actual); + } + + #[test] + fn it_internalizes_headers() { + // Build our expected HeaderMap + let mut expected = HeaderMap::new(); + + expected.insert( + ACCEPT, + HeaderValue::from_static(APPLICATION_JSON.essence_str()), + ); + + expected.append(ACCEPT, HeaderValue::from_static(TEXT_HTML.essence_str())); + + let mut external_form = HashMap::new(); + + external_form.insert( + "accept".to_string(), + vec![ + APPLICATION_JSON.essence_str().to_string(), + TEXT_HTML.essence_str().to_string(), + ], + ); + + let actual = internalize_header_map(external_form).expect("internalized header map"); + + assert_eq!(expected, actual); + } +} diff --git a/apollo-router/src/plugins/mod.rs b/apollo-router/src/plugins/mod.rs index 7c4401d275..a5cb444c8f 100644 --- a/apollo-router/src/plugins/mod.rs +++ b/apollo-router/src/plugins/mod.rs @@ -4,6 +4,7 @@ pub(crate) mod csrf; mod expose_query_plan; +mod external; mod forbid_mutations; mod headers; mod include_subgraph_errors; diff --git a/apollo-router/src/plugins/telemetry/apollo.rs b/apollo-router/src/plugins/telemetry/apollo.rs index e6b826e178..5a1d3324d2 100644 --- a/apollo-router/src/plugins/telemetry/apollo.rs +++ b/apollo-router/src/plugins/telemetry/apollo.rs @@ -25,6 +25,8 @@ use crate::plugins::telemetry::apollo_exporter::proto::reports::StatsContext; use crate::plugins::telemetry::apollo_exporter::proto::reports::Trace; use crate::plugins::telemetry::config::SamplerOption; use crate::plugins::telemetry::tracing::BatchProcessorConfig; +use crate::services::apollo_graph_reference; +use crate::services::apollo_key; pub(crate) const ENDPOINT_DEFAULT: &str = "https://usage-reporting.api.apollographql.com/api/ingress/traces"; @@ -96,28 +98,6 @@ fn default_field_level_instrumentation_sampler() -> SamplerOption { SamplerOption::TraceIdRatioBased(0.01) } -#[cfg(test)] -fn apollo_key() -> Option { - // During tests we don't want env variables to affect defaults - None -} - -#[cfg(not(test))] -fn apollo_key() -> Option { - std::env::var("APOLLO_KEY").ok() -} - -#[cfg(test)] -fn apollo_graph_reference() -> Option { - // During tests we don't want env variables to affect defaults - None -} - -#[cfg(not(test))] -fn apollo_graph_reference() -> Option { - std::env::var("APOLLO_GRAPH_REF").ok() -} - fn endpoint_default() -> Url { Url::parse(ENDPOINT_DEFAULT).expect("must be valid url") } diff --git a/apollo-router/src/services/external.rs b/apollo-router/src/services/external.rs new file mode 100644 index 0000000000..d5a36f2d87 --- /dev/null +++ b/apollo-router/src/services/external.rs @@ -0,0 +1,140 @@ +// With regards to ELv2 licensing, this entire file is license key functionality +#![allow(missing_docs)] // FIXME + +use std::collections::HashMap; +use std::fmt::Debug; +use std::string::ToString; +use std::time::Duration; + +use http::header::ACCEPT; +use http::header::CONTENT_TYPE; +use http::StatusCode; +use once_cell::sync::Lazy; +use reqwest::Client; +use schemars::JsonSchema; +use serde::de::DeserializeOwned; +use serde::Deserialize; +use serde::Serialize; +use strum_macros::Display; +use tower::BoxError; + +use crate::error::LicenseError; +use crate::services::apollo_graph_reference; +use crate::services::apollo_key; +use crate::tracer::TraceId; +use crate::Context; + +const DEFAULT_EXTERNALIZATION_TIMEOUT: Duration = Duration::from_secs(1); + +static CLIENT: Lazy> = Lazy::new(|| { + apollo_graph_reference().ok_or(LicenseError::MissingGraphReference)?; + apollo_key().ok_or(LicenseError::MissingKey)?; + Ok(Client::new()) +}); + +/// Version of our externalised data. Rev this if it changes +const EXTERNALIZABLE_VERSION: u8 = 1; + +#[derive(Clone, Debug, Display, Deserialize, PartialEq, Serialize, JsonSchema)] +pub(crate) enum PipelineStep { + RouterRequest, + RouterResponse, + SupergraphRequest, + SupergraphResponse, + ExecutionRequest, + ExecutionResponse, + SubgraphRequest, + SubgraphResponse, +} + +#[derive(Clone, Debug, Display, Deserialize, PartialEq, Serialize, JsonSchema)] +pub(crate) enum Control { + Continue, + Break(u16), +} + +impl Default for Control { + fn default() -> Self { + Control::Continue + } +} + +impl Control { + #[allow(dead_code)] + fn new(status: u16) -> Self { + Control::Break(status) + } + + pub(crate) fn get_http_status(&self) -> Result { + match self { + Control::Continue => Ok(StatusCode::OK), + Control::Break(code) => StatusCode::from_u16(*code).map_err(|e| e.into()), + } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct Externalizable { + pub(crate) version: u8, + pub(crate) stage: String, + pub(crate) control: Control, + pub(crate) id: Option, + pub(crate) headers: Option>>, + pub(crate) body: Option, + pub(crate) context: Option, + pub(crate) sdl: Option, +} + +impl Externalizable +where + T: Debug + DeserializeOwned + Serialize + Send + Sync, +{ + pub(crate) fn new( + stage: PipelineStep, + headers: Option>>, + body: Option, + context: Option, + sdl: Option, + ) -> Self { + Self { + version: EXTERNALIZABLE_VERSION, + stage: stage.to_string(), + control: Control::default(), + id: TraceId::maybe_new().map(|id| id.to_string()), + headers, + body, + context, + sdl, + } + } + + pub(crate) async fn call(self, url: &str, timeout: Option) -> Result { + let my_client = CLIENT.as_ref().map_err(|e| e.to_string())?.clone(); + let t = timeout.unwrap_or(DEFAULT_EXTERNALIZATION_TIMEOUT); + + tracing::debug!("forwarding json: {}", serde_json::to_string(&self)?); + let response = my_client + .post(url) + .json(&self) + .header(ACCEPT, "application/json") + .header(CONTENT_TYPE, "application/json") + .timeout(t) + .send() + .await?; + + // Let's process our response + let response: Self = response.json().await?; + + Ok(response) + } +} + +#[cfg(test)] +mod tests { + use super::CLIENT; + + #[test] + fn it_will_not_externalize_without_environment() { + assert!(CLIENT.as_ref().is_err()); + } +} diff --git a/apollo-router/src/services/mod.rs b/apollo-router/src/services/mod.rs index fe3488481f..c99b245c60 100644 --- a/apollo-router/src/services/mod.rs +++ b/apollo-router/src/services/mod.rs @@ -24,6 +24,7 @@ pub(crate) use crate::services::supergraph_service::SupergraphCreator; pub mod execution; mod execution_service; +pub(crate) mod external; pub(crate) mod layers; pub(crate) mod new_service; pub(crate) mod query_planner; @@ -47,6 +48,28 @@ impl AsRef for Arc> { } } +#[cfg(test)] +pub(crate) fn apollo_key() -> Option { + // During tests we don't want env variables to affect defaults + None +} + +#[cfg(not(test))] +pub(crate) fn apollo_key() -> Option { + std::env::var("APOLLO_KEY").ok() +} + +#[cfg(test)] +pub(crate) fn apollo_graph_reference() -> Option { + // During tests we don't want env variables to affect defaults + None +} + +#[cfg(not(test))] +pub(crate) fn apollo_graph_reference() -> Option { + std::env::var("APOLLO_GRAPH_REF").ok() +} + // set the supported `@defer` specification version to https://github.com/graphql/graphql-spec/pull/742/commits/01d7b98f04810c9a9db4c0e53d3c4d54dbf10b82 pub(crate) const MULTIPART_DEFER_SPEC_PARAMETER: &str = "deferSpec"; pub(crate) const MULTIPART_DEFER_SPEC_VALUE: &str = "20220824"; diff --git a/apollo-router/src/services/router.rs b/apollo-router/src/services/router.rs index 15df5550ea..e865e56d9c 100644 --- a/apollo-router/src/services/router.rs +++ b/apollo-router/src/services/router.rs @@ -3,13 +3,24 @@ use bytes::Bytes; use futures::Stream; use futures::StreamExt; +use http::header::HeaderName; +use http::HeaderValue; use http::Method; +use http::StatusCode; use multer::Multipart; +use multimap::MultiMap; +use serde_json_bytes::ByteString; +use serde_json_bytes::Map as JsonMap; +use serde_json_bytes::Value; use static_assertions::assert_impl_all; use tower::BoxError; use super::supergraph; +use crate::graphql; +use crate::json_ext::Path; use crate::Context; +use crate::TryIntoHeaderName; +use crate::TryIntoHeaderValue; pub type BoxService = tower::util::BoxService; pub type BoxCloneService = tower::util::BoxCloneService; @@ -96,6 +107,7 @@ impl TryFrom for Request { assert_impl_all!(Response: Send); #[non_exhaustive] +#[derive(Debug)] pub struct Response { pub response: http::Response, pub context: Context, @@ -110,6 +122,7 @@ impl From> for Response { } } +#[buildstructor::buildstructor] impl Response { pub async fn next_response(&mut self) -> Option> { self.response.body_mut().next().await @@ -125,6 +138,72 @@ impl Response { } } + /// This is the constructor (or builder) to use when constructing a real Response.. + /// + /// Required parameters are required in non-testing code to create a Response.. + #[allow(clippy::too_many_arguments)] + #[builder(visibility = "pub")] + fn new( + label: Option, + data: Option, + path: Option, + errors: Vec, + // Skip the `Object` type alias in order to use buildstructor’s map special-casing + extensions: JsonMap, + status_code: Option, + headers: MultiMap, + context: Context, + ) -> Result { + // Build a response + let b = graphql::Response::builder() + .and_label(label) + .and_path(path) + .errors(errors) + .extensions(extensions); + let res = match data { + Some(data) => b.data(data).build(), + None => b.build(), + }; + + // Build an http Response + let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK)); + for (key, values) in headers { + let header_name: HeaderName = key.try_into()?; + for value in values { + let header_value: HeaderValue = value.try_into()?; + builder = builder.header(header_name.clone(), header_value); + } + } + + // let response = builder.body(once(ready(res)).boxed())?; + + let response = builder.body(hyper::Body::from(serde_json::to_vec(&res)?))?; + + Ok(Self { response, context }) + } + + /// This is the constructor (or builder) to use when constructing a Response that represents a global error. + /// It has no path and no response data. + /// This is useful for things such as authentication errors. + #[builder(visibility = "pub")] + fn error_new( + errors: Vec, + status_code: Option, + headers: MultiMap, + context: Context, + ) -> Result { + Response::new( + Default::default(), + Default::default(), + None, + errors, + Default::default(), + status_code, + headers, + context, + ) + } + /// EXPERIMENTAL: this is function is experimental and subject to potentially change. pub async fn into_graphql_response_stream( self, diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index a79a373322..eb36b9c242 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -65,6 +65,9 @@ impl Display for Compression { /// Client for interacting with subgraphs. #[derive(Clone)] pub(crate) struct SubgraphService { + // Note: We use hyper::Client here in preference to reqwest to avoid expensive URL translation + // in the hot path. We use reqwest elsewhere because it's convenient and some of the + // opentelemetry crate require reqwest clients to work correctly (at time of writing). client: Decompression>>, service: Arc, } diff --git a/apollo-router/src/tracer.rs b/apollo-router/src/tracer.rs index 59130f34c3..93758d892a 100644 --- a/apollo-router/src/tracer.rs +++ b/apollo-router/src/tracer.rs @@ -5,12 +5,13 @@ use std::fmt; use opentelemetry::trace::TraceContextExt; use opentelemetry::trace::TraceId as OtelTraceId; +use serde::Deserialize; use serde::Serialize; use tracing::Span; use tracing_opentelemetry::OpenTelemetrySpanExt; /// Trace ID -#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] pub struct TraceId([u8; 16]); impl TraceId { diff --git a/docs/source/config.json b/docs/source/config.json index e0092db222..5be2b78458 100644 --- a/docs/source/config.json +++ b/docs/source/config.json @@ -10,8 +10,10 @@ "Federation version support": "/federation-version-support", "Configuration": { "Overview": "/configuration/overview", + "Caching": "/configuration/caching", "CORS": "/configuration/cors", "CSRF prevention": "/configuration/csrf", + "External extensibility": "/configuration/external", "Logging": "/configuration/logging", "Header propagation": "/configuration/header-propagation", "Traffic shaping": "/configuration/traffic-shaping", diff --git a/docs/source/configuration/external.mdx b/docs/source/configuration/external.mdx new file mode 100644 index 0000000000..11870c114f --- /dev/null +++ b/docs/source/configuration/external.mdx @@ -0,0 +1,348 @@ +--- +title: External Extensibility in the Apollo Router +sidebar_title: External extensibility +description: Configure external co-processing for the router +--- + +> This is part of an experimental feature, it means any time until it's stabilized (without the prefix `experimental_`) we might change the configuration shape, add or remove features. +> If you want to give feedback or participate in that feature feel free to join [this discussion on GitHub](https://github.com/apollographql/router/discussions/2266). + +The router provides extensive in-process customization functionality: + - configuration + - rhai script + - rust plugin + +However, you may find that you have existing components in your system which can't be re-worked to any of these options for a variety of reasons, including, but not limited to: + - choice of implementation language + - bespoke packages/libs + - access to existing services/infrastructure + +For these situations, you can use external extensibility to interact with data flowing through the router. + +If external extensibility is configured, then a block of data is transmitted (encoded as JSON) to an endpoint via an HTTP POST request. Details of the encoding and expected communication protocol are documented below. The router will process the response to the POST request before resuming execution. + +Conceptually, an external co-processor performs the same functionality as you may provide via a rust plugin or a rhai script within the router. The difference is the protocol which governs the interaction between the router and the co-processor. + +```mermaid +sequenceDiagram + actor Client + participant RouterService + participant CoProcessor + participant SupergraphService + participant ExecutionService + participant SubgraphService(s) + + Client->>RouterService: Sends HTTP request + alt has coprocessor + RouterService->>CoProcessor: HTTP Post + CoProcessor-->>RouterService: HTTP Post Response + alt continue + RouterService->>SupergraphService: Continues request + else break + RouterService-->>Client: Breaks and returns HTTP response + end + else no coprocessor + RouterService->>SupergraphService: Processes request + end + SupergraphService->>ExecutionService: Initiates query
plan execution + par + ExecutionService->>SubgraphService(s): Initiates
sub-operation + SubgraphService(s)-->>ExecutionService: + and + ExecutionService->>SubgraphService(s): Initiates
sub-operation + SubgraphService(s)-->>ExecutionService: + and + ExecutionService->>SubgraphService(s): Initiates
sub-operation + SubgraphService(s)-->>ExecutionService: + end + ExecutionService-->>SupergraphService: Assembles and
returns response + SupergraphService-->>RouterService: Returns response + alt has coprocessor + RouterService->>CoProcessor: HTTP Post + CoProcessor-->>RouterService: HTTP Post Response + RouterService-->>Client: Returns HTTP response + else no coprocessor + RouterService-->>Client: Returns HTTP response + end +``` + +The external interface makes the following data available in the HTTP Post request: + + - control + - control + - stage + - trace id + - version + - data + - body + - context + - headers + - sdl (schema) + +The "control" attributes, provide information about the context of the specific router request or response and provide a mechanism to influence the flow of data within the router. + +The "data" attributes, provide information about the substance of a request or response. Apart from the SDL, this data is all modifiable. All of these fields are "optional" (via configuration) to provide control over what data is externalized. + +## Terminology + +It can be confusing to talk about requests/responses external processes within the context of an externalized interaction. For the purposes of clarity, this is how we will talk about the various components in this process: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
TextDescription
+ +##### `request` + + + +A request from a client being processed by a router + +
+ +##### `response` + + + +A request to a client being processed by a router + +
+ +##### `co-processor` + + + +An external program, processing information about a router request or response + +
+ +##### `co-processor request` + + + +A POST from a router to a co-processor containing information about a router request or response + +
+ +##### `co-processor response` + + + +A POST reply from a co-processor to a router containing information about a router request or response + +
+ +## Configuration + +The plugin is configured like other plugins. The typical configuration would look something like this. This is sending all available data related to a router request to a co-processor at the router stage and also sending the headers and context at the router response stage. A timeout is always in force for all co-processor requests. + +```yaml title="typical.yaml" +plugins: + experimental.external: + url: http://127.0.0.1:8081 # mandatory URL which is the address of the co-processor + timeout: 2s # optional timeout (2 seconds in this example). If not set, defaults to 1 second + stages: # In future, multiple stages may be configurable + router: # Currently, the only valid value is router + request: # What data should we transmit to the co-processor from the router request? + headers: true # All of these data content attributes are optional and false by default. + context: true + body: true + sdl: true + response: # What data should we transmit to the co-processor from the router response? + headers: true + context: true +``` + +The minimal data to transfer would be nothing on every router request. This is sending no data from the router request (just the control data) to a co-processor. This probably wouldn't be very useful in production, but could be in testing. + +```yaml title="minimal.yaml" +plugins: + experimental.external: + url: http://127.0.0.1:8081 # mandatory URL which is the POST target + stages: # In future, multiple stages may be configurable + router: # Currently, the only valid value is router + request: # What data should we transmit from the request? +``` + +## Encoding + +The data is encoded into JSON. Here's what the data would look like for a request at the router stage. This would be delivered as a POST request to the configured URL. + +```json title="router_request.json" +{ + "version": 1, + "stage": "RouterRequest", + "control": "Continue", + "id": "1b19c05fdafc521016df33148ad63c1b", + "headers": { + "cookie": [ + "tasty_cookie=strawberry" + ], + "content-type": [ + "application/json" + ], + "host": [ + "127.0.0.1:4000" + ], + "apollo-federation-include-trace": [ + "ftv1" + ], + "apollographql-client-name": [ + "manual" + ], + "accept": [ + "*/*" + ], + "user-agent": [ + "curl/7.79.1" + ], + "content-length": [ + "46" + ] + }, + "body": { + "query": "query Long {\n me {\n name\n}\n}" + }, + "context": { + "entries": { + "accepts-json": false, + "accepts-wildcard": true, + "accepts-multipart": false + } + }, + "sdl": "schema\n @core(feature: \"https://specs.apollo.dev/core/v0.1\"),\n @core(feature: \"https://specs.apollo.dev/join/v0.1\")\n{\n query: Query\n mutation: Mutation\n}\n\ndirective @core(feature: String!) repeatable on SCHEMA\n\ndirective @join__field(graph: join__Graph, requires: join__FieldSet, provides: join__FieldSet) on FIELD_DEFINITION\n\ndirective @join__type(graph: join__Graph!, key: join__FieldSet) repeatable on OBJECT | INTERFACE\n\ndirective @join__owner(graph: join__Graph!) on OBJECT | INTERFACE\n\ndirective @join__graph(name: String!, url: String!) on ENUM_VALUE\n\nscalar join__FieldSet\n\nenum join__Graph {\n ACCOUNTS @join__graph(name: \"accounts\" url: \"http://localhost:4001\")\n INVENTORY @join__graph(name: \"inventory\" url: \"http://localhost:4004\")\n PRODUCTS @join__graph(name: \"products\" url: \"http://localhost:4003\")\n REVIEWS @join__graph(name: \"reviews\" url: \"http://localhost:4002\")\n}\n\ntype Mutation {\n createProduct(name: String, upc: ID!): Product @join__field(graph: PRODUCTS)\n createReview(body: String, id: ID!, upc: ID!): Review @join__field(graph: REVIEWS)\n}\n\ntype Product\n @join__owner(graph: PRODUCTS)\n @join__type(graph: PRODUCTS, key: \"upc\")\n @join__type(graph: INVENTORY, key: \"upc\")\n @join__type(graph: REVIEWS, key: \"upc\")\n{\n inStock: Boolean @join__field(graph: INVENTORY)\n name: String @join__field(graph: PRODUCTS)\n price: Int @join__field(graph: PRODUCTS)\n reviews: [Review] @join__field(graph: REVIEWS)\n reviewsForAuthor(authorID: ID!): [Review] @join__field(graph: REVIEWS)\n shippingEstimate: Int @join__field(graph: INVENTORY, requires: \"price weight\")\n upc: String! @join__field(graph: PRODUCTS)\n weight: Int @join__field(graph: PRODUCTS)\n}\n\ntype Query {\n me: User @join__field(graph: ACCOUNTS)\n topProducts(first: Int = 5): [Product] @join__field(graph: PRODUCTS)\n}\n\ntype Review\n @join__owner(graph: REVIEWS)\n @join__type(graph: REVIEWS, key: \"id\")\n{\n author: User @join__field(graph: REVIEWS, provides: \"username\")\n body: String @join__field(graph: REVIEWS)\n id: ID! @join__field(graph: REVIEWS)\n product: Product @join__field(graph: REVIEWS)\n}\n\ntype User\n @join__owner(graph: ACCOUNTS)\n @join__type(graph: ACCOUNTS, key: \"id\")\n @join__type(graph: REVIEWS, key: \"id\")\n{\n id: ID! @join__field(graph: ACCOUNTS)\n name: String @join__field(graph: ACCOUNTS)\n reviews: [Review] @join__field(graph: REVIEWS)\n username: String @join__field(graph: ACCOUNTS)\n}\n" +} +``` + +And here's what a router stage response may look like: + +```json title="router_response.json" +{ + "version": 1, + "stage": "RouterResponse", + "control": "Continue", + "id": "1b19c05fdafc521016df33148ad63c1b", + "headers": { + "vary": [ + "origin" + ], + "content-type": [ + "application/json" + ] + }, + "body": { + "data": { + "me": { + "name": "Ada Lovelace" + } + } + }, + "context": { + "entries": { + "apollo_telemetry::subgraph_metrics_attributes": {}, + "accepts-json": false, + "accepts-multipart": false, + "apollo_telemetry::client_name": "manual", + "apollo_telemetry::usage_reporting": { + "statsReportKey": "# Long\nquery Long{me{name}}", + "referencedFieldsByType": { + "User": { + "fieldNames": [ + "name" + ], + "isInterface": false + }, + "Query": { + "fieldNames": [ + "me" + ], + "isInterface": false + } + } + }, + "apollo_telemetry::client_version": "", + "accepts-wildcard": true + } + }, + "sdl": "schema\n @core(feature: \"https://specs.apollo.dev/core/v0.1\"),\n @core(feature: \"https://specs.apollo.dev/join/v0.1\")\n{\n query: Query\n mutation: Mutation\n}\n\ndirective @core(feature: String!) repeatable on SCHEMA\n\ndirective @join__field(graph: join__Graph, requires: join__FieldSet, provides: join__FieldSet) on FIELD_DEFINITION\n\ndirective @join__type(graph: join__Graph!, key: join__FieldSet) repeatable on OBJECT | INTERFACE\n\ndirective @join__owner(graph: join__Graph!) on OBJECT | INTERFACE\n\ndirective @join__graph(name: String!, url: String!) on ENUM_VALUE\n\nscalar join__FieldSet\n\nenum join__Graph {\n ACCOUNTS @join__graph(name: \"accounts\" url: \"http://localhost:4001\")\n INVENTORY @join__graph(name: \"inventory\" url: \"http://localhost:4004\")\n PRODUCTS @join__graph(name: \"products\" url: \"http://localhost:4003\")\n REVIEWS @join__graph(name: \"reviews\" url: \"http://localhost:4002\")\n}\n\ntype Mutation {\n createProduct(name: String, upc: ID!): Product @join__field(graph: PRODUCTS)\n createReview(body: String, id: ID!, upc: ID!): Review @join__field(graph: REVIEWS)\n}\n\ntype Product\n @join__owner(graph: PRODUCTS)\n @join__type(graph: PRODUCTS, key: \"upc\")\n @join__type(graph: INVENTORY, key: \"upc\")\n @join__type(graph: REVIEWS, key: \"upc\")\n{\n inStock: Boolean @join__field(graph: INVENTORY)\n name: String @join__field(graph: PRODUCTS)\n price: Int @join__field(graph: PRODUCTS)\n reviews: [Review] @join__field(graph: REVIEWS)\n reviewsForAuthor(authorID: ID!): [Review] @join__field(graph: REVIEWS)\n shippingEstimate: Int @join__field(graph: INVENTORY, requires: \"price weight\")\n upc: String! @join__field(graph: PRODUCTS)\n weight: Int @join__field(graph: PRODUCTS)\n}\n\ntype Query {\n me: User @join__field(graph: ACCOUNTS)\n topProducts(first: Int = 5): [Product] @join__field(graph: PRODUCTS)\n}\n\ntype Review\n @join__owner(graph: REVIEWS)\n @join__type(graph: REVIEWS, key: \"id\")\n{\n author: User @join__field(graph: REVIEWS, provides: \"username\")\n body: String @join__field(graph: REVIEWS)\n id: ID! @join__field(graph: REVIEWS)\n product: Product @join__field(graph: REVIEWS)\n}\n\ntype User\n @join__owner(graph: ACCOUNTS)\n @join__type(graph: ACCOUNTS, key: \"id\")\n @join__type(graph: REVIEWS, key: \"id\")\n{\n id: ID! @join__field(graph: ACCOUNTS)\n name: String @join__field(graph: ACCOUNTS)\n reviews: [Review] @join__field(graph: REVIEWS)\n username: String @join__field(graph: ACCOUNTS)\n}\n" +} +``` + +The co-processor can make use of this data however they choose, the response will dictate how the data continues to flow through the router. + +## Protocol + +The protocol is very simple. A payload is transmitted to a co-processor and a response is expected. The payload of the response must comply to the defined format and is interpreted as follows. + +### Control + +Control field changes are ignored with the exception of the control field. The control field indicates to the router how processing of a request/response should proceed. Valid values are: + + - "Continue" + - { "Break": <status code> } + +If the value of the "control" field is "Continue", then processing will continue as normal through the router. If the value is a map with a key of "Break" and a numeric status code, then the router will stop processing and return a response to the client. The data returned to the client is the "body" of the co-processor response to the router. + +If the Break code is a "success" code (2xx), then the body of the co-processor response should be valid data that the client is expecting. If the Break code is a "failure" code, then the body of the co-processor response will be formatted into an error response for the client. + +> In the case of an externalized response, the router is already returning to the client, so the flow isn't altered, but the status code may still be changed. + + +For example, if the co-processor updated a received request so that the control block of the co-processor response data looked like this: + +```json title="router_request_break.json" +{ + "version": 1, + "stage": "RouterRequest", + "control": { "Break": 403 }, + "id": "1b19c05fdafc521016df33148ad63c1b", + "body": "You are not allowed to do this", + (etc...) +} +``` + +then the router would stop processing the request and return a response to the client with an HTTP status code of 403 and the error message "You are not allowed to do this" (read from the body). + +### Data + +The router will read back the values of "body", "headers" and "context" and assign the returned values to the request/response. If the values are absent from the co-processor response, then the values are left unchanged. Any changes to "sdl" are always ignored by the router. + +## Reliability + +The router is written to try and maintain the highest standards of fidelity as a federated GraphQL processor. Externalization weakens that guarantee to some extent, since the responsibility for formatting data/errors is now partially devolved to a co-processor. It is the responsibility of the co-processor author to ensure that the externalization protocol is followed carefully to minimize the opportunity for unexpected failures. + +## Performance + +There is a cost to sending data from the router to a co-processor and waiting for the response and then processing any changes. We've done some basic benchmarking where we externalized all data on a request to a co-processor which was on the same machine and simply echoed back the data to the router. This reduced performance of the router (in terms of transaction throughput) by ~33%. If we then did the same thing and also called the co-processor with the router response, transaction throughput was reduced by ~50%. + +Most co-processors will do substantially more than just echo a response back to a router, so expect the performance impact to be proportionately greater. diff --git a/docs/source/configuration/overview.mdx b/docs/source/configuration/overview.mdx index e15a3574df..13f3160636 100644 --- a/docs/source/configuration/overview.mdx +++ b/docs/source/configuration/overview.mdx @@ -377,6 +377,10 @@ See [Sending HTTP headers to subgraphs](./header-propagation/). See [Configuring CORS in the Apollo Router](./cors). +### External Extensibility + +See [External Extensibility in the Apollo Router](./external). + ### OpenTelemetry tracing See [Tracing in the Apollo Router](./tracing/). diff --git a/examples/external/README.md b/examples/external/README.md new file mode 100644 index 0000000000..937cad4027 --- /dev/null +++ b/examples/external/README.md @@ -0,0 +1,15 @@ +# External co-processing + +Demonstrates router request/response externalization can be performed via yaml configuration. + +Possible operations include externalizing: +- Headers +- Body +- Context +- SDL (schema) + +## Usage + +```bash +cargo run -- -s ../graphql/supergraph.graphql -c ./router.yaml +``` diff --git a/examples/external/router.yaml b/examples/external/router.yaml new file mode 100644 index 0000000000..a749a77f08 --- /dev/null +++ b/examples/external/router.yaml @@ -0,0 +1,33 @@ +### +# This feature is experimental and subject to change without notice. +# +# The only mandatory parameter is the url, which specifies the target of an HTTP post payload +# +# optional parameters: +# - timeout: defaults to 1s if not specified +# - stages: which stages of the router to externalize +# (if specified, must include at least one stage. The only valid value right now is "router") +# - request: which data to populate in the request externalization payload +# - headers: all of the headers received in the client request +# - context: the context of the request/response +# - body: the body of the client request +# - sdl: the sdl (schema) of the router +# - response: which data to populate in the response externalization payload +# - headers: all of the headers about to be returned to the client +# - context: the context of the request/response +# - body: the body of the client response +# - sdl: the sdl (schema) of the router +### +plugins: + experimental.external: + url: http://127.0.0.1:8081 + timeout: 5s + stages: + router: + request: + headers: true + context: true + body: true + sdl: true + response: + headers: true diff --git a/examples/header-manipulation/README.md b/examples/header-manipulation/README.md index d8618aa7bb..987b1ecd7f 100644 --- a/examples/header-manipulation/README.md +++ b/examples/header-manipulation/README.md @@ -11,5 +11,5 @@ Possible operations are: ## Usage ```bash -cargo run -- -s ../../graphql/supergraph.graphql -c ./router.yaml +cargo run -- -s ../graphql/supergraph.graphql -c ./router.yaml ```