Skip to content

std::task - Revamp TaskBuilder API #14989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fallout from TaskBuilder changes
This commit brings code downstream of libstd up to date with the new
TaskBuilder API.
  • Loading branch information
aturon committed Jun 18, 2014
commit f4e024279154440a09b10379253ecc8528249db2
17 changes: 7 additions & 10 deletions src/librustc/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,22 +366,19 @@ fn monitor(f: proc():Send) {
#[cfg(not(rtopt))]
static STACK_SIZE: uint = 20000000; // 20MB

let mut task_builder = TaskBuilder::new().named("rustc");
let (tx, rx) = channel();
let w = io::ChanWriter::new(tx);
let mut r = io::ChanReader::new(rx);

let mut task = TaskBuilder::new().named("rustc").stderr(box w);

// FIXME: Hacks on hacks. If the env is trying to override the stack size
// then *don't* set it explicitly.
if os::getenv("RUST_MIN_STACK").is_none() {
task_builder.opts.stack_size = Some(STACK_SIZE);
task = task.stack_size(STACK_SIZE);
}

let (tx, rx) = channel();
let w = io::ChanWriter::new(tx);
let mut r = io::ChanReader::new(rx);

match task_builder.try(proc() {
io::stdio::set_stderr(box w);
f()
}) {
match task.try(f) {
Ok(()) => { /* fallthrough */ }
Err(value) => {
// Task failed without emitting a fatal diagnostic
Expand Down
10 changes: 4 additions & 6 deletions src/libsync/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ mod tests {
use std::prelude::*;
use std::comm::Empty;
use std::task;
use std::task::TaskBuilder;
use std::task::try_future;

use Arc;
use super::{Mutex, Barrier, RWLock};
Expand Down Expand Up @@ -629,17 +629,15 @@ mod tests {
let mut children = Vec::new();
for _ in range(0, 5) {
let arc3 = arc.clone();
let mut builder = TaskBuilder::new();
children.push(builder.future_result());
builder.spawn(proc() {
children.push(try_future(proc() {
let lock = arc3.read();
assert!(*lock >= 0);
});
}));
}

// Wait for children to pass their asserts
for r in children.mut_iter() {
assert!(r.recv().is_ok());
assert!(r.get_ref().is_ok());
}

// Wait for writer to finish
Expand Down
9 changes: 4 additions & 5 deletions src/libtest/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,14 +1049,13 @@ pub fn run_test(opts: &TestOpts,
if nocapture {
drop((stdout, stderr));
} else {
task.opts.stdout = Some(box stdout as Box<Writer + Send>);
task.opts.stderr = Some(box stderr as Box<Writer + Send>);
task = task.stdout(box stdout as Box<Writer + Send>);
task = task.stderr(box stderr as Box<Writer + Send>);
}
let result_future = task.future_result();
task.spawn(testfn);
let result_future = task.try_future(testfn);

let stdout = reader.read_to_end().unwrap().move_iter().collect();
let task_result = result_future.recv();
let task_result = result_future.unwrap();
let test_result = calc_result(&desc, task_result.is_ok());
monitor_ch.send((desc.clone(), test_result, stdout));
})
Expand Down
11 changes: 4 additions & 7 deletions src/test/bench/msgsend-pipes-shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ extern crate debug;
use std::comm;
use std::os;
use std::task;
use std::task::TaskBuilder;
use std::uint;

fn move_out<T>(_x: T) {}
Expand Down Expand Up @@ -64,22 +63,20 @@ fn run(args: &[String]) {
let mut worker_results = Vec::new();
for _ in range(0u, workers) {
let to_child = to_child.clone();
let mut builder = TaskBuilder::new();
worker_results.push(builder.future_result());
builder.spawn(proc() {
worker_results.push(task::try_future(proc() {
for _ in range(0u, size / workers) {
//println!("worker {:?}: sending {:?} bytes", i, num_bytes);
to_child.send(bytes(num_bytes));
}
//println!("worker {:?} exiting", i);
});
}));
}
task::spawn(proc() {
server(&from_parent, &to_parent);
});

for r in worker_results.iter() {
r.recv();
for r in worker_results.move_iter() {
r.unwrap();
}

//println!("sending stop message");
Expand Down
17 changes: 6 additions & 11 deletions src/test/bench/msgsend-pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ extern crate debug;

use std::os;
use std::task;
use std::task::TaskBuilder;
use std::uint;

fn move_out<T>(_x: T) {}
Expand Down Expand Up @@ -58,38 +57,34 @@ fn run(args: &[String]) {
let mut worker_results = Vec::new();
let from_parent = if workers == 1 {
let (to_child, from_parent) = channel();
let mut builder = TaskBuilder::new();
worker_results.push(builder.future_result());
builder.spawn(proc() {
worker_results.push(task::try_future(proc() {
for _ in range(0u, size / workers) {
//println!("worker {:?}: sending {:?} bytes", i, num_bytes);
to_child.send(bytes(num_bytes));
}
//println!("worker {:?} exiting", i);
});
}));
from_parent
} else {
let (to_child, from_parent) = channel();
for _ in range(0u, workers) {
let to_child = to_child.clone();
let mut builder = TaskBuilder::new();
worker_results.push(builder.future_result());
builder.spawn(proc() {
worker_results.push(task::try_future(proc() {
for _ in range(0u, size / workers) {
//println!("worker {:?}: sending {:?} bytes", i, num_bytes);
to_child.send(bytes(num_bytes));
}
//println!("worker {:?} exiting", i);
});
}));
}
from_parent
};
task::spawn(proc() {
server(&from_parent, &to_parent);
});

for r in worker_results.iter() {
r.recv();
for r in worker_results.move_iter() {
r.unwrap();
}

//println!("sending stop message");
Expand Down
11 changes: 4 additions & 7 deletions src/test/bench/shootout-pfib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ extern crate time;
use std::os;
use std::result::{Ok, Err};
use std::task;
use std::task::TaskBuilder;
use std::uint;

fn fib(n: int) -> int {
Expand Down Expand Up @@ -79,14 +78,12 @@ fn stress_task(id: int) {
fn stress(num_tasks: int) {
let mut results = Vec::new();
for i in range(0, num_tasks) {
let mut builder = TaskBuilder::new();
results.push(builder.future_result());
builder.spawn(proc() {
results.push(task::try_future(proc() {
stress_task(i);
});
}));
}
for r in results.iter() {
r.recv();
for r in results.move_iter() {
r.unwrap();
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/test/run-pass/issue-2190-1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use std::task::TaskBuilder;
static generations: uint = 1024+256+128+49;

fn spawn(f: proc():Send) {
let mut t = TaskBuilder::new();
t.opts.stack_size = Some(32 * 1024);
t.spawn(f);
TaskBuilder::new().stack_size(32 * 1024).spawn(f)
}

fn child_no(x: uint) -> proc():Send {
Expand Down
7 changes: 2 additions & 5 deletions src/test/run-pass/task-comm-12.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@
// except according to those terms.

use std::task;
use std::task::TaskBuilder;

pub fn main() { test00(); }

fn start(_task_number: int) { println!("Started / Finished task."); }

fn test00() {
let i: int = 0;
let mut builder = TaskBuilder::new();
let mut result = builder.future_result();
builder.spawn(proc() {
let mut result = task::try_future(proc() {
start(i)
});

Expand All @@ -31,7 +28,7 @@ fn test00() {
}

// Try joining tasks that have already finished.
result.recv();
result.unwrap();

println!("Joined task.");
}
10 changes: 4 additions & 6 deletions src/test/run-pass/task-comm-3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

extern crate debug;

use std::task::TaskBuilder;
use std::task;

pub fn main() { println!("===== WITHOUT THREADS ====="); test00(); }

Expand Down Expand Up @@ -39,14 +39,12 @@ fn test00() {
let mut results = Vec::new();
while i < number_of_tasks {
let tx = tx.clone();
let mut builder = TaskBuilder::new();
results.push(builder.future_result());
builder.spawn({
results.push(task::try_future({
let i = i;
proc() {
test00_start(&tx, i, number_of_messages)
}
});
}));
i = i + 1;
}

Expand All @@ -62,7 +60,7 @@ fn test00() {
}

// Join spawned tasks...
for r in results.iter() { r.recv(); }
for r in results.mut_iter() { r.get_ref(); }

println!("Completed: Final number is: ");
println!("{:?}", sum);
Expand Down
8 changes: 3 additions & 5 deletions src/test/run-pass/task-comm-9.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

extern crate debug;

use std::task::TaskBuilder;
use std::task;

pub fn main() { test00(); }

Expand All @@ -25,9 +25,7 @@ fn test00() {
let (tx, rx) = channel();
let number_of_messages: int = 10;

let mut builder = TaskBuilder::new();
let result = builder.future_result();
builder.spawn(proc() {
let result = task::try_future(proc() {
test00_start(&tx, number_of_messages);
});

Expand All @@ -38,7 +36,7 @@ fn test00() {
i += 1;
}

result.recv();
result.unwrap();

assert_eq!(sum, number_of_messages * (number_of_messages - 1) / 2);
}
4 changes: 2 additions & 2 deletions src/test/run-pass/task-stderr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
// except according to those terms.

use std::io::{ChanReader, ChanWriter};
use std::task::build;
use std::task::TaskBuilder;

fn main() {
let (tx, rx) = channel();
let mut reader = ChanReader::new(rx);
let stderr = ChanWriter::new(tx);

let res = build().stderr(box stderr as Box<Writer + Send>).try(proc() -> () {
let res = TaskBuilder::new().stderr(box stderr as Box<Writer + Send>).try(proc() -> () {
fail!("Hello, world!")
});
assert!(res.is_err());
Expand Down
4 changes: 1 addition & 3 deletions src/test/run-pass/tcp-stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ fn main() {
let (tx, rx) = channel();
for _ in range(0, 1000) {
let tx = tx.clone();
let mut builder = TaskBuilder::new();
builder.opts.stack_size = Some(64 * 1024);
builder.spawn(proc() {
TaskBuilder::new().stack_size(64 * 1024).spawn(proc() {
let host = addr.ip.to_str();
let port = addr.port;
match TcpStream::connect(host.as_slice(), port) {
Expand Down
7 changes: 2 additions & 5 deletions src/test/run-pass/yield.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,15 @@
// except according to those terms.

use std::task;
use std::task::TaskBuilder;

pub fn main() {
let mut builder = TaskBuilder::new();
let mut result = builder.future_result();
builder.spawn(child);
let mut result = task::try_future(child);
println!("1");
task::deschedule();
println!("2");
task::deschedule();
println!("3");
result.recv();
result.unwrap();
}

fn child() {
Expand Down
7 changes: 2 additions & 5 deletions src/test/run-pass/yield1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@
// except according to those terms.

use std::task;
use std::task::TaskBuilder;

pub fn main() {
let mut builder = TaskBuilder::new();
let mut result = builder.future_result();
builder.spawn(child);
let mut result = task::try_future(child);
println!("1");
task::deschedule();
result.recv();
result.unwrap();
}

fn child() { println!("2"); }