diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index 3e16fb2d16..6ee80d07d7 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -10,6 +10,7 @@ edition = "2021" rust-version = "1.65" [features] +default = ["hyper"] hyper = ["dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:tokio"] reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"] reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"] diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index 79a72029b3..768c12507d 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -105,23 +105,51 @@ pub mod hyper { use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; use http::HeaderValue; - use http_body_util::BodyExt; - use hyper::body::Body; + use http_body_util::{BodyExt, Full}; + use hyper::body::{Body as HttpBody, Frame}; use hyper_util::client::legacy::{connect::Connect, Client}; - use std::error::Error; + use std::convert::Infallible; use std::fmt::Debug; + use std::pin::Pin; + use std::task::{self, Poll}; use std::time::Duration; use tokio::time; + pub struct Body(Full); + + impl HttpBody for Body { + type Data = Bytes; + type Error = Infallible; + + #[inline] + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>>> { + let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) }; + inner_body.poll_frame(cx) + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.0.is_end_stream() + } + + #[inline] + fn size_hint(&self) -> hyper::body::SizeHint { + self.0.size_hint() + } + } + #[derive(Debug, Clone)] - pub struct HyperClient { - inner: Client, + pub struct HyperClient { + inner: Client, timeout: Duration, authorization: Option, } - impl HyperClient { - pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { + impl HyperClient { + pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { Self { inner, timeout, @@ -130,7 +158,7 @@ pub mod hyper { } pub fn new_with_timeout_and_authorization_header( - inner: Client, + inner: Client, timeout: Duration, authorization: HeaderValue, ) -> Self { @@ -143,16 +171,13 @@ pub mod hyper { } #[async_trait] - impl HttpClient for HyperClient + impl HttpClient for HyperClient where C: Connect + Send + Sync + Clone + Debug + 'static, - B: From> + Body + Send + Sync + Debug + Unpin + 'static, - ::Data: Send, - ::Error: Into>, { async fn send(&self, request: Request>) -> Result, HttpError> { let (parts, body) = request.into_parts(); - let mut request = Request::from_parts(parts, B::from(body)); + let mut request = Request::from_parts(parts, Body(Full::from(body))); if let Some(ref authorization) = self.authorization { request .headers_mut()