Skip to content

Commit 421a7b0

Browse files
authored
rt: do not track time-based metrics on wasm32-unknown-unknown (#7322)
1 parent b1bdb3c commit 421a7b0

File tree

4 files changed

+103
-18
lines changed

4 files changed

+103
-18
lines changed

tokio/src/net/tcp/stream.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1112,8 +1112,16 @@ impl TcpStream {
11121112
/// This function will cause all pending and future I/O on the specified
11131113
/// portions to return immediately with an appropriate value (see the
11141114
/// documentation of `Shutdown`).
1115+
///
1116+
/// Remark: this function transforms `Err(std::io::ErrorKind::NotConnected)` to `Ok(())`.
1117+
/// It does this to abstract away OS specific logic and to prevent a race condition between
1118+
/// this function call and the OS closing this socket because of external events (e.g. TCP reset).
1119+
/// See <https://github.com/tokio-rs/tokio/issues/4665> for more information.
11151120
pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1116-
self.io.shutdown(how)
1121+
match self.io.shutdown(how) {
1122+
Err(err) if err.kind() == std::io::ErrorKind::NotConnected => Ok(()),
1123+
result => result,
1124+
}
11171125
}
11181126

11191127
/// Gets the value of the `TCP_NODELAY` option on this socket.

tokio/src/runtime/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,7 @@ impl Builder {
770770
/// # }
771771
/// ```
772772
#[cfg(tokio_unstable)]
773+
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
773774
pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
774775
where
775776
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
@@ -813,6 +814,7 @@ impl Builder {
813814
/// # }
814815
/// ```
815816
#[cfg(tokio_unstable)]
817+
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
816818
pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
817819
where
818820
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,

tokio/src/runtime/metrics/batch.rs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub(crate) struct MetricsBatch {
1212
busy_duration_total: u64,
1313

1414
/// Instant at which work last resumed (continued after park).
15-
processing_scheduled_tasks_started_at: Instant,
15+
processing_scheduled_tasks_started_at: Option<Instant>,
1616

1717
/// Number of times the worker parked.
1818
park_count: u64,
@@ -67,25 +67,34 @@ cfg_unstable_metrics! {
6767

6868
impl MetricsBatch {
6969
pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
70-
let now = Instant::now();
71-
Self::new_unstable(worker_metrics, now)
70+
let maybe_now = now();
71+
Self::new_unstable(worker_metrics, maybe_now)
7272
}
7373

7474
cfg_metrics_variant! {
7575
stable: {
7676
#[inline(always)]
77-
fn new_unstable(_worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch {
77+
fn new_unstable(_worker_metrics: &WorkerMetrics, maybe_now: Option<Instant>) -> MetricsBatch {
7878
MetricsBatch {
7979
busy_duration_total: 0,
80-
processing_scheduled_tasks_started_at: now,
80+
processing_scheduled_tasks_started_at: maybe_now,
8181
park_count: 0,
8282
park_unpark_count: 0,
8383
}
8484
}
8585
},
8686
unstable: {
8787
#[inline(always)]
88-
fn new_unstable(worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch {
88+
fn new_unstable(worker_metrics: &WorkerMetrics, maybe_now: Option<Instant>) -> MetricsBatch {
89+
let poll_timer = maybe_now.and_then(|now| {
90+
worker_metrics
91+
.poll_count_histogram
92+
.as_ref()
93+
.map(|worker_poll_counts| PollTimer {
94+
poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
95+
poll_started_at: now,
96+
})
97+
});
8998
MetricsBatch {
9099
park_count: 0,
91100
park_unpark_count: 0,
@@ -97,13 +106,8 @@ impl MetricsBatch {
97106
local_schedule_count: 0,
98107
overflow_count: 0,
99108
busy_duration_total: 0,
100-
processing_scheduled_tasks_started_at: now,
101-
poll_timer: worker_metrics.poll_count_histogram.as_ref().map(
102-
|worker_poll_counts| PollTimer {
103-
poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
104-
poll_started_at: now,
105-
},
106-
),
109+
processing_scheduled_tasks_started_at: maybe_now,
110+
poll_timer,
107111
}
108112
}
109113
}
@@ -186,13 +190,17 @@ impl MetricsBatch {
186190

187191
/// Start processing a batch of tasks
188192
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
189-
self.processing_scheduled_tasks_started_at = Instant::now();
193+
self.processing_scheduled_tasks_started_at = now();
190194
}
191195

192196
/// Stop processing a batch of tasks
193197
pub(crate) fn end_processing_scheduled_tasks(&mut self) {
194-
let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
195-
self.busy_duration_total += duration_as_u64(busy_duration);
198+
if let Some(processing_scheduled_tasks_started_at) =
199+
self.processing_scheduled_tasks_started_at
200+
{
201+
let busy_duration = processing_scheduled_tasks_started_at.elapsed();
202+
self.busy_duration_total += duration_as_u64(busy_duration);
203+
}
196204
}
197205

198206
cfg_metrics_variant! {
@@ -279,3 +287,17 @@ cfg_rt_multi_thread! {
279287
pub(crate) fn duration_as_u64(dur: Duration) -> u64 {
280288
u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
281289
}
290+
291+
/// Gate unsupported time metrics for `wasm32-unknown-unknown`
292+
/// <https://github.com/tokio-rs/tokio/issues/7319>
293+
fn now() -> Option<Instant> {
294+
if cfg!(all(
295+
target_arch = "wasm32",
296+
target_os = "unknown",
297+
target_vendor = "unknown"
298+
)) {
299+
None
300+
} else {
301+
Some(Instant::now())
302+
}
303+
}

tokio/tests/tcp_shutdown.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@
22
#![cfg(all(feature = "full", not(target_os = "wasi"), not(miri)))] // Wasi doesn't support bind
33
// No `socket` on miri.
44

5+
use std::time::Duration;
56
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
67
use tokio::net::{TcpListener, TcpStream};
8+
use tokio::sync::oneshot::channel;
79
use tokio_test::assert_ok;
810

911
#[tokio::test]
1012
async fn shutdown() {
1113
let srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
1214
let addr = assert_ok!(srv.local_addr());
1315

14-
tokio::spawn(async move {
16+
let handle = tokio::spawn(async move {
1517
let mut stream = assert_ok!(TcpStream::connect(&addr).await);
1618

1719
assert_ok!(AsyncWriteExt::shutdown(&mut stream).await);
@@ -26,4 +28,55 @@ async fn shutdown() {
2628

2729
let n = assert_ok!(io::copy(&mut rd, &mut wr).await);
2830
assert_eq!(n, 0);
31+
assert_ok!(AsyncWriteExt::shutdown(&mut stream).await);
32+
handle.await.unwrap()
33+
}
34+
35+
#[tokio::test]
36+
async fn shutdown_after_tcp_reset() {
37+
let srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
38+
let addr = assert_ok!(srv.local_addr());
39+
40+
let (connected_tx, connected_rx) = channel();
41+
let (dropped_tx, dropped_rx) = channel();
42+
43+
let handle = tokio::spawn(async move {
44+
let mut stream = assert_ok!(TcpStream::connect(&addr).await);
45+
connected_tx.send(()).unwrap();
46+
47+
dropped_rx.await.unwrap();
48+
assert_ok!(AsyncWriteExt::shutdown(&mut stream).await);
49+
});
50+
51+
let (stream, _) = assert_ok!(srv.accept().await);
52+
// By setting linger to 0 we will trigger a TCP reset
53+
stream.set_linger(Some(Duration::new(0, 0))).unwrap();
54+
connected_rx.await.unwrap();
55+
56+
drop(stream);
57+
dropped_tx.send(()).unwrap();
58+
59+
handle.await.unwrap();
60+
}
61+
62+
#[tokio::test]
63+
async fn shutdown_multiple_calls() {
64+
let srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
65+
let addr = assert_ok!(srv.local_addr());
66+
67+
let (connected_tx, connected_rx) = channel();
68+
69+
let handle = tokio::spawn(async move {
70+
let mut stream = assert_ok!(TcpStream::connect(&addr).await);
71+
connected_tx.send(()).unwrap();
72+
assert_ok!(AsyncWriteExt::shutdown(&mut stream).await);
73+
assert_ok!(AsyncWriteExt::shutdown(&mut stream).await);
74+
assert_ok!(AsyncWriteExt::shutdown(&mut stream).await);
75+
});
76+
77+
let (mut stream, _) = assert_ok!(srv.accept().await);
78+
connected_rx.await.unwrap();
79+
80+
assert_ok!(AsyncWriteExt::shutdown(&mut stream).await);
81+
handle.await.unwrap();
2982
}

0 commit comments

Comments
 (0)