Skip to content

Commit

Permalink
Add opaque wrapper around Full body, use wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Jul 11, 2024
1 parent 19ddbd8 commit 33ed871
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 13 deletions.
1 change: 1 addition & 0 deletions opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ edition = "2021"
rust-version = "1.65"

[features]
default = ["hyper"]
hyper = ["dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:tokio"]
reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"]
reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"]
Expand Down
51 changes: 38 additions & 13 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,51 @@ pub mod hyper {

use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
use http::HeaderValue;
use http_body_util::BodyExt;
use hyper::body::Body;
use http_body_util::{BodyExt, Full};
use hyper::body::{Body as HttpBody, Frame};
use hyper_util::client::legacy::{connect::Connect, Client};
use std::error::Error;
use std::convert::Infallible;
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
use tokio::time;

pub struct Body(Full<Bytes>);

impl HttpBody for Body {
type Data = Bytes;
type Error = Infallible;

#[inline]
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) };
inner_body.poll_frame(cx)
}

#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}

#[inline]
fn size_hint(&self) -> hyper::body::SizeHint {
self.0.size_hint()
}
}

#[derive(Debug, Clone)]
pub struct HyperClient<C, B> {
inner: Client<C, B>,
pub struct HyperClient<C> {
inner: Client<C, Body>,
timeout: Duration,
authorization: Option<HeaderValue>,
}

impl<C, B> HyperClient<C, B> {
pub fn new_with_timeout(inner: Client<C, B>, timeout: Duration) -> Self {
impl<C> HyperClient<C> {
pub fn new_with_timeout(inner: Client<C, Body>, timeout: Duration) -> Self {
Self {
inner,
timeout,
Expand All @@ -130,7 +158,7 @@ pub mod hyper {
}

pub fn new_with_timeout_and_authorization_header(
inner: Client<C, B>,
inner: Client<C, Body>,
timeout: Duration,
authorization: HeaderValue,
) -> Self {
Expand All @@ -143,16 +171,13 @@ pub mod hyper {
}

#[async_trait]
impl<C, B> HttpClient for HyperClient<C, B>
impl<C> HttpClient for HyperClient<C>
where
C: Connect + Send + Sync + Clone + Debug + 'static,
B: From<Vec<u8>> + Body + Send + Sync + Debug + Unpin + 'static,
<B as Body>::Data: Send,
<B as Body>::Error: Into<Box<dyn Error + Send + Sync>>,
{
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, B::from(body));
let mut request = Request::from_parts(parts, Body(Full::from(body)));
if let Some(ref authorization) = self.authorization {
request
.headers_mut()
Expand Down

0 comments on commit 33ed871

Please sign in to comment.