Skip to content

Commit 99bd22c

Browse files
committed
First stab at a test case to ensure sort does not start processing eagerly
1 parent 861d1c7 commit 99bd22c

File tree

2 files changed

+90
-3
lines changed

2 files changed

+90
-3
lines changed

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1255,9 +1255,9 @@ mod tests {
12551255
use crate::execution_plan::Boundedness;
12561256
use crate::expressions::col;
12571257
use crate::test;
1258-
use crate::test::assert_is_pending;
12591258
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
12601259
use crate::test::TestMemoryExec;
1260+
use crate::test::{assert_is_pending, panic_exec};
12611261

12621262
use arrow::array::*;
12631263
use arrow::compute::SortOptions;
@@ -2005,4 +2005,23 @@ mod tests {
20052005
"#);
20062006
Ok(())
20072007
}
2008+
2009+
#[tokio::test]
2010+
async fn unpolled_sort_does_not_start_eagerly() -> Result<()> {
2011+
let task_ctx = Arc::new(TaskContext::default());
2012+
let source = panic_exec(1);
2013+
let schema = source.schema();
2014+
2015+
let sort_exec = Arc::new(SortExec::new(
2016+
[PhysicalSortExpr {
2017+
expr: col("i", &schema)?,
2018+
options: SortOptions::default(),
2019+
}]
2020+
.into(),
2021+
source,
2022+
));
2023+
2024+
let _ = sort_exec.execute(1, task_ctx);
2025+
Ok(())
2026+
}
20082027
}

datafusion/physical-plan/src/test.rs

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion_physical_expr::expressions::Column;
4747
use datafusion_physical_expr::utils::collect_columns;
4848
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
4949

50-
use futures::{Future, FutureExt};
50+
use futures::{stream, Future, FutureExt};
5151

5252
pub mod exec;
5353

@@ -515,10 +515,78 @@ impl PartitionStream for TestPartitionStream {
515515
&self.schema
516516
}
517517
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
518-
let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok));
518+
let stream = stream::iter(self.batches.clone().into_iter().map(Ok));
519519
Box::pin(RecordBatchStreamAdapter::new(
520520
Arc::clone(&self.schema),
521521
stream,
522522
))
523523
}
524524
}
525+
526+
/// Returns an `ExecutionPlan` that return a stream which panics if it is ever polled.
527+
/// This can be used to test that execution plan implementations do not eagerly start
528+
/// processing data when `ExecutionPlan::execute is called`.
529+
pub fn panic_exec(partitions: usize) -> Arc<dyn ExecutionPlan> {
530+
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
531+
Arc::new(PanicExec::new(schema, partitions))
532+
}
533+
534+
#[derive(Debug)]
535+
struct PanicExec {
536+
properties: PlanProperties,
537+
}
538+
539+
impl PanicExec {
540+
fn new(schema: SchemaRef, partitions: usize) -> Self {
541+
Self {
542+
properties: PlanProperties::new(
543+
EquivalenceProperties::new(schema.clone()),
544+
Partitioning::UnknownPartitioning(partitions),
545+
EmissionType::Incremental,
546+
Boundedness::Bounded,
547+
),
548+
}
549+
}
550+
}
551+
552+
impl DisplayAs for PanicExec {
553+
fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
554+
write!(f, "Panic")
555+
}
556+
}
557+
558+
impl ExecutionPlan for PanicExec {
559+
fn name(&self) -> &str {
560+
"PanicExec"
561+
}
562+
563+
fn as_any(&self) -> &dyn Any {
564+
self
565+
}
566+
567+
fn properties(&self) -> &PlanProperties {
568+
&self.properties
569+
}
570+
571+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
572+
vec![]
573+
}
574+
575+
fn with_new_children(
576+
self: Arc<Self>,
577+
_: Vec<Arc<dyn ExecutionPlan>>,
578+
) -> Result<Arc<dyn ExecutionPlan>> {
579+
Ok(self.clone())
580+
}
581+
582+
fn execute(
583+
&self,
584+
_: usize,
585+
_: Arc<TaskContext>,
586+
) -> Result<SendableRecordBatchStream> {
587+
Ok(Box::pin(RecordBatchStreamAdapter::new(
588+
self.schema(),
589+
stream::once(async { panic!() }),
590+
)))
591+
}
592+
}

0 commit comments

Comments
 (0)