Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ __internal_happy_eyeballs_tests = []
[[example]]
name = "client"
required-features = ["client", "http1", "tcp", "runtime"]

[[example]]
name = "tower_server"
required-features = ["server", "http1", "tcp", "runtime"]
27 changes: 27 additions & 0 deletions examples/tower_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use bytes::Bytes;
use http::{Request, Response};
use http_body_util::Full;
use hyper::body::Incoming;
use hyper_util::{rt::TokioExecutor, server::conn::auto::Builder};
use tokio::net::TcpListener;
use tower::BoxError;

async fn handle(_request: Request<Incoming>) -> Result<Response<Full<Bytes>>, BoxError> {
Ok(Response::new(Full::from("Hello, World!")))
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
let tower_service = tower::service_fn(handle);

let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
loop {
let (tcp_stream, _remote_addr) = listener.accept().await.unwrap();
if let Err(err) = Builder::new(TokioExecutor::new())
.serve_connection_with_upgrades_tower(tcp_stream, tower_service)
.await
{
eprintln!("failed to serve connection: {err:#}");
}
}
}
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@ pub mod client;
mod common;
pub mod rt;
pub mod server;
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
))]
Comment on lines +11 to +14
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps hyper should have a service feature so we don't need to do stuff like this.

pub mod service;

mod error;
75 changes: 75 additions & 0 deletions src/server/conn/auto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,40 @@ impl<E> Builder<E> {
Ok(())
}

/// Bind a connection together with a [`tower_service::Service`].
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
))]
pub async fn serve_connection_tower<I, S, B>(&self, io: I, service: S) -> Result<()>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Been thinking that axum also needs a version that uses MakeService for https://docs.rs/axum/latest/axum/struct.Router.html#method.into_make_service_with_connect_info

where
S: tower_service::Service<Request<Incoming>, Response = Response<B>> + Clone + Send,
S::Future: 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + 'static,
E: Http2ConnExec<crate::service::TowerToHyperServiceFuture<S, Request<Incoming>>, B>,
{
let (version, io) = read_version(io).await?;
let io = TokioIo::new(io);
match version {
Version::H1 => {
self.http1
.serve_connection(io, crate::service::TowerToHyperService::new(service))
.await?
}
Version::H2 => {
self.http2
.serve_connection(io, crate::service::TowerToHyperService::new(service))
.await?
}
}

Ok(())
}

/// Bind a connection together with a [`Service`], with the ability to
/// handle HTTP upgrades. This requires that the IO object implements
/// `Send`.
Expand Down Expand Up @@ -115,6 +149,47 @@ impl<E> Builder<E> {

Ok(())
}

/// Bind a connection together with a [`tower_service::Service`], with the ability to
/// handle HTTP upgrades. This requires that the IO object implements
/// `Send`.
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
))]
pub async fn serve_connection_with_upgrades_tower<I, S, B>(
&self,
io: I,
service: S,
) -> Result<()>
where
S: tower_service::Service<Request<Incoming>, Response = Response<B>> + Clone + Send,
S::Future: 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
E: Http2ConnExec<crate::service::TowerToHyperServiceFuture<S, Request<Incoming>>, B>,
{
let (version, io) = read_version(io).await?;
let io = TokioIo::new(io);
match version {
Version::H1 => {
self.http1
.serve_connection(io, crate::service::TowerToHyperService::new(service))
.with_upgrades()
.await?
}
Version::H2 => {
self.http2
.serve_connection(io, crate::service::TowerToHyperService::new(service))
.await?
}
}

Ok(())
}
}
#[derive(Copy, Clone)]
enum Version {
Expand Down
70 changes: 70 additions & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//! Service utilities.

use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower::{util::Oneshot, ServiceExt};

/// A tower service converted into a hyper service.
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
))]
#[derive(Debug, Copy, Clone)]
pub struct TowerToHyperService<S> {
service: S,
}

impl<S> TowerToHyperService<S> {
/// Create a new `TowerToHyperService` from a tower service.
pub fn new(tower_service: S) -> Self {
Self {
service: tower_service,
}
}
}

impl<S, R> hyper::service::Service<R> for TowerToHyperService<S>
where
S: tower_service::Service<R> + Clone,
{
type Response = S::Response;
type Error = S::Error;
type Future = TowerToHyperServiceFuture<S, R>;

fn call(&self, req: R) -> Self::Future {
TowerToHyperServiceFuture {
future: self.service.clone().oneshot(req),
}
}
}

pin_project! {
/// Response future for [`TowerToHyperService`].
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "server", feature = "client")
))]
pub struct TowerToHyperServiceFuture<S, R>
where
S: tower_service::Service<R>,
{
#[pin]
future: Oneshot<S, R>,
}
}

impl<S, R> Future for TowerToHyperServiceFuture<S, R>
where
S: tower_service::Service<R>,
{
type Output = Result<S::Response, S::Error>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().future.poll(cx)
}
}