Skip to content

Commit 9e3fb16

Browse files
author
Abutalib Aghayev
authored
rt: move CoreStage methods to Core (#5182)
1 parent 53cba02 commit 9e3fb16

File tree

2 files changed

+25
-36
lines changed

2 files changed

+25
-36
lines changed

tokio/src/runtime/task/core.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,9 @@ impl<T: Future> CoreStage<T> {
155155
pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
156156
self.stage.with_mut(f)
157157
}
158+
}
158159

160+
impl<T: Future, S: Schedule> Core<T, S> {
159161
/// Polls the future.
160162
///
161163
/// # Safety
@@ -171,7 +173,7 @@ impl<T: Future> CoreStage<T> {
171173
/// heap.
172174
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
173175
let res = {
174-
self.stage.with_mut(|ptr| {
176+
self.stage.stage.with_mut(|ptr| {
175177
// Safety: The caller ensures mutual exclusion to the field.
176178
let future = match unsafe { &mut *ptr } {
177179
Stage::Running(future) => future,
@@ -224,7 +226,7 @@ impl<T: Future> CoreStage<T> {
224226
pub(super) fn take_output(&self) -> super::Result<T::Output> {
225227
use std::mem;
226228

227-
self.stage.with_mut(|ptr| {
229+
self.stage.stage.with_mut(|ptr| {
228230
// Safety:: the caller ensures mutual exclusion to the field.
229231
match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
230232
Stage::Finished(output) => output,
@@ -234,7 +236,7 @@ impl<T: Future> CoreStage<T> {
234236
}
235237

236238
unsafe fn set_stage(&self, stage: Stage<T>) {
237-
self.stage.with_mut(|ptr| *ptr = stage)
239+
self.stage.stage.with_mut(|ptr| *ptr = stage)
238240
}
239241
}
240242

tokio/src/runtime/task/harness.rs

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::future::Future;
2-
use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer};
2+
use crate::runtime::task::core::{Cell, Core, Header, Trailer};
33
use crate::runtime::task::state::{Snapshot, State};
44
use crate::runtime::task::waker::waker_ref;
55
use crate::runtime::task::{JoinError, Notified, Schedule, Task};
@@ -104,13 +104,7 @@ where
104104
let header_ptr = self.header_ptr();
105105
let waker_ref = waker_ref::<T, S>(&header_ptr);
106106
let cx = Context::from_waker(&*waker_ref);
107-
let core = self.core();
108-
let res = poll_future(
109-
&core.stage,
110-
&self.core().scheduler,
111-
core.task_id.clone(),
112-
cx,
113-
);
107+
let res = poll_future(self.core(), cx);
114108

115109
if res == Poll::Ready(()) {
116110
// The future completed. Move on to complete the task.
@@ -124,15 +118,13 @@ where
124118
TransitionToIdle::Cancelled => {
125119
// The transition to idle failed because the task was
126120
// cancelled during the poll.
127-
let core = self.core();
128-
cancel_task(&core.stage, core.task_id.clone());
121+
cancel_task(self.core());
129122
PollFuture::Complete
130123
}
131124
}
132125
}
133126
TransitionToRunning::Cancelled => {
134-
let core = self.core();
135-
cancel_task(&core.stage, core.task_id.clone());
127+
cancel_task(self.core());
136128
PollFuture::Complete
137129
}
138130
TransitionToRunning::Failed => PollFuture::Done,
@@ -155,8 +147,7 @@ where
155147

156148
// By transitioning the lifecycle to `Running`, we have permission to
157149
// drop the future.
158-
let core = self.core();
159-
cancel_task(&core.stage, core.task_id.clone());
150+
cancel_task(self.core());
160151
self.complete();
161152
}
162153

@@ -190,7 +181,7 @@ where
190181
/// Read the task output into `dst`.
191182
pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
192183
if can_read_output(self.header(), self.trailer(), waker) {
193-
*dst = Poll::Ready(self.core().stage.take_output());
184+
*dst = Poll::Ready(self.core().take_output());
194185
}
195186
}
196187

@@ -215,7 +206,7 @@ where
215206
// they are dropping the `JoinHandle`, we assume they are not
216207
// interested in the panic and swallow it.
217208
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
218-
self.core().stage.drop_future_or_output();
209+
self.core().drop_future_or_output();
219210
}));
220211
}
221212

@@ -325,7 +316,7 @@ where
325316
// The `JoinHandle` is not interested in the output of
326317
// this task. It is our responsibility to drop the
327318
// output.
328-
self.core().stage.drop_future_or_output();
319+
self.core().drop_future_or_output();
329320
} else if snapshot.has_join_waker() {
330321
// Notify the join handle. The previous transition obtains the
331322
// lock on the waker cell.
@@ -457,36 +448,32 @@ enum PollFuture {
457448
}
458449

459450
/// Cancels the task and store the appropriate error in the stage field.
460-
fn cancel_task<T: Future>(stage: &CoreStage<T>, id: super::Id) {
451+
fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
461452
// Drop the future from a panic guard.
462453
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
463-
stage.drop_future_or_output();
454+
core.drop_future_or_output();
464455
}));
465456

457+
let id = core.task_id.clone();
466458
match res {
467459
Ok(()) => {
468-
stage.store_output(Err(JoinError::cancelled(id)));
460+
core.store_output(Err(JoinError::cancelled(id)));
469461
}
470462
Err(panic) => {
471-
stage.store_output(Err(JoinError::panic(id, panic)));
463+
core.store_output(Err(JoinError::panic(id, panic)));
472464
}
473465
}
474466
}
475467

476468
/// Polls the future. If the future completes, the output is written to the
477469
/// stage field.
478-
fn poll_future<T: Future, S: Schedule>(
479-
core: &CoreStage<T>,
480-
scheduler: &S,
481-
id: super::Id,
482-
cx: Context<'_>,
483-
) -> Poll<()> {
470+
fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
484471
// Poll the future.
485472
let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
486-
struct Guard<'a, T: Future> {
487-
core: &'a CoreStage<T>,
473+
struct Guard<'a, T: Future, S: Schedule> {
474+
core: &'a Core<T, S>,
488475
}
489-
impl<'a, T: Future> Drop for Guard<'a, T> {
476+
impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
490477
fn drop(&mut self) {
491478
// If the future panics on poll, we drop it inside the panic
492479
// guard.
@@ -504,8 +491,8 @@ fn poll_future<T: Future, S: Schedule>(
504491
Ok(Poll::Pending) => return Poll::Pending,
505492
Ok(Poll::Ready(output)) => Ok(output),
506493
Err(panic) => {
507-
scheduler.unhandled_panic();
508-
Err(JoinError::panic(id, panic))
494+
core.scheduler.unhandled_panic();
495+
Err(JoinError::panic(core.task_id.clone(), panic))
509496
}
510497
};
511498

@@ -515,7 +502,7 @@ fn poll_future<T: Future, S: Schedule>(
515502
}));
516503

517504
if res.is_err() {
518-
scheduler.unhandled_panic();
505+
core.scheduler.unhandled_panic();
519506
}
520507

521508
Poll::Ready(())

0 commit comments

Comments
 (0)