From 5a97236a96520e3ffe570021756d926692253ee1 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 18 Nov 2021 15:11:16 -0800 Subject: [PATCH] outbound: Use per-route services in routing stack (#1380) The outbound proxy currently maintains a `Proxy` instance for each profile-defined `Route`. This allows the router to use a single underlying logical service, but this prevents per-route logic from influencing the logical target to which the request is dispatched. This change modifies the outbound stack to use an alternate profile router that dispatches over per-route `Service`s instead of per-route `Proxy`s. This is possible because the outbound logical stack is buffered (and implements `Clone`), while the inbound stack is not. To support this, the following changes have been made: * The `linkerd_app_core::dst` module has been eliminated in favor of inbound- and outbound-specific Route target types; * `linkerd_http_classify::Classify` now implements `Service` in addition to `Proxy`; * `linkerd_retry::Retry` now implements `Service` instead of `Proxy`; * `linkerd_service_profiles`now implements two caching routers: `NewServiceRouter` for `Service`s and `NewProxyRouter` for `Proxy`s; * The `linkerd_stack::ProxyService` helper has been removed, as it's not used; and * Various unneeded `http::BoxResponse` layers have been removed from the outound stack. --- Cargo.lock | 2 +- linkerd/app/core/src/dst.rs | 28 ---- linkerd/app/core/src/lib.rs | 1 - linkerd/app/core/src/metrics.rs | 24 ++- linkerd/app/core/src/svc.rs | 6 +- linkerd/app/inbound/src/http/router.rs | 108 ++++++++------ linkerd/app/outbound/src/http.rs | 51 +++++-- linkerd/app/outbound/src/http/logical.rs | 78 +++++----- linkerd/app/outbound/src/http/retry.rs | 11 +- linkerd/http-classify/src/service.rs | 28 +++- linkerd/retry/Cargo.toml | 2 +- linkerd/retry/src/lib.rs | 84 ++++------- linkerd/service-profiles/src/http.rs | 17 ++- linkerd/service-profiles/src/http/proxy.rs | 137 ++++++++++++++++++ .../src/http/route_request.rs | 134 ----------------- linkerd/service-profiles/src/http/service.rs | 116 +++++++++++++++ linkerd/stack/src/lib.rs | 4 +- linkerd/stack/src/proxy.rs | 50 +------ linkerd/stack/src/router.rs | 2 +- 19 files changed, 498 insertions(+), 385 deletions(-) delete mode 100644 linkerd/app/core/src/dst.rs create mode 100644 linkerd/service-profiles/src/http/proxy.rs delete mode 100644 linkerd/service-profiles/src/http/route_request.rs create mode 100644 linkerd/service-profiles/src/http/service.rs diff --git a/Cargo.lock b/Cargo.lock index b2fe226448..d6b06a370d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1478,9 +1478,9 @@ dependencies = [ name = "linkerd-retry" version = "0.1.0" dependencies = [ + "futures", "linkerd-error", "linkerd-stack", - "pin-project", "tower", "tracing", ] diff --git a/linkerd/app/core/src/dst.rs b/linkerd/app/core/src/dst.rs deleted file mode 100644 index 369d7ec0d0..0000000000 --- a/linkerd/app/core/src/dst.rs +++ /dev/null @@ -1,28 +0,0 @@ -use super::classify; -use crate::profiles; -use linkerd_http_classify::CanClassify; -use linkerd_proxy_http as http; -use linkerd_stack::Param; - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct Route { - pub addr: profiles::LogicalAddr, - pub route: profiles::http::Route, - pub direction: super::metrics::Direction, -} - -// === impl Route === - -impl CanClassify for Route { - type Classify = classify::Request; - - fn classify(&self) -> classify::Request { - self.route.response_classes().clone().into() - } -} - -impl Param for Route { - fn param(&self) -> http::ResponseTimeout { - http::ResponseTimeout(self.route.timeout()) - } -} diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index 9df24fe07e..1229552eca 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -36,7 +36,6 @@ pub mod classify; pub mod config; pub mod control; pub mod dns; -pub mod dst; pub mod errors; pub mod http_tracing; pub mod metrics; diff --git a/linkerd/app/core/src/metrics.rs b/linkerd/app/core/src/metrics.rs index 1647242332..0f5fb60fbe 100644 --- a/linkerd/app/core/src/metrics.rs +++ b/linkerd/app/core/src/metrics.rs @@ -1,7 +1,7 @@ pub use crate::transport::labels::{TargetAddr, TlsAccept}; use crate::{ classify::{Class, SuccessOrFailure}, - control, dst, http_metrics, http_metrics as metrics, opencensus, profiles, stack_metrics, + control, http_metrics, http_metrics as metrics, opencensus, profiles, stack_metrics, svc::Param, telemetry, tls, transport::{self, labels::TlsConnect}, @@ -216,12 +216,22 @@ impl FmtLabels for ControlLabels { // === impl RouteLabels === -impl Param for dst::Route { - fn param(&self) -> RouteLabels { - RouteLabels { - addr: self.addr.clone(), - direction: self.direction, - labels: prefix_labels("rt", self.route.labels().iter()), +impl RouteLabels { + pub fn inbound(addr: profiles::LogicalAddr, route: &profiles::http::Route) -> Self { + let labels = prefix_labels("rt", route.labels().iter()); + Self { + addr, + labels, + direction: Direction::In, + } + } + + pub fn outbound(addr: profiles::LogicalAddr, route: &profiles::http::Route) -> Self { + let labels = prefix_labels("rt", route.labels().iter()); + Self { + addr, + labels, + direction: Direction::Out, } } } diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 882a9b1f27..94e143fbfb 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -6,9 +6,9 @@ use linkerd_error::Recover; use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}; pub use linkerd_reconnect::NewReconnect; pub use linkerd_stack::{ - self as stack, layer, ArcNewService, BoxService, BoxServiceLayer, Either, ExtractParam, Fail, - FailFast, Filter, InsertParam, MapErr, MapTargetLayer, NewRouter, NewService, Param, Predicate, - UnwrapOr, + self as stack, layer, ArcNewService, BoxCloneService, BoxService, BoxServiceLayer, Either, + ExtractParam, Fail, FailFast, Filter, InsertParam, MapErr, MapTargetLayer, NewRouter, + NewService, Param, Predicate, UnwrapOr, }; pub use linkerd_stack_tracing::{NewInstrument, NewInstrumentLayer}; use std::{ diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index 89a08c3108..21b05ae6e6 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -1,6 +1,6 @@ use crate::{policy, stack_labels, Inbound}; use linkerd_app_core::{ - classify, dst, errors, http_tracing, io, metrics, + classify, errors, http_tracing, io, metrics, profiles::{self, DiscoveryRejected}, proxy::{http, tap}, svc::{self, ExtractParam, Param}, @@ -8,7 +8,7 @@ use linkerd_app_core::{ transport::{self, ClientAddr, Remote, ServerAddr}, Error, Infallible, NameAddr, Result, }; -use std::{borrow::Borrow, net::SocketAddr}; +use std::net::SocketAddr; use tracing::{debug, debug_span}; /// Describes an HTTP client target. @@ -48,6 +48,12 @@ struct Profile { profiles: profiles::Receiver, } +#[derive(Clone, Debug)] +struct Route { + profile: Profile, + route: profiles::http::Route, +} + #[derive(Copy, Clone, Debug)] struct ClientRescue; @@ -112,16 +118,18 @@ impl Inbound { .http_endpoint .to_layer::(), ) - .push_on_service(http_tracing::client( - rt.span_sink.clone(), - super::trace_labels(), - )) - .push_on_service(svc::layers() - .push(http::BoxResponse::layer()) - // This box is needed to reduce compile times on recent (2021-10-17) nightlies, - // though this may be fixed by https://github.com/rust-lang/rust/pull/89831. It - // should be removed when possible. - .push(svc::BoxService::layer()) + .push_on_service( + svc::layers() + .push(http_tracing::client( + rt.span_sink.clone(), + super::trace_labels(), + )) + .push(http::BoxResponse::layer()) + // This box is needed to reduce compile times on recent + // (2021-10-17) nightlies, though this may be fixed by + // https://github.com/rust-lang/rust/pull/89831. It should + // be removed when possible. + .push(svc::BoxService::layer()) ); // Attempts to discover a service profile for each logical target (as @@ -129,32 +137,23 @@ impl Inbound { // request has not been received for `cache_max_idle_age`. http.clone() .check_new_service::>() - // The HTTP stack doesn't use the profile resolution, so drop it. - .push_map_target(Logical::from) - .push_on_service(http::BoxResponse::layer()) - .push(profiles::http::route_request::layer( + .push_map_target(|p: Profile| p.logical) + .push(profiles::http::NewProxyRouter::layer( + // If the request matches a route, use a per-route proxy to + // wrap the inner service. svc::proxies() + .push_map_target(|r: Route| r.profile.logical) .push_on_service(http::BoxRequest::layer()) - // Records per-route metrics. .push( - rt.metrics.proxy + rt.metrics + .proxy .http_route - .to_layer::(), + .to_layer::(), ) - // Sets the per-route response classifier as a request - // extension. - .push(classify::NewClassify::layer()) - // Sets the route as a request extension so that it can be used - // by tap. - .push_http_insert_target::() - .push_map_target(|(route, logical): (profiles::http::Route, Profile)| { - dst::Route { - route, - addr: logical.addr, - direction: metrics::Direction::In, - } - }) .push_on_service(http::BoxResponse::layer()) + .push(classify::NewClassify::layer()) + .push_http_insert_target::() + .push_map_target(|(route, profile)| Route { route, profile }) .into_inner(), )) .push_switch( @@ -163,7 +162,7 @@ impl Inbound { // the underlying target stack directly. |(rx, logical): (Option, Logical)| -> Result<_, Infallible> { if let Some(rx) = rx { - if let Some(addr) = rx.borrow().logical_addr() { + if let Some(addr) = rx.logical_addr() { return Ok(svc::Either::A(Profile { addr, logical, @@ -174,7 +173,6 @@ impl Inbound { Ok(svc::Either::B(logical)) }, http.clone() - .push_on_service(http::BoxResponse::layer()) .check_new_service::>() .into_inner(), ) @@ -196,19 +194,13 @@ impl Inbound { Ok(profiles::LookupAddr(addr.into())) })) .instrument(|_: &Logical| debug_span!("profile")) - .push_on_service( - svc::layers() - .push(http::BoxResponse::layer()) - .push(svc::layer::mk(svc::SpawnReady::new)), - ) + .push_on_service(svc::layer::mk(svc::SpawnReady::new)) // Skip the profile stack if it takes too long to become ready. .push_when_unready( config.profile_idle_timeout, - http.clone() - .push_on_service(svc::layer::mk(svc::SpawnReady::new)) + http.push_on_service(svc::layer::mk(svc::SpawnReady::new)) .into_inner(), ) - .check_new_service::>() .push_on_service( svc::layers() .push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical"))) @@ -311,6 +303,34 @@ impl svc::stack::RecognizeRoute> for LogicalPerRequest { } } +// === impl Route === + +impl Param for Route { + fn param(&self) -> profiles::http::Route { + self.route.clone() + } +} + +impl Param for Route { + fn param(&self) -> metrics::RouteLabels { + metrics::RouteLabels::inbound(self.profile.addr.clone(), &self.route) + } +} + +impl classify::CanClassify for Route { + type Classify = classify::Request; + + fn classify(&self) -> classify::Request { + self.route.response_classes().clone().into() + } +} + +impl Param for Route { + fn param(&self) -> http::ResponseTimeout { + http::ResponseTimeout(self.route.timeout()) + } +} + // === impl Profile === impl Param for Profile { @@ -387,8 +407,8 @@ impl tap::Inspect for Logical { fn route_labels(&self, req: &http::Request) -> Option { req.extensions() - .get::() - .map(|r| r.route.labels().clone()) + .get::() + .map(|r| r.labels().clone()) } fn is_outbound(&self, _: &http::Request) -> bool { diff --git a/linkerd/app/outbound/src/http.rs b/linkerd/app/outbound/src/http.rs index d13de90695..8a8c39103e 100644 --- a/linkerd/app/outbound/src/http.rs +++ b/linkerd/app/outbound/src/http.rs @@ -15,7 +15,7 @@ pub(crate) use self::{require_id_header::IdentityRequired, server::ServerRescue} use crate::tcp; pub use linkerd_app_core::proxy::http::*; use linkerd_app_core::{ - dst, + classify, metrics, profiles::{self, LogicalAddr}, proxy::{api_resolve::ProtocolHint, tap}, svc::Param, @@ -30,6 +30,12 @@ pub type Logical = crate::logical::Logical; pub type Concrete = crate::logical::Concrete; pub type Endpoint = crate::endpoint::Endpoint; +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct Route { + logical: Logical, + route: profiles::http::Route, +} + #[derive(Clone, Debug)] pub struct CanonicalDstHeader(pub Addr); @@ -85,17 +91,6 @@ impl Param for Logical { } } -impl Logical { - pub fn mk_route((route, logical): (profiles::http::Route, Self)) -> dst::Route { - use linkerd_app_core::metrics::Direction; - dst::Route { - route, - addr: logical.logical_addr, - direction: Direction::Out, - } - } -} - impl Param for Logical { fn param(&self) -> normalize_uri::DefaultAuthority { normalize_uri::DefaultAuthority(Some( @@ -190,11 +185,39 @@ impl tap::Inspect for Endpoint { fn route_labels(&self, req: &Request) -> Option { req.extensions() - .get::() - .map(|r| r.route.labels().clone()) + .get::() + .map(|r| r.labels().clone()) } fn is_outbound(&self, _: &Request) -> bool { true } } + +// === impl Route === + +impl Param for Route { + fn param(&self) -> profiles::http::Route { + self.route.clone() + } +} + +impl Param for Route { + fn param(&self) -> metrics::RouteLabels { + metrics::RouteLabels::outbound(self.logical.logical_addr.clone(), &self.route) + } +} + +impl Param for Route { + fn param(&self) -> ResponseTimeout { + ResponseTimeout(self.route.timeout()) + } +} + +impl classify::CanClassify for Route { + type Classify = classify::Request; + + fn classify(&self) -> classify::Request { + self.route.response_classes().clone().into() + } +} diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index b64aaee37b..dc91a2b35a 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -1,7 +1,7 @@ -use super::{retry, CanonicalDstHeader, Concrete, Endpoint, Logical}; +use super::{retry, CanonicalDstHeader, Concrete, Endpoint, Logical, Route}; use crate::{endpoint, resolve, stack_labels, Outbound}; use linkerd_app_core::{ - classify, config, dst, profiles, + classify, config, profiles, proxy::{ api_resolve::{ConcreteAddr, Metadata}, core::Resolve, @@ -13,10 +13,8 @@ use linkerd_app_core::{ use tracing::debug_span; impl Outbound { - pub fn push_http_logical(self, resolve: R) -> Outbound> + pub fn push_http_logical(self, resolve: R) -> Outbound> where - B: http::HttpBody + std::fmt::Debug + Default + Unpin + Send + 'static, - B::Data: Send + 'static, E: svc::NewService + Clone + Send + Sync + 'static, ESvc: svc::Service, Response = http::Response> + Send @@ -54,7 +52,7 @@ impl Outbound { .check_service::() .into_inner(); - endpoint + let concrete = endpoint .clone() .check_new_service::>() .push_on_service( @@ -103,13 +101,15 @@ impl Outbound { // concrete address. .instrument(|c: &Concrete| debug_span!("concrete", addr = %c.resolve)) .push_map_target(Concrete::from) - .push(svc::ArcNewService::layer()) - // Distribute requests over a distribution of balancers via a - // traffic split. - // - // If the traffic split is empty/unavailable, eagerly fail requests. - // When the split is in failfast, spawn the service in a background - // task so it becomes ready without new requests. + .push(svc::ArcNewService::layer()); + + // Distribute requests over a distribution of balancers via a + // traffic split. + // + // If the traffic split is empty/unavailable, eagerly fail requests. + // When the split is in failfast, spawn the service in a background + // task so it becomes ready without new requests. + let logical = concrete .check_new_service::<(ConcreteAddr, Logical), _>() .push(profiles::split::layer()) .push_on_service( @@ -124,25 +124,33 @@ impl Outbound { .push(svc::FailFast::layer("HTTP Logical", dispatch_timeout)) .push_spawn_buffer(buffer_capacity), ) - .push_cache(cache_max_idle_age) - .push_on_service(http::BoxResponse::layer()) - // Note: routes can't exert backpressure. - .push(profiles::http::route_request::layer( - svc::proxies() + .push_cache(cache_max_idle_age); + + // If there's no route, use the logical service directly; otherwise + // use the per-route stack. + logical + .clone() + .push_switch( + |(route, logical): (Option, Logical)| -> Result<_, Infallible> { + match route { + None => Ok(svc::Either::A(logical)), + Some(route) => Ok(svc::Either::B(Route { route, logical })), + } + }, + logical + .push_map_target(|r: Route| r.logical) .push_on_service(http::BoxRequest::layer()) .push( rt.metrics .proxy .http_route_actual - .to_layer::(), + .to_layer::(), ) - // Depending on whether or not the request can be retried, - // it may have one of two `Body` types. This layer unifies - // any `Body` type into `BoxBody` so that the rest of the - // stack doesn't have to implement `Service` for requests - // with both body types. + // Depending on whether or not the request can be + // retried, it may have one of two `Body` types. This + // layer unifies any `Body` type into `BoxBody`. .push_on_service(http::BoxRequest::erased()) - .push_http_insert_target::() + .push_http_insert_target::() // Sets an optional retry policy. .push(retry::layer(rt.metrics.proxy.http_route_retry.clone())) // Sets an optional request timeout. @@ -157,17 +165,19 @@ impl Outbound { // Sets the per-route response classifier as a request // extension. .push(classify::NewClassify::layer()) - .push_map_target(Logical::mk_route) - .push_on_service(http::BoxResponse::layer()) + .push_on_service( + svc::layers() + .push(http::BoxResponse::layer()) + .push(svc::BoxCloneService::layer()) + ) .into_inner(), - )) - .check_new_service::>() - .push_on_service(http::BoxRequest::layer()) - // Strips headers that may be set by this proxy and add an outbound - // canonical-dst-header. The response body is boxed unify the profile - // stack's response type with that of to endpoint stack. + ) + .push(profiles::http::NewServiceRouter::layer()) + // Strips headers that may be set by this proxy and add an + // outbound canonical-dst-header. The response body is boxed + // unify the profile stack's response type with that of to + // endpoint stack. .push(http::NewHeaderFromTarget::::layer()) - .push_on_service(http::BoxResponse::layer()) .instrument(|l: &Logical| debug_span!("logical", dst = %l.logical_addr)) .push_on_service(svc::BoxService::layer()) .push(svc::ArcNewService::layer()) diff --git a/linkerd/app/outbound/src/http/retry.rs b/linkerd/app/outbound/src/http/retry.rs index 0da2ba9e9a..87fe11779d 100644 --- a/linkerd/app/outbound/src/http/retry.rs +++ b/linkerd/app/outbound/src/http/retry.rs @@ -1,10 +1,9 @@ +use super::Route; use futures::future; use linkerd_app_core::{ classify, - dst::Route, http_metrics::retries::Handle, - metrics::HttpRouteRetry, - profiles, + metrics, profiles, proxy::http::{ClientHandle, HttpBody}, svc::{layer, Either, Param}, Error, @@ -15,14 +14,14 @@ use linkerd_retry as retry; use std::sync::Arc; pub fn layer( - metrics: HttpRouteRetry, + metrics: metrics::HttpRouteRetry, ) -> impl layer::Layer> + Clone { retry::NewRetry::<_, N>::layer(NewRetryPolicy::new(metrics)) } #[derive(Clone, Debug)] pub struct NewRetryPolicy { - metrics: HttpRouteRetry, + metrics: metrics::HttpRouteRetry, } #[derive(Clone, Debug)] @@ -38,7 +37,7 @@ const MAX_BUFFERED_BYTES: usize = 64 * 1024; // === impl NewRetryPolicy === impl NewRetryPolicy { - pub fn new(metrics: HttpRouteRetry) -> Self { + pub fn new(metrics: metrics::HttpRouteRetry) -> Self { Self { metrics } } } diff --git a/linkerd/http-classify/src/service.rs b/linkerd/http-classify/src/service.rs index 212cf7db38..4cd387f7ae 100644 --- a/linkerd/http-classify/src/service.rs +++ b/linkerd/http-classify/src/service.rs @@ -1,4 +1,5 @@ -use linkerd_stack::{layer, NewService, Proxy}; +use linkerd_stack::{layer, NewService, Proxy, Service}; +use std::task::{Context, Poll}; #[derive(Clone, Debug)] pub struct NewClassify { @@ -44,7 +45,30 @@ where fn proxy(&self, svc: &mut S, mut req: http::Request) -> Self::Future { let classify_rsp = self.classify.classify(&req); - let _ = req.extensions_mut().insert(classify_rsp); + let prior = req.extensions_mut().insert(classify_rsp); + debug_assert!(prior.is_none(), "classification extension already existed"); self.inner.proxy(svc, req) } } + +impl Service> for Classify +where + C: super::Classify, + S: tower::Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: http::Request) -> Self::Future { + let classify_rsp = self.classify.classify(&req); + let prior = req.extensions_mut().insert(classify_rsp); + debug_assert!(prior.is_none(), "classification extension already existed"); + self.inner.call(req) + } +} diff --git a/linkerd/retry/Cargo.toml b/linkerd/retry/Cargo.toml index ecd4ba636c..6f27c068b4 100644 --- a/linkerd/retry/Cargo.toml +++ b/linkerd/retry/Cargo.toml @@ -7,8 +7,8 @@ edition = "2021" publish = false [dependencies] +futures = { version = "0.3", default-features = false } linkerd-error = { path = "../error" } linkerd-stack = { path = "../stack" } -pin-project = "1" tower = { version = "0.4.11", default-features = false, features = ["retry"] } tracing = "0.1.29" diff --git a/linkerd/retry/src/lib.rs b/linkerd/retry/src/lib.rs index 37f1f5fa22..384a6a8685 100644 --- a/linkerd/retry/src/lib.rs +++ b/linkerd/retry/src/lib.rs @@ -1,14 +1,10 @@ #![deny(warnings, rust_2018_idioms)] #![forbid(unsafe_code)] +use futures::future; use linkerd_error::Error; -use linkerd_stack::{layer, Either, NewService, Oneshot, Proxy, ProxyService, ServiceExt}; -use pin_project::pin_project; -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; +use linkerd_stack::{layer, Either, NewService, Oneshot, Service, ServiceExt}; +use std::task::{Context, Poll}; pub use tower::retry::{budget::Budget, Policy}; use tracing::trace; @@ -53,18 +49,6 @@ pub struct Retry { inner: S, } -#[pin_project(project = ResponseFutureProj)] -pub enum ResponseFuture -where - R: tower::retry::Policy + Clone, - P: Proxy + Clone, - S: tower::Service + Clone, - S::Error: Into, -{ - Disabled(#[pin] P::Future), - Retry(#[pin] Oneshot>, Req>), -} - // === impl NewRetry === impl NewRetry { @@ -94,52 +78,38 @@ where // === impl Retry === -impl Proxy for Retry +impl Service for Retry where - R: PrepareRequest + Clone, - P: Proxy - + Proxy + P: PrepareRequest + Clone, + S: Service + + Service + Clone, - S: tower::Service + Clone, - S::Error: Into, + Fut: std::future::Future>, { - type Request = PReq; - type Response = PRsp; + type Response = Rsp; type Error = Error; - type Future = ResponseFuture; + type Future = future::Either, P::RetryRequest>>; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + >::poll_ready(&mut self.inner, cx) + } - fn proxy(&self, svc: &mut S, req: Req) -> Self::Future { + fn call(&mut self, req: Req) -> Self::Future { trace!(retryable = %self.policy.is_some()); - if let Some(policy) = self.policy.as_ref() { - return match policy.prepare_request(req) { - Either::A(retry_req) => { - let inner = - Proxy::::wrap_service(self.inner.clone(), svc.clone()); - let retry = tower::retry::Retry::new(policy.clone(), inner); - ResponseFuture::Retry(retry.oneshot(retry_req)) - } - Either::B(req) => ResponseFuture::Disabled(self.inner.proxy(svc, req)), - }; - } - - ResponseFuture::Disabled(self.inner.proxy(svc, req)) - } -} + let policy = match self.policy.as_ref() { + None => return future::Either::Left(self.inner.call(req)), + Some(p) => p, + }; -impl Future for ResponseFuture -where - R: tower::retry::Policy + Clone, - P: Proxy + Clone, - S: tower::Service + Clone, - S::Error: Into, -{ - type Output = Result; + let retry_req = match policy.prepare_request(req) { + Either::A(retry_req) => retry_req, + Either::B(req) => return future::Either::Left(self.inner.call(req)), + }; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project() { - ResponseFutureProj::Disabled(f) => f.poll(cx).map_err(Into::into), - ResponseFutureProj::Retry(f) => f.poll(cx).map_err(Into::into), - } + let inner = self.inner.clone(); + let retry = tower::retry::Retry::new(policy.clone(), inner); + future::Either::Right(retry.oneshot(retry_req)) } } diff --git a/linkerd/service-profiles/src/http.rs b/linkerd/service-profiles/src/http.rs index 6b0d40c39a..80dfa5bcc8 100644 --- a/linkerd/service-profiles/src/http.rs +++ b/linkerd/service-profiles/src/http.rs @@ -1,3 +1,6 @@ +mod proxy; +mod service; + use regex::Regex; use std::{ fmt, @@ -8,7 +11,7 @@ use std::{ }; use tower::retry::budget::Budget; -pub mod route_request; +pub use self::{proxy::NewProxyRouter, service::NewServiceRouter}; #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] pub struct Route { @@ -55,6 +58,18 @@ pub struct Retries { #[derive(Clone, Default)] struct Labels(Arc>); +fn route_for_request<'r, B>( + http_routes: &'r [(RequestMatch, Route)], + request: &http::Request, +) -> Option<&'r Route> { + for (request_match, route) in http_routes { + if request_match.is_match(request) { + return Some(route); + } + } + None +} + // === impl Route === impl Route { diff --git a/linkerd/service-profiles/src/http/proxy.rs b/linkerd/service-profiles/src/http/proxy.rs new file mode 100644 index 0000000000..07fd75f321 --- /dev/null +++ b/linkerd/service-profiles/src/http/proxy.rs @@ -0,0 +1,137 @@ +use super::{RequestMatch, Route}; +use crate::{Profile, Receiver, ReceiverStream}; +use futures::{future, prelude::*}; +use linkerd_error::{Error, Result}; +use linkerd_stack::{layer, NewService, Param, Proxy, Service}; +use std::{ + collections::{hash_map, HashMap, HashSet}, + task::{Context, Poll}, +}; +use tracing::{debug, trace}; + +/// A router that uses a per-route `Proxy` to wrap a common underlying +/// `Service`. +/// +/// This router is similar to `linkerd_stack::NewRouter` and +/// `linkerd_cache::Cache` with a few differences: +/// +/// * It's `Proxy`-specific; +/// * Routes are constructed eagerly as the profile updates; +/// * Routes are removed eagerly as the profile updates (i.e. there's no +/// idle-oriented eviction). +#[derive(Clone, Debug)] +pub struct NewProxyRouter { + new_proxy: M, + new_service: N, +} + +#[derive(Debug)] +pub struct ProxyRouter { + new_proxy: N, + inner: S, + target: T, + rx: ReceiverStream, + http_routes: Vec<(RequestMatch, Route)>, + proxies: HashMap, +} + +// === impl NewProxyRouter === + +impl NewProxyRouter { + pub fn layer(new_proxy: M) -> impl layer::Layer + Clone { + layer::mk(move |new_service| Self { + new_service, + new_proxy: new_proxy.clone(), + }) + } +} + +impl NewService for NewProxyRouter +where + T: Param + Clone, + N: NewService + Clone, + M: NewService<(Route, T)> + Clone, +{ + type Service = ProxyRouter; + + fn new_service(&self, target: T) -> Self::Service { + let rx = target.param(); + let inner = self.new_service.new_service(target.clone()); + ProxyRouter { + inner, + target, + rx: rx.into(), + http_routes: Vec::new(), + proxies: HashMap::new(), + new_proxy: self.new_proxy.clone(), + } + } +} + +// === impl ProxyRouter === + +type ProxyResponseFuture = + future::Either Error>, future::MapErr Error>>; + +impl Service> for ProxyRouter +where + T: Clone, + N: NewService<(Route, T), Service = P> + Clone, + P: Proxy, S, Request = http::Request, Response = Rsp>, + S: Service, Response = Rsp>, + S::Error: Into, +{ + type Response = Rsp; + type Error = Error; + type Future = ProxyResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Poll the inner service first so we don't bother updating routes unless we can actually + // use them. + futures::ready!(self.inner.poll_ready(cx).map_err(Into::into))?; + + // If the routes have been updated, update the cache. + if let Poll::Ready(Some(Profile { http_routes, .. })) = self.rx.poll_next_unpin(cx) { + debug!(routes = %http_routes.len(), "Updating HTTP routes"); + let routes = http_routes + .iter() + .map(|(_, r)| r.clone()) + .collect::>(); + self.http_routes = http_routes; + + // Clear out defunct routes before building any missing routes. + self.proxies.retain(|r, _| routes.contains(r)); + for route in routes.into_iter() { + if let hash_map::Entry::Vacant(ent) = self.proxies.entry(route) { + let proxy = self + .new_proxy + .new_service((ent.key().clone(), self.target.clone())); + ent.insert(proxy); + } + } + } + + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + match super::route_for_request(&self.http_routes, &req) { + None => future::Either::Left({ + // Use the inner service directly if no route matches the + // request. + trace!("No routes matched"); + self.inner.call(req).map_err(Into::into) + }), + Some(route) => future::Either::Right({ + // Otherwise, wrap the inner service with the route-specific + // proxy. + trace!(?route, "Using route proxy"); + self.proxies + .get(route) + .expect("route must exist") + .proxy(&mut self.inner, req) + .map_err(Into::into) + }), + } + } +} diff --git a/linkerd/service-profiles/src/http/route_request.rs b/linkerd/service-profiles/src/http/route_request.rs deleted file mode 100644 index 498708716a..0000000000 --- a/linkerd/service-profiles/src/http/route_request.rs +++ /dev/null @@ -1,134 +0,0 @@ -use super::{RequestMatch, Route}; -use crate::{Profile, Receiver, ReceiverStream}; -use futures::{future, prelude::*, ready}; -use linkerd_error::Error; -use linkerd_http_box::BoxBody; -use linkerd_stack::{layer, NewService, Param, Proxy}; -use std::{ - collections::HashMap, - marker::PhantomData, - task::{Context, Poll}, -}; -use tracing::{debug, trace}; - -pub fn layer( - new_route: N, -) -> impl layer::Layer> { - // This is saved so that the same `Arc`s are used and cloned instead of - // calling `Route::default()` every time. - layer::mk(move |inner| NewRouteRequest { - inner, - new_route: new_route.clone(), - _route: PhantomData, - }) -} - -pub struct NewRouteRequest { - inner: M, - new_route: N, - _route: PhantomData, -} - -pub struct RouteRequest { - target: T, - rx: ReceiverStream, - inner: S, - new_route: N, - http_routes: Vec<(RequestMatch, Route)>, - proxies: HashMap, -} - -impl Clone for NewRouteRequest { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - new_route: self.new_route.clone(), - _route: self._route, - } - } -} - -impl NewService for NewRouteRequest -where - T: Clone + Param, - M: NewService, - N: NewService<(Route, T)> + Clone, -{ - type Service = RouteRequest; - - fn new_service(&self, target: T) -> Self::Service { - let rx = target.param(); - let inner = self.inner.new_service(target.clone()); - RouteRequest { - rx: rx.into(), - target, - inner, - new_route: self.new_route.clone(), - http_routes: Vec::new(), - proxies: HashMap::new(), - } - } -} - -impl tower::Service> for RouteRequest -where - T: Clone, - N: NewService<(Route, T), Service = R> + Clone, - R: Proxy< - http::Request, - S, - Request = http::Request, - Response = http::Response, - >, - S: tower::Service, Response = http::Response>, - S::Error: Into, -{ - type Response = http::Response; - type Error = Error; - type Future = - future::Either, future::ErrInto>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let mut update = None; - while let Poll::Ready(Some(up)) = self.rx.poll_next_unpin(cx) { - tracing::trace!(update = ?up, "updated profile"); - update = Some(up); - } - - // Every time the profile updates, rebuild the distribution, reusing - // services that existed in the prior state. - if let Some(Profile { http_routes, .. }) = update { - debug!(routes = %http_routes.len(), "Updating HTTP routes"); - let mut proxies = HashMap::with_capacity(http_routes.len()); - for (_, ref route) in &http_routes { - // Reuse the prior services whenever possible. - let proxy = self.proxies.remove(route).unwrap_or_else(|| { - debug!(?route, "Creating HTTP route"); - self.new_route - .new_service((route.clone(), self.target.clone())) - }); - proxies.insert(route.clone(), proxy); - } - self.http_routes = http_routes; - self.proxies = proxies; - } - - Poll::Ready(ready!(self.inner.poll_ready(cx)).map_err(Into::into)) - } - - fn call(&mut self, req: http::Request) -> Self::Future { - for (ref condition, ref route) in &self.http_routes { - if condition.is_match(&req) { - trace!(?condition, "Using configured route"); - return future::Either::Left( - self.proxies[route] - .proxy(&mut self.inner, req) - .err_into::(), - ); - } - } - - trace!("No routes matched"); - future::Either::Right(self.inner.call(req).err_into::()) - } -} diff --git a/linkerd/service-profiles/src/http/service.rs b/linkerd/service-profiles/src/http/service.rs new file mode 100644 index 0000000000..f95cc7f813 --- /dev/null +++ b/linkerd/service-profiles/src/http/service.rs @@ -0,0 +1,116 @@ +use super::{RequestMatch, Route}; +use crate::{Profile, Receiver, ReceiverStream}; +use futures::prelude::*; +use linkerd_stack::{layer, NewService, Oneshot, Param, Service, ServiceExt}; +use std::{ + collections::{hash_map, HashMap, HashSet}, + task::{Context, Poll}, +}; +use tracing::{debug, trace}; + +/// A router that uses a per-route `Service` (with a fallback service when no +/// route is matched). +/// +/// This router is similar to `linkerd_stack::NewRouter` and +/// `linkerd_cache::Cache` with a few differences: +/// +/// * Routes are constructed eagerly as the profile updates; +/// * Routes are removed eagerly as the profile updates (i.e. there's no +/// idle-oriented eviction). +#[derive(Clone, Debug)] +pub struct NewServiceRouter(N); + +#[derive(Debug)] +pub struct ServiceRouter { + new_route: N, + target: T, + rx: ReceiverStream, + http_routes: Vec<(RequestMatch, Route)>, + services: HashMap, + default: S, +} + +// === impl NewServiceRouter === + +impl NewServiceRouter { + pub fn layer() -> impl layer::Layer + Clone { + layer::mk(Self) + } +} + +impl NewService for NewServiceRouter +where + T: Param + Clone, + N: NewService<(Option, T)> + Clone, +{ + type Service = ServiceRouter; + + fn new_service(&self, target: T) -> Self::Service { + let rx = target.param(); + let default = self.0.new_service((None, target.clone())); + ServiceRouter { + default, + target, + rx: rx.into(), + http_routes: Vec::new(), + services: HashMap::new(), + new_route: self.0.clone(), + } + } +} + +// === impl ServiceRouter === + +impl Service> for ServiceRouter +where + T: Clone, + N: NewService<(Option, T), Service = S> + Clone, + S: Service> + Clone, +{ + type Response = S::Response; + type Error = S::Error; + type Future = Oneshot>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // If the routes have been updated, update the cache. + if let Poll::Ready(Some(Profile { http_routes, .. })) = self.rx.poll_next_unpin(cx) { + debug!(routes = %http_routes.len(), "Updating HTTP routes"); + let routes = http_routes + .iter() + .map(|(_, r)| r.clone()) + .collect::>(); + self.http_routes = http_routes; + + // Clear out defunct routes before building any missing routes. + self.services.retain(|r, _| routes.contains(r)); + for route in routes.into_iter() { + if let hash_map::Entry::Vacant(ent) = self.services.entry(route) { + let route = ent.key().clone(); + let svc = self + .new_route + .new_service((Some(route), self.target.clone())); + ent.insert(svc); + } + } + } + + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = match super::route_for_request(&self.http_routes, &req) { + Some(route) => { + // If the request matches a route, use the route's service. + trace!(?route, "Using route service"); + self.services.get(route).expect("route must exist").clone() + } + None => { + // Otherwise, use the default service. + trace!("No routes matched"); + self.default.clone() + } + }; + + inner.oneshot(req) + } +} diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index ba939d6678..011d7456fc 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -40,7 +40,7 @@ pub use self::{ monitor::{Monitor, MonitorError, MonitorNewService, MonitorService, NewMonitor}, new_service::NewService, on_service::{OnService, OnServiceLayer}, - proxy::{Proxy, ProxyService}, + proxy::Proxy, result::ResultService, router::{NewRouter, RecognizeRoute}, switch_ready::{NewSwitchReady, SwitchReady}, @@ -49,7 +49,7 @@ pub use self::{ }; pub use tower::{ service_fn, - util::{future_service, FutureService, Oneshot, ServiceExt}, + util::{future_service, BoxCloneService, FutureService, Oneshot, ServiceExt}, Service, }; diff --git a/linkerd/stack/src/proxy.rs b/linkerd/stack/src/proxy.rs index 351ccb2fbc..8e030ffc47 100644 --- a/linkerd/stack/src/proxy.rs +++ b/linkerd/stack/src/proxy.rs @@ -1,7 +1,5 @@ -use futures::{future, TryFutureExt}; use linkerd_error::Error; use std::future::Future; -use std::task::{Context, Poll}; /// A middleware type that cannot exert backpressure. /// @@ -21,22 +19,6 @@ pub trait Proxy> { /// Usually invokes `S::call`, potentially modifying requests or responses. fn proxy(&self, inner: &mut S, req: Req) -> Self::Future; - - /// Wraps an `S` typed service with the proxy. - fn wrap_service(self, inner: S) -> ProxyService - where - Self: Sized, - S: tower::Service, - { - ProxyService::new(self, inner) - } -} - -/// Wraps an `S`-typed `Service` with a `P`-typed `Proxy`. -#[derive(Clone, Debug)] -pub struct ProxyService { - proxy: P, - service: S, } // === impl Proxy === @@ -52,38 +34,8 @@ where type Error = S::Error; type Future = S::Future; + #[inline] fn proxy(&self, inner: &mut S, req: Req) -> Self::Future { inner.call(req) } } - -// === impl ProxyService === - -impl ProxyService { - pub fn new(proxy: P, service: S) -> Self { - Self { proxy, service } - } - - pub fn into_parts(self) -> (P, S) { - (self.proxy, self.service) - } -} - -impl tower::Service for ProxyService -where - P: Proxy, - S: tower::Service, - S::Error: Into, -{ - type Response = P::Response; - type Error = Error; - type Future = future::MapErr Error>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.service.poll_ready(cx).map_err(Into::into) - } - - fn call(&mut self, req: Req) -> Self::Future { - self.proxy.proxy(&mut self.service, req).map_err(Into::into) - } -} diff --git a/linkerd/stack/src/router.rs b/linkerd/stack/src/router.rs index 2db0ae1df4..473e837459 100644 --- a/linkerd/stack/src/router.rs +++ b/linkerd/stack/src/router.rs @@ -31,7 +31,7 @@ impl NewRouter { } } - /// Creates a layer that creates that produces Routers. + /// Creates a layer that produces Routers. /// /// The provided `new_recognize` is expected to implement a `NewService` that /// produces `Recognize` implementations.