Skip to content

Commit fd763a5

Browse files
committed
native: clone/close_accept for win32 pipes
This commits takes a similar strategy to the previous commit to implement close_accept and clone for the native win32 pipes implementation. Closes #15595
1 parent c301db2 commit fd763a5

File tree

10 files changed

+117
-48
lines changed

10 files changed

+117
-48
lines changed

src/libnative/io/c_windows.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ extern "system" {
115115
optval: *mut libc::c_char,
116116
optlen: *mut libc::c_int) -> libc::c_int;
117117

118+
pub fn SetEvent(hEvent: libc::HANDLE) -> libc::BOOL;
119+
pub fn WaitForMultipleObjects(nCount: libc::DWORD,
120+
lpHandles: *const libc::HANDLE,
121+
bWaitAll: libc::BOOL,
122+
dwMilliseconds: libc::DWORD) -> libc::DWORD;
123+
118124
pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
119125
pub fn CancelIoEx(hFile: libc::HANDLE,
120126
lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL;

src/libnative/io/net.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::ptr;
1515
use std::rt::mutex;
1616
use std::rt::rtio;
1717
use std::rt::rtio::{IoResult, IoError};
18-
use std::sync::atomics;
18+
use std::sync::atomic;
1919

2020
use super::{retry, keep_going};
2121
use super::c;
@@ -456,7 +456,7 @@ impl TcpListener {
456456
listener: self,
457457
reader: reader,
458458
writer: writer,
459-
closed: atomics::AtomicBool::new(false),
459+
closed: atomic::AtomicBool::new(false),
460460
}),
461461
deadline: 0,
462462
})
@@ -476,7 +476,7 @@ impl TcpListener {
476476
listener: self,
477477
abort: try!(os::Event::new()),
478478
accept: accept,
479-
closed: atomics::AtomicBool::new(false),
479+
closed: atomic::AtomicBool::new(false),
480480
}),
481481
deadline: 0,
482482
})
@@ -510,15 +510,15 @@ struct AcceptorInner {
510510
listener: TcpListener,
511511
reader: FileDesc,
512512
writer: FileDesc,
513-
closed: atomics::AtomicBool,
513+
closed: atomic::AtomicBool,
514514
}
515515

516516
#[cfg(windows)]
517517
struct AcceptorInner {
518518
listener: TcpListener,
519519
abort: os::Event,
520520
accept: os::Event,
521-
closed: atomics::AtomicBool,
521+
closed: atomic::AtomicBool,
522522
}
523523

524524
impl TcpAcceptor {
@@ -542,7 +542,7 @@ impl TcpAcceptor {
542542
// self-pipe is never written to unless close_accept() is called.
543543
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
544544

545-
while !self.inner.closed.load(atomics::SeqCst) {
545+
while !self.inner.closed.load(atomic::SeqCst) {
546546
match retry(|| unsafe {
547547
libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null())
548548
}) {
@@ -581,12 +581,12 @@ impl TcpAcceptor {
581581
// stolen, so we do all of this in a loop as well.
582582
let events = [self.inner.abort.handle(), self.inner.accept.handle()];
583583

584-
while !self.inner.closed.load(atomics::SeqCst) {
584+
while !self.inner.closed.load(atomic::SeqCst) {
585585
let ms = if self.deadline == 0 {
586586
c::WSA_INFINITE as u64
587587
} else {
588588
let now = ::io::timer::now();
589-
if self.deadline < now {0} else {now - self.deadline}
589+
if self.deadline < now {0} else {self.deadline - now}
590590
};
591591
let ret = unsafe {
592592
c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
@@ -600,7 +600,6 @@ impl TcpAcceptor {
600600
c::WSA_WAIT_EVENT_0 => break,
601601
n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
602602
}
603-
println!("woke up");
604603

605604
let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
606605
let ret = unsafe {
@@ -614,7 +613,19 @@ impl TcpAcceptor {
614613
} {
615614
-1 if util::wouldblock() => {}
616615
-1 => return Err(os::last_error()),
617-
fd => return Ok(TcpStream::new(Inner::new(fd))),
616+
617+
// Accepted sockets inherit the same properties as the caller,
618+
// so we need to deregister our event and switch the socket back
619+
// to blocking mode
620+
fd => {
621+
let stream = TcpStream::new(Inner::new(fd));
622+
let ret = unsafe {
623+
c::WSAEventSelect(fd, events[1], 0)
624+
};
625+
if ret != 0 { return Err(os::last_error()) }
626+
try!(util::set_nonblocking(fd, false));
627+
return Ok(stream)
628+
}
618629
}
619630
}
620631

@@ -648,7 +659,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
648659

649660
#[cfg(unix)]
650661
fn close_accept(&mut self) -> IoResult<()> {
651-
self.inner.closed.store(true, atomics::SeqCst);
662+
self.inner.closed.store(true, atomic::SeqCst);
652663
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
653664
match fd.inner_write([0]) {
654665
Ok(..) => Ok(()),
@@ -659,7 +670,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
659670

660671
#[cfg(windows)]
661672
fn close_accept(&mut self) -> IoResult<()> {
662-
self.inner.closed.store(true, atomics::SeqCst);
673+
self.inner.closed.store(true, atomic::SeqCst);
663674
let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
664675
if ret == libc::TRUE {
665676
Ok(())

src/libnative/io/pipe_unix.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::mem;
1515
use std::rt::mutex;
1616
use std::rt::rtio;
1717
use std::rt::rtio::{IoResult, IoError};
18-
use std::sync::atomics;
18+
use std::sync::atomic;
1919

2020
use super::retry;
2121
use super::net;
@@ -239,7 +239,7 @@ impl UnixListener {
239239
listener: self,
240240
reader: reader,
241241
writer: writer,
242-
closed: atomics::AtomicBool::new(false),
242+
closed: atomic::AtomicBool::new(false),
243243
}),
244244
deadline: 0,
245245
})
@@ -267,7 +267,7 @@ struct AcceptorInner {
267267
listener: UnixListener,
268268
reader: FileDesc,
269269
writer: FileDesc,
270-
closed: atomics::AtomicBool,
270+
closed: atomic::AtomicBool,
271271
}
272272

273273
impl UnixAcceptor {
@@ -276,7 +276,7 @@ impl UnixAcceptor {
276276
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
277277
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
278278

279-
while !self.inner.closed.load(atomics::SeqCst) {
279+
while !self.inner.closed.load(atomic::SeqCst) {
280280
unsafe {
281281
let mut storage: libc::sockaddr_storage = mem::zeroed();
282282
let storagep = &mut storage as *mut libc::sockaddr_storage;
@@ -317,7 +317,7 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
317317

318318
#[cfg(unix)]
319319
fn close_accept(&mut self) -> IoResult<()> {
320-
self.inner.closed.store(true, atomics::SeqCst);
320+
self.inner.closed.store(true, atomic::SeqCst);
321321
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
322322
match fd.inner_write([0]) {
323323
Ok(..) => Ok(()),

src/libnative/io/pipe_windows.rs

Lines changed: 71 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -169,23 +169,30 @@ unsafe fn pipe(name: *const u16, init: bool) -> libc::HANDLE {
169169
}
170170

171171
pub fn await(handle: libc::HANDLE, deadline: u64,
172-
overlapped: &mut libc::OVERLAPPED) -> bool {
173-
if deadline == 0 { return true }
172+
events: &[libc::HANDLE]) -> IoResult<uint> {
173+
use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0};
174174

175175
// If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
176176
// to figure out if we should indeed get the result.
177-
let now = ::io::timer::now();
178-
let timeout = deadline < now || unsafe {
179-
let ms = (deadline - now) as libc::DWORD;
180-
let r = libc::WaitForSingleObject(overlapped.hEvent,
181-
ms);
182-
r != libc::WAIT_OBJECT_0
183-
};
184-
if timeout {
185-
unsafe { let _ = c::CancelIo(handle); }
186-
false
177+
let ms = if deadline == 0 {
178+
libc::INFINITE as u64
187179
} else {
188-
true
180+
let now = ::io::timer::now();
181+
if deadline < now {0} else {deadline - now}
182+
};
183+
let ret = unsafe {
184+
c::WaitForMultipleObjects(events.len() as libc::DWORD,
185+
events.as_ptr(),
186+
libc::FALSE,
187+
ms as libc::DWORD)
188+
};
189+
match ret {
190+
WAIT_FAILED => Err(super::last_error()),
191+
WAIT_TIMEOUT => unsafe {
192+
let _ = c::CancelIo(handle);
193+
Err(util::timeout("operation timed out"))
194+
},
195+
n => Ok((n - WAIT_OBJECT_0) as uint)
189196
}
190197
}
191198

@@ -390,8 +397,8 @@ impl rtio::RtioPipe for UnixStream {
390397
drop(guard);
391398
loop {
392399
// Process a timeout if one is pending
393-
let succeeded = await(self.handle(), self.read_deadline,
394-
&mut overlapped);
400+
let wait_succeeded = await(self.handle(), self.read_deadline,
401+
[overlapped.hEvent]);
395402

396403
let ret = unsafe {
397404
libc::GetOverlappedResult(self.handle(),
@@ -408,7 +415,7 @@ impl rtio::RtioPipe for UnixStream {
408415

409416
// If the reading half is now closed, then we're done. If we woke up
410417
// because the writing half was closed, keep trying.
411-
if !succeeded {
418+
if wait_succeeded.is_err() {
412419
return Err(util::timeout("read timed out"))
413420
}
414421
if self.read_closed() {
@@ -458,8 +465,8 @@ impl rtio::RtioPipe for UnixStream {
458465
})
459466
}
460467
// Process a timeout if one is pending
461-
let succeeded = await(self.handle(), self.write_deadline,
462-
&mut overlapped);
468+
let wait_succeeded = await(self.handle(), self.write_deadline,
469+
[overlapped.hEvent]);
463470
let ret = unsafe {
464471
libc::GetOverlappedResult(self.handle(),
465472
&mut overlapped,
@@ -473,7 +480,7 @@ impl rtio::RtioPipe for UnixStream {
473480
if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
474481
return Err(super::last_error())
475482
}
476-
if !succeeded {
483+
if !wait_succeeded.is_ok() {
477484
let amt = offset + bytes_written as uint;
478485
return if amt > 0 {
479486
Err(IoError {
@@ -577,6 +584,10 @@ impl UnixListener {
577584
listener: self,
578585
event: try!(Event::new(true, false)),
579586
deadline: 0,
587+
inner: Arc::new(AcceptorState {
588+
abort: try!(Event::new(true, false)),
589+
closed: atomic::AtomicBool::new(false),
590+
}),
580591
})
581592
}
582593
}
@@ -597,11 +608,17 @@ impl rtio::RtioUnixListener for UnixListener {
597608
}
598609

599610
pub struct UnixAcceptor {
611+
inner: Arc<AcceptorState>,
600612
listener: UnixListener,
601613
event: Event,
602614
deadline: u64,
603615
}
604616

617+
struct AcceptorState {
618+
abort: Event,
619+
closed: atomic::AtomicBool,
620+
}
621+
605622
impl UnixAcceptor {
606623
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
607624
// This function has some funky implementation details when working with
@@ -638,6 +655,10 @@ impl UnixAcceptor {
638655
// using the original server pipe.
639656
let handle = self.listener.handle;
640657

658+
// If we've had an artifical call to close_accept, be sure to never
659+
// proceed in accepting new clients in the future
660+
if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) }
661+
641662
let name = try!(to_utf16(&self.listener.name));
642663

643664
// Once we've got a "server handle", we need to wait for a client to
@@ -652,7 +673,9 @@ impl UnixAcceptor {
652673

653674
if err == libc::ERROR_IO_PENDING as libc::DWORD {
654675
// Process a timeout if one is pending
655-
let _ = await(handle, self.deadline, &mut overlapped);
676+
let wait_succeeded = await(handle, self.deadline,
677+
[self.inner.abort.handle(),
678+
overlapped.hEvent]);
656679

657680
// This will block until the overlapped I/O is completed. The
658681
// timeout was previously handled, so this will either block in
@@ -665,7 +688,11 @@ impl UnixAcceptor {
665688
libc::TRUE)
666689
};
667690
if ret == 0 {
668-
err = unsafe { libc::GetLastError() };
691+
if wait_succeeded.is_ok() {
692+
err = unsafe { libc::GetLastError() };
693+
} else {
694+
return Err(util::timeout("accept timed out"))
695+
}
669696
} else {
670697
// we succeeded, bypass the check below
671698
err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
@@ -711,11 +738,32 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
711738
}
712739

713740
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
714-
fail!()
741+
let name = to_utf16(&self.listener.name).ok().unwrap();
742+
box UnixAcceptor {
743+
inner: self.inner.clone(),
744+
event: Event::new(true, false).ok().unwrap(),
745+
deadline: 0,
746+
listener: UnixListener {
747+
name: self.listener.name.clone(),
748+
handle: unsafe {
749+
let p = pipe(name.as_ptr(), false) ;
750+
assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
751+
p
752+
},
753+
},
754+
} as Box<rtio::RtioUnixAcceptor + Send>
715755
}
716756

717757
fn close_accept(&mut self) -> IoResult<()> {
718-
fail!()
758+
self.inner.closed.store(true, atomic::SeqCst);
759+
let ret = unsafe {
760+
c::SetEvent(self.inner.abort.handle())
761+
};
762+
if ret == 0 {
763+
Err(super::last_error())
764+
} else {
765+
Ok(())
766+
}
719767
}
720768
}
721769

src/libnative/io/util.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@ pub fn await(fds: &[net::sock_t], deadline: Option<u64>,
175175
c::fd_set(&mut set, fd);
176176
max = cmp::max(max, fd + 1);
177177
}
178+
if cfg!(windows) {
179+
max = fds.len() as net::sock_t;
180+
}
178181

179182
let (read, write) = match status {
180183
Readable => (&mut set as *mut _, ptr::mut_null()),

src/librustuv/net.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ impl rtio::RtioSocket for TcpListener {
387387
}
388388

389389
impl rtio::RtioTcpListener for TcpListener {
390-
fn listen(self: Box<TcpListener>)
390+
fn listen(mut self: Box<TcpListener>)
391391
-> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> {
392392
let _m = self.fire_homing_missile();
393393

src/librustuv/pipe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl PipeListener {
245245
}
246246

247247
impl rtio::RtioUnixListener for PipeListener {
248-
fn listen(self: Box<PipeListener>)
248+
fn listen(mut self: Box<PipeListener>)
249249
-> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
250250
let _m = self.fire_homing_missile();
251251

src/libstd/io/net/tcp.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -461,8 +461,7 @@ impl TcpAcceptor {
461461
///
462462
/// ```
463463
/// # #![allow(experimental)]
464-
/// use std::io::TcpListener;
465-
/// use std::io::{Listener, Acceptor, TimedOut};
464+
/// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
466465
///
467466
/// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
468467
/// let a2 = a.clone();

0 commit comments

Comments
 (0)