Skip to content

Commit 65b997b

Browse files
authored
[MINOR]: Parametrize sort-preservation tests to exercise all situations (unbounded/bounded sources and flag behavior) (#8575)
* Re-introduce unbounded tests with new executor * Remove unnecessary test
1 parent b5e94a6 commit 65b997b

File tree

3 files changed

+208
-122
lines changed

3 files changed

+208
-122
lines changed

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,7 @@ mod tests {
769769
use crate::physical_plan::repartition::RepartitionExec;
770770
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
771771
use crate::prelude::{SessionConfig, SessionContext};
772-
use crate::test::csv_exec_sorted;
772+
use crate::test::{csv_exec_sorted, stream_exec_ordered};
773773

774774
use arrow::compute::SortOptions;
775775
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -2141,11 +2141,11 @@ mod tests {
21412141
}
21422142

21432143
#[tokio::test]
2144-
#[ignore]
21452144
async fn test_with_lost_ordering_unbounded() -> Result<()> {
21462145
let schema = create_test_schema3()?;
21472146
let sort_exprs = vec![sort_expr("a", &schema)];
2148-
let source = csv_exec_sorted(&schema, sort_exprs);
2147+
// create an unbounded source
2148+
let source = stream_exec_ordered(&schema, sort_exprs);
21492149
let repartition_rr = repartition_exec(source);
21502150
let repartition_hash = Arc::new(RepartitionExec::try_new(
21512151
repartition_rr,
@@ -2159,25 +2159,24 @@ mod tests {
21592159
" CoalescePartitionsExec",
21602160
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
21612161
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
2162-
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
2162+
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
21632163
];
21642164
let expected_optimized = [
21652165
"SortPreservingMergeExec: [a@0 ASC]",
21662166
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
21672167
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
2168-
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
2168+
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
21692169
];
21702170
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
21712171
Ok(())
21722172
}
21732173

21742174
#[tokio::test]
2175-
#[ignore]
21762175
async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> {
21772176
let schema = create_test_schema3()?;
21782177
let sort_exprs = vec![sort_expr("a", &schema)];
2179-
// Make source unbounded
2180-
let source = csv_exec_sorted(&schema, sort_exprs);
2178+
// create an unbounded source
2179+
let source = stream_exec_ordered(&schema, sort_exprs);
21812180
let repartition_rr = repartition_exec(source);
21822181
let repartition_hash = Arc::new(RepartitionExec::try_new(
21832182
repartition_rr,
@@ -2190,13 +2189,13 @@ mod tests {
21902189
" CoalescePartitionsExec",
21912190
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
21922191
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
2193-
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
2192+
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
21942193
];
21952194
let expected_optimized = [
21962195
"SortPreservingMergeExec: [a@0 ASC]",
21972196
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
21982197
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
2199-
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
2198+
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
22002199
];
22012200
assert_optimized!(expected_input, expected_optimized, physical_plan, false);
22022201
Ok(())

0 commit comments

Comments
 (0)