Skip to content

Commit

Permalink
Fix hyper feature
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Jul 10, 2024
1 parent e52ba2c commit 6c75e7c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
3 changes: 3 additions & 0 deletions opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ edition = "2021"
rust-version = "1.65"

[features]
hyper = ["dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:tokio"]
reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"]
reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"]

[dependencies]
async-trait = { workspace = true }
bytes = { workspace = true }
http = { workspace = true }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, features = ["http2", "client"], optional = true }
hyper-util = { workspace = true, features = ["client-legacy"], optional = true }
opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["trace"] }
reqwest = { workspace = true, features = ["blocking"], optional = true }
tokio = { workspace = true, features = ["time"], optional = true }
26 changes: 16 additions & 10 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,23 @@ pub mod hyper {

use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
use http::HeaderValue;
use hyper::client::connect::Connect;
use hyper::Client;
use hyper::body::Body;
use hyper_util::client::legacy::{connect::Connect, Client};
use http_body_util::BodyExt;
use std::error::Error;
use std::fmt::Debug;
use std::time::Duration;
use tokio::time;

#[derive(Debug, Clone)]
pub struct HyperClient<C> {
inner: Client<C>,
pub struct HyperClient<C, B> {
inner: Client<C, B>,
timeout: Duration,
authorization: Option<HeaderValue>,
}

impl<C> HyperClient<C> {
pub fn new_with_timeout(inner: Client<C>, timeout: Duration) -> Self {
impl<C, B> HyperClient<C, B> {
pub fn new_with_timeout(inner: Client<C, B>, timeout: Duration) -> Self {
Self {
inner,
timeout,
Expand All @@ -128,7 +130,7 @@ pub mod hyper {
}

pub fn new_with_timeout_and_authorization_header(
inner: Client<C>,
inner: Client<C, B>,
timeout: Duration,
authorization: HeaderValue,
) -> Self {
Expand All @@ -141,23 +143,27 @@ pub mod hyper {
}

#[async_trait]
impl<C> HttpClient for HyperClient<C>
impl<C, B> HttpClient for HyperClient<C, B>
where
C: Connect + Send + Sync + Clone + Debug + 'static,
B: From<Vec<u8>> + Body + Send + Sync + Debug + Unpin + 'static,
<B as Body>::Data: Send,
<B as Body>::Error: Into<Box<dyn Error + Send + Sync>>,
{
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, body.into());
let mut request: Request<B> = Request::from_parts(parts, body.into());
if let Some(ref authorization) = self.authorization {
request
.headers_mut()
.insert(http::header::AUTHORIZATION, authorization.clone());
}
let mut response = time::timeout(self.timeout, self.inner.request(request)).await??;
let headers = std::mem::take(response.headers_mut());

let mut http_response = Response::builder()
.status(response.status())
.body(hyper::body::to_bytes(response.into_body()).await?)?;
.body(response.into_body().collect().await?.to_bytes())?;
*http_response.headers_mut() = headers;

Ok(http_response.error_for_status()?)
Expand Down

0 comments on commit 6c75e7c

Please sign in to comment.