Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions core/src/context/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#[cfg(feature = "futures")]
use std::future::Future;
use std::{
ffi::{CStr, CString},
fs,
mem::{self, MaybeUninit},
path::Path,
ptr::NonNull,
};

#[cfg(feature = "futures")]
use crate::AsyncContext;
use std::{future::Future, task::Waker};

use crate::{
atom::PredefinedAtom, cstr, markers::Invariant, qjs, runtime::raw::Opaque, Atom, Context,
Error, FromJs, Function, IntoJs, Object, Promise, Result, String, Value,
};
#[cfg(feature = "futures")]
use crate::{runtime::schedular::SchedularPoll, AsyncContext};

/// Eval options.
#[non_exhaustive]
Expand Down Expand Up @@ -401,6 +401,24 @@ impl<'js> Ctx<'js> {
unsafe { (*self.get_opaque()).spawner().push(future) }
}

/// Poll futures currently spawned within the rquickjs runtime executor.
///
/// Polling will not always be able to make progress, even when a future is technically ready
/// to be polled again.
#[cfg(feature = "futures")]
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
#[must_use]
pub fn poll_schedular(&self, ctx: &mut std::task::Context<'_>) -> SchedularPoll {
unsafe { &mut (*self.get_opaque()) }.spawner().poll(ctx)
}

/// Register a waker to be awoken once a new future is spawned.
#[cfg(feature = "futures")]
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
pub async fn listen_spawn(self, waker: Waker) {
unsafe { (*self.get_opaque()).spawner().listen(waker) }
}

/// Create a new `Ctx` from a pointer to the context and a invariant lifetime.
///
/// # Safety
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub use context::AsyncContext;
pub use context::MultiWith;
#[cfg(feature = "futures")]
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
pub use runtime::AsyncRuntime;
pub use runtime::{schedular::SchedularPoll, AsyncRuntime};
#[cfg(feature = "array-buffer")]
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "array-buffer")))]
pub use value::{ArrayBuffer, TypedArray};
Expand Down
123 changes: 120 additions & 3 deletions core/src/runtime/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,14 @@ macro_rules! async_test_case {

#[cfg(test)]
mod test {
use std::time::Duration;
use std::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};

use crate::*;

use self::context::EvalOptions;
use self::{context::EvalOptions, util::ManualPoll};

async_test_case!(basic => (_rt,ctx){
async_with!(&ctx => |ctx|{
Expand Down Expand Up @@ -516,7 +519,6 @@ mod test {
});

async_test_case!(recursive_spawn_from_script => (rt,ctx) {
use std::sync::atomic::{Ordering, AtomicUsize};
use crate::prelude::Func;

static COUNT: AtomicUsize = AtomicUsize::new(0);
Expand Down Expand Up @@ -610,4 +612,119 @@ mod test {
std::mem::drop(assert_is_send(rt.execute_pending_job()));
std::mem::drop(assert_is_send(rt.drive()));
}

async_test_case!(poll_schedular => (rt,ctx) {
use std::{rc::Rc, cell::RefCell, task::{self,Waker, Poll}};

static COUNT: AtomicUsize = AtomicUsize::new(0);

async_with!(ctx => |ctx|{
let waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));

let waker_clone = waker.clone();

ctx.spawn(async move {
ManualPoll::new(move |ctx: &mut task::Context|{
let mut r = (*waker_clone).borrow_mut();
if r.is_some(){
COUNT.store(2, Ordering::Relaxed);
return Poll::Ready(())
}

*r = Some(ctx.waker().clone());
COUNT.store(1, Ordering::Relaxed);
Poll::Pending
}).await
});

ManualPoll::new(move |task_ctx: &mut task::Context|{
assert_eq!(COUNT.load(Ordering::Relaxed),0);
assert_eq!(ctx.poll_schedular(task_ctx), SchedularPoll::PendingProgress);
assert_eq!(COUNT.load(Ordering::Relaxed),1);
assert_eq!(ctx.poll_schedular(task_ctx), SchedularPoll::Pending);

(*waker).borrow_mut().as_mut().unwrap().wake_by_ref();

assert_eq!(ctx.poll_schedular(task_ctx), SchedularPoll::PendingProgress);
assert_eq!(COUNT.load(Ordering::Relaxed),2);
assert_eq!(ctx.poll_schedular(task_ctx), SchedularPoll::Empty);

Poll::Ready(())
}).await
}).await
});

async_test_case!(poll_schedular_recursive => (rt,ctx) {
use std::{rc::Rc, cell::RefCell, task::{self,Waker, Poll}};

static COUNT: AtomicUsize = AtomicUsize::new(0);
static RUNNING: AtomicUsize = AtomicUsize::new(0);

async_with!(ctx => |ctx|{
let ctx_clone = ctx.clone();

ctx_clone.spawn(async move {
let waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));

let waker_clone = waker.clone();

ctx.spawn(async move {
ManualPoll(move |ctx: &mut task::Context|{
let mut r = (*waker_clone).borrow_mut();
if r.is_some(){
COUNT.store(2, Ordering::Relaxed);
return Poll::Ready(())
}

*r = Some(ctx.waker().clone());
COUNT.store(1, Ordering::Relaxed);
Poll::Pending
}).await
});

ManualPoll(move |task_ctx: &mut task::Context|{
assert_eq!(RUNNING.load(Ordering::SeqCst),0);
//println!("called: {}", std::backtrace::Backtrace::force_capture());
task_ctx.waker().wake_by_ref();

if COUNT.load(Ordering::Relaxed) == 2 {
return Poll::Pending
}

assert_eq!(COUNT.load(Ordering::Relaxed),0);

RUNNING.store(1,Ordering::SeqCst);
assert_eq!(ctx.poll_schedular(task_ctx), SchedularPoll::PendingProgress);
RUNNING.store(0,Ordering::SeqCst);

assert_eq!(COUNT.load(Ordering::Relaxed),1);
RUNNING.store(1,Ordering::SeqCst);
assert_eq!(ctx.poll_schedular(task_ctx), SchedularPoll::Pending);
RUNNING.store(0,Ordering::SeqCst);

(*waker).borrow_mut().as_mut().unwrap().wake_by_ref();

RUNNING.store(1,Ordering::SeqCst);
assert_eq!(ctx.poll_schedular(task_ctx), SchedularPoll::PendingProgress);
RUNNING.store(0,Ordering::SeqCst);

assert_eq!(COUNT.load(Ordering::Relaxed),2);

RUNNING.store(1,Ordering::SeqCst);
assert_eq!(ctx.poll_schedular(task_ctx), SchedularPoll::Pending);
RUNNING.store(0,Ordering::SeqCst);

Poll::Pending
}).await;


});

ManualPoll(move |task_ctx: &mut task::Context|{
assert_eq!(ctx_clone.poll_schedular(task_ctx), SchedularPoll::ShouldYield);
assert_eq!(ctx_clone.poll_schedular(task_ctx), SchedularPoll::ShouldYield);
Poll::Ready(())
}).await;
}).await
});
}
49 changes: 43 additions & 6 deletions core/src/runtime/schedular.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,26 @@ use queue::Queue;

use self::task::ErasedTaskPtr;

#[derive(Debug)]
/// A value returned by polling the rquickjs schedular informing about the current state of the
/// schedular and what action it's caller should take to propely drive the pending futures.
#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash)]
pub enum SchedularPoll {
/// Returns that the schedular should yield back to the root schedular.
/// The schedular has determined that a future needs to yield back to the root executor.
/// If this value is returned by the schedular future calls to poll will likely also return
/// ShouldYield until the current task has yield to the root executor.
ShouldYield,
/// There was no work to be done.
/// There are no spawned futures so no work could be done.
Empty,
/// No work could be done.
/// All futures currently spawned in the schedular are pending and no progress could be made.
Pending,
/// Work was done, but we didn't finish.
/// There are still futures which are pending, but some futures were awoken and were polled
/// again, possibly finishing or possibly becoming pending again.
PendingProgress,
}

pub struct Schedular {
len: Cell<usize>,
reentrant: Cell<usize>,
should_poll: Arc<Queue>,
all_next: Cell<Option<ErasedTaskPtr>>,
all_prev: Cell<Option<ErasedTaskPtr>>,
Expand All @@ -49,6 +55,7 @@ impl Schedular {
}
Schedular {
len: Cell::new(0),
reentrant: Cell::new(0),
should_poll: queue,
all_prev: Cell::new(None),
all_next: Cell::new(None),
Expand Down Expand Up @@ -145,6 +152,7 @@ impl Schedular {

let mut iteration = 0;
let mut yielded = 0;
let mut popped_running = 0;

loop {
// Popped a task, ownership taken from the queue
Expand All @@ -171,6 +179,25 @@ impl Schedular {
continue;
}

// Check for recursive future polling.
if cur.body().running.get() {
popped_running += 1;
Pin::new_unchecked(&*self.should_poll)
.push(ErasedTask::into_ptr(cur).as_node_ptr());

// If we popped more running futures than the reentrant counter then we can be
// sure that we did a full round of all the popped futures.
if popped_running > self.reentrant.get() {
if iteration > 0 {
return SchedularPoll::PendingProgress;
} else {
return SchedularPoll::Pending;
}
}

continue;
}

let prev = cur.body().queued.swap(false, Ordering::AcqRel);
assert!(prev);

Expand All @@ -185,11 +212,21 @@ impl Schedular {

iteration += 1;

match cur_ptr.task_drive(&mut ctx) {
// Set reentrant counter, if we ever encounter a non-zero reentrant counter then this
// function is called recursively.
self.reentrant.set(self.reentrant.get() + 1);
cur_ptr.body().running.set(true);
let res = cur_ptr.task_drive(&mut ctx);
cur_ptr.body().running.set(false);
self.reentrant.set(self.reentrant.get() - 1);

match res {
Poll::Ready(_) => {
// Nothing todo the defer will remove the task from the list.
}
Poll::Pending => {
cur_ptr.body().running.set(false);

// don't remove task from the list.
remove.take();

Expand Down
2 changes: 2 additions & 0 deletions core/src/runtime/schedular/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl<F: Future<Output = ()>> Task<F> {
prev: Cell::new(None),
queued: AtomicBool::new(true),
done: Cell::new(false),
running: Cell::new(false),
},
future: UnsafeCell::new(ManuallyDrop::new(f)),
}
Expand All @@ -50,6 +51,7 @@ pub struct TaskBody {
// wether the task is currently in the queue to be re-polled.
pub(crate) queued: AtomicBool,
pub(crate) done: Cell<bool>,
pub(crate) running: Cell<bool>,
}

/// A raw pointer to a task with it's type erased.
Expand Down