Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 72 additions & 11 deletions datafusion/core/tests/execution/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion::physical_plan::execution_plan::Boundedness;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_common::{DataFusionError, JoinType, ScalarValue};
use datafusion_execution::TaskContext;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr_common::operator::Operator;
use datafusion_expr_common::operator::Operator::{Divide, Eq, Gt, Modulo};
use datafusion_functions_aggregate::min_max;
Expand All @@ -42,12 +42,14 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coop::make_cooperative;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec};
use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::union::InterleaveExec;
use futures::StreamExt;
use parking_lot::RwLock;
Expand Down Expand Up @@ -250,6 +252,58 @@ async fn agg_grouped_topk_yields(
query_yields(aggr, session_ctx.task_ctx()).await
}

#[rstest]
#[tokio::test]
// A test that mocks the behavior of `SpillManager::read_spill_as_stream` without file access
// to verify that a cooperative stream would properly yields in a spill file read scenario
async fn spill_reader_stream_yield() -> Result<(), Box<dyn Error>> {
use datafusion_physical_plan::common::spawn_buffered;

// A mock stream that always returns `Poll::Ready(Some(...))` immediately
let always_ready =
make_lazy_exec("value", false).execute(0, SessionContext::new().task_ctx())?;

// this function makes a consumer stream that resembles how read_stream from spill file is constructed
let stream = make_cooperative(always_ready);

// Set large buffer so that buffer always has free space for the producer/sender
let buffer_capacity = 100_000;
let mut mock_stream = spawn_buffered(stream, buffer_capacity);
let schema = mock_stream.schema();

let consumer_stream = futures::stream::poll_fn(move |cx| {
let mut collected = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think of having some cap of iterations or timelimit instead of infinite loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed :)

// To make sure that inner stream is polled multiple times, loop until the buffer is full
// Ideally, the stream will yield before the loop ends
for _ in 0..buffer_capacity {
Comment on lines +276 to +278
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to loop until the buffer_capacity

match mock_stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => {
collected.push(batch);
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
break;
}
Poll::Pending => {
// polling inner stream may return Pending only when it reaches budget, since
// we intentionally made ProducerStream always return Ready
return Poll::Pending;
}
}
}

// This should be unreachable since the stream is canceled
unreachable!("Expected the stream to be canceled, but it continued polling");
});
Comment on lines +297 to +299
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated (Clippy told me to use unreachable!), thanks @comphead


let consumer_record_batch_stream =
Box::pin(RecordBatchStreamAdapter::new(schema, consumer_stream));

stream_yields(consumer_record_batch_stream).await
}

#[rstest]
#[tokio::test]
async fn sort_yields(
Expand Down Expand Up @@ -698,17 +752,9 @@ enum Yielded {
Timeout,
}

async fn query_yields(
plan: Arc<dyn ExecutionPlan>,
task_ctx: Arc<TaskContext>,
async fn stream_yields(
mut stream: SendableRecordBatchStream,
) -> Result<(), Box<dyn Error>> {
// Run plan through EnsureCooperative
let optimized =
EnsureCooperative::new().optimize(plan, task_ctx.session_config().options())?;

// Get the stream
let mut stream = physical_plan::execute_stream(optimized, task_ctx)?;

// Create an independent executor pool
let child_runtime = Runtime::new()?;

Expand Down Expand Up @@ -753,3 +799,18 @@ async fn query_yields(
);
Ok(())
}

async fn query_yields(
plan: Arc<dyn ExecutionPlan>,
task_ctx: Arc<TaskContext>,
) -> Result<(), Box<dyn Error>> {
// Run plan through EnsureCooperative
let optimized =
EnsureCooperative::new().optimize(plan, task_ctx.session_config().options())?;

// Get the stream
let stream = physical_plan::execute_stream(optimized, task_ctx)?;

// Spawn a task that tries to poll the stream and check whether given stream yields
stream_yields(stream).await
}
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ fn build_file_list_recurse(

/// If running in a tokio context spawns the execution of `stream` to a separate task
/// allowing it to execute in parallel with an intermediate buffer of size `buffer`
pub(crate) fn spawn_buffered(
pub fn spawn_buffered(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the visibility of spawn_buffered to pub to use it in the test.
If this isn't ideal, I'll follow any suggestions. (Perhaps duplicating the code might be better?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems fine to me

mut input: SendableRecordBatchStream,
buffer: usize,
) -> SendableRecordBatchStream {
Expand Down