Skip to content

Commit 1f81055

Browse files
authored
runtime: drop basic scheduler tasks inside context (#4213)
1 parent 94ee305 commit 1f81055

File tree

8 files changed

+187
-31
lines changed

8 files changed

+187
-31
lines changed

tokio/src/runtime/basic_scheduler.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::future::poll_fn;
22
use crate::loom::sync::atomic::AtomicBool;
33
use crate::loom::sync::Mutex;
44
use crate::park::{Park, Unpark};
5+
use crate::runtime::context::EnterGuard;
56
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
67
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
78
use crate::runtime::Callback;
@@ -29,6 +30,12 @@ pub(crate) struct BasicScheduler<P: Park> {
2930

3031
/// Sendable task spawner
3132
spawner: Spawner,
33+
34+
/// This is usually None, but right before dropping the BasicScheduler, it
35+
/// is changed to `Some` with the context being the runtime's own context.
36+
/// This ensures that any tasks dropped in the `BasicScheduler`s destructor
37+
/// run in that runtime's context.
38+
context_guard: Option<EnterGuard>,
3239
}
3340

3441
/// The inner scheduler that owns the task queue and the main parker P.
@@ -160,6 +167,7 @@ impl<P: Park> BasicScheduler<P> {
160167
inner,
161168
notify: Notify::new(),
162169
spawner,
170+
context_guard: None,
163171
}
164172
}
165173

@@ -210,6 +218,10 @@ impl<P: Park> BasicScheduler<P> {
210218
basic_scheduler: self,
211219
})
212220
}
221+
222+
pub(super) fn set_context_guard(&mut self, guard: EnterGuard) {
223+
self.context_guard = Some(guard);
224+
}
213225
}
214226

215227
impl<P: Park> Inner<P> {

tokio/src/runtime/blocking/pool.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use crate::runtime::builder::ThreadNameFn;
88
use crate::runtime::context;
99
use crate::runtime::task::{self, JoinHandle};
1010
use crate::runtime::{Builder, Callback, Handle};
11-
use crate::util::error::CONTEXT_MISSING_ERROR;
1211

1312
use std::collections::{HashMap, VecDeque};
1413
use std::fmt;
@@ -81,7 +80,7 @@ where
8180
F: FnOnce() -> R + Send + 'static,
8281
R: Send + 'static,
8382
{
84-
let rt = context::current().expect(CONTEXT_MISSING_ERROR);
83+
let rt = context::current();
8584
rt.spawn_blocking(func)
8685
}
8786

tokio/src/runtime/context.rs

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,102 @@
11
//! Thread local runtime context
2-
use crate::runtime::Handle;
2+
use crate::runtime::{Handle, TryCurrentError};
33

44
use std::cell::RefCell;
55

66
thread_local! {
77
static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None)
88
}
99

10-
pub(crate) fn current() -> Option<Handle> {
11-
CONTEXT.with(|ctx| ctx.borrow().clone())
10+
pub(crate) fn try_current() -> Result<Handle, crate::runtime::TryCurrentError> {
11+
match CONTEXT.try_with(|ctx| ctx.borrow().clone()) {
12+
Ok(Some(handle)) => Ok(handle),
13+
Ok(None) => Err(TryCurrentError::new_no_context()),
14+
Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()),
15+
}
16+
}
17+
18+
pub(crate) fn current() -> Handle {
19+
match try_current() {
20+
Ok(handle) => handle,
21+
Err(e) => panic!("{}", e),
22+
}
1223
}
1324

1425
cfg_io_driver! {
1526
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
16-
CONTEXT.with(|ctx| {
27+
match CONTEXT.try_with(|ctx| {
1728
let ctx = ctx.borrow();
1829
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).io_handle.clone()
19-
})
30+
}) {
31+
Ok(io_handle) => io_handle,
32+
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
33+
}
2034
}
2135
}
2236

2337
cfg_signal_internal! {
2438
#[cfg(unix)]
2539
pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
26-
CONTEXT.with(|ctx| {
40+
match CONTEXT.try_with(|ctx| {
2741
let ctx = ctx.borrow();
2842
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).signal_handle.clone()
29-
})
43+
}) {
44+
Ok(signal_handle) => signal_handle,
45+
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
46+
}
3047
}
3148
}
3249

3350
cfg_time! {
3451
pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
35-
CONTEXT.with(|ctx| {
52+
match CONTEXT.try_with(|ctx| {
3653
let ctx = ctx.borrow();
3754
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).time_handle.clone()
38-
})
55+
}) {
56+
Ok(time_handle) => time_handle,
57+
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
58+
}
3959
}
4060

4161
cfg_test_util! {
4262
pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
43-
CONTEXT.with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone()))
63+
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone())) {
64+
Ok(clock) => clock,
65+
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
66+
}
4467
}
4568
}
4669
}
4770

4871
cfg_rt! {
4972
pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> {
50-
CONTEXT.with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone()))
73+
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone())) {
74+
Ok(spawner) => spawner,
75+
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
76+
}
5177
}
5278
}
5379

5480
/// Sets this [`Handle`] as the current active [`Handle`].
5581
///
5682
/// [`Handle`]: Handle
5783
pub(crate) fn enter(new: Handle) -> EnterGuard {
58-
CONTEXT.with(|ctx| {
59-
let old = ctx.borrow_mut().replace(new);
60-
EnterGuard(old)
61-
})
84+
match try_enter(new) {
85+
Some(guard) => guard,
86+
None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
87+
}
88+
}
89+
90+
/// Sets this [`Handle`] as the current active [`Handle`].
91+
///
92+
/// [`Handle`]: Handle
93+
pub(crate) fn try_enter(new: Handle) -> Option<EnterGuard> {
94+
CONTEXT
95+
.try_with(|ctx| {
96+
let old = ctx.borrow_mut().replace(new);
97+
EnterGuard(old)
98+
})
99+
.ok()
62100
}
63101

64102
#[derive(Debug)]

tokio/src/runtime/handle.rs

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::runtime::blocking::{BlockingTask, NoopSchedule};
22
use crate::runtime::task::{self, JoinHandle};
33
use crate::runtime::{blocking, context, driver, Spawner};
4-
use crate::util::error::CONTEXT_MISSING_ERROR;
4+
use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
55

66
use std::future::Future;
77
use std::marker::PhantomData;
@@ -110,7 +110,7 @@ impl Handle {
110110
/// # }
111111
/// ```
112112
pub fn current() -> Self {
113-
context::current().expect(CONTEXT_MISSING_ERROR)
113+
context::current()
114114
}
115115

116116
/// Returns a Handle view over the currently running Runtime
@@ -119,7 +119,7 @@ impl Handle {
119119
///
120120
/// Contrary to `current`, this never panics
121121
pub fn try_current() -> Result<Self, TryCurrentError> {
122-
context::current().ok_or(TryCurrentError(()))
122+
context::try_current()
123123
}
124124

125125
cfg_stats! {
@@ -334,17 +334,60 @@ impl Handle {
334334
}
335335

336336
/// Error returned by `try_current` when no Runtime has been started
337-
pub struct TryCurrentError(());
337+
#[derive(Debug)]
338+
pub struct TryCurrentError {
339+
kind: TryCurrentErrorKind,
340+
}
341+
342+
impl TryCurrentError {
343+
pub(crate) fn new_no_context() -> Self {
344+
Self {
345+
kind: TryCurrentErrorKind::NoContext,
346+
}
347+
}
338348

339-
impl fmt::Debug for TryCurrentError {
349+
pub(crate) fn new_thread_local_destroyed() -> Self {
350+
Self {
351+
kind: TryCurrentErrorKind::ThreadLocalDestroyed,
352+
}
353+
}
354+
355+
/// Returns true if the call failed because there is currently no runtime in
356+
/// the Tokio context.
357+
pub fn is_missing_context(&self) -> bool {
358+
matches!(self.kind, TryCurrentErrorKind::NoContext)
359+
}
360+
361+
/// Returns true if the call failed because the Tokio context thread-local
362+
/// had been destroyed. This can usually only happen if in the destructor of
363+
/// other thread-locals.
364+
pub fn is_thread_local_destroyed(&self) -> bool {
365+
matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed)
366+
}
367+
}
368+
369+
enum TryCurrentErrorKind {
370+
NoContext,
371+
ThreadLocalDestroyed,
372+
}
373+
374+
impl fmt::Debug for TryCurrentErrorKind {
340375
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341-
f.debug_struct("TryCurrentError").finish()
376+
use TryCurrentErrorKind::*;
377+
match self {
378+
NoContext => f.write_str("NoContext"),
379+
ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"),
380+
}
342381
}
343382
}
344383

345384
impl fmt::Display for TryCurrentError {
346385
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347-
f.write_str(CONTEXT_MISSING_ERROR)
386+
use TryCurrentErrorKind::*;
387+
match self.kind {
388+
NoContext => f.write_str(CONTEXT_MISSING_ERROR),
389+
ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR),
390+
}
348391
}
349392
}
350393

tokio/src/runtime/mod.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ cfg_rt! {
205205
use self::enter::enter;
206206

207207
mod handle;
208-
pub use handle::{EnterGuard, Handle};
208+
pub use handle::{EnterGuard, Handle, TryCurrentError};
209209

210210
mod spawner;
211211
use self::spawner::Spawner;
@@ -537,7 +537,7 @@ cfg_rt! {
537537
/// ```
538538
pub fn shutdown_timeout(mut self, duration: Duration) {
539539
// Wakeup and shutdown all the worker threads
540-
self.handle.shutdown();
540+
self.handle.clone().shutdown();
541541
self.blocking_pool.shutdown(Some(duration));
542542
}
543543

@@ -571,4 +571,30 @@ cfg_rt! {
571571
self.shutdown_timeout(Duration::from_nanos(0))
572572
}
573573
}
574+
575+
#[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
576+
impl Drop for Runtime {
577+
fn drop(&mut self) {
578+
match &mut self.kind {
579+
Kind::CurrentThread(basic) => {
580+
// This ensures that tasks spawned on the basic runtime are dropped inside the
581+
// runtime's context.
582+
match self::context::try_enter(self.handle.clone()) {
583+
Some(guard) => basic.set_context_guard(guard),
584+
None => {
585+
// The context thread-local has alread been destroyed.
586+
//
587+
// We don't set the guard in this case. Calls to tokio::spawn in task
588+
// destructors would fail regardless if this happens.
589+
},
590+
}
591+
},
592+
#[cfg(feature = "rt-multi-thread")]
593+
Kind::ThreadPool(_) => {
594+
// The threaded scheduler drops its tasks on its worker threads, which is
595+
// already in the runtime's context.
596+
},
597+
}
598+
}
599+
}
574600
}

tokio/src/task/builder.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#![allow(unreachable_pub)]
2-
use crate::util::error::CONTEXT_MISSING_ERROR;
32
use crate::{runtime::context, task::JoinHandle};
43
use std::future::Future;
54

@@ -98,8 +97,6 @@ impl<'a> Builder<'a> {
9897
Function: FnOnce() -> Output + Send + 'static,
9998
Output: Send + 'static,
10099
{
101-
context::current()
102-
.expect(CONTEXT_MISSING_ERROR)
103-
.spawn_blocking_inner(function, self.name)
100+
context::current().spawn_blocking_inner(function, self.name)
104101
}
105102
}

tokio/src/util/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,11 @@ pub(crate) const CONTEXT_MISSING_ERROR: &str =
77
/// Error string explaining that the Tokio context is shutting down and cannot drive timers.
88
pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str =
99
"A Tokio 1.x context was found, but it is being shutdown.";
10+
11+
// some combinations of features might not use this
12+
#[allow(dead_code)]
13+
/// Error string explaining that the Tokio context is not available because the
14+
/// thread-local storing it has been destroyed. This usually only happens during
15+
/// destructors of other thread-locals.
16+
pub(crate) const THREAD_LOCAL_DESTROYED_ERROR: &str =
17+
"The Tokio context thread-local variable has been destroyed.";

tokio/tests/rt_basic.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33

44
use tokio::runtime::Runtime;
55
use tokio::sync::oneshot;
6+
use tokio::time::{timeout, Duration};
67
use tokio_test::{assert_err, assert_ok};
78

9+
use std::future::Future;
10+
use std::pin::Pin;
11+
use std::sync::atomic::{AtomicBool, Ordering};
12+
use std::task::{Context, Poll};
813
use std::thread;
9-
use tokio::time::{timeout, Duration};
1014

1115
mod support {
1216
pub(crate) mod mpsc_stream;
@@ -135,6 +139,35 @@ fn acquire_mutex_in_drop() {
135139
drop(rt);
136140
}
137141

142+
#[test]
143+
fn drop_tasks_in_context() {
144+
static SUCCESS: AtomicBool = AtomicBool::new(false);
145+
146+
struct ContextOnDrop;
147+
148+
impl Future for ContextOnDrop {
149+
type Output = ();
150+
151+
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
152+
Poll::Pending
153+
}
154+
}
155+
156+
impl Drop for ContextOnDrop {
157+
fn drop(&mut self) {
158+
if tokio::runtime::Handle::try_current().is_ok() {
159+
SUCCESS.store(true, Ordering::SeqCst);
160+
}
161+
}
162+
}
163+
164+
let rt = rt();
165+
rt.spawn(ContextOnDrop);
166+
drop(rt);
167+
168+
assert!(SUCCESS.load(Ordering::SeqCst));
169+
}
170+
138171
#[test]
139172
#[should_panic(
140173
expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."

0 commit comments

Comments
 (0)