Skip to content

Commit

Permalink
Merge branch 'tokio-rs:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
sharma-shray authored Oct 15, 2024
2 parents 505ea06 + 512e9de commit 8665992
Show file tree
Hide file tree
Showing 99 changed files with 1,182 additions and 218 deletions.
14 changes: 9 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ env:
# Change to specific Rust release to pin
rust_stable: stable
rust_nightly: nightly-2024-05-05
# Pin a specific miri version
rust_miri_nightly: nightly-2024-09-19
rust_clippy: '1.77'
# When updating this, also update:
# - README.md
Expand Down Expand Up @@ -413,17 +415,19 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust ${{ env.rust_nightly }}
- name: Install Rust ${{ env.rust_miri_nightly }}
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ env.rust_nightly }}
toolchain: ${{ env.rust_miri_nightly }}
components: miri
- name: Install cargo-nextest
uses: taiki-e/install-action@v2
with:
tool: cargo-nextest
- uses: Swatinem/rust-cache@v2
- name: miri
# Many of tests in tokio/tests and doctests use #[tokio::test] or
# #[tokio::main] that calls epoll_create1 that Miri does not support.
run: |
cargo miri test --features full --lib --no-fail-fast
cargo miri nextest run --features full --lib --tests --no-fail-fast
working-directory: tokio
env:
MIRIFLAGS: -Zmiri-disable-isolation -Zmiri-strict-provenance -Zmiri-retag-fields
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ run loom tests that test unstable features.

You can run miri tests with
```
MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-tag-raw-pointers" \
MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-strict-provenance -Zmiri-retag-fields" \
cargo +nightly miri test --features full --lib
```

Expand Down
1 change: 1 addition & 0 deletions tests-build/tests/macros.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#[test]
#[cfg_attr(miri, ignore)]
fn compile_fail_full() {
let t = trybuild::TestCases::new();

Expand Down
2 changes: 1 addition & 1 deletion tests-integration/tests/process_stdio.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi")))]
#![cfg(all(feature = "full", not(target_os = "wasi"), not(miri)))]

use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::join;
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ futures-io = { version = "0.3.0", optional = true }
futures-util = { version = "0.3.0", optional = true }
pin-project-lite = "0.2.11"
slab = { version = "0.4.4", optional = true } # Backs `DelayQueue`
tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true }
tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true }

[target.'cfg(tokio_unstable)'.dependencies]
hashbrown = { version = "0.14.0", default-features = false, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/src/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//!
//! This type must be used from within the context of the `Runtime`.

use futures_core::Future;
use std::future::Future;
use std::time::Duration;
use tokio::time::Timeout;

Expand Down
2 changes: 2 additions & 0 deletions tokio-util/tests/spawn_pinned.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![warn(rust_2018_idioms)]
#![cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
// Blocked on https://github.com/rust-lang/miri/issues/3911
#![cfg(not(miri))]

use std::rc::Rc;
use std::sync::Arc;
Expand Down
1 change: 1 addition & 0 deletions tokio-util/tests/time_delay_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ async fn single_short_delay() {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // Too slow on miri.
async fn multi_delay_at_start() {
time::pause();

Expand Down
1 change: 1 addition & 0 deletions tokio-util/tests/udp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![warn(rust_2018_idioms)]
#![cfg(not(target_os = "wasi"))] // Wasi doesn't support UDP
#![cfg(not(miri))] // No `socket` in Miri.

use tokio::net::UdpSocket;
use tokio_stream::StreamExt;
Expand Down
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ socket2 = { version = "0.5.5", optional = true, features = [ "all" ] }
# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(tokio_unstable)'.dependencies]
tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } # Not in full
tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } # Not in full

# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
Expand Down
28 changes: 28 additions & 0 deletions tokio/src/io/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,13 @@ impl<T: AsRawFd> AsyncFd<T> {
/// concurrently with other methods on this struct. This method only
/// provides shared access to the inner IO resource when handling the
/// [`AsyncFdReadyGuard`].
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
/// will continue to return immediately until the readiness event is
/// consumed by an attempt to read or write that fails with `WouldBlock` or
/// `Poll::Pending`.
#[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
pub async fn readable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
self.ready(Interest::READABLE).await
Expand All @@ -713,6 +720,13 @@ impl<T: AsRawFd> AsyncFd<T> {
///
/// This method takes `&mut self`, so it is possible to access the inner IO
/// resource mutably when handling the [`AsyncFdReadyMutGuard`].
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
/// will continue to return immediately until the readiness event is
/// consumed by an attempt to read or write that fails with `WouldBlock` or
/// `Poll::Pending`.
#[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
pub async fn readable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
self.ready_mut(Interest::READABLE).await
Expand All @@ -726,6 +740,13 @@ impl<T: AsRawFd> AsyncFd<T> {
/// concurrently with other methods on this struct. This method only
/// provides shared access to the inner IO resource when handling the
/// [`AsyncFdReadyGuard`].
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
/// will continue to return immediately until the readiness event is
/// consumed by an attempt to read or write that fails with `WouldBlock` or
/// `Poll::Pending`.
#[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
pub async fn writable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
self.ready(Interest::WRITABLE).await
Expand All @@ -737,6 +758,13 @@ impl<T: AsRawFd> AsyncFd<T> {
///
/// This method takes `&mut self`, so it is possible to access the inner IO
/// resource mutably when handling the [`AsyncFdReadyMutGuard`].
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
/// will continue to return immediately until the readiness event is
/// consumed by an attempt to read or write that fails with `WouldBlock` or
/// `Poll::Pending`.
#[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
pub async fn writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
self.ready_mut(Interest::WRITABLE).await
Expand Down
50 changes: 22 additions & 28 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD};
use crate::util::metric_atomics::MetricAtomicUsize;
use crate::util::trace::{blocking_task, SpawnMeta};

use std::collections::{HashMap, VecDeque};
use std::fmt;
Expand Down Expand Up @@ -299,10 +300,21 @@ impl Spawner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, spawn_result) = if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, None, rt)
let fn_size = std::mem::size_of::<F>();
let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD {
self.spawn_blocking_inner(
Box::new(func),
Mandatory::NonMandatory,
SpawnMeta::new_unnamed(fn_size),
rt,
)
} else {
self.spawn_blocking_inner(func, Mandatory::NonMandatory, None, rt)
self.spawn_blocking_inner(
func,
Mandatory::NonMandatory,
SpawnMeta::new_unnamed(fn_size),
rt,
)
};

match spawn_result {
Expand All @@ -326,18 +338,19 @@ impl Spawner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, spawn_result) = if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
let fn_size = std::mem::size_of::<F>();
let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD {
self.spawn_blocking_inner(
Box::new(func),
Mandatory::Mandatory,
None,
SpawnMeta::new_unnamed(fn_size),
rt,
)
} else {
self.spawn_blocking_inner(
func,
Mandatory::Mandatory,
None,
SpawnMeta::new_unnamed(fn_size),
rt,
)
};
Expand All @@ -355,35 +368,16 @@ impl Spawner {
&self,
func: F,
is_mandatory: Mandatory,
name: Option<&str>,
spawn_meta: SpawnMeta<'_>,
rt: &Handle,
) -> (JoinHandle<R>, Result<(), SpawnError>)
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let fut = BlockingTask::new(func);
let id = task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
use tracing::Instrument;
let location = std::panic::Location::caller();
let span = tracing::trace_span!(
target: "tokio::task::blocking",
"runtime.spawn",
kind = %"blocking",
task.name = %name.unwrap_or_default(),
task.id = id.as_u64(),
"fn" = %std::any::type_name::<F>(),
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);
fut.instrument(span)
};

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;
let fut =
blocking_task::<F, BlockingTask<F>>(BlockingTask::new(func), spawn_meta, id.as_u64());

let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);

Expand Down
81 changes: 72 additions & 9 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#![cfg_attr(loom, allow(unused_imports))]

use crate::runtime::handle::Handle;
#[cfg(tokio_unstable)]
use crate::runtime::TaskMeta;
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
#[cfg(tokio_unstable)]
use crate::runtime::{LocalOptions, LocalRuntime, TaskMeta};
use crate::util::rand::{RngSeed, RngSeedGenerator};

use crate::runtime::blocking::BlockingPool;
use crate::runtime::scheduler::CurrentThread;
use std::fmt;
use std::io;
use std::thread::ThreadId;
use std::time::Duration;

/// Builds Tokio Runtime with custom configuration values.
Expand Down Expand Up @@ -800,6 +803,37 @@ impl Builder {
}
}

/// Creates the configured `LocalRuntime`.
///
/// The returned `LocalRuntime` instance is ready to spawn tasks.
///
/// # Panics
/// This will panic if `current_thread` is not the selected runtime flavor.
/// All other runtime flavors are unsupported by [`LocalRuntime`].
///
/// [`LocalRuntime`]: [crate::runtime::LocalRuntime]
///
/// # Examples
///
/// ```
/// use tokio::runtime::Builder;
///
/// let rt = Builder::new_current_thread().build_local(&mut Default::default()).unwrap();
///
/// rt.block_on(async {
/// println!("Hello from the Tokio runtime");
/// });
/// ```
#[allow(unused_variables, unreachable_patterns)]
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> {
match &self.kind {
Kind::CurrentThread => self.build_current_thread_local_runtime(),
_ => panic!("Only current_thread is supported when building a local runtime"),
}
}

fn get_cfg(&self, workers: usize) -> driver::Cfg {
driver::Cfg {
enable_pause_time: match self.kind {
Expand Down Expand Up @@ -1191,8 +1225,40 @@ impl Builder {
}

fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::scheduler::{self, CurrentThread};
use crate::runtime::{runtime::Scheduler, Config};
use crate::runtime::runtime::Scheduler;

let (scheduler, handle, blocking_pool) =
self.build_current_thread_runtime_components(None)?;

Ok(Runtime::from_parts(
Scheduler::CurrentThread(scheduler),
handle,
blocking_pool,
))
}

#[cfg(tokio_unstable)]
fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
use crate::runtime::local_runtime::LocalRuntimeScheduler;

let tid = std::thread::current().id();

let (scheduler, handle, blocking_pool) =
self.build_current_thread_runtime_components(Some(tid))?;

Ok(LocalRuntime::from_parts(
LocalRuntimeScheduler::CurrentThread(scheduler),
handle,
blocking_pool,
))
}

fn build_current_thread_runtime_components(
&mut self,
local_tid: Option<ThreadId>,
) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
use crate::runtime::scheduler;
use crate::runtime::Config;

let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;

Expand Down Expand Up @@ -1227,17 +1293,14 @@ impl Builder {
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
local_tid,
);

let handle = Handle {
inner: scheduler::Handle::CurrentThread(handle),
};

Ok(Runtime::from_parts(
Scheduler::CurrentThread(scheduler),
handle,
blocking_pool,
))
Ok((scheduler, handle, blocking_pool))
}

fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
Expand Down
Loading

0 comments on commit 8665992

Please sign in to comment.