Skip to content

Commit 9e848bf

Browse files
authored
Fix bug, first last reverse (#7914)
1 parent 0b268b2 commit 9e848bf

File tree

3 files changed

+41
-11
lines changed

3 files changed

+41
-11
lines changed

datafusion/physical-expr/src/aggregate/first_last.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use std::sync::Arc;
2222

2323
use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
2424
use crate::expressions::format_state_name;
25-
use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
25+
use crate::{
26+
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr,
27+
};
2628

2729
use arrow::array::ArrayRef;
2830
use arrow::compute;
@@ -126,7 +128,7 @@ impl AggregateExpr for FirstValue {
126128
self.expr.clone(),
127129
name,
128130
self.input_data_type.clone(),
129-
self.ordering_req.clone(),
131+
reverse_order_bys(&self.ordering_req),
130132
self.order_by_data_types.clone(),
131133
)))
132134
}
@@ -350,7 +352,7 @@ impl AggregateExpr for LastValue {
350352
self.expr.clone(),
351353
name,
352354
self.input_data_type.clone(),
353-
self.ordering_req.clone(),
355+
reverse_order_bys(&self.ordering_req),
354356
self.order_by_data_types.clone(),
355357
)))
356358
}

datafusion/physical-expr/src/aggregate/utils.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,10 @@ pub fn adjust_output_array(
177177
/// for [`AggregateExpr`] aggregation expressions and allows comparing the equality
178178
/// between the trait objects.
179179
pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
180-
if any.is::<Arc<dyn AggregateExpr>>() {
181-
any.downcast_ref::<Arc<dyn AggregateExpr>>()
182-
.unwrap()
183-
.as_any()
184-
} else if any.is::<Box<dyn AggregateExpr>>() {
185-
any.downcast_ref::<Box<dyn AggregateExpr>>()
186-
.unwrap()
187-
.as_any()
180+
if let Some(obj) = any.downcast_ref::<Arc<dyn AggregateExpr>>() {
181+
obj.as_any()
182+
} else if let Some(obj) = any.downcast_ref::<Box<dyn AggregateExpr>>() {
183+
obj.as_any()
188184
} else {
189185
any
190186
}

datafusion/sqllogictest/test_files/groupby.slt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3613,6 +3613,38 @@ AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(foo.x)]
36133613
------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
36143614
--------MemoryExec: partitions=1, partition_sizes=[1]
36153615

3616+
# Since both ordering requirements are satisfied, there shouldn't be
3617+
# any SortExec in the final plan.
3618+
query TT
3619+
EXPLAIN SELECT FIRST_VALUE(a ORDER BY a ASC) as first_a,
3620+
LAST_VALUE(c ORDER BY c DESC) as last_c
3621+
FROM multiple_ordered_table
3622+
GROUP BY d;
3623+
----
3624+
logical_plan
3625+
Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST] AS first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] AS last_c
3626+
--Aggregate: groupBy=[[multiple_ordered_table.d]], aggr=[[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST], LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]]]
3627+
----TableScan: multiple_ordered_table projection=[a, c, d]
3628+
physical_plan
3629+
ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]@2 as last_c]
3630+
--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)]
3631+
----CoalesceBatchesExec: target_batch_size=2
3632+
------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
3633+
--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)]
3634+
----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
3635+
------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
3636+
3637+
query II rowsort
3638+
SELECT FIRST_VALUE(a ORDER BY a ASC) as first_a,
3639+
LAST_VALUE(c ORDER BY c DESC) as last_c
3640+
FROM multiple_ordered_table
3641+
GROUP BY d;
3642+
----
3643+
0 0
3644+
0 1
3645+
0 15
3646+
0 4
3647+
0 9
36163648

36173649
query TT
36183650
EXPLAIN SELECT c

0 commit comments

Comments
 (0)