Skip to content

Commit

Permalink
opentelemetry-http: move client impls into separate modules
Browse files Browse the repository at this point in the history
  • Loading branch information
djc committed Jan 6, 2021
1 parent dcae48f commit 54f4914
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 135 deletions.
1 change: 1 addition & 0 deletions opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ isahc = { version = "0.9", default-features = false, optional = true }
opentelemetry = { version = "0.11.2", path = "../opentelemetry", features = ["trace"] }
reqwest = { version = "0.10", default-features = false, features = ["blocking"], optional = true }
surf = { version = "2.0", default-features = false, optional = true }
thiserror = "1"
239 changes: 104 additions & 135 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
#[cfg(feature = "reqwest")]
use std::convert::TryInto;
#[cfg(any(feature = "surf", feature = "reqwest"))]
use std::fmt::{Debug, Display, Formatter};
use std::fmt::Debug;

use async_trait::async_trait;
use http::Request;
use opentelemetry::propagation::{Extractor, Injector};
#[cfg(any(feature = "surf", feature = "reqwest"))]
use opentelemetry::sdk::export::ExportError;
use opentelemetry::trace::TraceError;

pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap);
Expand Down Expand Up @@ -51,161 +46,135 @@ pub trait HttpClient: Debug + Send + Sync {
}

#[cfg(feature = "reqwest")]
#[derive(Debug)]
struct ReqwestError(reqwest::Error);

#[cfg(feature = "reqwest")]
impl Display for ReqwestError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
}
}

#[cfg(feature = "reqwest")]
impl std::error::Error for ReqwestError {}

#[cfg(feature = "reqwest")]
impl From<reqwest::Error> for ReqwestError {
fn from(err: reqwest::Error) -> Self {
ReqwestError(err)
}
}

#[cfg(feature = "reqwest")]
impl ExportError for ReqwestError {
fn exporter_name(&self) -> &'static str {
"reqwest"
mod reqwest {
use super::{async_trait, HttpClient, Request, TraceError};
use opentelemetry::sdk::export::ExportError;
use std::convert::TryInto;
use thiserror::Error;

#[async_trait]
impl HttpClient for reqwest::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let request = request.try_into().map_err(ReqwestError::from)?;
let _ = self
.execute(request)
.await
.and_then(|rsp| rsp.error_for_status())
.map_err(ReqwestError::from)?;
Ok(())
}
}
}

#[cfg(feature = "surf")]
impl ExportError for SurfError {
fn exporter_name(&self) -> &'static str {
"surf"
#[async_trait]
impl HttpClient for reqwest::blocking::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let _ = request
.try_into()
.and_then(|req| self.execute(req))
.and_then(|rsp| rsp.error_for_status())
.map_err(ReqwestError::from)?;
Ok(())
}
}
}

#[cfg(feature = "surf")]
#[derive(Debug)]
struct SurfError(surf::Error);
#[derive(Debug, Error)]
#[error(transparent)]
struct ReqwestError(#[from] reqwest::Error);

#[cfg(feature = "surf")]
impl Display for SurfError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
impl ExportError for ReqwestError {
fn exporter_name(&self) -> &'static str {
"reqwest"
}
}
}

#[cfg(feature = "surf")]
impl std::error::Error for SurfError {}

#[cfg(feature = "surf")]
impl From<surf::Error> for SurfError {
fn from(err: surf::Error) -> Self {
SurfError(err)
}
}

#[cfg(feature = "reqwest")]
#[async_trait]
impl HttpClient for reqwest::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let request = request.try_into().map_err(ReqwestError::from)?;
let _ = self
.execute(request)
.await
.and_then(|rsp| rsp.error_for_status())
.map_err(ReqwestError::from)?;
Ok(())
mod surf {
use super::{async_trait, HttpClient, Request, TraceError};
use opentelemetry::sdk::export::ExportError;
use std::fmt::{Display, Formatter};

#[async_trait]
impl HttpClient for surf::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let (parts, body) = request.into_parts();
let uri = parts
.uri
.to_string()
.parse()
.map_err(|_err: surf::http::url::ParseError| TraceError::from("error parse url"))?;

let req = surf::Request::builder(surf::http::Method::Post, uri)
.content_type("application/json")
.body(body);
let result = self.send(req).await.map_err::<SurfError, _>(Into::into)?;

if result.status().is_success() {
Ok(())
} else {
Err(SurfError(surf::Error::from_str(
result.status(),
result.status().canonical_reason(),
))
.into())
}
}
}
}

#[cfg(feature = "reqwest")]
#[async_trait]
impl HttpClient for reqwest::blocking::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let _ = request
.try_into()
.and_then(|req| self.execute(req))
.and_then(|rsp| rsp.error_for_status())
.map_err(ReqwestError::from)?;
Ok(())
}
}
#[derive(Debug)]
struct SurfError(surf::Error);

#[cfg(feature = "surf")]
#[async_trait]
impl HttpClient for surf::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let (parts, body) = request.into_parts();
let uri = parts
.uri
.to_string()
.parse()
.map_err(|_err: surf::http::url::ParseError| TraceError::from("error parse url"))?;

let req = surf::Request::builder(surf::http::Method::Post, uri)
.content_type("application/json")
.body(body);
let result = self.send(req).await.map_err::<SurfError, _>(Into::into)?;

if result.status().is_success() {
Ok(())
} else {
Err(SurfError(surf::Error::from_str(
result.status(),
result.status().canonical_reason(),
))
.into())
impl ExportError for SurfError {
fn exporter_name(&self) -> &'static str {
"surf"
}
}
}

#[cfg(feature = "isahc")]
impl ExportError for IsahcError {
fn exporter_name(&self) -> &'static str {
"isahc"
impl From<surf::Error> for SurfError {
fn from(err: surf::Error) -> Self {
SurfError(err)
}
}
}

#[cfg(feature = "isahc")]
#[derive(Debug)]
struct IsahcError(isahc::Error);
impl std::error::Error for SurfError {}

#[cfg(feature = "isahc")]
impl Display for IsahcError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
impl Display for SurfError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
}
}
}

#[cfg(feature = "isahc")]
impl std::error::Error for IsahcError {}
mod isahc {
use super::{async_trait, HttpClient, Request, TraceError};
use opentelemetry::sdk::export::ExportError;
use thiserror::Error;

#[async_trait]
impl HttpClient for isahc::HttpClient {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let res = self.send_async(request).await.map_err(IsahcError::from)?;

if !res.status().is_success() {
return Err(TraceError::from(format!(
"Expected success response, got {:?}",
res.status()
)));
}

#[cfg(feature = "isahc")]
impl From<isahc::Error> for IsahcError {
fn from(err: isahc::Error) -> Self {
IsahcError(err)
Ok(())
}
}
}

#[derive(Debug, Error)]
#[error(transparent)]
struct IsahcError(#[from] isahc::Error);

#[cfg(feature = "isahc")]
#[async_trait]
impl HttpClient for isahc::HttpClient {
async fn send(&self, request: http::Request<Vec<u8>>) -> Result<(), TraceError> {
let res = self
.send_async(request)
.await
.map_err(IsahcError::from)?;

if !res.status().is_success() {
return Err(TraceError::from(format!(
"Expected success response, got {:?}",
res.status()
)));
impl ExportError for IsahcError {
fn exporter_name(&self) -> &'static str {
"isahc"
}

Ok(())
}
}

Expand Down

0 comments on commit 54f4914

Please sign in to comment.