Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry: allow retrying requests without Content-Length headers #1341

Merged
merged 9 commits into from
Nov 1, 2021
11 changes: 11 additions & 0 deletions hyper-balance/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ where

this.body.poll_trailers(cx)
}

#[inline]
fn size_hint(&self) -> hyper::body::SizeHint {
self.body.size_hint()
}
}

// ==== PendingUntilEosBody ====
Expand All @@ -147,6 +152,7 @@ impl<T: Send + 'static, B: HttpBody> HttpBody for PendingUntilEosBody<T, B> {
type Data = B::Data;
type Error = B::Error;

#[inline]
fn is_end_stream(&self) -> bool {
self.body.is_end_stream()
}
Expand Down Expand Up @@ -181,6 +187,11 @@ impl<T: Send + 'static, B: HttpBody> HttpBody for PendingUntilEosBody<T, B> {

Poll::Ready(ret)
}

#[inline]
fn size_hint(&self) -> hyper::body::SizeHint {
self.body.size_hint()
}
}

#[cfg(test)]
Expand Down
91 changes: 38 additions & 53 deletions linkerd/app/core/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,55 +58,33 @@ impl retry::NewPolicy<Route> for NewRetryPolicy {

// === impl Retry ===

impl RetryPolicy {
fn can_retry<A: http_body::Body>(&self, req: &http::Request<A>) -> bool {
let content_length = |req: &http::Request<_>| {
req.headers()
.get(http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok()?.parse::<usize>().ok())
};

// Requests without bodies can always be retried, as we will not need to
// buffer the body. If the request *does* have a body, retry it if and
// only if the request contains a `content-length` header and the
// content length is >= 64 kb.
let has_body = !req.body().is_end_stream();
if has_body && content_length(req).unwrap_or(usize::MAX) > MAX_BUFFERED_BYTES {
tracing::trace!(
req.has_body = has_body,
req.content_length = ?content_length(req),
"not retryable",
);
return false;
}

tracing::trace!(
req.has_body = has_body,
req.content_length = ?content_length(req),
"retryable",
);
true
}
}

impl<A, B, E> retry::Policy<http::Request<A>, http::Response<B>, E> for RetryPolicy
impl<A, B, E> retry::Policy<http::Request<ReplayBody<A>>, http::Response<B>, E> for RetryPolicy
where
A: http_body::Body + Clone,
A: http_body::Body + Unpin,
A::Error: Into<Error>,
{
type Future = future::Ready<Self>;

fn retry(
&self,
req: &http::Request<A>,
req: &http::Request<ReplayBody<A>>,
result: Result<&http::Response<B>, &E>,
) -> Option<Self::Future> {
let retryable = match result {
Err(_) => false,
Ok(rsp) => classify::Request::from(self.response_classes.clone())
.classify(req)
.start(rsp)
.eos(None)
.is_failure(),
Ok(rsp) => {
// is the request a failure?
let is_failure = classify::Request::from(self.response_classes.clone())
.classify(req)
.start(rsp)
.eos(None)
.is_failure();
// did the body exceed the maximum length limit?
let exceeded_max_len = req.body().is_capped();
let retryable = is_failure && !exceeded_max_len;
tracing::trace!(is_failure, exceeded_max_len, retryable);
retryable
}
};

if !retryable {
Expand All @@ -123,16 +101,12 @@ where
Some(future::ready(self.clone()))
}

fn clone_request(&self, req: &http::Request<A>) -> Option<http::Request<A>> {
let can_retry = self.can_retry(req);
debug_assert!(
can_retry,
"The retry policy attempted to clone an un-retryable request. This is unexpected."
);
if !can_retry {
return None;
}

fn clone_request(
&self,
req: &http::Request<ReplayBody<A>>,
) -> Option<http::Request<ReplayBody<A>>> {
// Since the body is already wrapped in a ReplayBody, it must not be obviously too large to
// buffer/clone.
let mut clone = http::Request::new(req.body().clone());
*clone.method_mut() = req.method().clone();
*clone.uri_mut() = req.uri().clone();
Expand Down Expand Up @@ -160,9 +134,20 @@ where
&self,
req: http::Request<A>,
) -> Either<Self::RetryRequest, http::Request<A>> {
if self.can_retry(&req) {
return Either::A(req.map(|body| ReplayBody::new(body, MAX_BUFFERED_BYTES)));
}
Either::B(req)
let (head, body) = req.into_parts();
let replay_body = match ReplayBody::try_new(body, MAX_BUFFERED_BYTES) {
Ok(body) => body,
Err(body) => {
tracing::debug!(
size = body.size_hint().lower(),
"Body is too large to buffer"
);
return Either::B(http::Request::from_parts(head, body));
}
};

// The body may still be too large to be buffered if the body's length was not known.
// `ReplayBody` handles this gracefully.
Either::A(http::Request::from_parts(head, replay_body))
}
}
52 changes: 48 additions & 4 deletions linkerd/app/integration/src/tests/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ macro_rules! profile_test {
async move {
// Read the entire body before responding, so that the
// client doesn't fail when writing it out.
let _body = hyper::body::aggregate(req.into_body()).await;
let _body = hyper::body::to_bytes(req.into_body()).await;
tracing::debug!(body = ?_body.as_ref().map(|body| body.len()), "recieved body");
Ok::<_, Error>(if fail {
Response::builder()
.status(533)
Expand Down Expand Up @@ -222,6 +223,36 @@ async fn retry_with_small_put_body() {
}
}

#[tokio::test]
async fn retry_without_content_length() {
profile_test! {
routes: [
controller::route()
.request_any()
.response_failure(500..600)
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(10), 0.1, 1)),
with_client: |client: client::Client| async move {
let (mut tx, body) = hyper::body::Body::channel();
let req = client.request_builder("/0.5")
.method("POST")
.body(body)
.unwrap();
let res = tokio::spawn(async move { client.request_body(req).await });
tx.send_data(Bytes::from_static(b"hello"))
.await
.expect("the whole body should be read");
tx.send_data(Bytes::from_static(b"world"))
.await
.expect("the whole body should be read");
drop(tx);
let res = res.await.unwrap();
assert_eq!(res.status(), 200);
}
}
}

#[tokio::test]
async fn does_not_retry_if_request_does_not_match() {
profile_test! {
Expand Down Expand Up @@ -280,7 +311,9 @@ async fn does_not_retry_if_body_is_too_long() {
}

#[tokio::test]
async fn does_not_retry_if_body_lacks_known_length() {
async fn does_not_retry_if_streaming_body_exceeds_max_length() {
// TODO(eliza): if we make the max length limit configurable, update this
// test to test the configurable max length limit...
profile_test! {
routes: [
controller::route()
Expand All @@ -296,10 +329,21 @@ async fn does_not_retry_if_body_lacks_known_length() {
.body(body)
.unwrap();
let res = tokio::spawn(async move { client.request_body(req).await });
let _ = tx.send_data(Bytes::from_static(b"hello"));
let _ = tx.send_data(Bytes::from_static(b"world"));
// send a 32k chunk
tx.send_data(Bytes::from(&[1u8; 32 * 1024][..]))
.await
.expect("the whole body should be read");
// ...and another one...
tx.send_data(Bytes::from(&[1u8; 32 * 1024][..]))
.await
.expect("the whole body should be read");
// ...and a third one (exceeding the max length limit)
tx.send_data(Bytes::from(&[1u8; 32 * 1024][..]))
.await
.expect("the whole body should be read");
drop(tx);
let res = res.await.unwrap();

assert_eq!(res.status(), 533);
}
}
Expand Down
4 changes: 1 addition & 3 deletions linkerd/app/integration/src/tests/transparency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,9 +843,7 @@ macro_rules! http1_tests {

let srv = server::http1()
.route_fn("/", |req| {
let has_body_header = req.headers().contains_key("transfer-encoding")
|| req.headers().contains_key("content-length");
let status = if has_body_header {
let status = if req.headers().contains_key(http::header::TRANSFER_ENCODING) {
StatusCode::BAD_REQUEST
} else {
StatusCode::OK
Expand Down
7 changes: 7 additions & 0 deletions linkerd/http-box/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,28 @@ impl Body for BoxBody {
type Data = Data;
type Error = Error;

#[inline]
fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

#[inline]
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.as_mut().inner.as_mut().poll_data(cx)
}

#[inline]
fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
self.as_mut().inner.as_mut().poll_trailers(cx)
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
Expand Down Expand Up @@ -95,6 +99,7 @@ where
type Data = Data;
type Error = Error;

#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
Expand All @@ -111,13 +116,15 @@ where
}))
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
Poll::Ready(futures::ready!(self.project().0.poll_trailers(cx)).map_err(Into::into))
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.0.size_hint()
}
Expand Down
6 changes: 6 additions & 0 deletions linkerd/http-metrics/src/requests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ where
self.project().inner.poll_trailers(cx)
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
Expand Down Expand Up @@ -438,6 +439,11 @@ where

Poll::Ready(Ok(trls))
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
}

#[pinned_drop]
Expand Down
Loading