Skip to content

Commit 07aae2d

Browse files
committed
rt: fix new yield_now behavior with block_in_place
PR #5223 changed the behavior of `yield_now()` to store yielded tasks and notify them *after* polling the resource drivers. This PR fixes a couple of bugs with this new behavior when combined with `block_in_place()`. First, we need to avoid freeing the deferred task queue when exiting a runtime if it is *not* the root runtime. Because `block_in_place()` allows a user to start a new runtime from within an existing task, this check is necessary. Second, when a worker core is stolen from a thread during a `block_in_place()` call, we need to ensure that deferred tasks are notified anyway.
1 parent 2286273 commit 07aae2d

File tree

3 files changed

+68
-3
lines changed

3 files changed

+68
-3
lines changed

tokio/src/runtime/context.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,17 @@ cfg_rt! {
107107
/// Guard tracking that a caller has entered a runtime context.
108108
#[must_use]
109109
pub(crate) struct EnterRuntimeGuard {
110+
/// Tracks that the current thread has entered a blocking function call.
110111
pub(crate) blocking: BlockingRegionGuard,
112+
111113
#[allow(dead_code)] // Only tracking the guard.
112114
pub(crate) handle: SetCurrentGuard,
115+
116+
/// If true, then this is the root runtime guard. It is possible to nest
117+
/// runtime guards by using `block_in_place` between the calls. We need
118+
/// to track the root guard as this is the guard responsible for freeing
119+
/// the deferred task queue.
120+
is_root: bool,
113121
}
114122

115123
/// Guard tracking that a caller has entered a blocking region.
@@ -171,11 +179,19 @@ cfg_rt! {
171179
c.runtime.set(EnterRuntime::Entered { allow_block_in_place });
172180

173181
// Initialize queue to track yielded tasks
174-
*c.defer.borrow_mut() = Some(Defer::new());
182+
let mut defer = c.defer.borrow_mut();
183+
184+
let is_root = if defer.is_none() {
185+
*defer = Some(Defer::new());
186+
true
187+
} else {
188+
false
189+
};
175190

176191
Some(EnterRuntimeGuard {
177192
blocking: BlockingRegionGuard::new(),
178193
handle: c.set_current(handle),
194+
is_root,
179195
})
180196
}
181197
})
@@ -217,7 +233,6 @@ cfg_rt! {
217233
pub(crate) fn with_defer<R>(f: impl FnOnce(&mut Defer) -> R) -> Option<R> {
218234
CONTEXT.with(|c| {
219235
let mut defer = c.defer.borrow_mut();
220-
221236
defer.as_mut().map(f)
222237
})
223238
}
@@ -256,7 +271,10 @@ cfg_rt! {
256271
CONTEXT.with(|c| {
257272
assert!(c.runtime.get().is_entered());
258273
c.runtime.set(EnterRuntime::NotEntered);
259-
*c.defer.borrow_mut() = None;
274+
275+
if self.is_root {
276+
*c.defer.borrow_mut() = None;
277+
}
260278
});
261279
}
262280
}

tokio/src/runtime/scheduler/multi_thread/worker.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,22 @@ impl Launch {
368368
}
369369

370370
fn run(worker: Arc<Worker>) {
371+
struct AbortOnPanic;
372+
373+
impl Drop for AbortOnPanic {
374+
fn drop(&mut self) {
375+
if std::thread::panicking() {
376+
eprintln!("worker thread panicking; aborting process");
377+
std::process::abort();
378+
}
379+
}
380+
}
381+
382+
// Catching panics on worker threads in tests is quite tricky. Instead, when
383+
// debug assertions are enabled, we just abort the process.
384+
#[cfg(debug_assertions)]
385+
let _abort_on_panic = AbortOnPanic;
386+
371387
// Acquire a core. If this fails, then another thread is running this
372388
// worker and there is nothing further to do.
373389
let core = match worker.core.take() {
@@ -388,6 +404,11 @@ fn run(worker: Arc<Worker>) {
388404
// This should always be an error. It only returns a `Result` to support
389405
// using `?` to short circuit.
390406
assert!(cx.run(core).is_err());
407+
408+
// Check if there are any deferred tasks to notify. This can happen when
409+
// the worker core is lost due to `block_in_place()` being called from
410+
// within the task.
411+
wake_deferred_tasks();
391412
});
392413
}
393414

tokio/tests/rt_threaded.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,32 @@ fn coop_and_block_in_place() {
415415
});
416416
}
417417

418+
#[test]
419+
fn yield_after_block_in_place() {
420+
let rt = tokio::runtime::Builder::new_multi_thread()
421+
.worker_threads(1)
422+
.build()
423+
.unwrap();
424+
425+
rt.block_on(async {
426+
tokio::spawn(async move {
427+
// Block in place then enter a new runtime
428+
tokio::task::block_in_place(|| {
429+
let rt = tokio::runtime::Builder::new_current_thread()
430+
.build()
431+
.unwrap();
432+
433+
rt.block_on(async {});
434+
});
435+
436+
// Yield, then complete
437+
tokio::task::yield_now().await;
438+
})
439+
.await
440+
.unwrap()
441+
});
442+
}
443+
418444
// Testing this does not panic
419445
#[test]
420446
fn max_blocking_threads() {

0 commit comments

Comments
 (0)