Skip to content

Commit

Permalink
Add internal HTTP-level Tower service (#1582)
Browse files Browse the repository at this point in the history
Part of #1496
  • Loading branch information
SimonSapin authored Aug 25, 2022
1 parent d8f4a86 commit 1ffb4f4
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 92 deletions.
194 changes: 102 additions & 92 deletions apollo-router/src/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,106 @@ impl AxumHttpServerFactory {
}
}

pub(crate) fn make_axum_router<RF>(
service_factory: RF,
configuration: &Configuration,
plugin_handlers: HashMap<String, Handler>,
) -> Result<Router, ApolloRouterError>
where
RF: SupergraphServiceFactory,
{
let cors = configuration
.server
.cors
.clone()
.into_layer()
.map_err(|e| {
ApolloRouterError::ConfigError(
crate::configuration::ConfigurationError::LayerConfiguration {
layer: "Cors".to_string(),
error: e,
},
)
})?;
let graphql_endpoint = if configuration.server.endpoint.ends_with("/*") {
// Needed for axum (check the axum docs for more information about wildcards https://docs.rs/axum/latest/axum/struct.Router.html#wildcards)
format!("{}router_extra_path", configuration.server.endpoint)
} else {
configuration.server.endpoint.clone()
};
let mut router = Router::<hyper::Body>::new()
.route(
&graphql_endpoint,
get({
let display_landing_page = configuration.server.landing_page;
move |host: Host, Extension(service): Extension<RF>, http_request: Request<Body>| {
handle_get(
host,
service.new_service().boxed(),
http_request,
display_landing_page,
)
}
})
.post({
move |host: Host,
uri: OriginalUri,
request: Json<graphql::Request>,
Extension(service): Extension<RF>,
header_map: HeaderMap| {
handle_post(
host,
uri,
request,
service.new_service().boxed(),
header_map,
)
}
}),
)
.layer(middleware::from_fn(decompress_request_body))
.layer(
TraceLayer::new_for_http()
.make_span_with(PropagatingMakeSpan::new())
.on_response(|resp: &Response<_>, _duration: Duration, span: &Span| {
if resp.status() >= StatusCode::BAD_REQUEST {
span.record(
"otel.status_code",
&opentelemetry::trace::StatusCode::Error.as_str(),
);
} else {
span.record(
"otel.status_code",
&opentelemetry::trace::StatusCode::Ok.as_str(),
);
}
}),
)
.route(&configuration.server.health_check_path, get(health_check))
.layer(Extension(service_factory))
.layer(cors)
.layer(CompressionLayer::new()); // To compress response body

for (plugin_name, handler) in plugin_handlers {
router = router.route(
&format!("/plugins/{}/*path", plugin_name),
get({
let new_handler = handler.clone();
move |host: Host, request_parts: Request<Body>| {
custom_plugin_handler(host, request_parts, new_handler)
}
})
.post({
let new_handler = handler.clone();
move |host: Host, request_parts: Request<Body>| {
custom_plugin_handler(host, request_parts, new_handler)
}
}),
);
}
Ok(router)
}

impl HttpServerFactory for AxumHttpServerFactory {
type Future = Pin<Box<dyn Future<Output = Result<HttpServerHandle, ApolloRouterError>> + Send>>;

Expand All @@ -95,97 +195,7 @@ impl HttpServerFactory for AxumHttpServerFactory {
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let listen_address = configuration.server.listen.clone();

let cors = configuration
.server
.cors
.clone()
.into_layer()
.map_err(|e| {
ApolloRouterError::ConfigError(
crate::configuration::ConfigurationError::LayerConfiguration {
layer: "Cors".to_string(),
error: e,
},
)
})?;
let graphql_endpoint = if configuration.server.endpoint.ends_with("/*") {
// Needed for axum (check the axum docs for more information about wildcards https://docs.rs/axum/latest/axum/struct.Router.html#wildcards)
format!("{}router_extra_path", configuration.server.endpoint)
} else {
configuration.server.endpoint.clone()
};
let mut router = Router::new()
.route(
&graphql_endpoint,
get({
let display_landing_page = configuration.server.landing_page;
move |host: Host,
Extension(service): Extension<RF>,
http_request: Request<Body>| {
handle_get(
host,
service.new_service().boxed(),
http_request,
display_landing_page,
)
}
})
.post({
move |host: Host,
uri: OriginalUri,
request: Json<graphql::Request>,
Extension(service): Extension<RF>,
header_map: HeaderMap| {
handle_post(
host,
uri,
request,
service.new_service().boxed(),
header_map,
)
}
}),
)
.layer(middleware::from_fn(decompress_request_body))
.layer(
TraceLayer::new_for_http()
.make_span_with(PropagatingMakeSpan::new())
.on_response(|resp: &Response<_>, _duration: Duration, span: &Span| {
if resp.status() >= StatusCode::BAD_REQUEST {
span.record(
"otel.status_code",
&opentelemetry::trace::StatusCode::Error.as_str(),
);
} else {
span.record(
"otel.status_code",
&opentelemetry::trace::StatusCode::Ok.as_str(),
);
}
}),
)
.route(&configuration.server.health_check_path, get(health_check))
.layer(Extension(service_factory))
.layer(cors)
.layer(CompressionLayer::new()); // To compress response body

for (plugin_name, handler) in plugin_handlers {
router = router.route(
&format!("/plugins/{}/*path", plugin_name),
get({
let new_handler = handler.clone();
move |host: Host, request_parts: Request<Body>| {
custom_plugin_handler(host, request_parts, new_handler)
}
})
.post({
let new_handler = handler.clone();
move |host: Host, request_parts: Request<Body>| {
custom_plugin_handler(host, request_parts, new_handler)
}
}),
);
}
let router = make_axum_router(service_factory, &configuration, plugin_handlers)?;

// if we received a TCP listener, reuse it, otherwise create a new one
#[cfg_attr(not(unix), allow(unused_mut))]
Expand Down Expand Up @@ -240,7 +250,7 @@ impl HttpServerFactory for AxumHttpServerFactory {
max_open_file_warning = None;
}

tokio::task::spawn(async move{
tokio::task::spawn(async move {
match res {
NetworkStream::Tcp(stream) => {
stream
Expand Down
45 changes: 45 additions & 0 deletions apollo-router/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ use displaydoc::Display as DisplayDoc;
use futures::channel::oneshot;
use futures::prelude::*;
use futures::FutureExt;
use http_body::Body as _;
use hyper::Body;
use thiserror::Error;
use tokio::sync::RwLock;
use tokio::task::spawn;
use tower::BoxError;
use tower::ServiceExt;
use tracing::subscriber::SetGlobalDefaultError;
use tracing_futures::WithSubscriber;
use url::Url;
Expand All @@ -29,15 +32,57 @@ use Event::Shutdown;
use Event::UpdateConfiguration;
use Event::UpdateSchema;

use crate::axum_http_server_factory::make_axum_router;
use crate::axum_http_server_factory::AxumHttpServerFactory;
use crate::configuration::validate_configuration;
use crate::configuration::Configuration;
use crate::configuration::ListenAddr;
use crate::plugin::DynPlugin;
use crate::router_factory::SupergraphServiceConfigurator;
use crate::router_factory::YamlSupergraphServiceFactory;
use crate::services::transport;
use crate::spec::Schema;
use crate::state_machine::StateMachine;

type SchemaStream = Pin<Box<dyn Stream<Item = String> + Send>>;

// For now this is unused:
#[allow(unused)]
// Later we might add a public API for this (probably a builder similar to `test_harness.rs`),
// see https://github.com/apollographql/router/issues/1496.
// In the meantime keeping this function helps make sure it still compiles.
async fn make_transport_service<RF>(
schema: &str,
configuration: Arc<Configuration>,
extra_plugins: Vec<(String, Box<dyn DynPlugin>)>,
) -> Result<transport::BoxCloneService, BoxError> {
let schema = Arc::new(Schema::parse(schema, &configuration)?);
let service_factory = YamlSupergraphServiceFactory
.create(configuration.clone(), schema, None, Some(extra_plugins))
.await?;
let extra = Default::default();
Ok(make_axum_router(service_factory, &configuration, extra)?
.map_response(|response| {
response.map(|body| {
// Axum makes this `body` have type:
// https://docs.rs/http-body/0.4.5/http_body/combinators/struct.UnsyncBoxBody.html
let mut body = Box::pin(body);
// We make a stream based on its `poll_data` method
// in order to create a `hyper::Body`.
Body::wrap_stream(stream::poll_fn(move |ctx| body.as_mut().poll_data(ctx)))
// … but we ignore the `poll_trailers` method:
// https://docs.rs/http-body/0.4.5/http_body/trait.Body.html#tymethod.poll_trailers
// Apparently HTTP/2 trailers are like headers, except after the response body.
// I (Simon) believe nothing in the Apollo Router uses trailers as of this writing,
// so ignoring `poll_trailers` is fine.
// If we want to use trailers, we may need remove this convertion to `hyper::Body`
// and return `UnsyncBoxBody` (a.k.a. `axum::BoxBody`) as-is.
})
})
.map_err(|error| match error {})
.boxed_clone())
}

/// Error types for FederatedServer.
#[derive(Error, Debug, DisplayDoc)]
pub enum ApolloRouterError {
Expand Down

0 comments on commit 1ffb4f4

Please sign in to comment.