Skip to content

Commit

Permalink
auto merge of rust-lang#12103 : alexcrichton/rust/unix, r=brson
Browse files Browse the repository at this point in the history
There's a few parts to this PR

* Implement unix pipes in libnative for unix platforms (thanks @Geal!)
* Implement named pipes in libnative for windows (terrible, terrible code)
* Remove `#[cfg(unix)]` from `mod unix` in `std::io::net`. This is a terrible name for what it is, but that's the topic of rust-lang#12093.

The windows implementation was significantly more complicated than I thought it would be, but it seems to be passing all the tests. now.

Closes rust-lang#11201
  • Loading branch information
bors committed Feb 18, 2014
2 parents 03c5342 + a526aa1 commit 62d7d00
Show file tree
Hide file tree
Showing 6 changed files with 928 additions and 46 deletions.
21 changes: 17 additions & 4 deletions src/libnative/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ pub mod timer;
#[path = "timer_win32.rs"]
pub mod timer;

#[cfg(unix)]
#[path = "pipe_unix.rs"]
pub mod pipe;

#[cfg(windows)]
#[path = "pipe_win32.rs"]
pub mod pipe;

mod timer_helper;

pub type IoResult<T> = Result<T, IoError>;
Expand All @@ -77,6 +85,9 @@ fn translate_error(errno: i32, detail: bool) -> IoError {
fn get_err(errno: i32) -> (io::IoErrorKind, &'static str) {
match errno {
libc::EOF => (io::EndOfFile, "end of file"),
libc::ERROR_NO_DATA => (io::BrokenPipe, "the pipe is being closed"),
libc::ERROR_FILE_NOT_FOUND => (io::FileNotFound, "file not found"),
libc::ERROR_INVALID_NAME => (io::InvalidInput, "invalid file name"),
libc::WSAECONNREFUSED => (io::ConnectionRefused, "connection refused"),
libc::WSAECONNRESET => (io::ConnectionReset, "connection reset"),
libc::WSAEACCES => (io::PermissionDenied, "permission denied"),
Expand All @@ -86,6 +97,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError {
libc::WSAECONNABORTED => (io::ConnectionAborted, "connection aborted"),
libc::WSAEADDRNOTAVAIL => (io::ConnectionRefused, "address not available"),
libc::WSAEADDRINUSE => (io::ConnectionRefused, "address in use"),
libc::ERROR_BROKEN_PIPE => (io::BrokenPipe, "the pipe has ended"),

x => {
debug!("ignoring {}: {}", x, os::last_os_error());
Expand All @@ -108,6 +120,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError {
libc::ECONNABORTED => (io::ConnectionAborted, "connection aborted"),
libc::EADDRNOTAVAIL => (io::ConnectionRefused, "address not available"),
libc::EADDRINUSE => (io::ConnectionRefused, "address in use"),
libc::ENOENT => (io::FileNotFound, "no such file or directory"),

// These two constants can have the same value on some systems, but
// different values on others, so we can't use a match clause
Expand Down Expand Up @@ -196,11 +209,11 @@ impl rtio::IoFactory for IoFactory {
fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket> {
net::UdpSocket::bind(addr).map(|u| ~u as ~RtioUdpSocket)
}
fn unix_bind(&mut self, _path: &CString) -> IoResult<~RtioUnixListener> {
Err(unimpl())
fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener> {
pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener)
}
fn unix_connect(&mut self, _path: &CString) -> IoResult<~RtioPipe> {
Err(unimpl())
fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe> {
pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe)
}
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {
Expand Down
285 changes: 285 additions & 0 deletions src/libnative/io/pipe_unix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::c_str::CString;
use std::cast;
use std::io;
use std::libc;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::unstable::intrinsics;

use super::{IoResult, retry};
use super::file::{keep_going, fd_t};

fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
-1 => Err(super::last_error()),
fd => Ok(fd)
}
}

fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> {
// the sun_path length is limited to SUN_LEN (with null)
assert!(mem::size_of::<libc::sockaddr_storage>() >=
mem::size_of::<libc::sockaddr_un>());
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
let s: &mut libc::sockaddr_un = unsafe { cast::transmute(&mut storage) };

let len = addr.len();
if len > s.sun_path.len() - 1 {
return Err(io::IoError {
kind: io::InvalidInput,
desc: "path must be smaller than SUN_LEN",
detail: None,
})
}
s.sun_family = libc::AF_UNIX as libc::sa_family_t;
for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) {
*slot = value;
}

// count the null terminator
let len = mem::size_of::<libc::sa_family_t>() + len + 1;
return Ok((storage, len));
}

fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
len: uint) -> IoResult<CString> {
match storage.ss_family as libc::c_int {
libc::AF_UNIX => {
assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
let storage: &libc::sockaddr_un = unsafe {
cast::transmute(storage)
};
unsafe {
Ok(CString::new(storage.sun_path.as_ptr(), false).clone())
}
}
_ => Err(io::standard_error(io::InvalidInput))
}
}

struct Inner {
fd: fd_t,
}

impl Drop for Inner {
fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
}

fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
let (addr, len) = if_ok!(addr_to_sockaddr_un(addr));
let inner = Inner { fd: if_ok!(unix_socket(ty)) };
let addrp = &addr as *libc::sockaddr_storage;
match retry(|| unsafe {
libc::connect(inner.fd, addrp as *libc::sockaddr,
len as libc::socklen_t)
}) {
-1 => Err(super::last_error()),
_ => Ok(inner)
}
}

fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
let (addr, len) = if_ok!(addr_to_sockaddr_un(addr));
let inner = Inner { fd: if_ok!(unix_socket(ty)) };
let addrp = &addr as *libc::sockaddr_storage;
match unsafe {
libc::bind(inner.fd, addrp as *libc::sockaddr, len as libc::socklen_t)
} {
-1 => Err(super::last_error()),
_ => Ok(inner)
}
}

////////////////////////////////////////////////////////////////////////////////
// Unix Streams
////////////////////////////////////////////////////////////////////////////////

pub struct UnixStream {
priv inner: UnsafeArc<Inner>,
}

impl UnixStream {
pub fn connect(addr: &CString) -> IoResult<UnixStream> {
connect(addr, libc::SOCK_STREAM).map(|inner| {
UnixStream { inner: UnsafeArc::new(inner) }
})
}

fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } }
}

impl rtio::RtioPipe for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let ret = retry(|| unsafe {
libc::recv(self.fd(),
buf.as_ptr() as *mut libc::c_void,
buf.len() as libc::size_t,
0) as libc::c_int
});
if ret == 0 {
Err(io::standard_error(io::EndOfFile))
} else if ret < 0 {
Err(super::last_error())
} else {
Ok(ret as uint)
}
}

fn write(&mut self, buf: &[u8]) -> IoResult<()> {
let ret = keep_going(buf, |buf, len| unsafe {
libc::send(self.fd(),
buf as *mut libc::c_void,
len as libc::size_t,
0) as i64
});
if ret < 0 {
Err(super::last_error())
} else {
Ok(())
}
}

fn clone(&self) -> ~rtio::RtioPipe {
~UnixStream { inner: self.inner.clone() } as ~rtio::RtioPipe
}
}

////////////////////////////////////////////////////////////////////////////////
// Unix Datagram
////////////////////////////////////////////////////////////////////////////////

pub struct UnixDatagram {
priv inner: UnsafeArc<Inner>,
}

impl UnixDatagram {
pub fn connect(addr: &CString) -> IoResult<UnixDatagram> {
connect(addr, libc::SOCK_DGRAM).map(|inner| {
UnixDatagram { inner: UnsafeArc::new(inner) }
})
}

pub fn bind(addr: &CString) -> IoResult<UnixDatagram> {
bind(addr, libc::SOCK_DGRAM).map(|inner| {
UnixDatagram { inner: UnsafeArc::new(inner) }
})
}

fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } }

pub fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> {
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
let storagep = &mut storage as *mut libc::sockaddr_storage;
let mut addrlen: libc::socklen_t =
mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let ret = retry(|| unsafe {
libc::recvfrom(self.fd(),
buf.as_ptr() as *mut libc::c_void,
buf.len() as libc::size_t,
0,
storagep as *mut libc::sockaddr,
&mut addrlen) as libc::c_int
});
if ret < 0 { return Err(super::last_error()) }
sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| {
Ok((ret as uint, addr))
})
}

pub fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> {
let (dst, len) = if_ok!(addr_to_sockaddr_un(dst));
let dstp = &dst as *libc::sockaddr_storage;
let ret = retry(|| unsafe {
libc::sendto(self.fd(),
buf.as_ptr() as *libc::c_void,
buf.len() as libc::size_t,
0,
dstp as *libc::sockaddr,
len as libc::socklen_t) as libc::c_int
});
match ret {
-1 => Err(super::last_error()),
n if n as uint != buf.len() => {
Err(io::IoError {
kind: io::OtherIoError,
desc: "couldn't send entire packet at once",
detail: None,
})
}
_ => Ok(())
}
}

pub fn clone(&mut self) -> UnixDatagram {
UnixDatagram { inner: self.inner.clone() }
}
}

////////////////////////////////////////////////////////////////////////////////
// Unix Listener
////////////////////////////////////////////////////////////////////////////////

pub struct UnixListener {
priv inner: Inner,
}

impl UnixListener {
pub fn bind(addr: &CString) -> IoResult<UnixListener> {
bind(addr, libc::SOCK_STREAM).map(|fd| UnixListener { inner: fd })
}

fn fd(&self) -> fd_t { self.inner.fd }

pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(super::last_error()),
_ => Ok(UnixAcceptor { listener: self })
}
}
}

impl rtio::RtioUnixListener for UnixListener {
fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> {
self.native_listen(128).map(|a| ~a as ~rtio::RtioUnixAcceptor)
}
}

pub struct UnixAcceptor {
priv listener: UnixListener,
}

impl UnixAcceptor {
fn fd(&self) -> fd_t { self.listener.fd() }

pub fn native_accept(&mut self) -> IoResult<UnixStream> {
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
let storagep = &mut storage as *mut libc::sockaddr_storage;
let size = mem::size_of::<libc::sockaddr_storage>();
let mut size = size as libc::socklen_t;
match retry(|| unsafe {
libc::accept(self.fd(),
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) {
-1 => Err(super::last_error()),
fd => Ok(UnixStream { inner: UnsafeArc::new(Inner { fd: fd }) })
}
}
}

impl rtio::RtioUnixAcceptor for UnixAcceptor {
fn accept(&mut self) -> IoResult<~rtio::RtioPipe> {
self.native_accept().map(|s| ~s as ~rtio::RtioPipe)
}
}
Loading

0 comments on commit 62d7d00

Please sign in to comment.