Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed

## [6.2.0] - 2022-06-25
### Added
- Use a trait for connection on CompositeService to allow users to define
their own supported connection types

## [6.1.1] - 2022-02-01
### Fixed
- Remove private, unused dependency on `chrono`
Expand Down Expand Up @@ -185,7 +190,8 @@ No changes. We now think we've got enough to declare this crate stable.
## [0.5.0] - 2017-09-18
- Start of changelog.

[Unreleased]: https://github.com/Metaswitch/swagger-rs/compare/6.1.1...HEAD
[Unreleased]: https://github.com/Metaswitch/swagger-rs/compare/6.2.0...HEAD
[6.2.0]: https://github.com/Metaswitch/swagger-rs/compare/6.1.1...6.2.0
[6.1.1]: https://github.com/Metaswitch/swagger-rs/compare/6.1.0...6.1.1
[6.1.0]: https://github.com/Metaswitch/swagger-rs/compare/6.0.0...6.1.0
[6.0.0]: https://github.com/Metaswitch/swagger-rs/compare/5.1.0...6.0.0
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "swagger"
version = "6.1.1"
version = "6.2.0"
authors = ["Metaswitch Networks Ltd"]
license = "Apache-2.0"
description = "A set of common utilities for Rust code generated by swagger-codegen"
Expand Down
128 changes: 32 additions & 96 deletions src/composites.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use hyper::service::Service;
use hyper::{Request, Response, StatusCode};
use std::fmt;
use std::future::Future;
use std::net::SocketAddr;
use std::ops::{Deref, DerefMut};
use std::task::{Context, Poll};

Expand All @@ -27,6 +28,32 @@ impl<B: Default> NotFound<B> for B {
}
}

/// Connection which has a remote address, which can thus be composited.
pub trait HasRemoteAddr {
/// Get the remote address for the connection to pass
/// to the composited service
fn remote_addr(&self) -> Option<SocketAddr>;
}

impl<'a> HasRemoteAddr for &'a Option<SocketAddr> {
fn remote_addr(&self) -> Option<SocketAddr> {
**self
}
}

impl<'a> HasRemoteAddr for &'a hyper::server::conn::AddrStream {
fn remote_addr(&self) -> Option<SocketAddr> {
Some(hyper::server::conn::AddrStream::remote_addr(self))
}
}

#[cfg(feature = "uds")]
impl HasRemoteAddr for &'a tokio::net::UnixStream {
fn remote_addr(&self) -> Option<SocketAddr> {
None
}
}

/// Trait implemented by services which can be composited.
///
/// Wraps tower_service::Service
Expand Down Expand Up @@ -151,98 +178,10 @@ where
}
}

use std::net::SocketAddr;

impl<'a, ReqBody, ResBody, Error, MakeError> Service<&'a Option<SocketAddr>>
for CompositeMakeService<Option<SocketAddr>, ReqBody, ResBody, Error, MakeError>
where
ReqBody: 'static,
ResBody: NotFound<ResBody> + 'static,
MakeError: Send + 'static,
Error: 'static,
{
type Error = MakeError;
type Response = CompositeService<ReqBody, ResBody, Error>;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
for service in &mut self.0 {
match service.1.poll_ready(cx) {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(e)) => {
return Poll::Ready(Err(e));
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Poll::Ready(Ok(()))
}

fn call(&mut self, target: &'a Option<SocketAddr>) -> Self::Future {
let mut services = Vec::with_capacity(self.0.len());
for (path, service) in &mut self.0 {
let path: &'static str = path;
services.push(service.call(*target).map_ok(move |s| (path, s)));
}
Box::pin(futures::future::join_all(services).map(|results| {
let services: Result<Vec<_>, MakeError> = results.into_iter().collect();

Ok(CompositeService(services?))
}))
}
}

impl<'a, ReqBody, ResBody, Error, MakeError> Service<&'a hyper::server::conn::AddrStream>
for CompositeMakeService<Option<SocketAddr>, ReqBody, ResBody, Error, MakeError>
where
ReqBody: 'static,
ResBody: NotFound<ResBody> + 'static,
MakeError: Send + 'static,
Error: 'static,
{
type Error = MakeError;
type Response = CompositeService<ReqBody, ResBody, Error>;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
for service in &mut self.0 {
match service.1.poll_ready(cx) {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(e)) => {
return Poll::Ready(Err(e));
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Poll::Ready(Ok(()))
}

fn call(&mut self, target: &'a hyper::server::conn::AddrStream) -> Self::Future {
let mut services = Vec::with_capacity(self.0.len());
for (path, service) in &mut self.0 {
let path: &'static str = path;
services.push(
service
.call(Some(target.remote_addr()))
.map_ok(move |s| (path, s)),
);
}
Box::pin(futures::future::join_all(services).map(|results| {
let services: Result<Vec<_>, MakeError> = results.into_iter().collect();

Ok(CompositeService(services?))
}))
}
}

#[cfg(feature = "uds")]
impl<'a, ReqBody, ResBody, Error, MakeError> Service<&'a tokio::net::UnixStream>
impl<ReqBody, ResBody, Error, MakeError, Connection> Service<Connection>
for CompositeMakeService<Option<SocketAddr>, ReqBody, ResBody, Error, MakeError>
where
Connection: HasRemoteAddr,
ReqBody: 'static,
ResBody: NotFound<ResBody> + 'static,
MakeError: Send + 'static,
Expand All @@ -267,15 +206,12 @@ where
Poll::Ready(Ok(()))
}

fn call(&mut self, _target: &'a tokio::net::UnixStream) -> Self::Future {
fn call(&mut self, target: Connection) -> Self::Future {
let mut services = Vec::with_capacity(self.0.len());
let addr = target.remote_addr();
for (path, service) in &mut self.0 {
let path: &'static str = path;
services.push(
service
.call(None)
.map_ok(move |s| (path, s)),
);
services.push(service.call(addr).map_ok(move |s| (path, s)));
}
Box::pin(futures::future::join_all(services).map(|results| {
let services: Result<Vec<_>, MakeError> = results.into_iter().collect();
Expand Down