Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aws-smithy-types): Impl http-body-1.0 Body for SdkBody #3380

Merged
merged 7 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@ message = "The MSRV has been increase to 1.74.1"
references = ["smithy-rs#3410"]
meta = { "bug" = false, "breaking" = false, tada = false }
author = "rcoh"

[[smithy-rs]]
message = "[`SdkBody`](https://docs.rs/aws-smithy-types/latest/aws_smithy_types/body/struct.SdkBody.html) now implements the 1.0 version of the `http_body` trait."
rcoh marked this conversation as resolved.
Show resolved Hide resolved
references = ["smithy-rs#3365", "aws-sdk-rust#1046"]
meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "all" }
author = "cayman-amzn"
rcoh marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 2 additions & 1 deletion rust-runtime/aws-smithy-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repository = "https://github.com/smithy-lang/smithy-rs"
[features]
byte-stream-poll-next = []
http-body-0-4-x = ["dep:http-body-0-4"]
http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http-1x"]
http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http-1x", "dep:hyper-1-0"]
hyper-0-14-x = ["dep:hyper-0-14"]
rt-tokio = [
"dep:http-body-0-4",
Expand All @@ -38,6 +38,7 @@ http-body-0-4 = { package = "http-body", version = "0.4.4", optional = true }
http-body-1-0 = { package = "http-body", version = "1", optional = true }
http-body-util = { version = "0.1.0", optional = true }
hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true }
hyper-1-0 = { package = "hyper", version = "1", optional = true }
itoa = "1.0.0"
num-integer = "0.1.44"
pin-project-lite = "0.2.9"
Expand Down
6 changes: 2 additions & 4 deletions rust-runtime/aws-smithy-types/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ pin_project! {
/// For handling responses, the type of the body will be controlled
/// by the HTTP stack.
///
// TODO(naming): Consider renaming to simply `Body`, although I'm concerned about naming headaches
// between hyper::Body and our Body
pub struct SdkBody {
#[pin]
inner: Inner,
Expand Down Expand Up @@ -191,10 +189,10 @@ impl SdkBody {
}
}

#[cfg(feature = "http-body-0-4-x")]
#[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x",))]
pub(crate) fn poll_next_trailers(
self: Pin<&mut Self>,
#[allow(unused)] cx: &mut Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap<http::HeaderValue>>, Error>> {
let this = self.project();
match this.inner.project() {
Expand Down
124 changes: 119 additions & 5 deletions rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,53 @@ impl SdkBody {
{
SdkBody::from_body_0_4_internal(Http1toHttp04::new(body.map_err(Into::into)))
}

pub(crate) fn poll_data_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body_1_0::Frame<Bytes>, Error>>> {
match ready!(self.as_mut().poll_next(cx)) {
// if there's no more data, try to return trailers
None => match ready!(self.poll_next_trailers(cx)) {
Ok(Some(trailers)) => Poll::Ready(Some(Ok(http_body_1_0::Frame::trailers(
convert_headers_0x_1x(trailers),
)))),
Ok(None) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(e))),
},
Some(result) => match result {
Err(err) => Poll::Ready(Some(Err(err))),
Ok(bytes) => Poll::Ready(Some(Ok(http_body_1_0::Frame::data(bytes)))),
},
}
}
}

#[cfg(feature = "http-body-1-x")]
impl http_body_1_0::Body for SdkBody {
type Data = Bytes;
type Error = Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body_1_0::Frame<Self::Data>, Self::Error>>> {
self.poll_data_frame(cx)
}

fn is_end_stream(&self) -> bool {
self.is_end_stream()
}

fn size_hint(&self) -> http_body_1_0::SizeHint {
let mut hint = http_body_1_0::SizeHint::default();
let (lower, upper) = self.bounds_on_remaining_length();
hint.set_lower(lower);
if let Some(upper) = upper {
hint.set_upper(upper);
}
hint
}
}

pin_project! {
Expand Down Expand Up @@ -83,7 +130,7 @@ where
// already read everything
let this = self.project();
match this.trailers.take() {
Some(headers) => Poll::Ready(Ok(Some(convert_header_map(headers)))),
Some(headers) => Poll::Ready(Ok(Some(convert_headers_1x_0x(headers)))),
None => Poll::Ready(Ok(None)),
}
}
Expand All @@ -107,7 +154,7 @@ where
}
}

fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap {
fn convert_headers_1x_0x(input: http_1x::HeaderMap) -> http::HeaderMap {
let mut map = http::HeaderMap::with_capacity(input.capacity());
let mut mem: Option<http_1x::HeaderName> = None;
for (k, v) in input.into_iter() {
Expand All @@ -121,6 +168,20 @@ fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap {
map
}

fn convert_headers_0x_1x(input: http::HeaderMap) -> http_1x::HeaderMap {
let mut map = http_1x::HeaderMap::with_capacity(input.capacity());
let mut mem: Option<http::HeaderName> = None;
for (k, v) in input.into_iter() {
let name = k.or_else(|| mem.clone()).unwrap();
map.append(
http_1x::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
http_1x::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
);
mem = Some(name);
}
map
}

#[cfg(test)]
mod test {
use std::collections::VecDeque;
Expand All @@ -132,8 +193,9 @@ mod test {
use http_1x::header::{CONTENT_LENGTH as CL1, CONTENT_TYPE as CT1};
use http_1x::{HeaderMap, HeaderName, HeaderValue};
use http_body_1_0::Frame;
use http_body_util::BodyExt;

use crate::body::http_body_1_x::convert_header_map;
use crate::body::http_body_1_x::{convert_headers_1x_0x, Http1toHttp04};
use crate::body::{Error, SdkBody};
use crate::byte_stream::ByteStream;

Expand Down Expand Up @@ -215,10 +277,46 @@ mod test {
while let Some(_data) = http_body_0_4::Body::data(&mut body).await {}
assert_eq!(
http_body_0_4::Body::trailers(&mut body).await.unwrap(),
Some(convert_header_map(trailers()))
Some(convert_headers_1x_0x(trailers()))
);
}

#[tokio::test]
async fn test_read_trailers_as_1x() {
let body = TestBody {
chunks: vec![
Chunk::Data("123"),
Chunk::Data("456"),
Chunk::Data("789"),
Chunk::Trailers(trailers()),
]
.into(),
};
let body = SdkBody::from_body_1_x(body);

let collected = BodyExt::collect(body).await.expect("should succeed");
assert_eq!(collected.trailers(), Some(&trailers()));
assert_eq!(collected.to_bytes().as_ref(), b"123456789");
}

#[tokio::test]
async fn test_trailers_04x_to_1x() {
let body = TestBody {
chunks: vec![
Chunk::Data("123"),
Chunk::Data("456"),
Chunk::Data("789"),
Chunk::Trailers(trailers()),
]
.into(),
};
let body = SdkBody::from_body_0_4(Http1toHttp04::new(body));

let collected = BodyExt::collect(body).await.expect("should succeed");
assert_eq!(collected.trailers(), Some(&trailers()));
assert_eq!(collected.to_bytes().as_ref(), b"123456789");
}

#[tokio::test]
async fn test_errors() {
let body = TestBody {
Expand All @@ -235,6 +333,7 @@ mod test {
let body = ByteStream::new(body);
body.collect().await.expect_err("body returned an error");
}

#[tokio::test]
async fn test_no_trailers() {
let body = TestBody {
Expand Down Expand Up @@ -262,6 +361,21 @@ mod test {

expect.insert(CL0, http::HeaderValue::from_static("1234"));

assert_eq!(convert_header_map(http1_headermap), expect);
assert_eq!(convert_headers_1x_0x(http1_headermap), expect);
}

#[test]
fn sdkbody_debug_dyn() {
let body = TestBody {
chunks: vec![
Chunk::Data("123"),
Chunk::Data("456"),
Chunk::Data("789"),
Chunk::Trailers(trailers()),
]
.into(),
};
let body = SdkBody::from_body_1_x(body);
assert!(format!("{:?}", body).contains("BoxBody"));
}
}
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-types/src/byte_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ pin_project! {
/// 3. **From an `SdkBody` directly**: For more advanced / custom use cases, a ByteStream can be created directly
/// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable
/// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable).
/// ```no_run
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might have been something about the host I pulled the repo down on, but it was still compiling for some reason. This was the only way I could get it to pass the initial build (locally)

/// ```ignore
/// # use hyper_0_14 as hyper;
/// use aws_smithy_types::byte_stream::ByteStream;
/// use aws_smithy_types::body::SdkBody;
Expand Down
Loading