Skip to content

Commit 523eefd

Browse files
committed
Preserve input's equivalence properties in UnnestExec
1 parent b4e7147 commit 523eefd

File tree

3 files changed

+69
-5
lines changed

3 files changed

+69
-5
lines changed

datafusion/physical-expr/src/equivalence/ordering.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ impl OrderingEquivalenceClass {
7474
result
7575
}
7676

77+
/// Returns the orderings in this ordering equivalence class.
78+
pub fn orderings(&self) -> &[LexOrdering] {
79+
&self.orderings
80+
}
81+
7782
/// Extend this ordering equivalence class with the given orderings.
7883
pub fn extend(&mut self, orderings: impl IntoIterator<Item = LexOrdering>) {
7984
self.orderings.extend(orderings);

datafusion/physical-plan/src/unnest.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ use datafusion_common::{
4646
exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
4747
};
4848
use datafusion_execution::TaskContext;
49-
use datafusion_physical_expr::EquivalenceProperties;
49+
use datafusion_physical_expr::equivalence::ProjectionMapping;
5050
use futures::{Stream, StreamExt};
5151
use log::trace;
5252

5353
/// Unnest the given columns (either with type struct or list)
54-
/// For list unnesting, each rows is vertically transformed into multiple rows
55-
/// For struct unnesting, each columns is horizontally transformed into multiple columns,
54+
/// For list unnesting, each row is vertically transformed into multiple rows
55+
/// For struct unnesting, each column is horizontally transformed into multiple columns,
5656
/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
5757
///
5858
/// See [`UnnestOptions`] for more details and an example.
@@ -83,7 +83,12 @@ impl UnnestExec {
8383
schema: SchemaRef,
8484
options: UnnestOptions,
8585
) -> Self {
86-
let cache = Self::compute_properties(&input, Arc::clone(&schema));
86+
let cache = Self::compute_properties(
87+
&input,
88+
&list_column_indices,
89+
&struct_column_indices,
90+
Arc::clone(&schema),
91+
);
8792

8893
UnnestExec {
8994
input,
@@ -99,10 +104,34 @@ impl UnnestExec {
99104
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
100105
fn compute_properties(
101106
input: &Arc<dyn ExecutionPlan>,
107+
list_column_indices: &[ListUnnest],
108+
struct_column_indices: &[usize],
102109
schema: SchemaRef,
103110
) -> PlanProperties {
111+
let list_column_indices: Vec<usize> = list_column_indices
112+
.iter()
113+
.map(|list_unnest| list_unnest.index_in_input_schema)
114+
.collect();
115+
let non_unnested_indices: Vec<usize> = input
116+
.schema()
117+
.fields()
118+
.iter()
119+
.enumerate()
120+
.filter(|(idx, _)| {
121+
!list_column_indices.contains(idx) && !struct_column_indices.contains(idx)
122+
})
123+
.map(|(idx, _)| idx)
124+
.collect();
125+
126+
// Create the unnest equivalence properties by copying the input plan's equivalence properties
127+
// for the unaffected columns
128+
let input_eq_properties = input.equivalence_properties();
129+
let projection_mapping =
130+
ProjectionMapping::from_indices(&non_unnested_indices, &schema).unwrap();
131+
let eq_properties =
132+
input_eq_properties.project(&projection_mapping, schema.clone());
104133
PlanProperties::new(
105-
EquivalenceProperties::new(schema),
134+
eq_properties,
106135
input.output_partitioning().to_owned(),
107136
input.pipeline_behavior(),
108137
input.boundedness(),

datafusion/sqllogictest/test_files/unnest.slt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -941,3 +941,33 @@ where min_height * width1 = (
941941
)
942942
----
943943
4 7 4 28
944+
945+
## Unnest with ordering on unrelated column is preserved
946+
query TT
947+
EXPLAIN WITH unnested AS (SELECT
948+
ROW_NUMBER() OVER () AS generated_id,
949+
unnest(array[value]) as ar
950+
FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
951+
----
952+
logical_plan
953+
01)Projection: array_agg(unnested.ar)
954+
02)--Aggregate: groupBy=[[unnested.generated_id]], aggr=[[array_agg(unnested.ar)]]
955+
03)----SubqueryAlias: unnested
956+
04)------Projection: generated_id, __unnest_placeholder(make_array(range().value),depth=1) AS UNNEST(make_array(range().value)) AS ar
957+
05)--------Unnest: lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
958+
06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS __unnest_placeholder(make_array(range().value))
959+
07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
960+
08)--------------TableScan: range() projection=[value]
961+
physical_plan
962+
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
963+
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
964+
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
965+
04)------CoalesceBatchesExec: target_batch_size=8192
966+
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
967+
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
968+
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
969+
08)--------------UnnestExec
970+
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
971+
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
972+
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
973+
12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]

0 commit comments

Comments
 (0)