diff --git a/src/rust/demikernel/libos/mod.rs b/src/rust/demikernel/libos/mod.rs index feb543690..a3ea91bdd 100644 --- a/src/rust/demikernel/libos/mod.rs +++ b/src/rust/demikernel/libos/mod.rs @@ -86,7 +86,6 @@ impl LibOS { LibOSName::Catnap => Self::NetworkLibOS(NetworkLibOSWrapper::Catnap(SharedNetworkLibOS::< SharedCatnapTransport, >::new( - config.local_ipv4_addr()?, runtime.clone(), SharedCatnapTransport::new(&config, &mut runtime)?, ))), @@ -98,7 +97,7 @@ impl LibOS { let inetstack: SharedInetStack = SharedInetStack::new(&config, runtime.clone(), layer1_endpoint).unwrap(); Self::NetworkLibOS(NetworkLibOSWrapper::Catpowder( - SharedNetworkLibOS::::new(config.local_ipv4_addr()?, runtime, inetstack), + SharedNetworkLibOS::::new(runtime, inetstack), )) }, #[cfg(feature = "catnip-libos")] @@ -109,9 +108,7 @@ impl LibOS { SharedInetStack::new(&config, runtime.clone(), layer1_endpoint).unwrap(); Self::NetworkLibOS(NetworkLibOSWrapper::Catnip(SharedNetworkLibOS::::new( - config.local_ipv4_addr()?, - runtime, - inetstack, + runtime, inetstack, ))) }, _ => panic!("unsupported libos"), diff --git a/src/rust/demikernel/libos/network/libos.rs b/src/rust/demikernel/libos/network/libos.rs index e2a0b1cd3..c4b53e86b 100644 --- a/src/rust/demikernel/libos/network/libos.rs +++ b/src/rust/demikernel/libos/network/libos.rs @@ -42,7 +42,6 @@ use ::std::{ /// Catnap libOS. All state is kept in the [runtime] and [qtable]. /// TODO: Move [qtable] into [runtime] so all state is contained in the PosixRuntime. pub struct NetworkLibOS { - local_ipv4_addr: Ipv4Addr, runtime: SharedDemiRuntime, transport: T, } @@ -55,9 +54,8 @@ pub struct SharedNetworkLibOS(SharedObject> //====================================================================================================================== impl SharedNetworkLibOS { - pub fn new(local_ipv4_addr: Ipv4Addr, runtime: SharedDemiRuntime, transport: T) -> Self { + pub fn new(runtime: SharedDemiRuntime, transport: T) -> Self { Self(SharedObject::new(NetworkLibOS:: { - local_ipv4_addr, runtime: runtime.clone(), transport, })) @@ -99,7 +97,7 @@ impl SharedNetworkLibOS { } /// This function contains the LibOS-level functionality needed to bind a SharedNetworkQueue to a local address. - pub fn bind(&mut self, qd: QDesc, mut socket_addr: SocketAddr) -> Result<(), Fail> { + pub fn bind(&mut self, qd: QDesc, socket_addr: SocketAddr) -> Result<(), Fail> { trace!("bind() qd={:?}, local={:?}", qd, socket_addr); // We only support IPv4 addresses. @@ -107,37 +105,18 @@ impl SharedNetworkLibOS { // We only support the wildcard address for UDP sockets. // FIXME: https://github.com/demikernel/demikernel/issues/189 - match *socket_addrv4.ip() { - Ipv4Addr::UNSPECIFIED if self.get_shared_queue(&qd)?.get_qtype() == QType::UdpSocket => (), - Ipv4Addr::UNSPECIFIED => { - let cause: String = format!("cannot bind to wildcard address (qd={:?})", qd); - error!("bind(): {}", cause); - return Err(Fail::new(libc::ENOTSUP, &cause)); - }, - addrv4 if addrv4 != self.local_ipv4_addr => { - let cause: String = format!("cannot bind to non-local address: {:?}", addrv4); - error!("bind(): {}", &cause); - return Err(Fail::new(libc::EADDRNOTAVAIL, &cause)); - }, - _ => (), - } - - if SharedDemiRuntime::is_private_ephemeral_port(socket_addr.port()) { - self.runtime.reserve_ephemeral_port(socket_addr.port())? + if *socket_addrv4.ip() == Ipv4Addr::UNSPECIFIED && self.get_shared_queue(&qd)?.get_qtype() != QType::UdpSocket { + let cause: String = format!("cannot bind to wildcard address (qd={:?})", qd); + error!("bind(): {}", cause); + return Err(Fail::new(libc::ENOTSUP, &cause)); } // We only support the wildcard address for UDP sockets. // FIXME: https://github.com/demikernel/demikernel/issues/582 - if socket_addr.port() == 0 { - if self.get_shared_queue(&qd)?.get_qtype() != QType::UdpSocket { - let cause: String = format!("cannot bind to port 0 (qd={:?})", qd); - error!("bind(): {}", cause); - return Err(Fail::new(libc::ENOTSUP, &cause)); - } else { - // Allocate an ephemeral port. - let new_port: u16 = self.runtime.alloc_ephemeral_port()?; - socket_addr.set_port(new_port); - } + if socket_addr.port() == 0 && self.get_shared_queue(&qd)?.get_qtype() != QType::UdpSocket { + let cause: String = format!("cannot bind to port 0 (qd={:?})", qd); + error!("bind(): {}", cause); + return Err(Fail::new(libc::ENOTSUP, &cause)); } if self.runtime.is_addr_in_use(socket_addrv4) { @@ -145,20 +124,12 @@ impl SharedNetworkLibOS { error!("bind(): {}", &cause); return Err(Fail::new(libc::EADDRINUSE, &cause)); } + self.get_shared_queue(&qd)?.bind(socket_addr)?; + // Insert into address to queue descriptor table. + self.runtime + .insert_socket_id_to_qd(SocketId::Passive(socket_addrv4.clone()), qd); - if let Err(e) = self.get_shared_queue(&qd)?.bind(socket_addr) { - if SharedDemiRuntime::is_private_ephemeral_port(socket_addr.port()) { - if self.runtime.free_ephemeral_port(socket_addr.port()).is_err() { - warn!("bind(): leaking ephemeral port (port={})", socket_addr.port()); - } - } - Err(e) - } else { - // Insert into address to queue descriptor table. - self.runtime - .insert_socket_id_to_qd(SocketId::Passive(socket_addrv4.clone()), qd); - Ok(()) - } + Ok(()) } /// Sets a SharedNetworkQueue and its underlying socket as a passive one. This function contains the LibOS-level @@ -306,15 +277,6 @@ impl SharedNetworkLibOS { unwrap_socketaddr(local), "we only support IPv4" ))); - - // Check if this is an ephemeral port. - if SharedDemiRuntime::is_private_ephemeral_port(local.port()) { - // Allocate ephemeral port from the pool, to leave ephemeral port allocator in a consistent state. - if let Err(e) = self.runtime.free_ephemeral_port(local.port()) { - let cause: String = format!("close(): Could not free ephemeral port"); - warn!("{}: {:?}", cause, e); - } - } } // Remove the queue from the queue table. Expect is safe here because we looked up the queue to // schedule this coroutine and no other close coroutine should be able to run due to state machine diff --git a/src/rust/inetstack/protocols/layer3/mod.rs b/src/rust/inetstack/protocols/layer3/mod.rs index a10481ef4..8bc20ff6f 100644 --- a/src/rust/inetstack/protocols/layer3/mod.rs +++ b/src/rust/inetstack/protocols/layer3/mod.rs @@ -160,7 +160,6 @@ impl SharedLayer3Endpoint { self.layer2_endpoint.transmit_ipv4_packet(remote_link_addr, pkt) } - #[cfg(test)] pub fn get_local_addr(&self) -> Ipv4Addr { self.local_ipv4_addr } diff --git a/src/rust/runtime/network/ephemeral.rs b/src/rust/inetstack/protocols/layer4/ephemeral.rs similarity index 98% rename from src/rust/runtime/network/ephemeral.rs rename to src/rust/inetstack/protocols/layer4/ephemeral.rs index a92da54c6..4d75d5049 100644 --- a/src/rust/runtime/network/ephemeral.rs +++ b/src/rust/inetstack/protocols/layer4/ephemeral.rs @@ -104,7 +104,9 @@ impl Default for EphemeralPorts { #[cfg(test)] mod test { - use crate::runtime::network::ephemeral::{EphemeralPorts, FIRST_PRIVATE_PORT_NUMBER, LAST_PRIVATE_PORT_NUMBER}; + use crate::inetstack::protocols::layer4::ephemeral::{ + EphemeralPorts, FIRST_PRIVATE_PORT_NUMBER, LAST_PRIVATE_PORT_NUMBER, + }; use ::anyhow::Result; #[test] diff --git a/src/rust/inetstack/protocols/layer4/mod.rs b/src/rust/inetstack/protocols/layer4/mod.rs index 56fc67918..b949ffc9b 100644 --- a/src/rust/inetstack/protocols/layer4/mod.rs +++ b/src/rust/inetstack/protocols/layer4/mod.rs @@ -5,6 +5,7 @@ // Exports //====================================================================================================================== +pub mod ephemeral; pub mod tcp; pub mod udp; @@ -21,6 +22,7 @@ use crate::{ inetstack::protocols::{ layer3::{ip::IpProtocol, SharedLayer3Endpoint}, layer4::{ + ephemeral::EphemeralPorts, tcp::{SharedTcpPeer, SharedTcpSocket}, udp::{SharedUdpPeer, SharedUdpSocket}, }, @@ -48,6 +50,7 @@ pub struct Peer { tcp: SharedTcpPeer, udp: SharedUdpPeer, layer3_endpoint: SharedLayer3Endpoint, + ephemeral_ports: EphemeralPorts, } /// Socket Representation. @@ -75,6 +78,7 @@ impl Peer { tcp, udp, layer3_endpoint, + ephemeral_ports: EphemeralPorts::default(), }) } @@ -161,14 +165,28 @@ impl Peer { /// Upon successful completion, `Ok(())` is returned. Upon failure, `Fail` is /// returned instead. /// - pub fn bind(&mut self, sd: &mut Socket, local: SocketAddr) -> Result<(), Fail> { + pub fn bind(&mut self, sd: &mut Socket, socket_addr: SocketAddr) -> Result<(), Fail> { // FIXME: add IPv6 support; https://github.com/microsoft/demikernel/issues/935 - let local: SocketAddrV4 = unwrap_socketaddr(local)?; + let socket_addr_v4: SocketAddrV4 = unwrap_socketaddr(socket_addr)?; + // Check if we are allowed to bind to this address. + if *socket_addr_v4.ip() != self.layer3_endpoint.get_local_addr() + && *socket_addr_v4.ip() != Ipv4Addr::UNSPECIFIED + { + let cause: String = format!("cannot bind to non-local address: {:?}", socket_addr_v4); + error!("bind(): {}", &cause); + return Err(Fail::new(libc::EADDRNOTAVAIL, &cause)); + } match sd { - Socket::Tcp(socket) => self.tcp.bind(socket, local), - Socket::Udp(socket) => self.udp.bind(socket, local), + Socket::Tcp(socket) => self.tcp.bind(socket, socket_addr_v4), + Socket::Udp(socket) => self.udp.bind(socket, socket_addr_v4), + }?; + + if EphemeralPorts::is_private(socket_addr_v4.port()) { + self.ephemeral_ports.reserve(socket_addr_v4.port())?; } + + Ok(()) } /// @@ -251,11 +269,18 @@ impl Peer { pub async fn connect(&mut self, sd: &mut Socket, remote: SocketAddr) -> Result<(), Fail> { trace!("connect(): remote={:?}", remote); - // FIXME: add IPv6 support; https://github.com/microsoft/demikernel/issues/935 - let remote: SocketAddrV4 = unwrap_socketaddr(remote)?; - match sd { - Socket::Tcp(socket) => self.tcp.connect(socket, remote).await, + Socket::Tcp(socket) => { + // FIXME: add IPv6 support; https://github.com/microsoft/demikernel/issues/935 + let remote: SocketAddrV4 = unwrap_socketaddr(remote)?; + // If not bound, allocate an ephemeral port. + let local: SocketAddrV4 = match socket.local() { + Some(local) => local, + None => SocketAddrV4::new(self.layer3_endpoint.get_local_addr(), self.ephemeral_ports.alloc()?), + }; + + self.tcp.connect(socket, local, remote).await + }, _ => Err(Fail::new(libc::EINVAL, "invalid queue type")), } } @@ -271,17 +296,55 @@ impl Peer { /// completes shutting down the connection. Upon failure, `Fail` is returned instead. /// pub async fn close(&mut self, sd: &mut Socket) -> Result<(), Fail> { - match sd { - Socket::Tcp(socket) => self.tcp.close(socket).await, - Socket::Udp(socket) => self.udp.close(socket).await, + let local_port: Option = match sd { + Socket::Tcp(socket) => { + let local_port: Option = match socket.local() { + Some(socket_addr_v4) => Some(socket_addr_v4.port()), + None => None, + }; + + self.tcp.close(socket).await?; + local_port + }, + Socket::Udp(socket) => { + let local_port: Option = match socket.local() { + Some(socket_addr_v4) => Some(socket_addr_v4.port()), + None => None, + }; + self.udp.close(socket).await?; + local_port + }, + }; + match local_port { + Some(port) if EphemeralPorts::is_private(port) => self.ephemeral_ports.free(port), + _ => Ok(()), } } /// Forcibly close a socket. This should only be used on clean up. pub fn hard_close(&mut self, sd: &mut Socket) -> Result<(), Fail> { - match sd { - Socket::Tcp(socket) => self.tcp.hard_close(socket), - Socket::Udp(socket) => self.udp.hard_close(socket), + let local_port: Option = match sd { + Socket::Tcp(socket) => { + let local_port: Option = match socket.local() { + Some(socket_addr_v4) => Some(socket_addr_v4.port()), + None => None, + }; + + self.tcp.hard_close(socket)?; + local_port + }, + Socket::Udp(socket) => { + let local_port: Option = match socket.local() { + Some(socket_addr_v4) => Some(socket_addr_v4.port()), + None => None, + }; + self.udp.hard_close(socket)?; + local_port + }, + }; + match local_port { + Some(port) if EphemeralPorts::is_private(port) => self.ephemeral_ports.free(port), + _ => Ok(()), } } diff --git a/src/rust/inetstack/protocols/layer4/tcp/peer.rs b/src/rust/inetstack/protocols/layer4/tcp/peer.rs index 7208ee268..61b0efce8 100644 --- a/src/rust/inetstack/protocols/layer4/tcp/peer.rs +++ b/src/rust/inetstack/protocols/layer4/tcp/peer.rs @@ -146,19 +146,14 @@ impl SharedTcpPeer { } /// Runs until the connect to remote is made or times out. - pub async fn connect(&mut self, socket: &mut SharedTcpSocket, remote: SocketAddrV4) -> Result<(), Fail> { - // Check whether we need to allocate an ephemeral port. - let local: SocketAddrV4 = match socket.local() { - Some(addr) => { - // If socket is already bound to a local address, use it but remove the old binding. - self.addresses.remove(&SocketId::Passive(addr)); - addr - }, - None => { - let local_port: u16 = self.runtime.alloc_ephemeral_port()?; - SocketAddrV4::new(self.local_ipv4_addr, local_port) - }, - }; + pub async fn connect( + &mut self, + socket: &mut SharedTcpSocket, + local: SocketAddrV4, + remote: SocketAddrV4, + ) -> Result<(), Fail> { + // If socket is already bound to a local address, use it but remove the old binding. + self.addresses.remove(&SocketId::Passive(local)); // Insert the connection to receive incoming packets for this address pair. // Should we remove the passive entry for the local address if the socket was previously bound? if self @@ -204,30 +199,12 @@ impl SharedTcpPeer { Ok((None, incoming)) } - /// Frees an ephemeral port (if any) allocated to a given socket. - fn free_ephemeral_port(&mut self, socket_id: &SocketId) { - let local: &SocketAddrV4 = match socket_id { - SocketId::Active(local, _) => local, - SocketId::Passive(local) => local, - }; - // Rollback ephemeral port allocation. - if SharedDemiRuntime::is_private_ephemeral_port(local.port()) { - if self.runtime.free_ephemeral_port(local.port()).is_err() { - // We fail if and only if we attempted to free a port that was not allocated. - // This is unexpected, but if it happens, issue a warning and keep going, - // otherwise we would leave the queue in a dangling state. - warn!("bind(): leaking ephemeral port (port={})", local.port()); - } - } - } - /// Closes a TCP socket. pub async fn close(&mut self, socket: &mut SharedTcpSocket) -> Result<(), Fail> { // Wait for close to complete. // Handle result: If unsuccessful, free the new queue descriptor. if let Some(socket_id) = socket.close().await? { self.addresses.remove(&socket_id); - self.free_ephemeral_port(&socket_id); } Ok(()) } @@ -235,7 +212,6 @@ impl SharedTcpPeer { pub fn hard_close(&mut self, socket: &mut SharedTcpSocket) -> Result<(), Fail> { if let Some(socket_id) = socket.hard_close()? { self.addresses.remove(&socket_id); - self.free_ephemeral_port(&socket_id); } Ok(()) } diff --git a/src/rust/inetstack/test_helpers/engine.rs b/src/rust/inetstack/test_helpers/engine.rs index 22d96978c..2c17719e5 100644 --- a/src/rust/inetstack/test_helpers/engine.rs +++ b/src/rust/inetstack/test_helpers/engine.rs @@ -40,7 +40,7 @@ impl SharedEngine { let transport: SharedInetStack = SharedInetStack::new_test(&config, runtime.clone(), layer1_endpoint.clone())?; Ok(Self(SharedObject::new(Engine { - libos: SharedNetworkLibOS::::new(config.local_ipv4_addr()?, runtime, transport), + libos: SharedNetworkLibOS::::new(runtime, transport), layer1_endpoint, }))) } diff --git a/src/rust/runtime/mod.rs b/src/rust/runtime/mod.rs index 93c37faf0..38dbf507b 100644 --- a/src/rust/runtime/mod.rs +++ b/src/rust/runtime/mod.rs @@ -30,8 +30,6 @@ pub use demikernel_xdp_bindings as libxdp; // Imports //====================================================================================================================== -use crate::runtime::network::{ephemeral::EphemeralPorts, socket::SocketId, SocketIdToQDescMap}; - #[cfg(feature = "profiler")] use crate::coroutine_timer; @@ -39,6 +37,8 @@ use crate::{ expect_some, runtime::{ fail::Fail, + network::socket::SocketId, + network::SocketIdToQDescMap, poll::PollFuture, queue::{IoQueue, IoQueueTable}, scheduler::{SharedScheduler, TaskWithResult}, @@ -73,7 +73,6 @@ const TIMER_FINER_RESOLUTION: usize = 2; pub struct DemiRuntime { qtable: IoQueueTable, scheduler: SharedScheduler, - ephemeral_ports: EphemeralPorts, socket_id_to_qdesc_map: SocketIdToQDescMap, /// Number of iterations that we have polled since advancing the clock. ts_iters: usize, @@ -111,7 +110,6 @@ impl SharedDemiRuntime { Self(SharedObject::::new(DemiRuntime { qtable: IoQueueTable::default(), scheduler: SharedScheduler::default(), - ephemeral_ports: EphemeralPorts::default(), socket_id_to_qdesc_map: SocketIdToQDescMap::default(), ts_iters: 0, completed_tasks: HashMap::::new(), @@ -397,53 +395,6 @@ impl SharedDemiRuntime { self.qtable.get_type(qd) } - /// Allocates a port from the shared ephemeral port allocator. - pub fn alloc_ephemeral_port(&mut self) -> Result { - match self.ephemeral_ports.alloc() { - Ok(port) => { - trace!("Allocating ephemeral port: {:?}", port); - Ok(port) - }, - Err(e) => { - warn!("Could not allocate ephemeral port: {:?}", e); - Err(e) - }, - } - } - - /// Reserves a specific port if it is free. - pub fn reserve_ephemeral_port(&mut self, port_number: u16) -> Result<(), Fail> { - match self.ephemeral_ports.reserve(port_number) { - Ok(()) => { - trace!("Reserving ephemeral port: {:?}", port_number); - Ok(()) - }, - Err(e) => { - warn!("Could not reserve ephemeral port: port={:?} error={:?}", port_number, e); - Err(e) - }, - } - } - - /// Frees an ephemeral port. - pub fn free_ephemeral_port(&mut self, port: u16) -> Result<(), Fail> { - match self.ephemeral_ports.free(port) { - Ok(()) => { - trace!("Freeing ephemeral port: {:?}", port); - Ok(()) - }, - Err(e) => { - warn!("Could not free ephemeral port: port={:?} error={:?}", port, e); - Err(e) - }, - } - } - - /// Checks if a port is private. - pub fn is_private_ephemeral_port(port: u16) -> bool { - EphemeralPorts::is_private(port) - } - /// Moves time forward deterministically. pub fn advance_clock(&mut self, now: Instant) { timer::global_advance_clock(now) @@ -562,7 +513,6 @@ impl Default for SharedDemiRuntime { Self(SharedObject::::new(DemiRuntime { qtable: IoQueueTable::default(), scheduler: SharedScheduler::default(), - ephemeral_ports: EphemeralPorts::default(), socket_id_to_qdesc_map: SocketIdToQDescMap::default(), ts_iters: 0, completed_tasks: HashMap::::new(), diff --git a/src/rust/runtime/network/mod.rs b/src/rust/runtime/network/mod.rs index 6275b969e..2a39e69a6 100644 --- a/src/rust/runtime/network/mod.rs +++ b/src/rust/runtime/network/mod.rs @@ -7,7 +7,6 @@ pub mod config; pub mod consts; -pub mod ephemeral; pub mod ring; pub mod socket; pub mod transport; diff --git a/tests/rust/common/libos.rs b/tests/rust/common/libos.rs index 421bda1b9..ed4f870f7 100644 --- a/tests/rust/common/libos.rs +++ b/tests/rust/common/libos.rs @@ -51,11 +51,7 @@ impl DummyLibOS { logging::initialize(); let transport = SharedInetStack::new_test(&config, runtime.clone(), network)?; - Ok(Self(SharedNetworkLibOS::::new( - config.local_ipv4_addr()?, - runtime, - transport, - ))) + Ok(Self(SharedNetworkLibOS::::new(runtime, transport))) } pub fn prepare_dummy_buffer(&self, size: usize) -> Result {