diff --git a/src/libstd/cell.rs b/src/libstd/cell.rs index 5a0c781fe9a87..372effad61d3c 100644 --- a/src/libstd/cell.rs +++ b/src/libstd/cell.rs @@ -13,6 +13,7 @@ #[missing_doc]; use cast::transmute_mut; +use unstable::finally::Finally; use prelude::*; /* @@ -65,18 +66,17 @@ impl Cell { /// Calls a closure with a reference to the value. pub fn with_ref(&self, op: &fn(v: &T) -> R) -> R { - let v = self.take(); - let r = op(&v); - self.put_back(v); - r + do self.with_mut_ref |ptr| { op(ptr) } } /// Calls a closure with a mutable reference to the value. pub fn with_mut_ref(&self, op: &fn(v: &mut T) -> R) -> R { - let mut v = self.take(); - let r = op(&mut v); - self.put_back(v); - r + let mut v = Some(self.take()); + do (|| { + op(v.get_mut_ref()) + }).finally { + self.put_back(v.take_unwrap()); + } } } diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 793e244bec7b9..42d59ccdf958e 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -18,7 +18,8 @@ use kinds::Send; use rt; use rt::sched::Scheduler; use rt::local::Local; -use rt::select::{Select, SelectPort}; +use rt::select::{SelectInner, SelectPortInner}; +use select::{Select, SelectPort}; use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst}; use unstable::sync::UnsafeAtomicRcBox; use util::Void; @@ -113,7 +114,9 @@ impl ChanOne { // 'do_resched' configures whether the scheduler immediately switches to // the receiving task, or leaves the sending task still running. fn try_send_inner(self, val: T, do_resched: bool) -> bool { - rtassert!(!rt::in_sched_context()); + if do_resched { + rtassert!(!rt::in_sched_context()); + } let mut this = self; let mut recvr_active = true; @@ -215,7 +218,7 @@ impl PortOne { } } -impl Select for PortOne { +impl SelectInner for PortOne { #[inline] #[cfg(not(test))] fn optimistic_check(&mut self) -> bool { unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } @@ -318,7 +321,9 @@ impl Select for PortOne { } } -impl SelectPort for PortOne { +impl Select for PortOne { } + +impl SelectPortInner for PortOne { fn recv_ready(self) -> Option { let mut this = self; let packet = this.packet(); @@ -349,6 +354,8 @@ impl SelectPort for PortOne { } } +impl SelectPort for PortOne { } + impl Peekable for PortOne { fn peek(&self) -> bool { unsafe { @@ -513,7 +520,7 @@ impl Peekable for Port { // of them, but a &Port should also be selectable so you can select2 on it // alongside a PortOne without passing the port by value in recv_ready. -impl<'self, T> Select for &'self Port { +impl<'self, T> SelectInner for &'self Port { #[inline] fn optimistic_check(&mut self) -> bool { do self.next.with_mut_ref |pone| { pone.optimistic_check() } @@ -531,7 +538,9 @@ impl<'self, T> Select for &'self Port { } } -impl Select for Port { +impl<'self, T> Select for &'self Port { } + +impl SelectInner for Port { #[inline] fn optimistic_check(&mut self) -> bool { (&*self).optimistic_check() @@ -548,7 +557,9 @@ impl Select for Port { } } -impl<'self, T> SelectPort for &'self Port { +impl Select for Port { } + +impl<'self, T> SelectPortInner for &'self Port { fn recv_ready(self) -> Option { match self.next.take().recv_ready() { Some(StreamPayload { val, next }) => { @@ -560,6 +571,8 @@ impl<'self, T> SelectPort for &'self Port { } } +impl<'self, T> SelectPort for &'self Port { } + pub struct SharedChan { // Just like Chan, but a shared AtomicOption instead of Cell priv next: UnsafeAtomicRcBox>> diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 07b4ea10b6a37..83bf34941dc70 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -488,8 +488,8 @@ impl Death { rtassert!(self.unkillable == 0); self.unkillable = 1; - // FIXME(#7544): See corresponding fixme at the callsite in task.rs. - // NB(#8192): Doesn't work with "let _ = ..." + // NB. See corresponding comment at the callsite in task.rs. + // FIXME(#8192): Doesn't work with "let _ = ..." { use util; util::ignore(group); } // Step 1. Decide if we need to collect child failures synchronously. diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 1b9f28b95fb3f..3a991e92b0b5f 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -142,8 +142,7 @@ pub mod tube; /// Simple reimplementation of core::comm pub mod comm; -/// Routines for select()ing on pipes. -pub mod select; +mod select; // FIXME #5248 shouldn't be pub /// The runtime needs to be able to put a pointer into thread-local storage. diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs index bde703af31580..19a4948af3c5a 100644 --- a/src/libstd/rt/select.rs +++ b/src/libstd/rt/select.rs @@ -8,14 +8,13 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use option::*; -// use either::{Either, Left, Right}; +//! Module for private, abstraction-leaking select traits. Wrapped in std::select. + use rt::kill::BlockedTask; use rt::sched::Scheduler; -use rt::local::Local; +use option::Option; -/// Trait for message-passing primitives that can be select()ed on. -pub trait Select { +pub trait SelectInner { // Returns true if data was available. fn optimistic_check(&mut self) -> bool; // Returns true if data was available. If so, shall also wake() the task. @@ -24,305 +23,6 @@ pub trait Select { fn unblock_from(&mut self) -> bool; } -/// Trait for message-passing primitives that can use the select2() convenience wrapper. -// (This is separate from the above trait to enable heterogeneous lists of ports -// that implement Select on different types to use select().) -pub trait SelectPort : Select { +pub trait SelectPortInner { fn recv_ready(self) -> Option; } - -/// Receive a message from any one of many ports at once. -pub fn select(ports: &mut [A]) -> uint { - if ports.is_empty() { - fail!("can't select on an empty list"); - } - - for (index, port) in ports.mut_iter().enumerate() { - if port.optimistic_check() { - return index; - } - } - - // If one of the ports already contains data when we go to block on it, we - // don't bother enqueueing on the rest of them, so we shouldn't bother - // unblocking from it either. This is just for efficiency, not correctness. - // (If not, we need to unblock from all of them. Length is a placeholder.) - let mut ready_index = ports.len(); - - let sched = Local::take::(); - do sched.deschedule_running_task_and_then |sched, task| { - let task_handles = task.make_selectable(ports.len()); - - for (index, (port, task_handle)) in - ports.mut_iter().zip(task_handles.move_iter()).enumerate() { - // If one of the ports has data by now, it will wake the handle. - if port.block_on(sched, task_handle) { - ready_index = index; - break; - } - } - } - - // Task resumes. Now unblock ourselves from all the ports we blocked on. - // If the success index wasn't reset, 'take' will just take all of them. - // Iterate in reverse so the 'earliest' index that's ready gets returned. - for (index, port) in ports.mut_slice(0, ready_index).mut_rev_iter().enumerate() { - if port.unblock_from() { - ready_index = index; - } - } - - assert!(ready_index < ports.len()); - return ready_index; -} - -/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet. - -impl <'self> Select for &'self mut Select { - fn optimistic_check(&mut self) -> bool { self.optimistic_check() } - fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { - self.block_on(sched, task) - } - fn unblock_from(&mut self) -> bool { self.unblock_from() } -} - -pub fn select2, TB, B: SelectPort>(mut a: A, mut b: B) - -> Either<(Option, B), (A, Option)> { - let result = { - let mut ports = [&mut a as &mut Select, &mut b as &mut Select]; - select(ports) - }; - match result { - 0 => Left ((a.recv_ready(), b)), - 1 => Right((a, b.recv_ready())), - x => fail!("impossible case in select2: %?", x) - } -} - -*/ - -#[cfg(test)] -mod test { - use super::*; - use option::*; - use rt::comm::*; - use rt::test::*; - use vec::*; - use comm::GenericChan; - use task; - use cell::Cell; - use iterator::{Iterator, range}; - - #[test] #[ignore(cfg(windows))] #[should_fail] - fn select_doesnt_get_trolled() { - select::>([]); - } - - /* non-blocking select tests */ - - #[cfg(test)] - fn select_helper(num_ports: uint, send_on_chans: &[uint]) { - // Unfortunately this does not actually test the block_on early-break - // codepath in select -- racing between the sender and the receiver in - // separate tasks is necessary to get around the optimistic check. - let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>())); - let mut dead_chans = ~[]; - let mut ports = ports; - for (i, chan) in chans.move_iter().enumerate() { - if send_on_chans.contains(&i) { - chan.send(()); - } else { - dead_chans.push(chan); - } - } - let ready_index = select(ports); - assert!(send_on_chans.contains(&ready_index)); - assert!(ports.swap_remove(ready_index).recv_ready().is_some()); - let _ = dead_chans; - - // Same thing with streams instead. - // FIXME(#7971): This should be in a macro but borrowck isn't smart enough. - let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>())); - let mut dead_chans = ~[]; - let mut ports = ports; - for (i, chan) in chans.move_iter().enumerate() { - if send_on_chans.contains(&i) { - chan.send(()); - } else { - dead_chans.push(chan); - } - } - let ready_index = select(ports); - assert!(send_on_chans.contains(&ready_index)); - assert!(ports.swap_remove(ready_index).recv_ready().is_some()); - let _ = dead_chans; - } - - #[test] - fn select_one() { - do run_in_newsched_task { select_helper(1, [0]) } - } - - #[test] - fn select_two() { - // NB. I would like to have a test that tests the first one that is - // ready is the one that's returned, but that can't be reliably tested - // with the randomized behaviour of optimistic_check. - do run_in_newsched_task { select_helper(2, [1]) } - do run_in_newsched_task { select_helper(2, [0]) } - do run_in_newsched_task { select_helper(2, [1,0]) } - } - - #[test] - fn select_a_lot() { - do run_in_newsched_task { select_helper(12, [7,8,9]) } - } - - #[test] - fn select_stream() { - use util; - use comm::GenericChan; - use iter::Times; - - // Sends 10 buffered packets, and uses select to retrieve them all. - // Puts the port in a different spot in the vector each time. - do run_in_newsched_task { - let (ports, _) = unzip(from_fn(10, |_| stream())); - let (port, chan) = stream(); - do 10.times { chan.send(31337); } - let mut ports = ports; - let mut port = Some(port); - let order = [5u,0,4,3,2,6,9,8,7,1]; - for &index in order.iter() { - // put the port in the vector at any index - util::swap(port.get_mut_ref(), &mut ports[index]); - assert!(select(ports) == index); - // get it back out - util::swap(port.get_mut_ref(), &mut ports[index]); - // NB. Not recv(), because optimistic_check randomly fails. - assert!(port.get_ref().recv_ready().unwrap() == 31337); - } - } - } - - #[test] - fn select_unkillable() { - do run_in_newsched_task { - do task::unkillable { select_helper(2, [1]) } - } - } - - /* blocking select tests */ - - #[test] - fn select_blocking() { - select_blocking_helper(true); - select_blocking_helper(false); - - fn select_blocking_helper(killable: bool) { - do run_in_newsched_task { - let (p1,_c) = oneshot(); - let (p2,c2) = oneshot(); - let mut ports = [p1,p2]; - - let (p3,c3) = oneshot(); - let (p4,c4) = oneshot(); - - let x = Cell::new((c2, p3, c4)); - do task::spawn { - let (c2, p3, c4) = x.take(); - p3.recv(); // handshake parent - c4.send(()); // normal receive - task::yield(); - c2.send(()); // select receive - } - - // Try to block before child sends on c2. - c3.send(()); - p4.recv(); - if killable { - assert!(select(ports) == 1); - } else { - do task::unkillable { assert!(select(ports) == 1); } - } - } - } - } - - #[test] - fn select_racing_senders() { - static NUM_CHANS: uint = 10; - - select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]); - select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]); - select_racing_senders_helper(true, ~[0,1,2]); - select_racing_senders_helper(false, ~[0,1,2]); - select_racing_senders_helper(true, ~[3,4,5,6]); - select_racing_senders_helper(false, ~[3,4,5,6]); - select_racing_senders_helper(true, ~[7,8,9]); - select_racing_senders_helper(false, ~[7,8,9]); - - fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) { - use rt::test::spawntask_random; - use iter::Times; - - do run_in_newsched_task { - // A bit of stress, since ordinarily this is just smoke and mirrors. - do 4.times { - let send_on_chans = send_on_chans.clone(); - do task::spawn { - let mut ports = ~[]; - for i in range(0u, NUM_CHANS) { - let (p,c) = oneshot(); - ports.push(p); - if send_on_chans.contains(&i) { - let c = Cell::new(c); - do spawntask_random { - task::yield(); - c.take().send(()); - } - } - } - // nondeterministic result, but should succeed - if killable { - select(ports); - } else { - do task::unkillable { select(ports); } - } - } - } - } - } - } - - #[test] #[ignore(cfg(windows))] - fn select_killed() { - do run_in_newsched_task { - let (success_p, success_c) = oneshot::(); - let success_c = Cell::new(success_c); - do task::try { - let success_c = Cell::new(success_c.take()); - do task::unkillable { - let (p,c) = oneshot(); - let c = Cell::new(c); - do task::spawn { - let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>())); - let mut ports = dead_ps; - select(ports); // should get killed; nothing should leak - c.take().send(()); // must not happen - // Make sure dead_cs doesn't get closed until after select. - let _ = dead_cs; - } - do task::spawn { - fail!(); // should kill sibling awake - } - - // wait for killed selector to close (NOT send on) its c. - // hope to send 'true'. - success_c.take().send(p.try_recv().is_none()); - } - }; - assert!(success_p.recv()); - } - } -} diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index b50e794cce0f3..c669f25d8b738 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -253,12 +253,10 @@ impl Task { } } - // FIXME(#7544): We pass the taskgroup into death so that it can be - // dropped while the unkillable counter is set. This should not be - // necessary except for an extraneous clone() in task/spawn.rs that - // causes a killhandle to get dropped, which mustn't receive a kill - // signal since we're outside of the unwinder's try() scope. - // { let _ = self.taskgroup.take(); } + // NB. We pass the taskgroup into death so that it can be dropped while + // the unkillable counter is set. This is necessary for when the + // taskgroup destruction code drops references on KillHandles, which + // might require using unkillable (to synchronize with an unwrapper). self.death.collect_failure(!self.unwinder.unwinding, self.taskgroup.take()); self.destroyed = true; } diff --git a/src/libstd/select.rs b/src/libstd/select.rs new file mode 100644 index 0000000000000..a92339e256244 --- /dev/null +++ b/src/libstd/select.rs @@ -0,0 +1,344 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use cell::Cell; +use comm; +use container::Container; +use iterator::Iterator; +use option::*; +// use either::{Either, Left, Right}; +// use rt::kill::BlockedTask; +use rt::sched::Scheduler; +use rt::select::{SelectInner, SelectPortInner}; +use rt::local::Local; +use rt::rtio::EventLoop; +use task; +use vec::{OwnedVector, MutableVector}; + +/// Trait for message-passing primitives that can be select()ed on. +pub trait Select : SelectInner { } + +/// Trait for message-passing primitives that can use the select2() convenience wrapper. +// (This is separate from the above trait to enable heterogeneous lists of ports +// that implement Select on different types to use select().) +pub trait SelectPort : SelectPortInner { } + +/// Receive a message from any one of many ports at once. Returns the index of the +/// port whose data is ready. (If multiple are ready, returns the lowest index.) +pub fn select(ports: &mut [A]) -> uint { + if ports.is_empty() { + fail!("can't select on an empty list"); + } + + for (index, port) in ports.mut_iter().enumerate() { + if port.optimistic_check() { + return index; + } + } + + // If one of the ports already contains data when we go to block on it, we + // don't bother enqueueing on the rest of them, so we shouldn't bother + // unblocking from it either. This is just for efficiency, not correctness. + // (If not, we need to unblock from all of them. Length is a placeholder.) + let mut ready_index = ports.len(); + + // XXX: We're using deschedule...and_then in an unsafe way here (see #8132), + // in that we need to continue mutating the ready_index in the environment + // after letting the task get woken up. The and_then closure needs to delay + // the task from resuming until all ports have become blocked_on. + let (p,c) = comm::oneshot(); + let p = Cell::new(p); + let c = Cell::new(c); + + let sched = Local::take::(); + do sched.deschedule_running_task_and_then |sched, task| { + let task_handles = task.make_selectable(ports.len()); + + for (index, (port, task_handle)) in + ports.mut_iter().zip(task_handles.move_iter()).enumerate() { + // If one of the ports has data by now, it will wake the handle. + if port.block_on(sched, task_handle) { + ready_index = index; + break; + } + } + + let c = Cell::new(c.take()); + do sched.event_loop.callback { c.take().send_deferred(()) } + } + + // Unkillable is necessary not because getting killed is dangerous here, + // but to force the recv not to use the same kill-flag that we used for + // selecting. Otherwise a user-sender could spuriously wakeup us here. + do task::unkillable { p.take().recv(); } + + // Task resumes. Now unblock ourselves from all the ports we blocked on. + // If the success index wasn't reset, 'take' will just take all of them. + // Iterate in reverse so the 'earliest' index that's ready gets returned. + for (index, port) in ports.mut_slice(0, ready_index).mut_rev_iter().enumerate() { + if port.unblock_from() { + ready_index = index; + } + } + + assert!(ready_index < ports.len()); + return ready_index; +} + +/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet. + +impl <'self> Select for &'self mut Select { + fn optimistic_check(&mut self) -> bool { self.optimistic_check() } + fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { + self.block_on(sched, task) + } + fn unblock_from(&mut self) -> bool { self.unblock_from() } +} + +pub fn select2, TB, B: SelectPort>(mut a: A, mut b: B) + -> Either<(Option, B), (A, Option)> { + let result = { + let mut ports = [&mut a as &mut Select, &mut b as &mut Select]; + select(ports) + }; + match result { + 0 => Left ((a.recv_ready(), b)), + 1 => Right((a, b.recv_ready())), + x => fail!("impossible case in select2: %?", x) + } +} + +*/ + +#[cfg(test)] +mod test { + use super::*; + use clone::Clone; + use iter::Times; + use option::*; + use rt::comm::*; + use rt::test::*; + use vec::*; + use comm::GenericChan; + use task; + use cell::Cell; + use iterator::{Iterator, range}; + + #[test] #[ignore(cfg(windows))] #[should_fail] + fn select_doesnt_get_trolled() { + select::>([]); + } + + /* non-blocking select tests */ + + #[cfg(test)] + fn select_helper(num_ports: uint, send_on_chans: &[uint]) { + // Unfortunately this does not actually test the block_on early-break + // codepath in select -- racing between the sender and the receiver in + // separate tasks is necessary to get around the optimistic check. + let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>())); + let mut dead_chans = ~[]; + let mut ports = ports; + for (i, chan) in chans.move_iter().enumerate() { + if send_on_chans.contains(&i) { + chan.send(()); + } else { + dead_chans.push(chan); + } + } + let ready_index = select(ports); + assert!(send_on_chans.contains(&ready_index)); + assert!(ports.swap_remove(ready_index).recv_ready().is_some()); + let _ = dead_chans; + + // Same thing with streams instead. + // FIXME(#7971): This should be in a macro but borrowck isn't smart enough. + let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>())); + let mut dead_chans = ~[]; + let mut ports = ports; + for (i, chan) in chans.move_iter().enumerate() { + if send_on_chans.contains(&i) { + chan.send(()); + } else { + dead_chans.push(chan); + } + } + let ready_index = select(ports); + assert!(send_on_chans.contains(&ready_index)); + assert!(ports.swap_remove(ready_index).recv_ready().is_some()); + let _ = dead_chans; + } + + #[test] + fn select_one() { + do run_in_newsched_task { select_helper(1, [0]) } + } + + #[test] + fn select_two() { + // NB. I would like to have a test that tests the first one that is + // ready is the one that's returned, but that can't be reliably tested + // with the randomized behaviour of optimistic_check. + do run_in_newsched_task { select_helper(2, [1]) } + do run_in_newsched_task { select_helper(2, [0]) } + do run_in_newsched_task { select_helper(2, [1,0]) } + } + + #[test] + fn select_a_lot() { + do run_in_newsched_task { select_helper(12, [7,8,9]) } + } + + #[test] + fn select_stream() { + use util; + use comm::GenericChan; + + // Sends 10 buffered packets, and uses select to retrieve them all. + // Puts the port in a different spot in the vector each time. + do run_in_newsched_task { + let (ports, _) = unzip(from_fn(10, |_| stream())); + let (port, chan) = stream(); + do 10.times { chan.send(31337); } + let mut ports = ports; + let mut port = Some(port); + let order = [5u,0,4,3,2,6,9,8,7,1]; + for &index in order.iter() { + // put the port in the vector at any index + util::swap(port.get_mut_ref(), &mut ports[index]); + assert!(select(ports) == index); + // get it back out + util::swap(port.get_mut_ref(), &mut ports[index]); + // NB. Not recv(), because optimistic_check randomly fails. + assert!(port.get_ref().recv_ready().unwrap() == 31337); + } + } + } + + #[test] + fn select_unkillable() { + do run_in_newsched_task { + do task::unkillable { select_helper(2, [1]) } + } + } + + /* blocking select tests */ + + #[test] + fn select_blocking() { + select_blocking_helper(true); + select_blocking_helper(false); + + fn select_blocking_helper(killable: bool) { + do run_in_newsched_task { + let (p1,_c) = oneshot(); + let (p2,c2) = oneshot(); + let mut ports = [p1,p2]; + + let (p3,c3) = oneshot(); + let (p4,c4) = oneshot(); + + let x = Cell::new((c2, p3, c4)); + do task::spawn { + let (c2, p3, c4) = x.take(); + p3.recv(); // handshake parent + c4.send(()); // normal receive + task::yield(); + c2.send(()); // select receive + } + + // Try to block before child sends on c2. + c3.send(()); + p4.recv(); + if killable { + assert!(select(ports) == 1); + } else { + do task::unkillable { assert!(select(ports) == 1); } + } + } + } + } + + #[test] + fn select_racing_senders() { + static NUM_CHANS: uint = 10; + + select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]); + select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]); + select_racing_senders_helper(true, ~[0,1,2]); + select_racing_senders_helper(false, ~[0,1,2]); + select_racing_senders_helper(true, ~[3,4,5,6]); + select_racing_senders_helper(false, ~[3,4,5,6]); + select_racing_senders_helper(true, ~[7,8,9]); + select_racing_senders_helper(false, ~[7,8,9]); + + fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) { + use rt::test::spawntask_random; + + do run_in_newsched_task { + // A bit of stress, since ordinarily this is just smoke and mirrors. + do 4.times { + let send_on_chans = send_on_chans.clone(); + do task::spawn { + let mut ports = ~[]; + for i in range(0u, NUM_CHANS) { + let (p,c) = oneshot(); + ports.push(p); + if send_on_chans.contains(&i) { + let c = Cell::new(c); + do spawntask_random { + task::yield(); + c.take().send(()); + } + } + } + // nondeterministic result, but should succeed + if killable { + select(ports); + } else { + do task::unkillable { select(ports); } + } + } + } + } + } + } + + #[test] #[ignore(cfg(windows))] + fn select_killed() { + do run_in_newsched_task { + let (success_p, success_c) = oneshot::(); + let success_c = Cell::new(success_c); + do task::try { + let success_c = Cell::new(success_c.take()); + do task::unkillable { + let (p,c) = oneshot(); + let c = Cell::new(c); + do task::spawn { + let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>())); + let mut ports = dead_ps; + select(ports); // should get killed; nothing should leak + c.take().send(()); // must not happen + // Make sure dead_cs doesn't get closed until after select. + let _ = dead_cs; + } + do task::spawn { + fail!(); // should kill sibling awake + } + + // wait for killed selector to close (NOT send on) its c. + // hope to send 'true'. + success_c.take().send(p.try_recv().is_none()); + } + }; + assert!(success_p.recv()); + } + } +} diff --git a/src/libstd/std.rs b/src/libstd/std.rs index aa0bb905e9a68..c4bd0a6d04354 100644 --- a/src/libstd/std.rs +++ b/src/libstd/std.rs @@ -164,6 +164,7 @@ pub mod trie; pub mod task; pub mod comm; +pub mod select; pub mod local_data; diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 09431c05e2251..c38e6f233130b 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -38,7 +38,6 @@ use prelude::*; use cell::Cell; -use cmp::Eq; use comm::{stream, Chan, GenericChan, GenericPort, Port}; use result::Result; use result; diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 10bac9325ab43..e0efc14a8871f 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -22,10 +22,9 @@ * * A new one of these is created each spawn_linked or spawn_supervised. * - * (2) The "tcb" is a per-task control structure that tracks a task's spawn - * configuration. It contains a reference to its taskgroup_arc, a - * reference to its node in the ancestor list (below), a flag for - * whether it's part of the 'main'/'root' taskgroup, and an optionally + * (2) The "taskgroup" is a per-task control structure that tracks a task's + * spawn configuration. It contains a reference to its taskgroup_arc, a + * reference to its node in the ancestor list (below), and an optionally * configured notification port. These are stored in TLS. * * (3) The "ancestor_list" is a cons-style list of unsafe::exclusives which @@ -84,7 +83,6 @@ use local_data; use task::{Failure, SingleThreaded}; use task::{Success, TaskOpts, TaskResult}; use task::unkillable; -use to_bytes::IterBytes; use uint; use util; use unstable::sync::Exclusive; @@ -101,29 +99,7 @@ use rt::work_queue::WorkQueue; #[cfg(test)] use comm; #[cfg(test)] use task; -// Transitionary. -#[deriving(Eq)] -enum TaskHandle { - NewTask(KillHandle), -} - -impl Clone for TaskHandle { - fn clone(&self) -> TaskHandle { - match *self { - NewTask(ref x) => NewTask(x.clone()), - } - } -} - -impl IterBytes for TaskHandle { - fn iter_bytes(&self, lsb0: bool, f: &fn(buf: &[u8]) -> bool) -> bool { - match *self { - NewTask(ref x) => x.iter_bytes(lsb0, f), - } - } -} - -struct TaskSet(HashSet); +struct TaskSet(HashSet); impl TaskSet { #[inline] @@ -131,17 +107,17 @@ impl TaskSet { TaskSet(HashSet::new()) } #[inline] - fn insert(&mut self, task: TaskHandle) { + fn insert(&mut self, task: KillHandle) { let didnt_overwrite = (**self).insert(task); assert!(didnt_overwrite); } #[inline] - fn remove(&mut self, task: &TaskHandle) { + fn remove(&mut self, task: &KillHandle) { let was_present = (**self).remove(task); assert!(was_present); } #[inline] - fn move_iter(self) -> HashSetMoveIterator { + fn move_iter(self) -> HashSetMoveIterator { (*self).move_iter() } } @@ -291,7 +267,7 @@ fn each_ancestor(list: &mut AncestorList, None => nobe_is_dead }; // Call iterator block. (If the group is dead, it's - // safe to skip it. This will leave our TaskHandle + // safe to skip it. This will leave our KillHandle // hanging around in the group even after it's freed, // but that's ok because, by virtue of the group being // dead, nobody will ever kill-all (for) over it.) @@ -338,7 +314,6 @@ pub struct Taskgroup { tasks: TaskGroupArc, // 'none' means the group has failed. // Lists of tasks who will kill us if they fail, but whom we won't kill. ancestors: AncestorList, - is_main: bool, notifier: Option, } @@ -355,14 +330,18 @@ impl Drop for Taskgroup { for x in this.notifier.mut_iter() { x.failed = true; } - // Take everybody down with us. - do access_group(&self.tasks) |tg| { - kill_taskgroup(tg, &me, self.is_main); - } + // Take everybody down with us. After this point, every + // other task in the group will see 'tg' as none, which + // indicates the whole taskgroup is failing (and forbids + // new spawns from succeeding). + let tg = do access_group(&self.tasks) |tg| { tg.take() }; + // It's safe to send kill signals outside the lock, because + // we have a refcount on all kill-handles in the group. + kill_taskgroup(tg, me); } else { // Remove ourselves from the group(s). do access_group(&self.tasks) |tg| { - leave_taskgroup(tg, &me, true); + leave_taskgroup(tg, me, true); } } // It doesn't matter whether this happens before or after dealing @@ -370,7 +349,7 @@ impl Drop for Taskgroup { // We remove ourself from every ancestor we can, so no cleanup; no // break. do each_ancestor(&mut this.ancestors, |_| {}) |ancestor_group| { - leave_taskgroup(ancestor_group, &me, false); + leave_taskgroup(ancestor_group, me, false); true }; } @@ -380,7 +359,6 @@ impl Drop for Taskgroup { pub fn Taskgroup(tasks: TaskGroupArc, ancestors: AncestorList, - is_main: bool, mut notifier: Option) -> Taskgroup { for x in notifier.mut_iter() { x.failed = false; @@ -389,7 +367,6 @@ pub fn Taskgroup(tasks: TaskGroupArc, Taskgroup { tasks: tasks, ancestors: ancestors, - is_main: is_main, notifier: notifier } } @@ -413,7 +390,7 @@ fn AutoNotify(chan: Chan) -> AutoNotify { } } -fn enlist_in_taskgroup(state: TaskGroupInner, me: TaskHandle, +fn enlist_in_taskgroup(state: TaskGroupInner, me: KillHandle, is_member: bool) -> bool { let me = Cell::new(me); // :( // If 'None', the group was failing. Can't enlist. @@ -428,8 +405,7 @@ fn enlist_in_taskgroup(state: TaskGroupInner, me: TaskHandle, } // NB: Runs in destructor/post-exit context. Can't 'fail'. -fn leave_taskgroup(state: TaskGroupInner, me: &TaskHandle, - is_member: bool) { +fn leave_taskgroup(state: TaskGroupInner, me: &KillHandle, is_member: bool) { let me = Cell::new(me); // :( // If 'None', already failing and we've already gotten a kill signal. do state.map_mut |group| { @@ -442,43 +418,23 @@ fn leave_taskgroup(state: TaskGroupInner, me: &TaskHandle, } // NB: Runs in destructor/post-exit context. Can't 'fail'. -fn kill_taskgroup(state: TaskGroupInner, me: &TaskHandle, is_main: bool) { - unsafe { - // NB: We could do the killing iteration outside of the group arc, by - // having "let mut newstate" here, swapping inside, and iterating - // after. But that would let other exiting tasks fall-through and exit - // while we were trying to kill them, causing potential - // use-after-free. A task's presence in the arc guarantees it's alive - // only while we hold the lock, so if we're failing, all concurrently - // exiting tasks must wait for us. To do it differently, we'd have to - // use the runtime's task refcounting, but that could leave task - // structs around long after their task exited. - let newstate = util::replace(state, None); - // Might already be None, if Somebody is failing simultaneously. - // That's ok; only one task needs to do the dirty work. (Might also - // see 'None' if Somebody already failed and we got a kill signal.) - if newstate.is_some() { - let TaskGroupData { members: members, descendants: descendants } = - newstate.unwrap(); - for sibling in members.move_iter() { - // Skip self - killing ourself won't do much good. - if &sibling != me { - RuntimeGlue::kill_task(sibling); - } - } - for child in descendants.move_iter() { - assert!(&child != me); - RuntimeGlue::kill_task(child); +fn kill_taskgroup(state: Option, me: &KillHandle) { + // Might already be None, if somebody is failing simultaneously. + // That's ok; only one task needs to do the dirty work. (Might also + // see 'None' if somebody already failed and we got a kill signal.) + do state.map_move |TaskGroupData { members: members, descendants: descendants }| { + for sibling in members.move_iter() { + // Skip self - killing ourself won't do much good. + if &sibling != me { + RuntimeGlue::kill_task(sibling); } - // Only one task should ever do this. - if is_main { - RuntimeGlue::kill_all_tasks(me); - } - // Do NOT restore state to Some(..)! It stays None to indicate - // that the whole taskgroup is failing, to forbid new spawns. } - // (note: multiple tasks may reach this point) - } + for child in descendants.move_iter() { + assert!(&child != me); + RuntimeGlue::kill_task(child); + } + }; + // (note: multiple tasks may reach this point) } // FIXME (#2912): Work around core-vs-coretest function duplication. Can't use @@ -490,38 +446,23 @@ fn taskgroup_key() -> local_data::Key<@@mut Taskgroup> { // Transitionary. struct RuntimeGlue; impl RuntimeGlue { - unsafe fn kill_task(task: TaskHandle) { - match task { - NewTask(handle) => { - let mut handle = handle; - do handle.kill().map_move |killed_task| { - let killed_task = Cell::new(killed_task); - do Local::borrow:: |sched| { - sched.enqueue_task(killed_task.take()); - } - }; + fn kill_task(handle: KillHandle) { + let mut handle = handle; + do handle.kill().map_move |killed_task| { + let killed_task = Cell::new(killed_task); + do Local::borrow:: |sched| { + sched.enqueue_task(killed_task.take()); } - } - } - - unsafe fn kill_all_tasks(task: &TaskHandle) { - match *task { - // FIXME(#7544): Remove the kill_all feature entirely once the - // oldsched goes away. - NewTask(ref _handle) => rtabort!("can't kill_all in newsched"), - } + }; } - fn with_task_handle_and_failing(blk: &fn(TaskHandle, bool)) { + fn with_task_handle_and_failing(blk: &fn(&KillHandle, bool)) { if in_green_task_context() { unsafe { // Can't use safe borrow, because the taskgroup destructor needs to // access the scheduler again to send kill signals to other tasks. let me = Local::unsafe_borrow::(); - // FIXME(#7544): Get rid of this clone by passing by-ref. - // Will probably have to wait until the old rt is gone. - blk(NewTask((*me).death.kill_handle.get_ref().clone()), - (*me).unwinder.unwinding) + blk((*me).death.kill_handle.get_ref(), (*me).unwinder.unwinding) } } else { rtabort!("task dying in bad context") @@ -540,15 +481,12 @@ impl RuntimeGlue { // Lazily initialize. let mut members = TaskSet::new(); let my_handle = (*me).death.kill_handle.get_ref().clone(); - members.insert(NewTask(my_handle)); + members.insert(my_handle); let tasks = Exclusive::new(Some(TaskGroupData { members: members, descendants: TaskSet::new(), })); - // FIXME(#7544): Remove the is_main flag entirely once - // the newsched goes away. The main taskgroup has no special - // behaviour. - let group = Taskgroup(tasks, AncestorList(None), false, None); + let group = Taskgroup(tasks, AncestorList(None), None); (*me).taskgroup = Some(group); (*me).taskgroup.get_ref() } @@ -563,9 +501,7 @@ impl RuntimeGlue { // Returns 'None' in the case where the child's TG should be lazily initialized. fn gen_child_taskgroup(linked: bool, supervised: bool) - -> Option<(TaskGroupArc, AncestorList, bool)> { - // FIXME(#7544): Not safe to lazily initialize in the old runtime. Remove - // this context check once 'spawn_raw_oldsched' is gone. + -> Option<(TaskGroupArc, AncestorList)> { if linked || supervised { // with_my_taskgroup will lazily initialize the parent's taskgroup if // it doesn't yet exist. We don't want to call it in the unlinked case. @@ -574,8 +510,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool) if linked { // Child is in the same group as spawner. // Child's ancestors are spawner's ancestors. - // Propagate main-ness. - Some((spawner_group.tasks.clone(), ancestors, spawner_group.is_main)) + Some((spawner_group.tasks.clone(), ancestors)) } else { // Child is in a separate group from spawner. let g = Exclusive::new(Some(TaskGroupData { @@ -596,7 +531,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool) // Child has no ancestors. AncestorList(None) }; - Some((g, a, false)) + Some((g, a)) } } } else { @@ -607,7 +542,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool) // Set up membership in taskgroup and descendantship in all ancestor // groups. If any enlistment fails, Some task was already failing, so // don't let the child task run, and undo every successful enlistment. -fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc, +fn enlist_many(child: &KillHandle, child_arc: &TaskGroupArc, ancestors: &mut AncestorList) -> bool { // Join this taskgroup. let mut result = do access_group(child_arc) |child_tg| { @@ -615,7 +550,7 @@ fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc, }; if result { // Unwinding function in case any ancestral enlisting fails - let bail: &fn(TaskGroupInner) = |tg| { leave_taskgroup(tg, &child, false) }; + let bail: &fn(TaskGroupInner) = |tg| { leave_taskgroup(tg, child, false) }; // Attempt to join every ancestor group. result = do each_ancestor(ancestors, bail) |ancestor_tg| { // Enlist as a descendant, not as an actual member. @@ -625,7 +560,7 @@ fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc, // If any ancestor group fails, need to exit this group too. if !result { do access_group(child_arc) |child_tg| { - leave_taskgroup(child_tg, &child, true); // member + leave_taskgroup(child_tg, child, true); // member } } } @@ -653,15 +588,14 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { let enlist_success = do child_data.take().map_move_default(true) |child_data| { let child_data = Cell::new(child_data); // :( do Local::borrow:: |me| { - let (child_tg, ancestors, is_main) = child_data.take(); + let (child_tg, ancestors) = child_data.take(); let mut ancestors = ancestors; - // FIXME(#7544): Optimize out the xadd in this clone, somehow. - let handle = me.death.kill_handle.get_ref().clone(); + let handle = me.death.kill_handle.get_ref(); // Atomically try to get into all of our taskgroups. - if enlist_many(NewTask(handle), &child_tg, &mut ancestors) { + if enlist_many(handle, &child_tg, &mut ancestors) { // Got in. We can run the provided child body, and can also run // the taskgroup's exit-time-destructor afterward. - me.taskgroup = Some(Taskgroup(child_tg, ancestors, is_main, None)); + me.taskgroup = Some(Taskgroup(child_tg, ancestors, None)); true } else { false @@ -678,14 +612,14 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { } }; - let mut task = unsafe { - if opts.sched.mode != SingleThreaded { - if opts.watched { - Task::build_child(opts.stack_size, child_wrapper) - } else { - Task::build_root(opts.stack_size, child_wrapper) - } + let mut task = if opts.sched.mode != SingleThreaded { + if opts.watched { + Task::build_child(opts.stack_size, child_wrapper) } else { + Task::build_root(opts.stack_size, child_wrapper) + } + } else { + unsafe { // Creating a 1:1 task:thread ... let sched = Local::unsafe_borrow::(); let sched_handle = (*sched).make_handle(); diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index a9dded41683a9..adbf9fc757819 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -229,20 +229,22 @@ impl Drop for UnsafeAtomicRcBox{ if self.data.is_null() { return; // Happens when destructing an unwrapper's handle. } - do task::unkillable { - let mut data: ~AtomicRcBoxData = cast::transmute(self.data); - // Must be acquire+release, not just release, to make sure this - // doesn't get reordered to after the unwrapper pointer load. - let old_count = data.count.fetch_sub(1, SeqCst); - assert!(old_count >= 1); - if old_count == 1 { - // Were we really last, or should we hand off to an - // unwrapper? It's safe to not xchg because the unwrapper - // will set the unwrap lock *before* dropping his/her - // reference. In effect, being here means we're the only - // *awake* task with the data. - match data.unwrapper.take(Acquire) { - Some(~(message,response)) => { + let mut data: ~AtomicRcBoxData = cast::transmute(self.data); + // Must be acquire+release, not just release, to make sure this + // doesn't get reordered to after the unwrapper pointer load. + let old_count = data.count.fetch_sub(1, SeqCst); + assert!(old_count >= 1); + if old_count == 1 { + // Were we really last, or should we hand off to an + // unwrapper? It's safe to not xchg because the unwrapper + // will set the unwrap lock *before* dropping his/her + // reference. In effect, being here means we're the only + // *awake* task with the data. + match data.unwrapper.take(Acquire) { + Some(~(message,response)) => { + let cell = Cell::new((message, response, data)); + do task::unkillable { + let (message, response, data) = cell.take(); // Send 'ready' and wait for a response. message.send(()); // Unkillable wait. Message guaranteed to come. @@ -253,13 +255,13 @@ impl Drop for UnsafeAtomicRcBox{ // Other task was killed. drop glue takes over. } } - None => { - // drop glue takes over. - } } - } else { - cast::forget(data); + None => { + // drop glue takes over. + } } + } else { + cast::forget(data); } } }