Skip to content

Commit

Permalink
perf: eager poll async ops in Isolate (#3046)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartlomieju authored and ry committed Oct 14, 2019
1 parent 6056595 commit 4221b90
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 93 deletions.
12 changes: 9 additions & 3 deletions cli/js/dispatch_json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,17 @@ export async function sendAsync(
const promiseId = nextPromiseId();
args = Object.assign(args, { promiseId });
const promise = util.createResolvable<Ok>();
promiseTable.set(promiseId, promise);

const argsUi8 = encode(args);
const resUi8 = core.dispatch(opId, argsUi8, zeroCopy);
util.assert(resUi8 == null);
const buf = core.dispatch(opId, argsUi8, zeroCopy);
if (buf) {
// Sync result.
const res = decode(buf);
promise.resolve(res);
} else {
// Async result.
promiseTable.set(promiseId, promise);
}

const res = await promise;
return unwrapResponse(res);
Expand Down
16 changes: 14 additions & 2 deletions cli/js/dispatch_minimal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,20 @@ export function sendAsyncMinimal(
scratch32[1] = arg;
scratch32[2] = 0; // result
const promise = util.createResolvable<number>();
promiseTableMin.set(promiseId, promise);
core.dispatch(opId, scratchBytes, zeroCopy);
const buf = core.dispatch(opId, scratchBytes, zeroCopy);
if (buf) {
const buf32 = new Int32Array(
buf.buffer,
buf.byteOffset,
buf.byteLength / 4
);
const record = recordFromBufMinimal(opId, buf32);
// Sync result.
promise.resolve(record.result);
} else {
// Async result.
promiseTableMin.set(promiseId, promise);
}
return promise;
}

Expand Down
32 changes: 19 additions & 13 deletions cli/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,25 +298,31 @@ fn eval_command(flags: DenoFlags, argv: Vec<String>) {
}

fn bundle_command(flags: DenoFlags, argv: Vec<String>) {
let (mut _worker, state) = create_worker_and_state(flags, argv);
let (worker, state) = create_worker_and_state(flags, argv);

let main_module = state.main_module().unwrap();
assert!(state.argv.len() >= 3);
let out_file = state.argv[2].clone();
debug!(">>>>> bundle_async START");
let bundle_future = state
.ts_compiler
.bundle_async(state.clone(), main_module.to_string(), out_file)
.map_err(|err| {
debug!("diagnostics returned, exiting!");
eprintln!("");
print_err_and_exit(err);
// NOTE: we need to poll `worker` otherwise TS compiler worker won't run properly
let main_future = lazy(move || {
worker.then(move |result| {
js_check(result);
state
.ts_compiler
.bundle_async(state.clone(), main_module.to_string(), out_file)
.map_err(|err| {
debug!("diagnostics returned, exiting!");
eprintln!("");
print_err_and_exit(err);
})
.and_then(move |_| {
debug!(">>>>> bundle_async END");
Ok(())
})
})
.and_then(move |_| {
debug!(">>>>> bundle_async END");
Ok(())
});
tokio_util::run(bundle_future);
});
tokio_util::run(main_future);
}

fn run_repl(flags: DenoFlags, argv: Vec<String>) {
Expand Down
29 changes: 27 additions & 2 deletions cli/tokio_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::Poll;
use std::io;
use std::mem;
use std::net::SocketAddr;
use std::ops::FnOnce;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime;
Expand Down Expand Up @@ -78,14 +79,15 @@ where

#[derive(Debug)]
enum AcceptState {
Eager(Resource),
Pending(Resource),
Empty,
}

/// Simply accepts a connection.
pub fn accept(r: Resource) -> Accept {
Accept {
state: AcceptState::Pending(r),
state: AcceptState::Eager(r),
}
}

Expand All @@ -107,6 +109,16 @@ impl Future for Accept {
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
AcceptState::Eager(ref mut r) => match r.poll_accept() {
Ok(futures::prelude::Async::Ready(t)) => t,
Ok(futures::prelude::Async::NotReady) => {
self.state = AcceptState::Pending(r.to_owned());
return Ok(futures::prelude::Async::NotReady);
}
Err(e) => {
return Err(e);
}
},
AcceptState::Pending(ref mut r) => match r.poll_accept() {
Ok(futures::prelude::Async::Ready(t)) => {
r.untrack_task();
Expand All @@ -126,8 +138,8 @@ impl Future for Accept {
};

match mem::replace(&mut self.state, AcceptState::Empty) {
AcceptState::Pending(_) => Ok((stream, addr).into()),
AcceptState::Empty => panic!("invalid internal state"),
_ => Ok((stream, addr).into()),
}
}
}
Expand Down Expand Up @@ -166,3 +178,16 @@ where
{
f.map_err(|err| panic!("Future got unexpected error: {:?}", err))
}

#[cfg(test)]
pub fn run_in_task<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
let fut = futures::future::lazy(move || {
f();
futures::future::ok(())
});

run(fut)
}
8 changes: 4 additions & 4 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ mod tests {

#[test]
fn test_worker_messages() {
tokio_util::init(|| {
tokio_util::run_in_task(|| {
let mut worker = create_test_worker();
let source = r#"
onmessage = function(e) {
Expand Down Expand Up @@ -314,7 +314,7 @@ mod tests {

#[test]
fn removed_from_resource_table_on_close() {
tokio_util::init(|| {
tokio_util::run_in_task(|| {
let mut worker = create_test_worker();
worker
.execute("onmessage = () => { delete window.onmessage; }")
Expand Down Expand Up @@ -349,7 +349,7 @@ mod tests {

#[test]
fn execute_mod_resolve_error() {
tokio_util::init(|| {
tokio_util::run_in_task(|| {
// "foo" is not a valid module specifier so this should return an error.
let mut worker = create_test_worker();
let module_specifier =
Expand All @@ -361,7 +361,7 @@ mod tests {

#[test]
fn execute_mod_002_hello() {
tokio_util::init(|| {
tokio_util::run_in_task(|| {
// This assumes cwd is project root (an assumption made throughout the
// tests).
let mut worker = create_test_worker();
Expand Down
25 changes: 16 additions & 9 deletions core/examples/http_bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,25 @@ function send(promiseId, opId, arg, zeroCopy = null) {
function sendAsync(opId, arg, zeroCopy = null) {
const promiseId = nextPromiseId++;
const p = createResolvable();
promiseMap.set(promiseId, p);
send(promiseId, opId, arg, zeroCopy);
const buf = send(promiseId, opId, arg, zeroCopy);
if (buf) {
const record = recordFromBuf(buf);
// Sync result.
p.resolve(record.result);
} else {
// Async result.
promiseMap.set(promiseId, p);
}
return p;
}

/** Returns i32 number */
function sendSync(opId, arg) {
const buf = send(0, opId, arg);
const record = recordFromBuf(buf);
return record.result;
}

function recordFromBuf(buf) {
assert(buf.byteLength === 3 * 4);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
Expand All @@ -58,13 +72,6 @@ function recordFromBuf(buf) {
};
}

/** Returns i32 number */
function sendSync(opId, arg) {
const buf = send(0, opId, arg);
const record = recordFromBuf(buf);
return record.result;
}

function handleAsyncMsgFromRust(opId, buf) {
const record = recordFromBuf(buf);
const { promiseId, result } = record;
Expand Down
Loading

0 comments on commit 4221b90

Please sign in to comment.