Skip to content

feat: implement client for async-h1 #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ documentation = "https://docs.rs/http-client"
description = "Types and traits for http clients."
keywords = ["http", "service", "client", "futures", "async"]
categories = ["asynchronous", "web-programming", "web-programming::http-client", "web-programming::websocket"]
authors = ["Yoshua Wuyts <yoshuawuyts@gmail.com>"]
authors = ["Yoshua Wuyts <yoshuawuyts@gmail.com>", "dignifiedquire <me@dignifiedquire.com>"]
readme = "README.md"
edition = "2018"

Expand All @@ -16,18 +16,26 @@ features = ["docs"]
rustdoc-args = ["--cfg", "feature=\"docs\""]

[features]
docs = ["native_client"]
default = ["h1_client"]
docs = ["h1_client"]
h1_client = ["async-h1", "async-std", "async-native-tls"]
native_client = ["curl_client", "wasm_client"]
curl_client = ["isahc"]
curl_client = ["isahc", "async-std"]
wasm_client = ["js-sys", "web-sys", "wasm-bindgen", "wasm-bindgen-futures"]

[dependencies]
futures = { version = "0.3.1", features = ["compat", "io-compat"] }
http = "0.1.19"
http-types = { version = "1.0.1", features = ["hyperium_http"] }
log = "0.4.7"

# h1-client
async-h1 = { version = "1.0.0", optional = true }
async-std = { version = "1.4.0", default-features = false, optional = true }
async-native-tls = { version = "0.3.1", optional = true }

# isahc-client
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
isahc = { version = "0.8", optional = true, default-features = false, features = ["http2"] }
isahc = { version = "0.9", optional = true, default-features = false, features = ["http2"] }

# wasm-client
[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand Down
80 changes: 80 additions & 0 deletions src/h1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//! http-client implementation for async-h1.

use super::{HttpClient, Request, Response};

use async_h1::client;
use futures::future::BoxFuture;
use http_types::{Error, StatusCode};

/// Async-h1 based HTTP Client.
#[derive(Debug)]
pub struct H1Client {}

impl Default for H1Client {
fn default() -> Self {
Self::new()
}
}

impl H1Client {
/// Create a new instance.
pub fn new() -> Self {
Self {}
}
}

impl Clone for H1Client {
fn clone(&self) -> Self {
Self {}
}
}

impl HttpClient for H1Client {
type Error = Error;

fn send(&self, req: Request) -> BoxFuture<'static, Result<Response, Self::Error>> {
Box::pin(async move {
// Insert host
let host = req
.url()
.host_str()
.ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing hostname"))?;

let scheme = req.url().scheme();
if scheme != "http" && scheme != "https" {
return Err(Error::from_str(
StatusCode::BadRequest,
format!("invalid url scheme '{}'", scheme),
));
}

let addr = req
.url()
.socket_addrs(|| match req.url().scheme() {
"http" => Some(80),
"https" => Some(443),
_ => None,
})?
.into_iter()
.next()
.ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing valid address"))?;

log::trace!("> Scheme: {}", scheme);

match scheme {
"http" => {
let stream = async_std::net::TcpStream::connect(addr).await?;
client::connect(stream, req).await
}
"https" => {
let raw_stream = async_std::net::TcpStream::connect(addr).await?;

let stream = async_native_tls::connect(host, raw_stream).await?;

client::connect(stream, req).await
}
_ => unreachable!(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should return an error here instead.

Suggested change
_ => unreachable!(),
_ => http_types::bail!("Unsupported protocol prefix"),

}
})
}
}
23 changes: 11 additions & 12 deletions src/isahc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

use super::{Body, HttpClient, Request, Response};

use async_std::io::BufReader;
use futures::future::BoxFuture;
use isahc::http;

use std::sync::Arc;

Expand Down Expand Up @@ -46,25 +48,22 @@ impl HttpClient for IsahcClient {
fn send(&self, req: Request) -> BoxFuture<'static, Result<Response, Self::Error>> {
let client = self.client.clone();
Box::pin(async move {
let (parts, body) = req.into_parts();

let body = if body.is_empty() {
isahc::Body::empty()
} else {
match body.len {
Some(len) => isahc::Body::reader_sized(body, len),
None => isahc::Body::reader(body),
}
let req_hyperium: http::Request<http_types::Body> = req.into();
let (parts, body) = req_hyperium.into_parts();
let body = match body.len() {
Some(len) => isahc::Body::from_reader_sized(body, len as u64),
None => isahc::Body::from_reader(body),
};

let req: http::Request<isahc::Body> = http::Request::from_parts(parts, body);

let res = client.send_async(req).await?;

let (parts, body) = res.into_parts();
let body = Body::from_reader(body);

let len = body.len().map(|len| len as usize);
let body = Body::from_reader(BufReader::new(body), len);
let res = http::Response::from_parts(parts, body);
Ok(res)
Ok(res.into())
})
}
}
99 changes: 11 additions & 88 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@
)]

use futures::future::BoxFuture;
use futures::io::{AsyncRead, Cursor};

use std::error::Error;
use std::fmt::{self, Debug};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

#[cfg_attr(feature = "docs", doc(cfg(curl_client)))]
#[cfg(all(feature = "curl_client", not(target_arch = "wasm32")))]
Expand All @@ -35,11 +28,15 @@ pub mod wasm;
#[cfg(feature = "native_client")]
pub mod native;

#[cfg_attr(feature = "docs", doc(cfg(h1_client)))]
#[cfg(feature = "h1_client")]
pub mod h1;

/// An HTTP Request type with a streaming body.
pub type Request = http::Request<Body>;
pub type Request = http_types::Request;

/// An HTTP Response type with a streaming body.
pub type Response = http::Response<Body>;
pub type Response = http_types::Response;

/// An abstract HTTP client.
///
Expand All @@ -55,90 +52,16 @@ pub type Response = http::Response<Body>;
///
/// How `Clone` is implemented is up to the implementors, but in an ideal scenario combining this
/// with the `Client` builder will allow for high connection reuse, improving latency.
pub trait HttpClient: Debug + Unpin + Send + Sync + Clone + 'static {
pub trait HttpClient: std::fmt::Debug + Unpin + Send + Sync + Clone + 'static {
/// The associated error type.
type Error: Error + Send + Sync;
type Error: Send + Sync + Into<Error>;

/// Perform a request.
fn send(&self, req: Request) -> BoxFuture<'static, Result<Response, Self::Error>>;
}

/// The raw body of an http request or response.
///
/// A body is a stream of `Bytes` values, which are shared handles to byte buffers.
/// Both `Body` and `Bytes` values can be easily created from standard owned byte buffer types
/// like `Vec<u8>` or `String`, using the `From` trait.
pub struct Body {
reader: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
/// Intentionally use `u64` over `usize` here.
/// `usize` won't work if you try to send 10GB file from 32bit host.
#[allow(dead_code)] // not all backends make use of this
len: Option<u64>,
}

impl Body {
/// Create a new empty body.
pub fn empty() -> Self {
Self {
reader: None,
len: Some(0),
}
}

/// Create a new instance from a reader.
pub fn from_reader(reader: impl AsyncRead + Unpin + Send + 'static) -> Self {
Self {
reader: Some(Box::new(reader)),
len: None,
}
}

/// Validate that the body was created with `Body::empty()`.
pub fn is_empty(&self) -> bool {
self.reader.is_none()
}
}
pub type Body = http_types::Body;

impl AsyncRead for Body {
#[allow(missing_doc_code_examples)]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.reader.as_mut() {
Some(reader) => Pin::new(reader).poll_read(cx, buf),
None => Poll::Ready(Ok(0)),
}
}
}

impl fmt::Debug for Body {
#[allow(missing_doc_code_examples)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Body").field("reader", &"<hidden>").finish()
}
}

impl From<Vec<u8>> for Body {
#[allow(missing_doc_code_examples)]
#[inline]
fn from(vec: Vec<u8>) -> Body {
let len = vec.len() as u64;
Self {
reader: Some(Box::new(Cursor::new(vec))),
len: Some(len),
}
}
}

impl<R: AsyncRead + Unpin + Send + 'static> From<Box<R>> for Body {
/// Converts an `AsyncRead` into a Body.
#[allow(missing_doc_code_examples)]
fn from(reader: Box<R>) -> Self {
Self {
reader: Some(reader),
len: None,
}
}
}
/// Error type.
pub type Error = http_types::Error;