Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http/retry): add a unit test suite to PeekTrailersBody<B> #3556

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
feat(http/retry): unit test suite for PeekTrailersBody<B>
`PeekTrailersBody<B>` contains some subtle edge cases related to the
number of DATA frames yielded by the inner body, and how persistent it
will be about polling for TRAILERS frames.

for example, if it yields a DATA frame, it will not await trailers being
available, but it *will* do so if the inner body does not yield a DATA
frame. if a DATA frame is yielded, it will check for a TRAILERS frame,
but it must be immmediately available.

this is all subtle, and particularly subject to change with the upgrade
to http-body 1.0's frame-oriented `Body` interface.

so, this commit introduces a test suite for `PeekTrailersBody<B>`. it
includes assertions to confirm when the peek middleware can and cannot
observe the trailers.

some `TODO(kate)` comments are left where issues exist.

Signed-off-by: katelyn martin <kate@buoyant.io>
  • Loading branch information
cratelyn committed Jan 22, 2025
commit 9be9042748cb4048a1f24103b77e80d41997a1f2
182 changes: 182 additions & 0 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,185 @@ where
hint
}
}

#[cfg(test)]
mod tests {
use super::PeekTrailersBody;
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body::Body;
use linkerd_error::Error;
use std::{
collections::VecDeque,
ops::Not,
pin::Pin,
task::{Context, Poll},
};

/// A "mock" body.
///
/// This type contains polling results for [`Body`].
#[derive(Default)]
struct MockBody {
data_polls: VecDeque<Poll<Option<Result<Bytes, Error>>>>,
trailer_polls: VecDeque<Poll<Result<Option<http::HeaderMap>, Error>>>,
}

fn data() -> Option<Result<Bytes, Error>> {
let bytes = Bytes::from_static(b"hello");
Some(Ok(bytes))
}

fn trailers() -> Result<Option<http::HeaderMap>, Error> {
let mut trls = HeaderMap::with_capacity(1);
let value = HeaderValue::from_static("shiny");
trls.insert("trailer", value);
Ok(Some(trls))
}

#[tokio::test]
async fn cannot_peek_empty() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let empty = MockBody::default();
let peek = PeekTrailersBody::read_body(empty).await;
assert!(peek.peek_trailers().is_none());
// TODO(kate): this will not return `true`.
// assert!(peek.is_end_stream());
}

#[tokio::test]
async fn peeks_only_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let only_trailers = MockBody::default().then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(only_trailers).await;
assert!(peek.peek_trailers().is_some());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn peeks_one_frame_with_immediate_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_some());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn cannot_peek_one_frame_with_eventual_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Pending)
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_none());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn peeks_one_eventual_frame_with_immediate_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Pending)
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_some());
assert!(peek.is_end_stream().not());
}

#[tokio::test]
async fn cannot_peek_two_frames_with_immediate_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let body = MockBody::default()
.then_yield_data(Poll::Ready(data()))
.then_yield_data(Poll::Ready(data()))
.then_yield_trailer(Poll::Ready(trailers()));
let peek = PeekTrailersBody::read_body(body).await;
assert!(peek.peek_trailers().is_none());
assert!(peek.is_end_stream().not());
}

// === impl MockBody ===

impl MockBody {
/// Appends a poll outcome for [`Body::poll_data()`].
fn then_yield_data(mut self, poll: Poll<Option<Result<Bytes, Error>>>) -> Self {
self.data_polls.push_back(poll);
self
}

/// Appends a poll outcome for [`Body::poll_trailers()`].
fn then_yield_trailer(
mut self,
poll: Poll<Result<Option<http::HeaderMap>, Error>>,
) -> Self {
self.trailer_polls.push_back(poll);
self
}

/// Schedules a task to be awoken.
fn schedule(cx: &Context<'_>) {
let waker = cx.waker().clone();
tokio::spawn(async move {
waker.wake();
});
}
}

impl Body for MockBody {
type Data = Bytes;
type Error = Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let poll = self
.get_mut()
.data_polls
.pop_front()
.unwrap_or(Poll::Ready(None));
// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}
poll
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let Self {
data_polls,
trailer_polls,
} = self.get_mut();

let poll = if data_polls.is_empty() {
trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None)))
} else {
// If the data frames have not all been yielded, yield `Pending`.
//
// TODO(kate): this arm should panic. it indicates `PeekTrailersBody<B>` isn't
// respecting the contract outlined in
// <https://docs.rs/http-body/0.4.6/http_body/trait.Body.html#tymethod.poll_trailers>.
Poll::Pending
};

// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}

poll
}

fn is_end_stream(&self) -> bool {
self.data_polls.is_empty() && self.trailer_polls.is_empty()
}
}
}
Loading