Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling and Task Safety Improvements #753

Closed
wants to merge 11 commits into from
4 changes: 4 additions & 0 deletions src/comp/back/upcall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type upcalls =
vec_append: ValueRef,
get_type_desc: ValueRef,
new_task: ValueRef,
take_task: ValueRef,
drop_task: ValueRef,
start_task: ValueRef,
ivec_resize: ValueRef,
ivec_spill: ValueRef,
Expand Down Expand Up @@ -129,6 +131,8 @@ fn declare_upcalls(tn: type_names, tydesc_type: TypeRef,
~[T_ptr(T_nil()), T_size_t(), T_size_t(), T_size_t(),
T_ptr(T_ptr(tydesc_type))], T_ptr(tydesc_type)),
new_task: d("new_task", ~[T_ptr(T_str())], taskptr_type),
take_task: dv("take_task", ~[taskptr_type]),
drop_task: dv("drop_task", ~[taskptr_type]),
start_task:
d("start_task", ~[taskptr_type, T_int(), T_int(), T_size_t()],
taskptr_type),
Expand Down
15 changes: 13 additions & 2 deletions src/comp/middle/trans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,13 @@ fn make_copy_glue(cx: &@block_ctxt, v: ValueRef, t: &ty::t) {
// NB: v is an *alias* of type t here, not a direct value.

let bcx;
if ty::type_is_boxed(bcx_tcx(cx), t) {

if ty::type_is_task(bcx_tcx(cx), t) {
let task_ptr = cx.build.Load(v);
cx.build.Call(bcx_ccx(cx).upcalls.take_task,
~[cx.fcx.lltaskptr, task_ptr]);
bcx = cx;
} else if ty::type_is_boxed(bcx_tcx(cx), t) {
bcx = incr_refcnt_of_boxed(cx, cx.build.Load(v)).bcx;
} else if (ty::type_is_structural(bcx_tcx(cx), t)) {
bcx = duplicate_heap_parts_if_necessary(cx, v, t).bcx;
Expand Down Expand Up @@ -1381,7 +1387,12 @@ fn make_drop_glue(cx: &@block_ctxt, v0: ValueRef, t: &ty::t) {
ty::ty_box(_) { decr_refcnt_maybe_free(cx, v0, v0, t) }
ty::ty_port(_) { decr_refcnt_maybe_free(cx, v0, v0, t) }
ty::ty_chan(_) { decr_refcnt_maybe_free(cx, v0, v0, t) }
ty::ty_task. { decr_refcnt_maybe_free(cx, v0, v0, t) }
ty::ty_task. {
let task_ptr = cx.build.Load(v0);
{bcx: cx,
val: cx.build.Call(bcx_ccx(cx).upcalls.drop_task,
~[cx.fcx.lltaskptr, task_ptr])}
}
ty::ty_obj(_) {
let box_cell =
cx.build.GEP(v0, ~[C_int(0), C_int(abi::obj_field_box)]);
Expand Down
5 changes: 5 additions & 0 deletions src/comp/middle/ty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ export type_is_bot;
export type_is_box;
export type_is_boxed;
export type_is_chan;
export type_is_task;
export type_is_fp;
export type_is_integral;
export type_is_native;
Expand Down Expand Up @@ -839,6 +840,10 @@ fn type_is_chan(cx: &ctxt, ty: &t) -> bool {
alt struct(cx, ty) { ty_chan(_) { ret true; } _ { ret false; } }
}

fn type_is_task(cx: &ctxt, ty: &t) -> bool {
alt struct(cx, ty) { ty_task. { ret true; } _ { ret false; } }
}

fn type_is_structural(cx: &ctxt, ty: &t) -> bool {
alt struct(cx, ty) {
ty_rec(_) { ret true; }
Expand Down
6 changes: 6 additions & 0 deletions src/lib/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ native "rust" mod rustrt {
fn clone_chan(c: *rust_chan) -> *rust_chan;

type rust_chan;

fn set_min_stack(stack_size: uint);
}

/**
Expand Down Expand Up @@ -40,6 +42,10 @@ fn send[T](c: chan[T], v: &T) { c <| v; }

fn recv[T](p: port[T]) -> T { let v; p |> v; v }

fn set_min_stack(stack_size : uint) {
rustrt::set_min_stack(stack_size);
}

// Local Variables:
// mode: rust;
// fill-column: 78;
Expand Down
86 changes: 43 additions & 43 deletions src/rt/circular_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,33 @@
#include "rust_internal.h"

circular_buffer::circular_buffer(rust_kernel *kernel, size_t unit_sz) :
sched(kernel->sched),
kernel(kernel),
unit_sz(unit_sz),
_buffer_sz(initial_size()),
_next(0),
_unread(0),
_buffer((uint8_t *)kernel->malloc(_buffer_sz, "circular_buffer")) {

A(sched, unit_sz, "Unit size must be larger than zero.");
// A(sched, unit_sz, "Unit size must be larger than zero.");

DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)"
"-> circular_buffer=0x%" PRIxPTR,
_buffer_sz, _unread, this);
// DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)"
// "-> circular_buffer=0x%" PRIxPTR,
// _buffer_sz, _unread, this);

A(sched, _buffer, "Failed to allocate buffer.");
// A(sched, _buffer, "Failed to allocate buffer.");
}

circular_buffer::~circular_buffer() {
DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this);
I(sched, _buffer);
W(sched, _unread == 0,
"freeing circular_buffer with %d unread bytes", _unread);
// DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this);
// I(sched, _buffer);
// W(sched, _unread == 0,
// "freeing circular_buffer with %d unread bytes", _unread);
kernel->free(_buffer);
}

size_t
circular_buffer::initial_size() {
I(sched, unit_sz > 0);
// I(sched, unit_sz > 0);
return INITIAL_CIRCULAR_BUFFER_SIZE_IN_UNITS * unit_sz;
}

Expand All @@ -41,8 +40,8 @@ circular_buffer::initial_size() {
*/
void
circular_buffer::transfer(void *dst) {
I(sched, dst);
I(sched, _unread <= _buffer_sz);
// I(sched, dst);
// I(sched, _unread <= _buffer_sz);

uint8_t *ptr = (uint8_t *) dst;

Expand All @@ -54,13 +53,13 @@ circular_buffer::transfer(void *dst) {
} else {
head_sz = _buffer_sz - _next;
}
I(sched, _next + head_sz <= _buffer_sz);
// I(sched, _next + head_sz <= _buffer_sz);
memcpy(ptr, _buffer + _next, head_sz);

// Then copy any other items from the beginning of the buffer
I(sched, _unread >= head_sz);
// I(sched, _unread >= head_sz);
size_t tail_sz = _unread - head_sz;
I(sched, head_sz + tail_sz <= _buffer_sz);
// I(sched, head_sz + tail_sz <= _buffer_sz);
memcpy(ptr + head_sz, _buffer, tail_sz);
}

Expand All @@ -70,37 +69,37 @@ circular_buffer::transfer(void *dst) {
*/
void
circular_buffer::enqueue(void *src) {
I(sched, src);
I(sched, _unread <= _buffer_sz);
I(sched, _buffer);
// I(sched, src);
// I(sched, _unread <= _buffer_sz);
// I(sched, _buffer);

// Grow if necessary.
if (_unread == _buffer_sz) {
grow();
}

DLOG(sched, mem, "circular_buffer enqueue "
"unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
_unread, _next, _buffer_sz, unit_sz);
// DLOG(sched, mem, "circular_buffer enqueue "
// "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
// _unread, _next, _buffer_sz, unit_sz);

I(sched, _unread < _buffer_sz);
I(sched, _unread + unit_sz <= _buffer_sz);
// I(sched, _unread < _buffer_sz);
// I(sched, _unread + unit_sz <= _buffer_sz);

// Copy data
size_t dst_idx = _next + _unread;
I(sched, dst_idx >= _buffer_sz || dst_idx + unit_sz <= _buffer_sz);
if (dst_idx >= _buffer_sz) {
dst_idx -= _buffer_sz;

I(sched, _next >= unit_sz);
I(sched, dst_idx <= _next - unit_sz);
// I(sched, _next >= unit_sz);
// I(sched, dst_idx <= _next - unit_sz);
}

I(sched, dst_idx + unit_sz <= _buffer_sz);
// I(sched, dst_idx + unit_sz <= _buffer_sz);
memcpy(&_buffer[dst_idx], src, unit_sz);
_unread += unit_sz;

DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx);
// DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx);
}

/**
Expand All @@ -110,21 +109,21 @@ circular_buffer::enqueue(void *src) {
*/
void
circular_buffer::dequeue(void *dst) {
I(sched, unit_sz > 0);
I(sched, _unread >= unit_sz);
I(sched, _unread <= _buffer_sz);
I(sched, _buffer);
// I(sched, unit_sz > 0);
// I(sched, _unread >= unit_sz);
// I(sched, _unread <= _buffer_sz);
// I(sched, _buffer);

DLOG(sched, mem,
"circular_buffer dequeue "
"unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
_unread, _next, _buffer_sz, unit_sz);
// DLOG(sched, mem,
// "circular_buffer dequeue "
// "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
// _unread, _next, _buffer_sz, unit_sz);

I(sched, _next + unit_sz <= _buffer_sz);
// I(sched, _next + unit_sz <= _buffer_sz);
if (dst != NULL) {
memcpy(dst, &_buffer[_next], unit_sz);
}
DLOG(sched, mem, "shifted data from index %d", _next);
//DLOG(sched, mem, "shifted data from index %d", _next);
_unread -= unit_sz;
_next += unit_sz;
if (_next == _buffer_sz) {
Expand All @@ -140,8 +139,9 @@ circular_buffer::dequeue(void *dst) {
void
circular_buffer::grow() {
size_t new_buffer_sz = _buffer_sz * 2;
I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE);
DLOG(sched, mem, "circular_buffer is growing to %d bytes", new_buffer_sz);
// I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE);
// DLOG(sched, mem, "circular_buffer is growing to %d bytes",
// new_buffer_sz);
void *new_buffer = kernel->malloc(new_buffer_sz,
"new circular_buffer (grow)");
transfer(new_buffer);
Expand All @@ -154,9 +154,9 @@ circular_buffer::grow() {
void
circular_buffer::shrink() {
size_t new_buffer_sz = _buffer_sz / 2;
I(sched, initial_size() <= new_buffer_sz);
DLOG(sched, mem, "circular_buffer is shrinking to %d bytes",
new_buffer_sz);
// I(sched, initial_size() <= new_buffer_sz);
// DLOG(sched, mem, "circular_buffer is shrinking to %d bytes",
// new_buffer_sz);
void *new_buffer = kernel->malloc(new_buffer_sz,
"new circular_buffer (shrink)");
transfer(new_buffer);
Expand Down
2 changes: 1 addition & 1 deletion src/rt/memory_region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// NB: please do not commit code with this uncommented. It's
// hugely expensive and should only be used as a last resort.
//
// #define TRACK_ALLOCATIONS
#define TRACK_ALLOCATIONS

#define MAGIC 0xbadc0ffe

Expand Down
14 changes: 7 additions & 7 deletions src/rt/rust.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,28 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {

update_log_settings(crate_map, getenv("RUST_LOG"));
enable_claims(getenv("CHECK_CLAIMS"));
int num_threads = get_num_threads();

rust_srv *srv = new rust_srv();
rust_kernel *kernel = new rust_kernel(srv);
rust_kernel *kernel = new rust_kernel(srv, num_threads);
kernel->start();
rust_scheduler *sched = kernel->get_scheduler();
rust_task *root_task = kernel->create_task(NULL, "main");
rust_scheduler *sched = root_task->sched;
command_line_args *args
= new (kernel, "main command line args")
command_line_args(sched->root_task, argc, argv);
command_line_args(root_task, argc, argv);

DLOG(sched, dom, "startup: %d args in 0x%" PRIxPTR,
args->argc, (uintptr_t)args->args);
for (int i = 0; i < args->argc; i++) {
DLOG(sched, dom, "startup: arg[%d] = '%s'", i, args->argv[i]);
}

sched->root_task->start(main_fn, (uintptr_t)args->args);

int num_threads = get_num_threads();
root_task->start(main_fn, (uintptr_t)args->args);

DLOG(sched, dom, "Using %d worker threads.", num_threads);

int ret = kernel->start_task_threads(num_threads);
int ret = kernel->start_task_threads();
delete args;
delete kernel;
delete srv;
Expand Down
7 changes: 7 additions & 0 deletions src/rt/rust_builtin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,13 @@ clone_chan(rust_task *task, rust_chan *chan) {
return chan->clone(task);
}

// defined in rust_task.cpp
extern size_t g_min_stack_size;
extern "C" CDECL void
set_min_stack(rust_task *task, uintptr_t stack_size) {
g_min_stack_size = stack_size;
}

//
// Local Variables:
// mode: C++
Expand Down
Loading