Skip to content

Commit 9b7e94d

Browse files
committed
Fix reversing first_value, last_value
Upon reversing, a schema and field mismatch would happen.
1 parent 4cd992c commit 9b7e94d

File tree

3 files changed

+80
-89
lines changed

3 files changed

+80
-89
lines changed

datafusion/physical-expr/src/aggregate.rs

Lines changed: 2 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ impl AggregateExprBuilder {
223223

224224
let return_field = fun.return_field(&input_exprs_fields)?;
225225
let is_nullable = fun.is_nullable();
226+
// TODO rename AggregateExprBuilder::alias to name
226227
let name = match alias {
227228
None => {
228229
return internal_err!(
@@ -575,18 +576,10 @@ impl AggregateFunctionExpr {
575576
ReversedUDAF::NotSupported => None,
576577
ReversedUDAF::Identical => Some(self.clone()),
577578
ReversedUDAF::Reversed(reverse_udf) => {
578-
let mut name = self.name().to_string();
579-
// If the function is changed, we need to reverse order_by clause as well
580-
// i.e. First(a order by b asc null first) -> Last(a order by b desc null last)
581-
if self.fun().name() != reverse_udf.name() {
582-
replace_order_by_clause(&mut name);
583-
}
584-
replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name());
585-
586579
AggregateExprBuilder::new(reverse_udf, self.args.to_vec())
587580
.order_by(self.order_bys.iter().map(|e| e.reverse()).collect())
588581
.schema(Arc::new(self.schema.clone()))
589-
.alias(name)
582+
.alias(self.name())
590583
.with_ignore_nulls(self.ignore_nulls)
591584
.with_distinct(self.is_distinct)
592585
.with_reversed(!self.is_reversed)
@@ -684,32 +677,3 @@ impl PartialEq for AggregateFunctionExpr {
684677
.all(|(this_arg, other_arg)| this_arg.eq(other_arg))
685678
}
686679
}
687-
688-
fn replace_order_by_clause(order_by: &mut String) {
689-
let suffixes = [
690-
(" DESC NULLS FIRST]", " ASC NULLS LAST]"),
691-
(" ASC NULLS FIRST]", " DESC NULLS LAST]"),
692-
(" DESC NULLS LAST]", " ASC NULLS FIRST]"),
693-
(" ASC NULLS LAST]", " DESC NULLS FIRST]"),
694-
];
695-
696-
if let Some(start) = order_by.find("ORDER BY [") {
697-
if let Some(end) = order_by[start..].find(']') {
698-
let order_by_start = start + 9;
699-
let order_by_end = start + end;
700-
701-
let column_order = &order_by[order_by_start..=order_by_end];
702-
for (suffix, replacement) in suffixes {
703-
if column_order.ends_with(suffix) {
704-
let new_order = column_order.replace(suffix, replacement);
705-
order_by.replace_range(order_by_start..=order_by_end, &new_order);
706-
break;
707-
}
708-
}
709-
}
710-
}
711-
}
712-
713-
fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str, fn_name_new: &str) {
714-
*aggr_name = aggr_name.replace(fn_name_old, fn_name_new);
715-
}

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6354,9 +6354,9 @@ logical_plan
63546354
01)Aggregate: groupBy=[[]], aggr=[[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
63556355
02)--TableScan: convert_first_last_table projection=[c1, c3]
63566356
physical_plan
6357-
01)AggregateExec: mode=Final, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
6357+
01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]
63586358
02)--CoalescePartitionsExec
6359-
03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
6359+
03)----AggregateExec: mode=Partial, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]
63606360
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
63616361
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS LAST]], file_type=csv, has_header=true
63626362

@@ -6368,9 +6368,9 @@ logical_plan
63686368
01)Aggregate: groupBy=[[]], aggr=[[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
63696369
02)--TableScan: convert_first_last_table projection=[c1, c2]
63706370
physical_plan
6371-
01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
6371+
01)AggregateExec: mode=Final, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]
63726372
02)--CoalescePartitionsExec
6373-
03)----AggregateExec: mode=Partial, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
6373+
03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]
63746374
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
63756375
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], file_type=csv, has_header=true
63766376

0 commit comments

Comments
 (0)