Skip to content

Commit

Permalink
Convert from hyper::Body to http_body::BoxedBody
Browse files Browse the repository at this point in the history
When appropriate, we replace `hyper::Body` with `http_body::BoxedBody`, a good general purpose replacement for `hyper::Body`.

Hyper does provide `hyper::body::Incoming`, but we cannot construct that, so anywhere we might need a body that we can construct (even most Service trait impls) we must use something like `http_body::BoxedBody`.

When a service accepts `BoxedBody` and not `Incoming`, this indicates that the service is designed to run in places where it is not adjacent to hyper, for example, after routing (which is managed by Axum)

Additionally, http >= 1 requires that extension types are `Clone`, so this bound has been added where appropriate.

Co-authored-by: Ivan Krivosheev <py.krivosheev@gmail.com>
Co-authored-by: Allan Zhang <allanzhang7@gmail.com>
  • Loading branch information
3 people committed Jun 12, 2024
1 parent 20369f5 commit aaede1e
Show file tree
Hide file tree
Showing 15 changed files with 62 additions and 55 deletions.
1 change: 1 addition & 0 deletions examples/src/interceptor/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ fn intercept(mut req: Request<()>) -> Result<Request<()>, Status> {
Ok(req)
}

#[derive(Clone)]
struct MyExtension {
some_piece_of_data: String,
}
3 changes: 1 addition & 2 deletions examples/src/tower/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ mod service {
use std::pin::Pin;
use std::task::{Context, Poll};
use tonic::body::BoxBody;
use tonic::transport::Body;
use tonic::transport::Channel;
use tower::Service;

Expand All @@ -59,7 +58,7 @@ mod service {
}

impl Service<Request<BoxBody>> for AuthSvc {
type Response = Response<Body>;
type Response = Response<BoxBody>;
type Error = Box<dyn std::error::Error + Send + Sync>;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
Expand Down
10 changes: 6 additions & 4 deletions examples/src/tower/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use hyper::Body;
use std::{
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -84,9 +83,12 @@ struct MyMiddleware<S> {

type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;

impl<S> Service<hyper::Request<Body>> for MyMiddleware<S>
impl<S> Service<hyper::Request<BoxBody>> for MyMiddleware<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
S: Service<hyper::Request<BoxBody>, Response = hyper::Response<BoxBody>>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
Expand All @@ -97,7 +99,7 @@ where
self.inner.poll_ready(cx)
}

fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
fn call(&mut self, req: hyper::Request<BoxBody>) -> Self::Future {
// This is necessary because tonic internally uses `tower::buffer::Buffer`.
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary
Expand Down
6 changes: 3 additions & 3 deletions interop/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ impl<S> EchoHeadersSvc<S> {
}
}

impl<S> Service<http::Request<hyper::Body>> for EchoHeadersSvc<S>
impl<S> Service<http::Request<BoxBody>> for EchoHeadersSvc<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>> + Send,
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>> + Send,
S::Future: Send + 'static,
{
type Response = S::Response;
Expand All @@ -193,7 +193,7 @@ where
Ok(()).into()
}

fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
let echo_header = req.headers().get("x-grpc-test-echo-initial").cloned();

let echo_trailer = req
Expand Down
9 changes: 5 additions & 4 deletions tests/integration_tests/tests/extensions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use hyper::{Body, Request as HyperRequest, Response as HyperResponse};
use hyper::{Request as HyperRequest, Response as HyperResponse};
use integration_tests::{
pb::{test_client, test_server, Input, Output},
BoxFuture,
Expand All @@ -16,6 +16,7 @@ use tonic::{
};
use tower_service::Service;

#[derive(Clone)]
struct ExtensionValue(i32);

#[tokio::test]
Expand Down Expand Up @@ -112,9 +113,9 @@ struct InterceptedService<S> {
inner: S,
}

impl<S> Service<HyperRequest<Body>> for InterceptedService<S>
impl<S> Service<HyperRequest<BoxBody>> for InterceptedService<S>
where
S: Service<HyperRequest<Body>, Response = HyperResponse<BoxBody>>
S: Service<HyperRequest<BoxBody>, Response = HyperResponse<BoxBody>>
+ NamedService
+ Clone
+ Send
Expand All @@ -129,7 +130,7 @@ where
self.inner.poll_ready(cx)
}

fn call(&mut self, mut req: HyperRequest<Body>) -> Self::Future {
fn call(&mut self, mut req: HyperRequest<BoxBody>) -> Self::Future {
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);

Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/tests/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Duration;
use tokio::sync::oneshot;
use tonic::codegen::http::Request;
use tonic::{
body::BoxBody,
transport::{Endpoint, Server},
Response, Status,
};
Expand Down
2 changes: 1 addition & 1 deletion tonic-web/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Default for GrpcWebLayer {

impl<S> Layer<S> for GrpcWebLayer
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>>,
S: Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
Expand Down
10 changes: 5 additions & 5 deletions tonic-web/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type BoxError = Box<dyn std::error::Error + Send + Sync>;
/// You can customize the CORS configuration composing the [`GrpcWebLayer`] with the cors layer of your choice.
pub fn enable<S>(service: S) -> CorsGrpcWeb<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>>,
S: Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
Expand Down Expand Up @@ -159,17 +159,17 @@ where
#[derive(Debug, Clone)]
pub struct CorsGrpcWeb<S>(tower_http::cors::Cors<GrpcWebService<S>>);

impl<S> Service<http::Request<hyper::Body>> for CorsGrpcWeb<S>
impl<S> Service<http::Request<BoxBody>> for CorsGrpcWeb<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>>,
S: Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
{
type Response = S::Response;
type Error = S::Error;
type Future =
<tower_http::cors::Cors<GrpcWebService<S>> as Service<http::Request<hyper::Body>>>::Future;
<tower_http::cors::Cors<GrpcWebService<S>> as Service<http::Request<BoxBody>>>::Future;

fn poll_ready(
&mut self,
Expand All @@ -178,7 +178,7 @@ where
self.0.poll_ready(cx)
}

fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
self.0.call(req)
}
}
Expand Down
39 changes: 18 additions & 21 deletions tonic-web/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::pin::Pin;
use std::task::{ready, Context, Poll};

use http::{header, HeaderMap, HeaderValue, Method, Request, Response, StatusCode, Version};
use hyper::Body;
use http_body_util::BodyExt;
use pin_project::pin_project;
use tonic::{
body::{empty_body, BoxBody},
Expand Down Expand Up @@ -50,7 +50,7 @@ impl<S> GrpcWebService<S> {

impl<S> GrpcWebService<S>
where
S: Service<Request<Body>, Response = Response<BoxBody>> + Send + 'static,
S: Service<Request<BoxBody>, Response = Response<BoxBody>> + Send + 'static,
{
fn response(&self, status: StatusCode) -> ResponseFuture<S::Future> {
ResponseFuture {
Expand All @@ -66,9 +66,9 @@ where
}
}

impl<S> Service<Request<Body>> for GrpcWebService<S>
impl<S> Service<Request<BoxBody>> for GrpcWebService<S>
where
S: Service<Request<Body>, Response = Response<BoxBody>> + Send + 'static,
S: Service<Request<BoxBody>, Response = Response<BoxBody>> + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
{
Expand All @@ -80,7 +80,7 @@ where
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
match RequestKind::new(req.headers(), req.method(), req.version()) {
// A valid grpc-web request, regardless of HTTP version.
//
Expand Down Expand Up @@ -202,7 +202,7 @@ impl<'a> RequestKind<'a> {
// Mutating request headers to conform to a gRPC request is not really
// necessary for us at this point. We could remove most of these except
// maybe for inserting `header::TE`, which tonic should check?
fn coerce_request(mut req: Request<Body>, encoding: Encoding) -> Request<Body> {
fn coerce_request(mut req: Request<BoxBody>, encoding: Encoding) -> Request<BoxBody> {
req.headers_mut().remove(header::CONTENT_LENGTH);

req.headers_mut()
Expand All @@ -216,8 +216,7 @@ fn coerce_request(mut req: Request<Body>, encoding: Encoding) -> Request<Body> {
HeaderValue::from_static("identity,deflate,gzip"),
);

req.map(|b| GrpcWebCall::request(b, encoding))
.map(Body::wrap_stream)
req.map(|b| GrpcWebCall::request(b, encoding).boxed_unsync())
}

fn coerce_response(res: Response<BoxBody>, encoding: Encoding) -> Response<BoxBody> {
Expand Down Expand Up @@ -246,7 +245,7 @@ mod tests {
#[derive(Debug, Clone)]
struct Svc;

impl tower_service::Service<Request<Body>> for Svc {
impl tower_service::Service<Request<BoxBody>> for Svc {
type Response = Response<BoxBody>;
type Error = String;
type Future = BoxFuture<Self::Response, Self::Error>;
Expand All @@ -255,7 +254,7 @@ mod tests {
Poll::Ready(Ok(()))
}

fn call(&mut self, _: Request<Body>) -> Self::Future {
fn call(&mut self, _: Request<BoxBody>) -> Self::Future {
Box::pin(async { Ok(Response::new(empty_body())) })
}
}
Expand All @@ -266,15 +265,14 @@ mod tests {

mod grpc_web {
use super::*;
use http::HeaderValue;
use tower_layer::Layer;

fn request() -> Request<Body> {
fn request() -> Request<BoxBody> {
Request::builder()
.method(Method::POST)
.header(CONTENT_TYPE, GRPC_WEB)
.header(ORIGIN, "http://example.com")
.body(Body::empty())
.body(empty_body())
.unwrap()
}

Expand Down Expand Up @@ -350,13 +348,13 @@ mod tests {
mod options {
use super::*;

fn request() -> Request<Body> {
fn request() -> Request<BoxBody> {
Request::builder()
.method(Method::OPTIONS)
.header(ORIGIN, "http://example.com")
.header(ACCESS_CONTROL_REQUEST_HEADERS, "x-grpc-web")
.header(ACCESS_CONTROL_REQUEST_METHOD, "POST")
.body(Body::empty())
.body(empty_body())
.unwrap()
}

Expand All @@ -371,13 +369,12 @@ mod tests {

mod grpc {
use super::*;
use http::HeaderValue;

fn request() -> Request<Body> {
fn request() -> Request<BoxBody> {
Request::builder()
.version(Version::HTTP_2)
.header(CONTENT_TYPE, GRPC)
.body(Body::empty())
.body(empty_body())
.unwrap()
}

Expand All @@ -397,7 +394,7 @@ mod tests {

let req = Request::builder()
.header(CONTENT_TYPE, GRPC)
.body(Body::empty())
.body(empty_body())
.unwrap();

let res = svc.call(req).await.unwrap();
Expand Down Expand Up @@ -425,10 +422,10 @@ mod tests {
mod other {
use super::*;

fn request() -> Request<Body> {
fn request() -> Request<BoxBody> {
Request::builder()
.header(CONTENT_TYPE, "application/text")
.body(Body::empty())
.body(empty_body())
.unwrap()
}

Expand Down
10 changes: 7 additions & 3 deletions tonic-web/tests/integration/tests/grpc_web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use std::net::SocketAddr;

use base64::Engine as _;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use http_body_util::{BodyExt as _, Full};
use hyper::http::{header, StatusCode};
use hyper::{Body, Client, Method, Request, Uri};
use hyper::{Method, Request, Uri};
use prost::Message;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::body::BoxBody;
use tonic::transport::Server;

use integration::pb::{test_server::TestServer, Input, Output};
Expand Down Expand Up @@ -102,7 +104,7 @@ fn encode_body() -> Bytes {
buf.split_to(len + 5).freeze()
}

fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request<Body> {
fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request<BoxBody> {
use header::{ACCEPT, CONTENT_TYPE, ORIGIN};

let request_uri = format!("{}/{}/{}", base_uri, "test.Test", "UnaryCall")
Expand All @@ -123,7 +125,9 @@ fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request<
.header(ORIGIN, "http://example.com")
.header(ACCEPT, format!("application/{}", accept))
.uri(request_uri)
.body(Body::from(bytes))
.body(BoxBody::new(
Full::new(bytes).map_err(|err| Status::internal(err.to_string())),
))
.unwrap()
}

Expand Down
6 changes: 3 additions & 3 deletions tonic/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! HTTP specific body utilities.

use http_body::Body;
use http_body_util::BodyExt;

/// A type erased HTTP body used for tonic services.
pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, crate::Status>;
pub type BoxBody = http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, crate::Status>;

/// Convert a [`http_body::Body`] into a [`BoxBody`].
pub(crate) fn boxed<B>(body: B) -> BoxBody
Expand All @@ -16,7 +16,7 @@ where

/// Create an empty `BoxBody`
pub fn empty_body() -> BoxBody {
http_body::Empty::new()
http_body_util::Empty::new()
.map_err(|err| match err {})
.boxed_unsync()
}
2 changes: 1 addition & 1 deletion tonic/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Extensions {
/// If a extension of this type already existed, it will
/// be returned.
#[inline]
pub fn insert<T: Send + Sync + 'static>(&mut self, val: T) -> Option<T> {
pub fn insert<T: Clone + Send + Sync + 'static>(&mut self, val: T) -> Option<T> {
self.inner.insert(val)
}

Expand Down
2 changes: 1 addition & 1 deletion tonic/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ impl<T> Request<T> {
/// ```no_run
/// use tonic::{Request, service::interceptor};
///
/// #[derive(Clone)] // Extensions must be Clone
/// struct MyExtension {
/// some_piece_of_data: String,
/// }
Expand Down Expand Up @@ -440,7 +441,6 @@ pub(crate) enum SanitizeHeaders {
#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::MetadataValue;
use http::Uri;

#[test]
Expand Down
Loading

0 comments on commit aaede1e

Please sign in to comment.