Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,19 @@ macros = ["tokio-macros"]
stats = []
net = [
"libc",
"mio/net",
"mio/os-ext",
"mio/os-poll",
"mio/os-util",
"mio/tcp",
"mio/udp",
"mio/uds",
"socket2/all",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too familiar with the features of socket2, but can we be more precise here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

socket2's all feature means enabling APIs that are not available in some OSes, not means enabling all feature flags.

https://github.com/rust-lang/socket2/blob/6601ed132b37d6e9d178b34918bfb0b236800232/Cargo.toml#L42-L43

# Enable all API, even ones not available on all OSs.
all = []

In our case, this feature is needed to use reuse_port/set_reuse_port, which is not available in Solaris & Illumos.

"winapi/namedpipeapi",
]
process = [
"bytes",
"once_cell",
"libc",
"mio/net",
"mio/os-ext",
"mio/os-poll",
"mio/os-util",
"mio/uds",
"signal-hook-registry",
"winapi/threadpoollegacyapiset",
]
Expand All @@ -77,9 +76,9 @@ rt-multi-thread = [
signal = [
"once_cell",
"libc",
"mio/net",
"mio/os-ext",
"mio/os-poll",
"mio/uds",
"mio/os-util",
"signal-hook-registry",
"winapi/consoleapi",
]
Expand All @@ -96,9 +95,10 @@ pin-project-lite = "0.2.0"
bytes = { version = "1.0.0", optional = true }
once_cell = { version = "1.5.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.7.6", optional = true }
mio = { version = "0.8.0", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true }
socket2 = { version = "0.4.2", optional = true }

# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
Expand Down Expand Up @@ -130,7 +130,6 @@ proptest = "1"
rand = "0.8.0"
tempfile = "3.1.0"
async-stream = "0.3"
socket2 = "0.4"

[target.'cfg(target_os = "freebsd")'.dev-dependencies]
mio-aio = { version = "0.6.0", features = ["tokio"] }
Expand Down
64 changes: 42 additions & 22 deletions tokio/src/net/tcp/socket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::net::{TcpListener, TcpStream};

use std::convert::TryInto;
use std::fmt;
use std::io;
use std::net::SocketAddr;
Expand Down Expand Up @@ -83,7 +84,7 @@ cfg_net! {
/// [`socket2`]: https://docs.rs/socket2/
#[cfg_attr(docsrs, doc(alias = "connect_std"))]
pub struct TcpSocket {
inner: mio::net::TcpSocket,
inner: socket2::Socket,
}
}

Expand Down Expand Up @@ -118,7 +119,11 @@ impl TcpSocket {
/// }
/// ```
pub fn new_v4() -> io::Result<TcpSocket> {
let inner = mio::net::TcpSocket::new_v4()?;
let inner = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
Ok(TcpSocket { inner })
}

Expand Down Expand Up @@ -152,7 +157,11 @@ impl TcpSocket {
/// }
/// ```
pub fn new_v6() -> io::Result<TcpSocket> {
let inner = mio::net::TcpSocket::new_v6()?;
let inner = socket2::Socket::new(
socket2::Domain::IPV6,
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
Ok(TcpSocket { inner })
}

Expand Down Expand Up @@ -183,7 +192,7 @@ impl TcpSocket {
/// }
/// ```
pub fn set_reuseaddr(&self, reuseaddr: bool) -> io::Result<()> {
self.inner.set_reuseaddr(reuseaddr)
self.inner.set_reuse_address(reuseaddr)
}

/// Retrieves the value set for `SO_REUSEADDR` on this socket.
Expand All @@ -209,7 +218,7 @@ impl TcpSocket {
/// }
/// ```
pub fn reuseaddr(&self) -> io::Result<bool> {
self.inner.get_reuseaddr()
self.inner.reuse_address()
}

/// Allows the socket to bind to an in-use port. Only available for unix systems
Expand Down Expand Up @@ -243,7 +252,7 @@ impl TcpSocket {
doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos"))))
)]
pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> {
self.inner.set_reuseport(reuseport)
self.inner.set_reuse_port(reuseport)
}

/// Allows the socket to bind to an in-use port. Only available for unix systems
Expand Down Expand Up @@ -278,14 +287,14 @@ impl TcpSocket {
doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos"))))
)]
pub fn reuseport(&self) -> io::Result<bool> {
self.inner.get_reuseport()
self.inner.reuse_port()
}

/// Sets the size of the TCP send buffer on this socket.
///
/// On most operating systems, this sets the `SO_SNDBUF` socket option.
pub fn set_send_buffer_size(&self, size: u32) -> io::Result<()> {
self.inner.set_send_buffer_size(size)
self.inner.set_send_buffer_size(size as usize)
}

/// Returns the size of the TCP send buffer for this socket.
Expand All @@ -312,14 +321,14 @@ impl TcpSocket {
///
/// [`set_send_buffer_size`]: #method.set_send_buffer_size
pub fn send_buffer_size(&self) -> io::Result<u32> {
self.inner.get_send_buffer_size()
self.inner.send_buffer_size().map(|n| n as u32)
Comment on lines -315 to +324
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several casts in this PR. Do we risk them truncating any data here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 5 casts:

  • set_send_buffer_size,set_recv_buffer_size (cast u32 as usize): on tokio, usize is always at least 32 bits.
  • send_buffer_size,recv_buffer_size (cast usize as u32): socket2 gets these values as c_int (always i32) and then cast them to usize (1, 2).
  • listen (cast u32 as i32): It seems using cast is wrong, I'll fix this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 5326df1

}

/// Sets the size of the TCP receive buffer on this socket.
///
/// On most operating systems, this sets the `SO_RCVBUF` socket option.
pub fn set_recv_buffer_size(&self, size: u32) -> io::Result<()> {
self.inner.set_recv_buffer_size(size)
self.inner.set_recv_buffer_size(size as usize)
}

/// Returns the size of the TCP receive buffer for this socket.
Expand All @@ -346,7 +355,7 @@ impl TcpSocket {
///
/// [`set_recv_buffer_size`]: #method.set_recv_buffer_size
pub fn recv_buffer_size(&self) -> io::Result<u32> {
self.inner.get_recv_buffer_size()
self.inner.recv_buffer_size().map(|n| n as u32)
}

/// Gets the local address of this socket.
Expand All @@ -372,7 +381,9 @@ impl TcpSocket {
/// }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_localaddr()
self.inner
.local_addr()
.map(|addr| addr.as_socket().unwrap())
}

/// Binds the socket to the given address.
Expand Down Expand Up @@ -404,7 +415,7 @@ impl TcpSocket {
/// }
/// ```
pub fn bind(&self, addr: SocketAddr) -> io::Result<()> {
self.inner.bind(addr)
self.inner.bind(&addr.into())
}

/// Establishes a TCP connection with a peer at the specified socket address.
Expand Down Expand Up @@ -440,7 +451,13 @@ impl TcpSocket {
/// }
/// ```
pub async fn connect(self, addr: SocketAddr) -> io::Result<TcpStream> {
let mio = self.inner.connect(addr)?;
self.inner.connect(&addr.into())?;

#[cfg(windows)]
let mio = unsafe { mio::net::TcpStream::from_raw_socket(self.inner.into_raw_socket()) };
#[cfg(unix)]
let mio = unsafe { mio::net::TcpStream::from_raw_fd(self.inner.into_raw_fd()) };

TcpStream::connect_mio(mio).await
}

Expand Down Expand Up @@ -480,7 +497,14 @@ impl TcpSocket {
/// }
/// ```
pub fn listen(self, backlog: u32) -> io::Result<TcpListener> {
let mio = self.inner.listen(backlog)?;
let backlog = backlog.try_into().unwrap_or(i32::MAX);
self.inner.listen(backlog)?;

#[cfg(windows)]
let mio = unsafe { mio::net::TcpListener::from_raw_socket(self.inner.into_raw_socket()) };
#[cfg(unix)]
let mio = unsafe { mio::net::TcpListener::from_raw_fd(self.inner.into_raw_fd()) };

TcpListener::new(mio)
}

Expand All @@ -500,7 +524,7 @@ impl TcpSocket {
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
///
///
/// let socket2_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
///
/// let socket = TcpSocket::from_std_stream(socket2_socket.into());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about impl From<socket2::Socket> for TcpSocket?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to make a crate with version 0.x a public dependency of a crate with version 1.x.

See also api-guidelines: https://rust-lang.github.io/api-guidelines/necessities.html#public-dependencies-of-a-stable-crate-are-stable-c-stable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, once this PR is merged, adding the options that socket2 has to the TcpSocket will be easy. So the cases where code like this example is needed should be greatly reduced in the near future.

Expand All @@ -511,16 +535,12 @@ impl TcpSocket {
pub fn from_std_stream(std_stream: std::net::TcpStream) -> TcpSocket {
#[cfg(unix)]
{
use std::os::unix::io::{FromRawFd, IntoRawFd};

let raw_fd = std_stream.into_raw_fd();
unsafe { TcpSocket::from_raw_fd(raw_fd) }
}

#[cfg(windows)]
{
use std::os::windows::io::{FromRawSocket, IntoRawSocket};

let raw_socket = std_stream.into_raw_socket();
unsafe { TcpSocket::from_raw_socket(raw_socket) }
}
Expand Down Expand Up @@ -549,7 +569,7 @@ impl FromRawFd for TcpSocket {
/// The caller is responsible for ensuring that the socket is in
/// non-blocking mode.
unsafe fn from_raw_fd(fd: RawFd) -> TcpSocket {
let inner = mio::net::TcpSocket::from_raw_fd(fd);
let inner = socket2::Socket::from_raw_fd(fd);
TcpSocket { inner }
}
}
Expand Down Expand Up @@ -584,7 +604,7 @@ impl FromRawSocket for TcpSocket {
/// The caller is responsible for ensuring that the socket is in
/// non-blocking mode.
unsafe fn from_raw_socket(socket: RawSocket) -> TcpSocket {
let inner = mio::net::TcpSocket::from_raw_socket(socket);
let inner = socket2::Socket::from_raw_socket(socket);
TcpSocket { inner }
}
}
26 changes: 7 additions & 19 deletions tokio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl TcpStream {
/// // if the readiness event is a false positive.
/// match stream.try_read(&mut data) {
/// Ok(n) => {
/// println!("read {} bytes", n);
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
Expand Down Expand Up @@ -1090,9 +1090,8 @@ impl TcpStream {
/// # }
/// ```
pub fn linger(&self) -> io::Result<Option<Duration>> {
let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());

mio_socket.get_linger()
let socket = self.to_socket();
socket.linger()
}

/// Sets the linger duration of this socket by setting the SO_LINGER option.
Expand All @@ -1117,23 +1116,12 @@ impl TcpStream {
/// # }
/// ```
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());

mio_socket.set_linger(dur)
let socket = self.to_socket();
socket.set_linger(dur)
}

fn to_mio(&self) -> mio::net::TcpSocket {
#[cfg(windows)]
{
use std::os::windows::io::{AsRawSocket, FromRawSocket};
unsafe { mio::net::TcpSocket::from_raw_socket(self.as_raw_socket()) }
}

#[cfg(unix)]
{
use std::os::unix::io::{AsRawFd, FromRawFd};
unsafe { mio::net::TcpSocket::from_raw_fd(self.as_raw_fd()) }
}
fn to_socket(&self) -> socket2::SockRef<'_> {
socket2::SockRef::from(self)
}

/// Gets the value of the `IP_TTL` option for this socket.
Expand Down