Skip to content

Commit

Permalink
propagate errors in TcpProxy
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Feb 23, 2024
1 parent ba8b51a commit 629551f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 42 deletions.
7 changes: 7 additions & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ use hex::FromHexError;
use mio::{net::TcpStream, Interest, Token};
use protocol::http::parser::Method;
use router::RouterError;
use socket::ServerBindError;
use time::{Duration, Instant};
use tls::CertificateResolverError;

Expand Down Expand Up @@ -699,6 +700,12 @@ pub enum ProxyError {
UnsupportedMessage,
#[error("failed to acquire the lock, {0}")]
Lock(String),
#[error("could not bind to socket {0:?}: {1}")]
BindToSocket(SocketAddr, ServerBindError),
#[error("error registering socket of listener: {0}")]
RegisterListener(std::io::Error),
#[error("the listener is not activated")]
UnactivatedListener,
}

use self::server::ListenToken;
Expand Down
19 changes: 11 additions & 8 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,13 +1246,14 @@ impl Server {

let listener_token = self.tcp.borrow_mut().activate_listener(&address, listener);
match listener_token {
Some(token) => {
Ok(token) => {
self.accept(ListenToken(token.0), Protocol::TCPListen);
WorkerResponse::ok(req_id)
}
None => {
error!("Could not activate TCP listener");
WorkerResponse::error(req_id, "cannot activate TCP listener")
Err(e) => {
let error = format!("Could not activate TCP listener: {}", e);
error!("{}", error);
WorkerResponse::error(req_id, error)
}
}
}
Expand Down Expand Up @@ -1360,10 +1361,12 @@ impl Server {
Ok(ListenerType::Tcp) => {
let (token, mut listener) = match self.tcp.borrow_mut().give_back_listener(address)
{
Some((token, listener)) => (token, listener),
None => {
let error =
format!("Couldn't deactivate TCP listener at address {:?}", address);
Ok((token, listener)) => (token, listener),
Err(e) => {
let error = format!(
"Could not deactivate TCP listener at address {:?}: {}",
address, e
);
error!("{}", error);
return WorkerResponse::error(req_id, error);
}
Expand Down
66 changes: 32 additions & 34 deletions lib/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,35 +1043,30 @@ impl TcpListener {
})
}

// TODO: return Result with context
pub fn activate(
&mut self,
registry: &Registry,
tcp_listener: Option<MioTcpListener>,
) -> Option<Token> {
) -> Result<Token, ProxyError> {
if self.active {
return Some(self.token);
return Ok(self.token);
}

let mut listener = tcp_listener.or_else(|| {
server_bind(self.config.address.clone().into())
.map_err(|e| {
error!("could not create listener {:?}: {}", self.config.address, e);
})
.ok()
});

if let Some(ref mut sock) = listener {
if let Err(e) = registry.register(sock, self.token, Interest::READABLE) {
error!("error registering socket({:?}): {:?}", sock, e);
let mut listener = match tcp_listener {
Some(listener) => listener,
None => {
let address = self.config.address.clone().into();
server_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
}
} else {
return None;
}
};

registry
.register(&mut listener, self.token, Interest::READABLE)
.map_err(|io_err| ProxyError::RegisterListener(io_err))?;

self.listener = listener;
self.listener = Some(listener);
self.active = true;
Some(self.token)
Ok(self.token)
}
}

Expand Down Expand Up @@ -1126,7 +1121,6 @@ impl TcpProxy {
}
}

// TODO: return Result with context
pub fn add_listener(
&mut self,
config: TcpListenerConfig,
Expand All @@ -1151,16 +1145,18 @@ impl TcpProxy {
self.listeners.len() < len
}

// TODO: return Result with context
pub fn activate_listener(
&self,
addr: &SocketAddr,
tcp_listener: Option<MioTcpListener>,
) -> Option<Token> {
self.listeners
) -> Result<Token, ProxyError> {
let listener = self
.listeners
.values()
.find(|listener| listener.borrow().address == *addr)
.and_then(|listener| listener.borrow_mut().activate(&self.registry, tcp_listener))
.ok_or(ProxyError::NoListenerFound(*addr))?;

listener.borrow_mut().activate(&self.registry, tcp_listener)
}

pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
Expand All @@ -1177,19 +1173,21 @@ impl TcpProxy {
.collect()
}

// TODO: return Result with context
pub fn give_back_listener(&mut self, address: SocketAddr) -> Option<(Token, MioTcpListener)> {
self.listeners
pub fn give_back_listener(
&mut self,
address: SocketAddr,
) -> Result<(Token, MioTcpListener), ProxyError> {
let listener = self
.listeners
.values()
.find(|listener| listener.borrow().address == address)
.and_then(|listener| {
let mut owned = listener.borrow_mut();
.ok_or(ProxyError::NoListenerFound(address.clone()))?;

owned
.listener
.take()
.map(|listener| (owned.token, listener))
})
let mut owned = listener.borrow_mut();

let taken_listener = owned.listener.take().ok_or(ProxyError::UnactivatedListener)?;

Ok((owned.token, taken_listener))
}

pub fn add_tcp_front(&mut self, front: RequestTcpFrontend) -> Result<(), ProxyError> {
Expand Down

0 comments on commit 629551f

Please sign in to comment.