Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ where
// Set request extensions based on the route configuration
// AND/OR headers
.push(extensions::NewSetExtensions::layer())
.push(metrics::layer(&metrics.requests, &metrics.body_data))
.push(metrics::layer(
&metrics.requests,
&metrics.statuses,
&metrics.body_data,
))
.push_on_service(crate::http::BoxResponse::layer())
.check_new::<Self>()
.check_new_service::<Self, http::Request<http::BoxBody>>()
// Configure a classifier to use in the endpoint stack.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use linkerd_app_core::{metrics::prom, svc};
use linkerd_http_prom::{
body_data::response::{BodyDataMetrics, ResponseBodyFamilies},
count_reqs::{RequestCount, RequestCountFamilies},
record_response,
record_response, status,
stream_label::{LabelSet, StreamLabel},
};

Expand All @@ -22,15 +22,14 @@ where
{
requests: RequestCountFamilies<labels::RouteBackend>,
responses: ResponseMetrics<L>,
statuses: status::StatusMetrics<L::StatusLabels>,
body_metrics: ResponseBodyFamilies<labels::RouteBackend>,
}

type ResponseMetrics<L> = record_response::ResponseMetrics<
<L as StreamLabel>::DurationLabels,
<L as StreamLabel>::StatusLabels,
>;
type ResponseMetrics<L> = record_response::ResponseMetrics<<L as StreamLabel>::DurationLabels>;

type Instrumented<T, N> = NewRecordBodyData<NewCountRequests<NewResponseDuration<T, N>>>;
type Instrumented<T, N> =
NewRecordBodyData<NewCountRequests<NewRecordStatusCode<T, NewResponseDuration<T, N>>>>;
type NewRecordBodyData<N> =
linkerd_http_prom::body_data::response::NewRecordBodyData<ExtractRecordBodyDataParams, N>;
type NewCountRequests<N> = linkerd_http_prom::count_reqs::NewCountRequests<ExtractRequestCount, N>;
Expand All @@ -52,6 +51,7 @@ where
let RouteBackendMetrics {
requests,
responses,
statuses,
body_metrics,
} = metrics.clone();

Expand All @@ -60,10 +60,11 @@ where

let record = NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()));
let count = NewCountRequests::layer_via(ExtractRequestCount(requests.clone()));
let status = NewRecordStatusCode::layer_via(ExtractStatusCodeParams::new(statuses.clone()));
let body_data =
NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_metrics.clone()));

body_data.layer(count.layer(record.layer(inner)))
body_data.layer(count.layer(status.layer(record.layer(inner))))
})
}

Expand All @@ -84,10 +85,15 @@ where
pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator<Item = f64>) -> Self {
let requests = RequestCountFamilies::register(reg);
let responses = record_response::ResponseMetrics::register(reg, histo);
let statuses = status::StatusMetrics::register(
reg.sub_registry_with_prefix("response"),
"Completed responses",
);
let body_metrics = ResponseBodyFamilies::register(reg);
Self {
requests,
responses,
statuses,
body_metrics,
}
}
Expand All @@ -104,7 +110,7 @@ where

#[cfg(test)]
pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter {
self.responses.get_statuses(l)
self.statuses.metric(l)
}

#[cfg(test)]
Expand All @@ -126,6 +132,7 @@ where
Self {
requests: Default::default(),
responses: Default::default(),
statuses: Default::default(),
body_metrics: Default::default(),
}
}
Expand All @@ -141,6 +148,7 @@ where
Self {
requests: self.requests.clone(),
responses: self.responses.clone(),
statuses: self.statuses.clone(),
body_metrics: self.body_metrics.clone(),
}
}
Expand Down
65 changes: 58 additions & 7 deletions linkerd/app/outbound/src/http/logical/policy/route/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use linkerd_app_core::{
};
use linkerd_http_prom::{
body_data::request::{BodyDataMetrics, NewRecordBodyData, RequestBodyFamilies},
record_response,
record_response, status,
stream_label::{
error::LabelError,
status::{LabelGrpcStatus, LabelHttpStatus},
Expand All @@ -22,10 +22,7 @@ pub(super) mod test_util;
#[cfg(test)]
mod tests;

pub type RequestMetrics<R> = record_response::RequestMetrics<
<R as StreamLabel>::DurationLabels,
<R as StreamLabel>::StatusLabels,
>;
pub type RequestMetrics<R> = record_response::RequestMetrics<<R as StreamLabel>::DurationLabels>;

#[derive(Debug)]
pub struct RouteMetrics<R, B>
Expand All @@ -39,6 +36,7 @@ where
{
pub(super) retry: retry::RouteRetryMetrics,
pub(super) requests: RequestMetrics<R>,
pub(super) statuses: status::StatusMetrics<R::StatusLabels>,
pub(super) backend: backend::RouteBackendMetrics<B>,
pub(super) body_data: RequestBodyFamilies<labels::Route>,
}
Expand Down Expand Up @@ -90,27 +88,46 @@ pub struct ExtractBodyDataMetrics<X> {
extract: X,
}

/// An `N`-typed [`NewService<T>`] with status code metrics.
pub type NewRecordStatusCode<T, N> = status::NewRecordStatusCode<
N,
ExtractStatusCodeParams<<T as MkStreamLabel>::StatusLabels>,
T,
<T as MkStreamLabel>::StatusLabels,
>;

/// [`NewRecordStatusCode<T, N>`] parameters.
type StatusCodeParams<T> = status::Params<T, <T as MkStreamLabel>::StatusLabels>;

/// Extracts parameters for request status code metrics.
#[derive(Clone, Debug)]
pub struct ExtractStatusCodeParams<L>(status::StatusMetrics<L>);

pub fn layer<T, N>(
metrics: &RequestMetrics<T::StreamLabel>,
statuses: &status::StatusMetrics<T::StatusLabels>,
body_data: &RequestBodyFamilies<labels::Route>,
) -> impl svc::Layer<
N,
Service = NewRecordBodyData<
NewRecordDuration<T, RequestMetrics<T::StreamLabel>, N>,
NewRecordStatusCode<T, NewRecordDuration<T, RequestMetrics<T::StreamLabel>, N>>,
ExtractBodyDataParams,
ExtractBodyDataMetrics<T>,
>,
>
where
N: svc::NewService<T>,
T: Clone + MkStreamLabel,
T: svc::ExtractParam<labels::Route, http::Request<http::BoxBody>>,
T::StatusLabels: Clone,
{
let record = NewRecordDuration::layer_via(ExtractRecordDurationParams(metrics.clone()));
let status = NewRecordStatusCode::layer_via(ExtractStatusCodeParams(statuses.clone()));
let body_data = NewRecordBodyData::layer_via(ExtractBodyDataParams(body_data.clone()));

svc::layer::mk(move |inner| {
use svc::Layer;
body_data.layer(record.layer(inner))
body_data.layer(status.layer(record.layer(inner)))
})
}

Expand Down Expand Up @@ -153,6 +170,7 @@ where
Self {
requests: Default::default(),
backend: Default::default(),
statuses: Default::default(),
retry: Default::default(),
body_data: Default::default(),
}
Expand All @@ -172,6 +190,7 @@ where
Self {
requests: self.requests.clone(),
backend: self.backend.clone(),
statuses: self.statuses.clone(),
retry: self.retry.clone(),
body_data: self.body_data.clone(),
}
Expand All @@ -195,11 +214,16 @@ where
Self::RESPONSE_BUCKETS.iter().copied(),
);

let statuses = status::StatusMetrics::register(
reg.sub_registry_with_prefix("request"),
"Completed request-response streams",
);
let retry = retry::RouteRetryMetrics::register(reg.sub_registry_with_prefix("retry"));
let body_data = RequestBodyFamilies::register(reg);

Self {
requests,
statuses,
backend,
retry,
body_data,
Expand Down Expand Up @@ -257,6 +281,33 @@ where
}
}

// === impl ExtractStatusCodeParams ===

impl<L> ExtractStatusCodeParams<L> {
pub fn new(metrics: status::StatusMetrics<L>) -> Self {
Self(metrics)
}
}

impl<L, T> svc::ExtractParam<StatusCodeParams<T>, T> for ExtractStatusCodeParams<L>
where
T: Clone,
T: MkStreamLabel<StatusLabels = L>,
L: Clone,
{
fn extract_param(&self, target: &T) -> StatusCodeParams<T> {
let Self(metrics) = self;

let mk_stream_label = target.clone();
let metrics = metrics.clone();

StatusCodeParams {
mk_stream_label,
metrics,
}
}
}

// === impl LabelHttpRsp ===

impl<P> From<P> for LabelHttpRsp<P> {
Expand Down
Loading
Loading