Skip to content

Commit 751a8ba

Browse files
committed
Preserve input's equivalence properties in UnnestExec
1 parent 853eee0 commit 751a8ba

File tree

3 files changed

+103
-3
lines changed

3 files changed

+103
-3
lines changed

datafusion/physical-expr/src/equivalence/ordering.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ impl OrderingEquivalenceClass {
7474
result
7575
}
7676

77+
/// Returns the orderings in this ordering equivalence class.
78+
pub fn orderings(&self) -> &[LexOrdering] {
79+
&self.orderings
80+
}
81+
7782
/// Extend this ordering equivalence class with the given orderings.
7883
pub fn extend(&mut self, orderings: impl IntoIterator<Item = LexOrdering>) {
7984
self.orderings.extend(orderings);

datafusion/physical-plan/src/unnest.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ use futures::{Stream, StreamExt};
5151
use log::trace;
5252

5353
/// Unnest the given columns (either with type struct or list)
54-
/// For list unnesting, each rows is vertically transformed into multiple rows
55-
/// For struct unnesting, each columns is horizontally transformed into multiple columns,
54+
/// For list unnesting, each row is vertically transformed into multiple rows
55+
/// For struct unnesting, each column is horizontally transformed into multiple columns,
5656
/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
5757
///
5858
/// See [`UnnestOptions`] for more details and an example.
@@ -101,8 +101,22 @@ impl UnnestExec {
101101
input: &Arc<dyn ExecutionPlan>,
102102
schema: SchemaRef,
103103
) -> PlanProperties {
104+
// Extract equivalence properties from input plan
105+
let input_eq_properties = input.equivalence_properties();
106+
let orderings = input_eq_properties.oeq_class().orderings().to_vec();
107+
let eq_group = input_eq_properties.eq_group();
108+
let constraints = input_eq_properties.constraints();
109+
110+
// Create new equivalence properties for the unnest plan based on the input plan
111+
let mut eq_properties =
112+
EquivalenceProperties::new_with_orderings(schema, orderings)
113+
.with_constraints(constraints.to_owned());
114+
eq_properties
115+
.add_equivalence_group(eq_group.to_owned())
116+
.unwrap(); // We can unwrap this because we know this is a valid equivalence group
117+
104118
PlanProperties::new(
105-
EquivalenceProperties::new(schema),
119+
eq_properties,
106120
input.output_partitioning().to_owned(),
107121
input.pipeline_behavior(),
108122
input.boundedness(),

datafusion/sqllogictest/test_files/unnest.slt

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -941,3 +941,84 @@ where min_height * width1 = (
941941
)
942942
----
943943
4 7 4 28
944+
945+
## Unnest with ordering on unrelated column is preserved
946+
query TT
947+
EXPLAIN WITH unnested AS (SELECT
948+
ROW_NUMBER() OVER () AS generated_id,
949+
unnest(array[value]) as ar
950+
FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
951+
----
952+
logical_plan
953+
01)Projection: array_agg(unnested.ar)
954+
02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]]
955+
03)----SubqueryAlias: unnested
956+
04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar
957+
05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
958+
06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value))
959+
07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
960+
08)--------------TableScan: range() projection=[value]
961+
physical_plan
962+
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
963+
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
964+
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
965+
04)------CoalesceBatchesExec: target_batch_size=8192
966+
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
967+
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
968+
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
969+
08)--------------UnnestExec
970+
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
971+
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
972+
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
973+
12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
974+
975+
## Unnest without ordering
976+
query TT
977+
EXPLAIN WITH unnested AS (SELECT
978+
random() AS generated_id,
979+
unnest(array[value]) as ar
980+
FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
981+
----
982+
logical_plan
983+
01)Projection: array_agg(unnested.ar)
984+
02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]]
985+
03)----SubqueryAlias: unnested
986+
04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar
987+
05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
988+
06)----------Projection: random() AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value))
989+
07)------------TableScan: range() projection=[value]
990+
physical_plan
991+
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
992+
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)]
993+
03)----CoalesceBatchesExec: target_batch_size=8192
994+
04)------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
995+
05)--------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)]
996+
06)----------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
997+
07)------------UnnestExec
998+
08)--------------ProjectionExec: expr=[random() as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
999+
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
1000+
10)------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
1001+
1002+
## ???
1003+
query TT
1004+
explain with temp as (select random() as r, [value, value + 1] as ordered from range(1, 5)), temp2 as (select r, unnest(ordered) as x from temp) select r, sum(x) from temp2 group by r;
1005+
----
1006+
logical_plan
1007+
01)Aggregate: groupBy=[[temp2.r]], aggr=[[sum(temp2.x)]]
1008+
02)--SubqueryAlias: temp2
1009+
03)----Projection: temp.r, __unnest_placeholder(temp.ordered,depth=1) AS UNNEST(temp.ordered) AS x
1010+
04)------Unnest: lists[__unnest_placeholder(temp.ordered)|depth=1] structs[]
1011+
05)--------Projection: temp.r, temp.ordered AS __unnest_placeholder(temp.ordered)
1012+
06)----------SubqueryAlias: temp
1013+
07)------------Projection: random() AS r, make_array(range().value, range().value + Int64(1)) AS ordered
1014+
08)--------------TableScan: range() projection=[value]
1015+
physical_plan
1016+
01)AggregateExec: mode=FinalPartitioned, gby=[r@0 as r], aggr=[sum(temp2.x)]
1017+
02)--CoalesceBatchesExec: target_batch_size=8192
1018+
03)----RepartitionExec: partitioning=Hash([r@0], 4), input_partitions=4
1019+
04)------AggregateExec: mode=Partial, gby=[r@0 as r], aggr=[sum(temp2.x)]
1020+
05)--------ProjectionExec: expr=[r@0 as r, __unnest_placeholder(temp.ordered,depth=1)@1 as x]
1021+
06)----------UnnestExec
1022+
07)------------ProjectionExec: expr=[random() as r, make_array(value@0, value@0 + 1) as __unnest_placeholder(temp.ordered)]
1023+
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
1024+
09)----------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]

0 commit comments

Comments
 (0)