Skip to content

Commit 3dc3aaa

Browse files
pepijnvealamb
andauthored
Use tokio::task::coop::poll_proceed by default in CooperativeStream (#16748)
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 5cdb7a2 commit 3dc3aaa

File tree

2 files changed

+10
-12
lines changed

2 files changed

+10
-12
lines changed

datafusion/common-runtime/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,4 @@ log = { workspace = true }
4343
tokio = { workspace = true }
4444

4545
[dev-dependencies]
46-
tokio = { version = "1.47", features = ["rt", "rt-multi-thread", "time"] }
46+
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time"] }

datafusion/physical-plan/src/coop.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,7 @@
6565
//! The optimizer rule currently checks the plan for exchange-like operators and leave operators
6666
//! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties).
6767
68-
#[cfg(any(
69-
datafusion_coop = "tokio_fallback",
70-
not(any(datafusion_coop = "tokio", datafusion_coop = "per_stream"))
71-
))]
68+
#[cfg(datafusion_coop = "tokio_fallback")]
7269
use futures::Future;
7370
use std::any::Any;
7471
use std::pin::Pin;
@@ -133,10 +130,14 @@ where
133130
mut self: Pin<&mut Self>,
134131
cx: &mut Context<'_>,
135132
) -> Poll<Option<Self::Item>> {
136-
#[cfg(datafusion_coop = "tokio")]
133+
#[cfg(any(
134+
datafusion_coop = "tokio",
135+
not(any(
136+
datafusion_coop = "tokio_fallback",
137+
datafusion_coop = "per_stream"
138+
))
139+
))]
137140
{
138-
// TODO this should be the default implementation
139-
// Enable once https://github.com/tokio-rs/tokio/issues/7403 is merged and released
140141
let coop = std::task::ready!(tokio::task::coop::poll_proceed(cx));
141142
let value = self.inner.poll_next_unpin(cx);
142143
if value.is_ready() {
@@ -145,10 +146,7 @@ where
145146
value
146147
}
147148

148-
#[cfg(any(
149-
datafusion_coop = "tokio_fallback",
150-
not(any(datafusion_coop = "tokio", datafusion_coop = "per_stream"))
151-
))]
149+
#[cfg(datafusion_coop = "tokio_fallback")]
152150
{
153151
// This is a temporary placeholder implementation that may have slightly
154152
// worse performance compared to `poll_proceed`

0 commit comments

Comments
 (0)