From aaede1e47bc2396001106a82fc97fff1979407f1 Mon Sep 17 00:00:00 2001 From: Alex Rudy Date: Mon, 27 May 2024 04:17:49 +0000 Subject: [PATCH] Convert from hyper::Body to http_body::BoxedBody When appropriate, we replace `hyper::Body` with `http_body::BoxedBody`, a good general purpose replacement for `hyper::Body`. Hyper does provide `hyper::body::Incoming`, but we cannot construct that, so anywhere we might need a body that we can construct (even most Service trait impls) we must use something like `http_body::BoxedBody`. When a service accepts `BoxedBody` and not `Incoming`, this indicates that the service is designed to run in places where it is not adjacent to hyper, for example, after routing (which is managed by Axum) Additionally, http >= 1 requires that extension types are `Clone`, so this bound has been added where appropriate. Co-authored-by: Ivan Krivosheev Co-authored-by: Allan Zhang --- examples/src/interceptor/server.rs | 1 + examples/src/tower/client.rs | 3 +- examples/src/tower/server.rs | 10 +++-- interop/src/server.rs | 6 +-- tests/integration_tests/tests/extensions.rs | 9 +++-- tests/integration_tests/tests/origin.rs | 1 + tonic-web/src/layer.rs | 2 +- tonic-web/src/lib.rs | 10 ++--- tonic-web/src/service.rs | 39 +++++++++---------- tonic-web/tests/integration/tests/grpc_web.rs | 10 +++-- tonic/src/body.rs | 6 +-- tonic/src/extensions.rs | 2 +- tonic/src/request.rs | 2 +- tonic/src/transport/server/mod.rs | 11 ++++-- tonic/src/transport/service/connection.rs | 5 +-- 15 files changed, 62 insertions(+), 55 deletions(-) diff --git a/examples/src/interceptor/server.rs b/examples/src/interceptor/server.rs index 263348a6d..fd0cf462f 100644 --- a/examples/src/interceptor/server.rs +++ b/examples/src/interceptor/server.rs @@ -57,6 +57,7 @@ fn intercept(mut req: Request<()>) -> Result, Status> { Ok(req) } +#[derive(Clone)] struct MyExtension { some_piece_of_data: String, } diff --git a/examples/src/tower/client.rs b/examples/src/tower/client.rs index 0a33fffae..39fec5d47 100644 --- a/examples/src/tower/client.rs +++ b/examples/src/tower/client.rs @@ -44,7 +44,6 @@ mod service { use std::pin::Pin; use std::task::{Context, Poll}; use tonic::body::BoxBody; - use tonic::transport::Body; use tonic::transport::Channel; use tower::Service; @@ -59,7 +58,7 @@ mod service { } impl Service> for AuthSvc { - type Response = Response; + type Response = Response; type Error = Box; #[allow(clippy::type_complexity)] type Future = Pin> + Send>>; diff --git a/examples/src/tower/server.rs b/examples/src/tower/server.rs index cc85d62e5..b7066a1b6 100644 --- a/examples/src/tower/server.rs +++ b/examples/src/tower/server.rs @@ -1,4 +1,3 @@ -use hyper::Body; use std::{ pin::Pin, task::{Context, Poll}, @@ -84,9 +83,12 @@ struct MyMiddleware { type BoxFuture<'a, T> = Pin + Send + 'a>>; -impl Service> for MyMiddleware +impl Service> for MyMiddleware where - S: Service, Response = hyper::Response> + Clone + Send + 'static, + S: Service, Response = hyper::Response> + + Clone + + Send + + 'static, S::Future: Send + 'static, { type Response = S::Response; @@ -97,7 +99,7 @@ where self.inner.poll_ready(cx) } - fn call(&mut self, req: hyper::Request) -> Self::Future { + fn call(&mut self, req: hyper::Request) -> Self::Future { // This is necessary because tonic internally uses `tower::buffer::Buffer`. // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149 // for details on why this is necessary diff --git a/interop/src/server.rs b/interop/src/server.rs index b32468866..aef7b0d45 100644 --- a/interop/src/server.rs +++ b/interop/src/server.rs @@ -180,9 +180,9 @@ impl EchoHeadersSvc { } } -impl Service> for EchoHeadersSvc +impl Service> for EchoHeadersSvc where - S: Service, Response = http::Response> + Send, + S: Service, Response = http::Response> + Send, S::Future: Send + 'static, { type Response = S::Response; @@ -193,7 +193,7 @@ where Ok(()).into() } - fn call(&mut self, req: http::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { let echo_header = req.headers().get("x-grpc-test-echo-initial").cloned(); let echo_trailer = req diff --git a/tests/integration_tests/tests/extensions.rs b/tests/integration_tests/tests/extensions.rs index b112f8e66..b2380181d 100644 --- a/tests/integration_tests/tests/extensions.rs +++ b/tests/integration_tests/tests/extensions.rs @@ -1,4 +1,4 @@ -use hyper::{Body, Request as HyperRequest, Response as HyperResponse}; +use hyper::{Request as HyperRequest, Response as HyperResponse}; use integration_tests::{ pb::{test_client, test_server, Input, Output}, BoxFuture, @@ -16,6 +16,7 @@ use tonic::{ }; use tower_service::Service; +#[derive(Clone)] struct ExtensionValue(i32); #[tokio::test] @@ -112,9 +113,9 @@ struct InterceptedService { inner: S, } -impl Service> for InterceptedService +impl Service> for InterceptedService where - S: Service, Response = HyperResponse> + S: Service, Response = HyperResponse> + NamedService + Clone + Send @@ -129,7 +130,7 @@ where self.inner.poll_ready(cx) } - fn call(&mut self, mut req: HyperRequest) -> Self::Future { + fn call(&mut self, mut req: HyperRequest) -> Self::Future { let clone = self.inner.clone(); let mut inner = std::mem::replace(&mut self.inner, clone); diff --git a/tests/integration_tests/tests/origin.rs b/tests/integration_tests/tests/origin.rs index f149dc68d..c8140c79f 100644 --- a/tests/integration_tests/tests/origin.rs +++ b/tests/integration_tests/tests/origin.rs @@ -7,6 +7,7 @@ use std::time::Duration; use tokio::sync::oneshot; use tonic::codegen::http::Request; use tonic::{ + body::BoxBody, transport::{Endpoint, Server}, Response, Status, }; diff --git a/tonic-web/src/layer.rs b/tonic-web/src/layer.rs index 77b03c77e..7834f1990 100644 --- a/tonic-web/src/layer.rs +++ b/tonic-web/src/layer.rs @@ -24,7 +24,7 @@ impl Default for GrpcWebLayer { impl Layer for GrpcWebLayer where - S: Service, Response = http::Response>, + S: Service, Response = http::Response>, S: Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, diff --git a/tonic-web/src/lib.rs b/tonic-web/src/lib.rs index 16e57e19d..50ed8c0a8 100644 --- a/tonic-web/src/lib.rs +++ b/tonic-web/src/lib.rs @@ -127,7 +127,7 @@ type BoxError = Box; /// You can customize the CORS configuration composing the [`GrpcWebLayer`] with the cors layer of your choice. pub fn enable(service: S) -> CorsGrpcWeb where - S: Service, Response = http::Response>, + S: Service, Response = http::Response>, S: Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, @@ -159,9 +159,9 @@ where #[derive(Debug, Clone)] pub struct CorsGrpcWeb(tower_http::cors::Cors>); -impl Service> for CorsGrpcWeb +impl Service> for CorsGrpcWeb where - S: Service, Response = http::Response>, + S: Service, Response = http::Response>, S: Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, @@ -169,7 +169,7 @@ where type Response = S::Response; type Error = S::Error; type Future = - > as Service>>::Future; + > as Service>>::Future; fn poll_ready( &mut self, @@ -178,7 +178,7 @@ where self.0.poll_ready(cx) } - fn call(&mut self, req: http::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { self.0.call(req) } } diff --git a/tonic-web/src/service.rs b/tonic-web/src/service.rs index af4c5276f..da65ba832 100644 --- a/tonic-web/src/service.rs +++ b/tonic-web/src/service.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::task::{ready, Context, Poll}; use http::{header, HeaderMap, HeaderValue, Method, Request, Response, StatusCode, Version}; -use hyper::Body; +use http_body_util::BodyExt; use pin_project::pin_project; use tonic::{ body::{empty_body, BoxBody}, @@ -50,7 +50,7 @@ impl GrpcWebService { impl GrpcWebService where - S: Service, Response = Response> + Send + 'static, + S: Service, Response = Response> + Send + 'static, { fn response(&self, status: StatusCode) -> ResponseFuture { ResponseFuture { @@ -66,9 +66,9 @@ where } } -impl Service> for GrpcWebService +impl Service> for GrpcWebService where - S: Service, Response = Response> + Send + 'static, + S: Service, Response = Response> + Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, { @@ -80,7 +80,7 @@ where self.inner.poll_ready(cx) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { match RequestKind::new(req.headers(), req.method(), req.version()) { // A valid grpc-web request, regardless of HTTP version. // @@ -202,7 +202,7 @@ impl<'a> RequestKind<'a> { // Mutating request headers to conform to a gRPC request is not really // necessary for us at this point. We could remove most of these except // maybe for inserting `header::TE`, which tonic should check? -fn coerce_request(mut req: Request, encoding: Encoding) -> Request { +fn coerce_request(mut req: Request, encoding: Encoding) -> Request { req.headers_mut().remove(header::CONTENT_LENGTH); req.headers_mut() @@ -216,8 +216,7 @@ fn coerce_request(mut req: Request, encoding: Encoding) -> Request { HeaderValue::from_static("identity,deflate,gzip"), ); - req.map(|b| GrpcWebCall::request(b, encoding)) - .map(Body::wrap_stream) + req.map(|b| GrpcWebCall::request(b, encoding).boxed_unsync()) } fn coerce_response(res: Response, encoding: Encoding) -> Response { @@ -246,7 +245,7 @@ mod tests { #[derive(Debug, Clone)] struct Svc; - impl tower_service::Service> for Svc { + impl tower_service::Service> for Svc { type Response = Response; type Error = String; type Future = BoxFuture; @@ -255,7 +254,7 @@ mod tests { Poll::Ready(Ok(())) } - fn call(&mut self, _: Request) -> Self::Future { + fn call(&mut self, _: Request) -> Self::Future { Box::pin(async { Ok(Response::new(empty_body())) }) } } @@ -266,15 +265,14 @@ mod tests { mod grpc_web { use super::*; - use http::HeaderValue; use tower_layer::Layer; - fn request() -> Request { + fn request() -> Request { Request::builder() .method(Method::POST) .header(CONTENT_TYPE, GRPC_WEB) .header(ORIGIN, "http://example.com") - .body(Body::empty()) + .body(empty_body()) .unwrap() } @@ -350,13 +348,13 @@ mod tests { mod options { use super::*; - fn request() -> Request { + fn request() -> Request { Request::builder() .method(Method::OPTIONS) .header(ORIGIN, "http://example.com") .header(ACCESS_CONTROL_REQUEST_HEADERS, "x-grpc-web") .header(ACCESS_CONTROL_REQUEST_METHOD, "POST") - .body(Body::empty()) + .body(empty_body()) .unwrap() } @@ -371,13 +369,12 @@ mod tests { mod grpc { use super::*; - use http::HeaderValue; - fn request() -> Request { + fn request() -> Request { Request::builder() .version(Version::HTTP_2) .header(CONTENT_TYPE, GRPC) - .body(Body::empty()) + .body(empty_body()) .unwrap() } @@ -397,7 +394,7 @@ mod tests { let req = Request::builder() .header(CONTENT_TYPE, GRPC) - .body(Body::empty()) + .body(empty_body()) .unwrap(); let res = svc.call(req).await.unwrap(); @@ -425,10 +422,10 @@ mod tests { mod other { use super::*; - fn request() -> Request { + fn request() -> Request { Request::builder() .header(CONTENT_TYPE, "application/text") - .body(Body::empty()) + .body(empty_body()) .unwrap() } diff --git a/tonic-web/tests/integration/tests/grpc_web.rs b/tonic-web/tests/integration/tests/grpc_web.rs index 3343d754c..037ff8dad 100644 --- a/tonic-web/tests/integration/tests/grpc_web.rs +++ b/tonic-web/tests/integration/tests/grpc_web.rs @@ -2,11 +2,13 @@ use std::net::SocketAddr; use base64::Engine as _; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use http_body_util::{BodyExt as _, Full}; use hyper::http::{header, StatusCode}; -use hyper::{Body, Client, Method, Request, Uri}; +use hyper::{Method, Request, Uri}; use prost::Message; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; +use tonic::body::BoxBody; use tonic::transport::Server; use integration::pb::{test_server::TestServer, Input, Output}; @@ -102,7 +104,7 @@ fn encode_body() -> Bytes { buf.split_to(len + 5).freeze() } -fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request { +fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request { use header::{ACCEPT, CONTENT_TYPE, ORIGIN}; let request_uri = format!("{}/{}/{}", base_uri, "test.Test", "UnaryCall") @@ -123,7 +125,9 @@ fn build_request(base_uri: String, content_type: &str, accept: &str) -> Request< .header(ORIGIN, "http://example.com") .header(ACCEPT, format!("application/{}", accept)) .uri(request_uri) - .body(Body::from(bytes)) + .body(BoxBody::new( + Full::new(bytes).map_err(|err| Status::internal(err.to_string())), + )) .unwrap() } diff --git a/tonic/src/body.rs b/tonic/src/body.rs index ef95eec47..428c0dade 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -1,9 +1,9 @@ //! HTTP specific body utilities. -use http_body::Body; +use http_body_util::BodyExt; /// A type erased HTTP body used for tonic services. -pub type BoxBody = http_body::combinators::UnsyncBoxBody; +pub type BoxBody = http_body_util::combinators::UnsyncBoxBody; /// Convert a [`http_body::Body`] into a [`BoxBody`]. pub(crate) fn boxed(body: B) -> BoxBody @@ -16,7 +16,7 @@ where /// Create an empty `BoxBody` pub fn empty_body() -> BoxBody { - http_body::Empty::new() + http_body_util::Empty::new() .map_err(|err| match err {}) .boxed_unsync() } diff --git a/tonic/src/extensions.rs b/tonic/src/extensions.rs index 37d84b87b..32b9ad021 100644 --- a/tonic/src/extensions.rs +++ b/tonic/src/extensions.rs @@ -24,7 +24,7 @@ impl Extensions { /// If a extension of this type already existed, it will /// be returned. #[inline] - pub fn insert(&mut self, val: T) -> Option { + pub fn insert(&mut self, val: T) -> Option { self.inner.insert(val) } diff --git a/tonic/src/request.rs b/tonic/src/request.rs index a27a7070c..f2cca7c74 100644 --- a/tonic/src/request.rs +++ b/tonic/src/request.rs @@ -313,6 +313,7 @@ impl Request { /// ```no_run /// use tonic::{Request, service::interceptor}; /// + /// #[derive(Clone)] // Extensions must be Clone /// struct MyExtension { /// some_piece_of_data: String, /// } @@ -440,7 +441,6 @@ pub(crate) enum SanitizeHeaders { #[cfg(test)] mod tests { use super::*; - use crate::metadata::MetadataValue; use http::Uri; #[test] diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index 7f2ffde2b..ad930c617 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -35,12 +35,13 @@ use crate::transport::Error; use self::recover_error::RecoverError; use super::service::{GrpcTimeout, ServerIo}; +use crate::body::boxed; use crate::body::BoxBody; use crate::server::NamedService; use bytes::Bytes; use http::{Request, Response}; -use http_body::Body as _; -use hyper::{server::accept, Body}; +use http_body_util::BodyExt; +use hyper::server::accept; use pin_project::pin_project; use std::{ convert::Infallible, @@ -63,9 +64,11 @@ use tower::{ Service, ServiceBuilder, }; -type BoxHttpBody = http_body::combinators::UnsyncBoxBody; -type BoxService = tower::util::BoxService, Response, crate::Error>; type TraceInterceptor = Arc) -> tracing::Span + Send + Sync + 'static>; +type BoxHttpBody = crate::body::BoxBody; +type Body = hyper::body::Incoming; // Temporary type alias to ease transition +type BoxError = crate::Error; +type BoxService = tower::util::BoxCloneService, Response, crate::Error>; const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20; diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 46a88dda5..b3428aa2c 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -1,6 +1,6 @@ use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent}; use crate::{ - body::BoxBody, + body::{boxed, BoxBody}, transport::{BoxFuture, Endpoint}, }; use http::Uri; @@ -21,8 +21,7 @@ use tower::{ }; use tower_service::Service; -pub(crate) type Request = http::Request; -pub(crate) type Response = http::Response; +pub(crate) use crate::transport::{Request, Response}; pub(crate) struct Connection { inner: BoxService,