Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,20 @@ macros = ["tokio-macros"]
stats = []
net = [
"libc",
"mio/net",
"mio/os-ext",
"mio/os-poll",
"socket2/all",
"mio/os-util",
"mio/tcp",
"mio/udp",
"mio/uds",
"winapi/namedpipeapi",
]
process = [
"bytes",
"once_cell",
"libc",
"mio/net",
"mio/os-ext",
"mio/os-poll",
"mio/os-util",
"mio/uds",
"signal-hook-registry",
"winapi/threadpoollegacyapiset",
]
Expand All @@ -74,9 +75,9 @@ rt-multi-thread = [
signal = [
"once_cell",
"libc",
"mio/net",
"mio/os-ext",
"mio/os-poll",
"mio/uds",
"mio/os-util",
"signal-hook-registry",
"winapi/consoleapi",
]
Expand All @@ -93,10 +94,9 @@ pin-project-lite = "0.2.0"
bytes = { version = "1.0.0", optional = true }
once_cell = { version = "1.5.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.8.0", optional = true }
mio = { version = "0.7.6", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true }
socket2 = { version = "0.4.2", optional = true }

# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
Expand Down Expand Up @@ -128,6 +128,7 @@ proptest = "1"
rand = "0.8.0"
tempfile = "3.1.0"
async-stream = "0.3"
socket2 = "0.4"

[target.'cfg(target_os = "freebsd")'.dev-dependencies]
mio-aio = { version = "0.6.0", features = ["tokio"] }
Expand Down
126 changes: 23 additions & 103 deletions tokio/src/net/tcp/socket.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::net::{TcpListener, TcpStream};

use std::convert::TryInto;
use std::fmt;
use std::io;
use std::net::SocketAddr;
Expand Down Expand Up @@ -85,7 +84,7 @@ cfg_net! {
/// [`socket2`]: https://docs.rs/socket2/
#[cfg_attr(docsrs, doc(alias = "connect_std"))]
pub struct TcpSocket {
inner: socket2::Socket,
inner: mio::net::TcpSocket,
}
}

Expand Down Expand Up @@ -120,11 +119,7 @@ impl TcpSocket {
/// }
/// ```
pub fn new_v4() -> io::Result<TcpSocket> {
let inner = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
let inner = mio::net::TcpSocket::new_v4()?;
Ok(TcpSocket { inner })
}

Expand Down Expand Up @@ -158,11 +153,7 @@ impl TcpSocket {
/// }
/// ```
pub fn new_v6() -> io::Result<TcpSocket> {
let inner = socket2::Socket::new(
socket2::Domain::IPV6,
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
let inner = mio::net::TcpSocket::new_v6()?;
Ok(TcpSocket { inner })
}

Expand Down Expand Up @@ -193,7 +184,7 @@ impl TcpSocket {
/// }
/// ```
pub fn set_reuseaddr(&self, reuseaddr: bool) -> io::Result<()> {
self.inner.set_reuse_address(reuseaddr)
self.inner.set_reuseaddr(reuseaddr)
}

/// Retrieves the value set for `SO_REUSEADDR` on this socket.
Expand All @@ -219,7 +210,7 @@ impl TcpSocket {
/// }
/// ```
pub fn reuseaddr(&self) -> io::Result<bool> {
self.inner.reuse_address()
self.inner.get_reuseaddr()
}

/// Allows the socket to bind to an in-use port. Only available for unix systems
Expand Down Expand Up @@ -253,7 +244,7 @@ impl TcpSocket {
doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos"))))
)]
pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> {
self.inner.set_reuse_port(reuseport)
self.inner.set_reuseport(reuseport)
}

/// Allows the socket to bind to an in-use port. Only available for unix systems
Expand Down Expand Up @@ -288,14 +279,14 @@ impl TcpSocket {
doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos"))))
)]
pub fn reuseport(&self) -> io::Result<bool> {
self.inner.reuse_port()
self.inner.get_reuseport()
}

/// Sets the size of the TCP send buffer on this socket.
///
/// On most operating systems, this sets the `SO_SNDBUF` socket option.
pub fn set_send_buffer_size(&self, size: u32) -> io::Result<()> {
self.inner.set_send_buffer_size(size as usize)
self.inner.set_send_buffer_size(size)
}

/// Returns the size of the TCP send buffer for this socket.
Expand All @@ -322,14 +313,14 @@ impl TcpSocket {
///
/// [`set_send_buffer_size`]: #method.set_send_buffer_size
pub fn send_buffer_size(&self) -> io::Result<u32> {
self.inner.send_buffer_size().map(|n| n as u32)
self.inner.get_send_buffer_size()
}

/// Sets the size of the TCP receive buffer on this socket.
///
/// On most operating systems, this sets the `SO_RCVBUF` socket option.
pub fn set_recv_buffer_size(&self, size: u32) -> io::Result<()> {
self.inner.set_recv_buffer_size(size as usize)
self.inner.set_recv_buffer_size(size)
}

/// Returns the size of the TCP receive buffer for this socket.
Expand All @@ -356,7 +347,7 @@ impl TcpSocket {
///
/// [`set_recv_buffer_size`]: #method.set_recv_buffer_size
pub fn recv_buffer_size(&self) -> io::Result<u32> {
self.inner.recv_buffer_size().map(|n| n as u32)
self.inner.get_recv_buffer_size()
}

/// Sets the linger duration of this socket by setting the SO_LINGER option.
Expand All @@ -378,62 +369,7 @@ impl TcpSocket {
///
/// [`set_linger`]: TcpSocket::set_linger
pub fn linger(&self) -> io::Result<Option<Duration>> {
self.inner.linger()
}

/// Gets the value of the `IP_TOS` option for this socket.
///
/// For more information about this option, see [`set_tos`].
///
/// **NOTE:** On Windows, `IP_TOS` is only supported on [Windows 8+ or
/// Windows Server 2012+.](https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options)
///
/// [`set_tos`]: Self::set_tos
// https://docs.rs/socket2/0.4.2/src/socket2/socket.rs.html#1178
#[cfg(not(any(
target_os = "fuchsia",
target_os = "redox",
target_os = "solaris",
target_os = "illumos",
)))]
#[cfg_attr(
docsrs,
doc(cfg(not(any(
target_os = "fuchsia",
target_os = "redox",
target_os = "solaris",
target_os = "illumos",
))))
)]
pub fn tos(&self) -> io::Result<u32> {
self.inner.tos()
}

/// Sets the value for the `IP_TOS` option on this socket.
///
/// This value sets the time-to-live field that is used in every packet sent
/// from this socket.
///
/// **NOTE:** On Windows, `IP_TOS` is only supported on [Windows 8+ or
/// Windows Server 2012+.](https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options)
// https://docs.rs/socket2/0.4.2/src/socket2/socket.rs.html#1178
#[cfg(not(any(
target_os = "fuchsia",
target_os = "redox",
target_os = "solaris",
target_os = "illumos",
)))]
#[cfg_attr(
docsrs,
doc(cfg(not(any(
target_os = "fuchsia",
target_os = "redox",
target_os = "solaris",
target_os = "illumos",
))))
)]
pub fn set_tos(&self, tos: u32) -> io::Result<()> {
self.inner.set_tos(tos)
self.inner.get_linger()
}

/// Gets the local address of this socket.
Expand All @@ -459,14 +395,7 @@ impl TcpSocket {
/// }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner
.local_addr()
.map(|addr| addr.as_socket().unwrap())
}

/// Returns the value of the `SO_ERROR` option.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.inner.take_error()
self.inner.get_localaddr()
}

/// Binds the socket to the given address.
Expand Down Expand Up @@ -498,7 +427,7 @@ impl TcpSocket {
/// }
/// ```
pub fn bind(&self, addr: SocketAddr) -> io::Result<()> {
self.inner.bind(&addr.into())
self.inner.bind(addr)
}

/// Establishes a TCP connection with a peer at the specified socket address.
Expand Down Expand Up @@ -534,13 +463,7 @@ impl TcpSocket {
/// }
/// ```
pub async fn connect(self, addr: SocketAddr) -> io::Result<TcpStream> {
self.inner.connect(&addr.into())?;

#[cfg(windows)]
let mio = unsafe { mio::net::TcpStream::from_raw_socket(self.inner.into_raw_socket()) };
#[cfg(unix)]
let mio = unsafe { mio::net::TcpStream::from_raw_fd(self.inner.into_raw_fd()) };

let mio = self.inner.connect(addr)?;
TcpStream::connect_mio(mio).await
}

Expand Down Expand Up @@ -580,14 +503,7 @@ impl TcpSocket {
/// }
/// ```
pub fn listen(self, backlog: u32) -> io::Result<TcpListener> {
let backlog = backlog.try_into().unwrap_or(i32::MAX);
self.inner.listen(backlog)?;

#[cfg(windows)]
let mio = unsafe { mio::net::TcpListener::from_raw_socket(self.inner.into_raw_socket()) };
#[cfg(unix)]
let mio = unsafe { mio::net::TcpListener::from_raw_fd(self.inner.into_raw_fd()) };

let mio = self.inner.listen(backlog)?;
TcpListener::new(mio)
}

Expand All @@ -607,7 +523,7 @@ impl TcpSocket {
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
///
///
/// let socket2_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
///
/// let socket = TcpSocket::from_std_stream(socket2_socket.into());
Expand All @@ -618,12 +534,16 @@ impl TcpSocket {
pub fn from_std_stream(std_stream: std::net::TcpStream) -> TcpSocket {
#[cfg(unix)]
{
use std::os::unix::io::{FromRawFd, IntoRawFd};

let raw_fd = std_stream.into_raw_fd();
unsafe { TcpSocket::from_raw_fd(raw_fd) }
}

#[cfg(windows)]
{
use std::os::windows::io::{FromRawSocket, IntoRawSocket};

let raw_socket = std_stream.into_raw_socket();
unsafe { TcpSocket::from_raw_socket(raw_socket) }
}
Expand Down Expand Up @@ -652,7 +572,7 @@ impl FromRawFd for TcpSocket {
/// The caller is responsible for ensuring that the socket is in
/// non-blocking mode.
unsafe fn from_raw_fd(fd: RawFd) -> TcpSocket {
let inner = socket2::Socket::from_raw_fd(fd);
let inner = mio::net::TcpSocket::from_raw_fd(fd);
TcpSocket { inner }
}
}
Expand Down Expand Up @@ -687,7 +607,7 @@ impl FromRawSocket for TcpSocket {
/// The caller is responsible for ensuring that the socket is in
/// non-blocking mode.
unsafe fn from_raw_socket(socket: RawSocket) -> TcpSocket {
let inner = socket2::Socket::from_raw_socket(socket);
let inner = mio::net::TcpSocket::from_raw_socket(socket);
TcpSocket { inner }
}
}
26 changes: 19 additions & 7 deletions tokio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl TcpStream {
/// // if the readiness event is a false positive.
/// match stream.try_read(&mut data) {
/// Ok(n) => {
/// println!("read {} bytes", n);
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
Expand Down Expand Up @@ -1090,8 +1090,9 @@ impl TcpStream {
/// # }
/// ```
pub fn linger(&self) -> io::Result<Option<Duration>> {
let socket = self.as_socket();
socket.linger()
let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());

mio_socket.get_linger()
}

/// Sets the linger duration of this socket by setting the SO_LINGER option.
Expand All @@ -1116,12 +1117,23 @@ impl TcpStream {
/// # }
/// ```
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
let socket = self.as_socket();
socket.set_linger(dur)
let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());

mio_socket.set_linger(dur)
}

fn as_socket(&self) -> socket2::SockRef<'_> {
socket2::SockRef::from(self)
fn to_mio(&self) -> mio::net::TcpSocket {
#[cfg(windows)]
{
use std::os::windows::io::{AsRawSocket, FromRawSocket};
unsafe { mio::net::TcpSocket::from_raw_socket(self.as_raw_socket()) }
}

#[cfg(unix)]
{
use std::os::unix::io::{AsRawFd, FromRawFd};
unsafe { mio::net::TcpSocket::from_raw_fd(self.as_raw_fd()) }
}
}

/// Gets the value of the `IP_TTL` option for this socket.
Expand Down
Loading