Skip to content

Commit

Permalink
record original future/function size when auto-boxing
Browse files Browse the repository at this point in the history
  • Loading branch information
hds committed Oct 1, 2024
1 parent 3ceb716 commit 648d906
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 72 deletions.
22 changes: 13 additions & 9 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD};
use crate::util::metric_atomics::MetricAtomicUsize;
use crate::util::trace::SpawnMeta;

use std::collections::{HashMap, VecDeque};
use std::fmt;
Expand Down Expand Up @@ -299,10 +300,11 @@ impl Spawner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, spawn_result) = if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, None, rt)
let fn_size = std::mem::size_of::<F>();
let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD {
self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, SpawnMeta::new_unnamed(fn_size), rt)
} else {
self.spawn_blocking_inner(func, Mandatory::NonMandatory, None, rt)
self.spawn_blocking_inner(func, Mandatory::NonMandatory, SpawnMeta::new_unnamed(fn_size), rt)
};

match spawn_result {
Expand All @@ -326,18 +328,19 @@ impl Spawner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, spawn_result) = if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
let fn_size = std::mem::size_of::<F>();
let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD {
self.spawn_blocking_inner(
Box::new(func),
Mandatory::Mandatory,
None,
SpawnMeta::new_unnamed(fn_size),
rt,
)
} else {
self.spawn_blocking_inner(
func,
Mandatory::Mandatory,
None,
SpawnMeta::new_unnamed(fn_size),
rt,
)
};
Expand All @@ -355,7 +358,7 @@ impl Spawner {
&self,
func: F,
is_mandatory: Mandatory,
name: Option<&str>,
spawn_meta: SpawnMeta<'_>,
rt: &Handle,
) -> (JoinHandle<R>, Result<(), SpawnError>)
where
Expand All @@ -372,9 +375,10 @@ impl Spawner {
target: "tokio::task::blocking",
"runtime.spawn",
kind = %"blocking",
task.name = %name.unwrap_or_default(),
task.name = %spawn_meta.name.unwrap_or_default(),
task.id = id.as_u64(),
"fn" = %std::any::type_name::<F>(),
original_size.bytes = spawn_meta.original_size,
size.bytes = std::mem::size_of::<F>(),
loc.file = location.file(),
loc.line = location.line(),
Expand All @@ -384,7 +388,7 @@ impl Spawner {
};

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;
let _ = spawn_meta;

let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);

Expand Down
25 changes: 14 additions & 11 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ pub struct Handle {
use crate::runtime::task::JoinHandle;
use crate::runtime::BOX_FUTURE_THRESHOLD;
use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
use crate::util::trace::SpawnMeta;

use std::future::Future;
use std::marker::PhantomData;
use std::{error, fmt};
use std::{error, fmt, mem};

/// Runtime context guard.
///
Expand Down Expand Up @@ -189,10 +190,11 @@ impl Handle {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.spawn_named(Box::pin(future), None)
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.spawn_named(future, None)
self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
}
}

Expand Down Expand Up @@ -296,15 +298,16 @@ impl Handle {
/// [`tokio::time`]: crate::time
#[track_caller]
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.block_on_inner(Box::pin(future))
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.block_on_inner(future)
self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
}
}

#[track_caller]
fn block_on_inner<F: Future>(&self, future: F) -> F::Output {
fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
#[cfg(all(
tokio_unstable,
tokio_taskdump,
Expand All @@ -316,7 +319,7 @@ impl Handle {

#[cfg(all(tokio_unstable, feature = "tracing"))]
let future =
crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64());
crate::util::trace::task(future, "block_on", _meta, super::task::Id::next().as_u64());

// Enter the runtime context. This sets the current driver handles and
// prevents blocking an existing runtime.
Expand All @@ -326,7 +329,7 @@ impl Handle {
}

#[track_caller]
pub(crate) fn spawn_named<F>(&self, future: F, _name: Option<&str>) -> JoinHandle<F::Output>
pub(crate) fn spawn_named<F>(&self, future: F, _meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand All @@ -341,7 +344,7 @@ impl Handle {
))]
let future = super::task::trace::Trace::root(future);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _name, id.as_u64());
let future = crate::util::trace::task(future, "task", _meta, id.as_u64());
self.inner.spawn(future, id)
}

Expand Down
20 changes: 12 additions & 8 deletions tokio/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use crate::runtime::blocking::BlockingPool;
use crate::runtime::scheduler::CurrentThread;
use crate::runtime::{context, EnterGuard, Handle};
use crate::task::JoinHandle;
use crate::util::trace::SpawnMeta;

use std::future::Future;
use std::mem;
use std::time::Duration;

cfg_rt_multi_thread! {
Expand Down Expand Up @@ -241,10 +243,11 @@ impl Runtime {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.handle.spawn_named(Box::pin(future), None)
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.handle.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.handle.spawn_named(future, None)
self.handle.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
}
}

Expand Down Expand Up @@ -329,15 +332,16 @@ impl Runtime {
/// [handle]: fn@Handle::block_on
#[track_caller]
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.block_on_inner(Box::pin(future))
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.block_on_inner(future)
self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
}
}

#[track_caller]
fn block_on_inner<F: Future>(&self, future: F) -> F::Output {
fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
#[cfg(all(
tokio_unstable,
tokio_taskdump,
Expand All @@ -351,7 +355,7 @@ impl Runtime {
let future = crate::util::trace::task(
future,
"block_on",
None,
_meta,
crate::runtime::task::Id::next().as_u64(),
);

Expand Down
41 changes: 26 additions & 15 deletions tokio/src/task/builder.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#![allow(unreachable_pub)]
use crate::{
runtime::{Handle, BOX_FUTURE_THRESHOLD},
task::{JoinHandle, LocalSet},
task::{JoinHandle, LocalSet}, util::trace::SpawnMeta,
};
use std::{future::Future, io};
use std::{future::Future, io, mem};

/// Factory which is used to configure the properties of a new task.
///
Expand Down Expand Up @@ -88,10 +88,11 @@ impl<'a> Builder<'a> {
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
Ok(if std::mem::size_of::<Fut>() > BOX_FUTURE_THRESHOLD {
super::spawn::spawn_inner(Box::pin(future), self.name)
let fut_size = mem::size_of::<Fut>();
Ok(if fut_size > BOX_FUTURE_THRESHOLD {
super::spawn::spawn_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size))
} else {
super::spawn::spawn_inner(future, self.name)
super::spawn::spawn_inner(future, SpawnMeta::new(self.name, fut_size))
})
}

Expand All @@ -108,10 +109,11 @@ impl<'a> Builder<'a> {
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
Ok(if std::mem::size_of::<Fut>() > BOX_FUTURE_THRESHOLD {
handle.spawn_named(Box::pin(future), self.name)
let fut_size = mem::size_of::<Fut>();
Ok(if fut_size > BOX_FUTURE_THRESHOLD {
handle.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size))
} else {
handle.spawn_named(future, self.name)
handle.spawn_named(future, SpawnMeta::new(self.name, fut_size))
})
}

Expand All @@ -135,10 +137,12 @@ impl<'a> Builder<'a> {
Fut: Future + 'static,
Fut::Output: 'static,
{
Ok(if std::mem::size_of::<Fut>() > BOX_FUTURE_THRESHOLD {
super::local::spawn_local_inner(Box::pin(future), self.name)

let fut_size = mem::size_of::<Fut>();
Ok(if fut_size > BOX_FUTURE_THRESHOLD {
super::local::spawn_local_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size))
} else {
super::local::spawn_local_inner(future, self.name)
super::local::spawn_local_inner(future, SpawnMeta::new(self.name, fut_size))
})
}

Expand All @@ -159,7 +163,13 @@ impl<'a> Builder<'a> {
Fut: Future + 'static,
Fut::Output: 'static,
{
Ok(local_set.spawn_named(future, self.name))
let fut_size = mem::size_of::<Fut>();
Ok(if fut_size > BOX_FUTURE_THRESHOLD {
local_set.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size))
} else {
local_set.spawn_named(future, SpawnMeta::new(self.name, fut_size))
})

}

/// Spawns blocking code on the blocking threadpool.
Expand Down Expand Up @@ -200,19 +210,20 @@ impl<'a> Builder<'a> {
Output: Send + 'static,
{
use crate::runtime::Mandatory;
let (join_handle, spawn_result) = if std::mem::size_of::<Function>() > BOX_FUTURE_THRESHOLD
let fn_size = mem::size_of::<Function>();
let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD
{
handle.inner.blocking_spawner().spawn_blocking_inner(
Box::new(function),
Mandatory::NonMandatory,
self.name,
SpawnMeta::new(self.name, fn_size),
handle,
)
} else {
handle.inner.blocking_spawner().spawn_blocking_inner(
function,
Mandatory::NonMandatory,
self.name,
SpawnMeta::new(self.name, fn_size),
handle,
)
};
Expand Down
Loading

0 comments on commit 648d906

Please sign in to comment.