Skip to content

Commit cc34dbb

Browse files
committed
Expose whether event loops have active I/O
The green scheduler can optimize its runtime based on this by deciding to not go to sleep in epoll() if there is no active I/O and there is a task to be stolen. This is implemented for librustuv by keeping a count of the number of tasks which are currently homed. If a task is homed, and then performs a blocking I/O operation, the count will be nonzero while the task is blocked. The homing count is intentionally 0 when there are I/O handles, but no handles currently blocked. The reason for this is that epoll() would only be used to wake up the scheduler anyway. The crux of this change was to have a `HomingMissile` contain a mutable borrowed reference back to the `HomeHandle`. The rest of the change was just dealing with this fallout. This reference is used to decrement the homed handle count in a HomingMissile's destructor. Also note that the count maintained is not atomic because all of its increments/decrements/reads are all on the same I/O thread.
1 parent 1d5c52d commit cc34dbb

File tree

11 files changed

+60
-25
lines changed

11 files changed

+60
-25
lines changed

src/libgreen/basic.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ impl EventLoop for BasicLoop {
158158
}
159159

160160
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }
161+
162+
fn has_active_io(&self) -> bool { false }
161163
}
162164

163165
struct BasicRemote {

src/librustuv/addrinfo.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl GetAddrInfoRequest {
8686
req.defuse(); // uv callback now owns this request
8787
let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
8888

89-
wait_until_woken_after(&mut cx.slot, || {
89+
wait_until_woken_after(&mut cx.slot, loop_, || {
9090
req.set_data(&cx);
9191
});
9292

src/librustuv/file.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
304304
0 => {
305305
req.fired = true;
306306
let mut slot = None;
307-
wait_until_woken_after(&mut slot, || {
307+
let loop_ = unsafe { uvll::get_loop_from_fs_req(req.req) };
308+
wait_until_woken_after(&mut slot, &Loop::wrap(loop_), || {
308309
unsafe { uvll::set_data_for_req(req.req, &slot) }
309310
});
310311
match req.get_result() {

src/librustuv/lib.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ via `close` and `delete` methods.
4747
use std::cast;
4848
use std::io;
4949
use std::io::IoError;
50-
use std::libc::c_int;
50+
use std::libc::{c_int, c_void};
5151
use std::ptr::null;
5252
use std::ptr;
5353
use std::rt::local::Local;
@@ -95,6 +95,10 @@ pub mod stream;
9595
pub trait UvHandle<T> {
9696
fn uv_handle(&self) -> *T;
9797

98+
fn uv_loop(&self) -> Loop {
99+
Loop::wrap(unsafe { uvll::get_loop_for_uv_handle(self.uv_handle()) })
100+
}
101+
98102
// FIXME(#8888) dummy self
99103
fn alloc(_: Option<Self>, ty: uvll::uv_handle_type) -> *T {
100104
unsafe {
@@ -136,7 +140,7 @@ pub trait UvHandle<T> {
136140
uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb);
137141
uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>());
138142

139-
wait_until_woken_after(&mut slot, || {
143+
wait_until_woken_after(&mut slot, &self.uv_loop(), || {
140144
uvll::set_data_for_uv_handle(self.uv_handle(), &slot);
141145
})
142146
}
@@ -195,16 +199,20 @@ impl Drop for ForbidUnwind {
195199
}
196200
}
197201

198-
fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {
202+
fn wait_until_woken_after(slot: *mut Option<BlockedTask>,
203+
loop_: &Loop,
204+
f: ||) {
199205
let _f = ForbidUnwind::new("wait_until_woken_after");
200206
unsafe {
201207
assert!((*slot).is_none());
202208
let task: ~Task = Local::take();
209+
loop_.modify_blockers(1);
203210
task.deschedule(1, |task| {
204211
*slot = Some(task);
205212
f();
206213
Ok(())
207214
});
215+
loop_.modify_blockers(-1);
208216
}
209217
}
210218

@@ -273,6 +281,7 @@ impl Loop {
273281
pub fn new() -> Loop {
274282
let handle = unsafe { uvll::loop_new() };
275283
assert!(handle.is_not_null());
284+
unsafe { uvll::set_data_for_uv_loop(handle, 0 as *c_void) }
276285
Loop::wrap(handle)
277286
}
278287

@@ -285,6 +294,19 @@ impl Loop {
285294
pub fn close(&mut self) {
286295
unsafe { uvll::uv_loop_delete(self.handle) };
287296
}
297+
298+
// The 'data' field of the uv_loop_t is used to count the number of tasks
299+
// that are currently blocked waiting for I/O to complete.
300+
fn modify_blockers(&self, amt: uint) {
301+
unsafe {
302+
let cur = uvll::get_data_for_uv_loop(self.handle) as uint;
303+
uvll::set_data_for_uv_loop(self.handle, (cur + amt) as *c_void)
304+
}
305+
}
306+
307+
fn get_blockers(&self) -> uint {
308+
unsafe { uvll::get_data_for_uv_loop(self.handle) as uint }
309+
}
288310
}
289311

290312
// FIXME: Need to define the error constants like EOF so they can be

src/librustuv/net.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ impl TcpWatcher {
216216
0 => {
217217
req.defuse(); // uv callback now owns this request
218218
let mut cx = Ctx { status: 0, task: None };
219-
wait_until_woken_after(&mut cx.task, || {
219+
wait_until_woken_after(&mut cx.task, &io.loop_, || {
220220
req.set_data(&cx);
221221
});
222222
match cx.status {
@@ -498,6 +498,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
498498
buf: Option<Buf>,
499499
result: Option<(ssize_t, Option<ip::SocketAddr>)>,
500500
}
501+
let loop_ = self.uv_loop();
501502
let m = self.fire_homing_missile();
502503
let _g = self.read_access.grant(m);
503504

@@ -511,7 +512,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
511512
result: None,
512513
};
513514
let handle = self.handle;
514-
wait_until_woken_after(&mut cx.task, || {
515+
wait_until_woken_after(&mut cx.task, &loop_, || {
515516
unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
516517
});
517518
match cx.result.take_unwrap() {
@@ -571,6 +572,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
571572
struct Ctx { task: Option<BlockedTask>, result: c_int }
572573

573574
let m = self.fire_homing_missile();
575+
let loop_ = self.uv_loop();
574576
let _g = self.write_access.grant(m);
575577

576578
let mut req = Request::new(uvll::UV_UDP_SEND);
@@ -586,7 +588,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
586588
0 => {
587589
req.defuse(); // uv callback now owns this request
588590
let mut cx = Ctx { task: None, result: 0 };
589-
wait_until_woken_after(&mut cx.task, || {
591+
wait_until_woken_after(&mut cx.task, &loop_, || {
590592
req.set_data(&cx);
591593
});
592594
match cx.result {

src/librustuv/pipe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ impl PipeWatcher {
9292
let mut req = Request::new(uvll::UV_CONNECT);
9393
let pipe = PipeWatcher::new(io, false);
9494

95-
wait_until_woken_after(&mut cx.task, || {
95+
wait_until_woken_after(&mut cx.task, &io.loop_, || {
9696
unsafe {
9797
uvll::uv_pipe_connect(req.handle,
9898
pipe.handle(),

src/librustuv/process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ impl RtioProcess for Process {
211211
// If there's no exit code previously listed, then the
212212
// process's exit callback has yet to be invoked. We just
213213
// need to deschedule ourselves and wait to be reawoken.
214-
wait_until_woken_after(&mut self.to_wake, || {});
214+
wait_until_woken_after(&mut self.to_wake, &self.uv_loop(), || {});
215215
assert!(self.exit_status.is_some());
216216
}
217217
}

src/librustuv/stream.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::libc::{c_int, size_t, ssize_t};
1313
use std::ptr;
1414
use std::rt::task::BlockedTask;
1515

16+
use Loop;
1617
use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
1718
ForbidUnwind, wakeup};
1819
use uvll;
@@ -87,7 +88,8 @@ impl StreamWatcher {
8788
uvll::uv_read_start(self.handle, alloc_cb, read_cb)
8889
} {
8990
0 => {
90-
wait_until_woken_after(&mut rcx.task, || {});
91+
let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
92+
wait_until_woken_after(&mut rcx.task, &Loop::wrap(loop_), || {});
9193
match rcx.result {
9294
n if n < 0 => Err(UvError(n as c_int)),
9395
n => Ok(n as uint),
@@ -121,7 +123,8 @@ impl StreamWatcher {
121123
let mut wcx = WriteContext { result: 0, task: None, };
122124
req.defuse(); // uv callback now owns this request
123125

124-
wait_until_woken_after(&mut wcx.task, || {
126+
let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
127+
wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || {
125128
req.set_data(&wcx);
126129
});
127130
self.last_write_req = Some(Request::wrap(req.handle));

src/librustuv/timer.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,25 @@
99
// except according to those terms.
1010

1111
use std::libc::c_int;
12-
use std::mem::replace;
13-
use std::rt::local::Local;
12+
use std::mem;
1413
use std::rt::rtio::RtioTimer;
15-
use std::rt::task::{BlockedTask, Task};
14+
use std::rt::task::BlockedTask;
1615

1716
use homing::{HomeHandle, HomingIO};
18-
use super::{UvHandle, ForbidUnwind, ForbidSwitch};
17+
use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after};
1918
use uvio::UvIoFactory;
2019
use uvll;
2120

2221
pub struct TimerWatcher {
2322
handle: *uvll::uv_timer_t,
2423
home: HomeHandle,
2524
action: Option<NextAction>,
25+
blocker: Option<BlockedTask>,
2626
id: uint, // see comments in timer_cb
2727
}
2828

2929
pub enum NextAction {
30-
WakeTask(BlockedTask),
30+
WakeTask,
3131
SendOnce(Chan<()>),
3232
SendMany(Chan<()>, uint),
3333
}
@@ -41,6 +41,7 @@ impl TimerWatcher {
4141
let me = ~TimerWatcher {
4242
handle: handle,
4343
action: None,
44+
blocker: None,
4445
home: io.make_handle(),
4546
id: 0,
4647
};
@@ -76,7 +77,7 @@ impl RtioTimer for TimerWatcher {
7677
let missile = self.fire_homing_missile();
7778
self.id += 1;
7879
self.stop();
79-
let _missile = match replace(&mut self.action, None) {
80+
let _missile = match mem::replace(&mut self.action, None) {
8081
None => missile, // no need to do a homing dance
8182
Some(action) => {
8283
drop(missile); // un-home ourself
@@ -89,11 +90,9 @@ impl RtioTimer for TimerWatcher {
8990
// started, then we need to call stop on the timer.
9091
let _f = ForbidUnwind::new("timer");
9192

92-
let task: ~Task = Local::take();
93-
task.deschedule(1, |task| {
94-
self.action = Some(WakeTask(task));
93+
self.action = Some(WakeTask);
94+
wait_until_woken_after(&mut self.blocker, &self.uv_loop(), || {
9595
self.start(msecs, 0);
96-
Ok(())
9796
});
9897
self.stop();
9998
}
@@ -108,7 +107,7 @@ impl RtioTimer for TimerWatcher {
108107
self.id += 1;
109108
self.stop();
110109
self.start(msecs, 0);
111-
replace(&mut self.action, Some(SendOnce(chan)))
110+
mem::replace(&mut self.action, Some(SendOnce(chan)))
112111
};
113112

114113
return port;
@@ -124,7 +123,7 @@ impl RtioTimer for TimerWatcher {
124123
self.id += 1;
125124
self.stop();
126125
self.start(msecs, msecs);
127-
replace(&mut self.action, Some(SendMany(chan, self.id)))
126+
mem::replace(&mut self.action, Some(SendMany(chan, self.id)))
128127
};
129128

130129
return port;
@@ -137,7 +136,8 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
137136
let timer: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
138137

139138
match timer.action.take_unwrap() {
140-
WakeTask(task) => {
139+
WakeTask => {
140+
let task = timer.blocker.take_unwrap();
141141
let _ = task.wake().map(|t| t.reawaken());
142142
}
143143
SendOnce(chan) => { let _ = chan.try_send(()); }

src/librustuv/uvio.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ impl rtio::EventLoop for UvEventLoop {
9999
let factory = &mut self.uvio as &mut rtio::IoFactory;
100100
Some(factory)
101101
}
102+
103+
fn has_active_io(&self) -> bool {
104+
self.uvio.loop_.get_blockers() > 0
105+
}
102106
}
103107

104108
#[cfg(not(test))]

src/libstd/rt/rtio.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub trait EventLoop {
4141

4242
/// The asynchronous I/O services. Not all event loops may provide one.
4343
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory>;
44+
fn has_active_io(&self) -> bool;
4445
}
4546

4647
pub trait RemoteCallback {

0 commit comments

Comments
 (0)