Skip to content

Commit

Permalink
feat(ext/net): add reuseAddress option for UDP (#13849)
Browse files Browse the repository at this point in the history
This commit adds a `reuseAddress` option for UDP sockets. When this
option is enabled, one can listen on an address even though it is
already being listened on from a different process or thread. The new
socket will steal the address from the existing socket.

On Windows and Linux this uses the `SO_REUSEADDR` option, while on other
Unixes this is done with `SO_REUSEPORT`.

This behavior aligns with what libuv does.

TCP sockets still unconditionally set the `SO_REUSEADDR` flag - this
behavior matches Node.js and Go. This PR does not change this behaviour.

Co-authored-by: Luca Casonato <hello@lcas.dev>
  • Loading branch information
Trolloldem and lucacasonato authored Oct 24, 2022
1 parent 38213f1 commit 873a5ce
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 5 deletions.
13 changes: 12 additions & 1 deletion cli/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,17 @@ declare namespace Deno {
path: string;
}

/** **UNSTABLE**: New API, yet to be vetted.
*
* @category Network
*/
export interface UdpListenOptions extends ListenOptions {
/** When `true` the specified address will be reused, even if another
* process has already bound a socket on it. This effectively steals the
* socket from the listener. Defaults to `false`. */
reuseAddress?: boolean;
}

/** **UNSTABLE**: New API, yet to be vetted.
*
* Listen announces on the local transport address.
Expand Down Expand Up @@ -1110,7 +1121,7 @@ declare namespace Deno {
* @category Network
*/
export function listenDatagram(
options: ListenOptions & { transport: "udp" },
options: UdpListenOptions & { transport: "udp" },
): DatagramConn;

/** **UNSTABLE**: New API, yet to be vetted.
Expand Down
99 changes: 99 additions & 0 deletions cli/tests/unit/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -906,3 +906,102 @@ Deno.test({
);
listener.close();
});

Deno.test({ permissions: { net: true } }, async function netTcpReuseAddr() {
const listener1 = Deno.listen({
hostname: "127.0.0.1",
port: 3500,
});
listener1.accept().then(
(conn) => {
conn.close();
},
);

const conn1 = await Deno.connect({ hostname: "127.0.0.1", port: 3500 });
const buf1 = new Uint8Array(1024);
await conn1.read(buf1);
listener1.close();
conn1.close();

const listener2 = Deno.listen({
hostname: "127.0.0.1",
port: 3500,
});

listener2.accept().then(
(conn) => {
conn.close();
},
);

const conn2 = await Deno.connect({ hostname: "127.0.0.1", port: 3500 });
const buf2 = new Uint8Array(1024);
await conn2.read(buf2);

listener2.close();
conn2.close();
});

Deno.test(
{ permissions: { net: true } },
async function netUdpReuseAddr() {
const sender = Deno.listenDatagram({
port: 4002,
transport: "udp",
});
const listener1 = Deno.listenDatagram({
port: 4000,
transport: "udp",
reuseAddress: true,
});
const listener2 = Deno.listenDatagram({
port: 4000,
transport: "udp",
reuseAddress: true,
});

const sent = new Uint8Array([1, 2, 3]);
await sender.send(sent, listener1.addr);
await Promise.any([listener1.receive(), listener2.receive()]).then(
([recvd, remote]) => {
assert(remote.transport === "udp");
assertEquals(recvd.length, 3);
assertEquals(1, recvd[0]);
assertEquals(2, recvd[1]);
assertEquals(3, recvd[2]);
},
);
sender.close();
listener1.close();
listener2.close();
},
);

Deno.test(
{ permissions: { net: true } },
function netUdpNoReuseAddr() {
let listener1;
try {
listener1 = Deno.listenDatagram({
port: 4001,
transport: "udp",
reuseAddress: false,
});
} catch (err) {
assert(err);
assert(err instanceof Deno.errors.AddrInUse); // AddrInUse from previous test
}

assertThrows(() => {
Deno.listenDatagram({
port: 4001,
transport: "udp",
reuseAddress: false,
});
}, Deno.errors.AddrInUse);
if (typeof listener1 !== "undefined") {
listener1.close();
}
},
);
51 changes: 51 additions & 0 deletions cli/tests/unit/tls_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1376,3 +1376,54 @@ Deno.test(
await Promise.all([server(), startTlsClient()]);
},
);

Deno.test(
{ permissions: { read: false, net: true } },
async function listenTlsWithReuseAddr() {
const resolvable1 = deferred();
const hostname = "localhost";
const port = 3500;

const listener1 = Deno.listenTls({ hostname, port, cert, key });

const response1 = encoder.encode(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n",
);

listener1.accept().then(
async (conn) => {
await conn.write(response1);
setTimeout(() => {
conn.close();
resolvable1.resolve();
}, 0);
},
);

const conn1 = await Deno.connectTls({ hostname, port, caCerts });
conn1.close();
listener1.close();
await resolvable1;

const resolvable2 = deferred();
const listener2 = Deno.listenTls({ hostname, port, cert, key });
const response2 = encoder.encode(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n",
);

listener2.accept().then(
async (conn) => {
await conn.write(response2);
setTimeout(() => {
conn.close();
resolvable2.resolve();
}, 0);
},
);

const conn2 = await Deno.connectTls({ hostname, port, caCerts });
conn2.close();
listener2.close();
await resolvable2;
},
);
41 changes: 37 additions & 4 deletions ext/net/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use log::debug;
use serde::Deserialize;
use serde::Serialize;
use socket2::Domain;
use socket2::Protocol;
use socket2::Socket;
use socket2::Type;
use std::borrow::Cow;
Expand Down Expand Up @@ -417,9 +418,11 @@ impl Resource for UdpSocketResource {
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct IpListenArgs {
hostname: String,
port: u16,
reuse_address: Option<bool>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -468,11 +471,35 @@ fn listen_tcp(
fn listen_udp(
state: &mut OpState,
addr: SocketAddr,
reuse_address: Option<bool>,
) -> Result<(u32, SocketAddr), AnyError> {
let std_socket = std::net::UdpSocket::bind(&addr)?;
std_socket.set_nonblocking(true)?;
let domain = if addr.is_ipv4() {
Domain::IPV4
} else {
Domain::IPV6
};
let socket_tmp = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
if reuse_address.unwrap_or(false) {
// This logic is taken from libuv:
//
// On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
// refinements for programs that use multicast.
//
// Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
// are different from the BSDs: it _shares_ the port rather than steal it
// from the current listener. While useful, it's not something we can
// emulate on other platforms so we don't enable it.
#[cfg(any(target_os = "windows", target_os = "linux"))]
socket_tmp.set_reuse_address(true)?;
#[cfg(all(unix, not(target_os = "linux")))]
socket_tmp.set_reuse_port(true)?;
}
let socket_addr = socket2::SockAddr::from(addr);
socket_tmp.bind(&socket_addr)?;
socket_tmp.set_nonblocking(true)?;
// Enable messages to be sent to the broadcast address (255.255.255.255) by default
std_socket.set_broadcast(true)?;
socket_tmp.set_broadcast(true)?;
let std_socket: std::net::UdpSocket = socket_tmp.into();
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
let socket_resource = UdpSocketResource {
Expand Down Expand Up @@ -510,9 +537,14 @@ where
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let (rid, local_addr) = if transport == "tcp" {
if args.reuse_address.is_some() {
return Err(generic_error(
"The reuseAddress option is not supported for TCP",
));
}
listen_tcp(state, addr)?
} else {
listen_udp(state, addr)?
listen_udp(state, addr, args.reuse_address)?
};
debug!(
"New listener {} {}:{}",
Expand Down Expand Up @@ -1099,6 +1131,7 @@ mod tests {
let ip_args = IpListenArgs {
hostname: String::from(server_addr[0]),
port: server_addr[1].parse().unwrap(),
reuse_address: None,
};
let connect_args = ConnectArgs {
transport: String::from("tcp"),
Expand Down

0 comments on commit 873a5ce

Please sign in to comment.