Skip to content

Commit

Permalink
outbound: Use per-route services in routing stack (#1380)
Browse files Browse the repository at this point in the history
The outbound proxy currently maintains a `Proxy` instance for each
profile-defined `Route`. This allows the router to use a single
underlying logical service, but this prevents per-route logic from
influencing the logical target to which the request is dispatched.

This change modifies the outbound stack to use an alternate profile
router that dispatches over per-route `Service`s instead of per-route
`Proxy`s. This is possible because the outbound logical stack is
buffered (and implements `Clone`), while the inbound stack is not.

To support this, the following changes have been made:

* The `linkerd_app_core::dst` module has been eliminated in favor of
  inbound- and outbound-specific Route target types;
* `linkerd_http_classify::Classify` now implements `Service` in addition
  to `Proxy`;
* `linkerd_retry::Retry` now implements `Service` instead of `Proxy`;
* `linkerd_service_profiles`now implements two caching routers:
  `NewServiceRouter` for `Service`s and `NewProxyRouter` for `Proxy`s;
* The `linkerd_stack::ProxyService` helper has been removed, as it's not
  used; and
* Various unneeded `http::BoxResponse` layers have been removed from the
  outound stack.
  • Loading branch information
olix0r authored Nov 18, 2021
1 parent 2f19d9c commit 5a97236
Show file tree
Hide file tree
Showing 19 changed files with 498 additions and 385 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1478,9 +1478,9 @@ dependencies = [
name = "linkerd-retry"
version = "0.1.0"
dependencies = [
"futures",
"linkerd-error",
"linkerd-stack",
"pin-project",
"tower",
"tracing",
]
Expand Down
28 changes: 0 additions & 28 deletions linkerd/app/core/src/dst.rs

This file was deleted.

1 change: 0 additions & 1 deletion linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub mod classify;
pub mod config;
pub mod control;
pub mod dns;
pub mod dst;
pub mod errors;
pub mod http_tracing;
pub mod metrics;
Expand Down
24 changes: 17 additions & 7 deletions linkerd/app/core/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub use crate::transport::labels::{TargetAddr, TlsAccept};
use crate::{
classify::{Class, SuccessOrFailure},
control, dst, http_metrics, http_metrics as metrics, opencensus, profiles, stack_metrics,
control, http_metrics, http_metrics as metrics, opencensus, profiles, stack_metrics,
svc::Param,
telemetry, tls,
transport::{self, labels::TlsConnect},
Expand Down Expand Up @@ -216,12 +216,22 @@ impl FmtLabels for ControlLabels {

// === impl RouteLabels ===

impl Param<RouteLabels> for dst::Route {
fn param(&self) -> RouteLabels {
RouteLabels {
addr: self.addr.clone(),
direction: self.direction,
labels: prefix_labels("rt", self.route.labels().iter()),
impl RouteLabels {
pub fn inbound(addr: profiles::LogicalAddr, route: &profiles::http::Route) -> Self {
let labels = prefix_labels("rt", route.labels().iter());
Self {
addr,
labels,
direction: Direction::In,
}
}

pub fn outbound(addr: profiles::LogicalAddr, route: &profiles::http::Route) -> Self {
let labels = prefix_labels("rt", route.labels().iter());
Self {
addr,
labels,
direction: Direction::Out,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use linkerd_error::Recover;
use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream};
pub use linkerd_reconnect::NewReconnect;
pub use linkerd_stack::{
self as stack, layer, ArcNewService, BoxService, BoxServiceLayer, Either, ExtractParam, Fail,
FailFast, Filter, InsertParam, MapErr, MapTargetLayer, NewRouter, NewService, Param, Predicate,
UnwrapOr,
self as stack, layer, ArcNewService, BoxCloneService, BoxService, BoxServiceLayer, Either,
ExtractParam, Fail, FailFast, Filter, InsertParam, MapErr, MapTargetLayer, NewRouter,
NewService, Param, Predicate, UnwrapOr,
};
pub use linkerd_stack_tracing::{NewInstrument, NewInstrumentLayer};
use std::{
Expand Down
108 changes: 64 additions & 44 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{policy, stack_labels, Inbound};
use linkerd_app_core::{
classify, dst, errors, http_tracing, io, metrics,
classify, errors, http_tracing, io, metrics,
profiles::{self, DiscoveryRejected},
proxy::{http, tap},
svc::{self, ExtractParam, Param},
tls,
transport::{self, ClientAddr, Remote, ServerAddr},
Error, Infallible, NameAddr, Result,
};
use std::{borrow::Borrow, net::SocketAddr};
use std::net::SocketAddr;
use tracing::{debug, debug_span};

/// Describes an HTTP client target.
Expand Down Expand Up @@ -48,6 +48,12 @@ struct Profile {
profiles: profiles::Receiver,
}

#[derive(Clone, Debug)]
struct Route {
profile: Profile,
route: profiles::http::Route,
}

#[derive(Copy, Clone, Debug)]
struct ClientRescue;

Expand Down Expand Up @@ -112,49 +118,42 @@ impl<C> Inbound<C> {
.http_endpoint
.to_layer::<classify::Response, _, _>(),
)
.push_on_service(http_tracing::client(
rt.span_sink.clone(),
super::trace_labels(),
))
.push_on_service(svc::layers()
.push(http::BoxResponse::layer())
// This box is needed to reduce compile times on recent (2021-10-17) nightlies,
// though this may be fixed by https://github.com/rust-lang/rust/pull/89831. It
// should be removed when possible.
.push(svc::BoxService::layer())
.push_on_service(
svc::layers()
.push(http_tracing::client(
rt.span_sink.clone(),
super::trace_labels(),
))
.push(http::BoxResponse::layer())
// This box is needed to reduce compile times on recent
// (2021-10-17) nightlies, though this may be fixed by
// https://github.com/rust-lang/rust/pull/89831. It should
// be removed when possible.
.push(svc::BoxService::layer())
);

// Attempts to discover a service profile for each logical target (as
// informed by the request's headers). The stack is cached until a
// request has not been received for `cache_max_idle_age`.
http.clone()
.check_new_service::<Logical, http::Request<http::BoxBody>>()
// The HTTP stack doesn't use the profile resolution, so drop it.
.push_map_target(Logical::from)
.push_on_service(http::BoxResponse::layer())
.push(profiles::http::route_request::layer(
.push_map_target(|p: Profile| p.logical)
.push(profiles::http::NewProxyRouter::layer(
// If the request matches a route, use a per-route proxy to
// wrap the inner service.
svc::proxies()
.push_map_target(|r: Route| r.profile.logical)
.push_on_service(http::BoxRequest::layer())
// Records per-route metrics.
.push(
rt.metrics.proxy
rt.metrics
.proxy
.http_route
.to_layer::<classify::Response, _, dst::Route>(),
.to_layer::<classify::Response, _, _>(),
)
// Sets the per-route response classifier as a request
// extension.
.push(classify::NewClassify::layer())
// Sets the route as a request extension so that it can be used
// by tap.
.push_http_insert_target::<dst::Route>()
.push_map_target(|(route, logical): (profiles::http::Route, Profile)| {
dst::Route {
route,
addr: logical.addr,
direction: metrics::Direction::In,
}
})
.push_on_service(http::BoxResponse::layer())
.push(classify::NewClassify::layer())
.push_http_insert_target::<profiles::http::Route>()
.push_map_target(|(route, profile)| Route { route, profile })
.into_inner(),
))
.push_switch(
Expand All @@ -163,7 +162,7 @@ impl<C> Inbound<C> {
// the underlying target stack directly.
|(rx, logical): (Option<profiles::Receiver>, Logical)| -> Result<_, Infallible> {
if let Some(rx) = rx {
if let Some(addr) = rx.borrow().logical_addr() {
if let Some(addr) = rx.logical_addr() {
return Ok(svc::Either::A(Profile {
addr,
logical,
Expand All @@ -174,7 +173,6 @@ impl<C> Inbound<C> {
Ok(svc::Either::B(logical))
},
http.clone()
.push_on_service(http::BoxResponse::layer())
.check_new_service::<Logical, http::Request<_>>()
.into_inner(),
)
Expand All @@ -196,19 +194,13 @@ impl<C> Inbound<C> {
Ok(profiles::LookupAddr(addr.into()))
}))
.instrument(|_: &Logical| debug_span!("profile"))
.push_on_service(
svc::layers()
.push(http::BoxResponse::layer())
.push(svc::layer::mk(svc::SpawnReady::new)),
)
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
// Skip the profile stack if it takes too long to become ready.
.push_when_unready(
config.profile_idle_timeout,
http.clone()
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
http.push_on_service(svc::layer::mk(svc::SpawnReady::new))
.into_inner(),
)
.check_new_service::<Logical, http::Request<http::BoxBody>>()
.push_on_service(
svc::layers()
.push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical")))
Expand Down Expand Up @@ -311,6 +303,34 @@ impl<A> svc::stack::RecognizeRoute<http::Request<A>> for LogicalPerRequest {
}
}

// === impl Route ===

impl Param<profiles::http::Route> for Route {
fn param(&self) -> profiles::http::Route {
self.route.clone()
}
}

impl Param<metrics::RouteLabels> for Route {
fn param(&self) -> metrics::RouteLabels {
metrics::RouteLabels::inbound(self.profile.addr.clone(), &self.route)
}
}

impl classify::CanClassify for Route {
type Classify = classify::Request;

fn classify(&self) -> classify::Request {
self.route.response_classes().clone().into()
}
}

impl Param<http::ResponseTimeout> for Route {
fn param(&self) -> http::ResponseTimeout {
http::ResponseTimeout(self.route.timeout())
}
}

// === impl Profile ===

impl Param<profiles::Receiver> for Profile {
Expand Down Expand Up @@ -387,8 +407,8 @@ impl tap::Inspect for Logical {

fn route_labels<B>(&self, req: &http::Request<B>) -> Option<tap::Labels> {
req.extensions()
.get::<dst::Route>()
.map(|r| r.route.labels().clone())
.get::<profiles::http::Route>()
.map(|r| r.labels().clone())
}

fn is_outbound<B>(&self, _: &http::Request<B>) -> bool {
Expand Down
51 changes: 37 additions & 14 deletions linkerd/app/outbound/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) use self::{require_id_header::IdentityRequired, server::ServerRescue}
use crate::tcp;
pub use linkerd_app_core::proxy::http::*;
use linkerd_app_core::{
dst,
classify, metrics,
profiles::{self, LogicalAddr},
proxy::{api_resolve::ProtocolHint, tap},
svc::Param,
Expand All @@ -30,6 +30,12 @@ pub type Logical = crate::logical::Logical<Version>;
pub type Concrete = crate::logical::Concrete<Version>;
pub type Endpoint = crate::endpoint::Endpoint<Version>;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Route {
logical: Logical,
route: profiles::http::Route,
}

#[derive(Clone, Debug)]
pub struct CanonicalDstHeader(pub Addr);

Expand Down Expand Up @@ -85,17 +91,6 @@ impl Param<Version> for Logical {
}
}

impl Logical {
pub fn mk_route((route, logical): (profiles::http::Route, Self)) -> dst::Route {
use linkerd_app_core::metrics::Direction;
dst::Route {
route,
addr: logical.logical_addr,
direction: Direction::Out,
}
}
}

impl Param<normalize_uri::DefaultAuthority> for Logical {
fn param(&self) -> normalize_uri::DefaultAuthority {
normalize_uri::DefaultAuthority(Some(
Expand Down Expand Up @@ -190,11 +185,39 @@ impl tap::Inspect for Endpoint {

fn route_labels<B>(&self, req: &Request<B>) -> Option<tap::Labels> {
req.extensions()
.get::<dst::Route>()
.map(|r| r.route.labels().clone())
.get::<profiles::http::Route>()
.map(|r| r.labels().clone())
}

fn is_outbound<B>(&self, _: &Request<B>) -> bool {
true
}
}

// === impl Route ===

impl Param<profiles::http::Route> for Route {
fn param(&self) -> profiles::http::Route {
self.route.clone()
}
}

impl Param<metrics::RouteLabels> for Route {
fn param(&self) -> metrics::RouteLabels {
metrics::RouteLabels::outbound(self.logical.logical_addr.clone(), &self.route)
}
}

impl Param<ResponseTimeout> for Route {
fn param(&self) -> ResponseTimeout {
ResponseTimeout(self.route.timeout())
}
}

impl classify::CanClassify for Route {
type Classify = classify::Request;

fn classify(&self) -> classify::Request {
self.route.response_classes().clone().into()
}
}
Loading

0 comments on commit 5a97236

Please sign in to comment.