Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use std::sync::Arc;

use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
use crate::expressions::format_state_name;
use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
use crate::{
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr,
};

use arrow::array::ArrayRef;
use arrow::compute;
Expand Down Expand Up @@ -126,7 +128,7 @@ impl AggregateExpr for FirstValue {
self.expr.clone(),
name,
self.input_data_type.clone(),
self.ordering_req.clone(),
reverse_order_bys(&self.ordering_req),
self.order_by_data_types.clone(),
)))
}
Expand Down Expand Up @@ -350,7 +352,7 @@ impl AggregateExpr for LastValue {
self.expr.clone(),
name,
self.input_data_type.clone(),
self.ordering_req.clone(),
reverse_order_bys(&self.ordering_req),
self.order_by_data_types.clone(),
)))
}
Expand Down
12 changes: 4 additions & 8 deletions datafusion/physical-expr/src/aggregate/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,10 @@ pub fn adjust_output_array(
/// for [`AggregateExpr`] aggregation expressions and allows comparing the equality
/// between the trait objects.
pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
if any.is::<Arc<dyn AggregateExpr>>() {
any.downcast_ref::<Arc<dyn AggregateExpr>>()
.unwrap()
.as_any()
} else if any.is::<Box<dyn AggregateExpr>>() {
any.downcast_ref::<Box<dyn AggregateExpr>>()
.unwrap()
.as_any()
if let Some(obj) = any.downcast_ref::<Arc<dyn AggregateExpr>>() {
obj.as_any()
} else if let Some(obj) = any.downcast_ref::<Box<dyn AggregateExpr>>() {
obj.as_any()
} else {
any
}
Expand Down
32 changes: 32 additions & 0 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3613,6 +3613,38 @@ AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(foo.x)]
------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
--------MemoryExec: partitions=1, partition_sizes=[1]

# Since both ordering requirements are satisfied, there shouldn't be
# any SortExec in the final plan.
query TT
EXPLAIN SELECT FIRST_VALUE(a ORDER BY a ASC) as first_a,
LAST_VALUE(c ORDER BY c DESC) as last_c
FROM multiple_ordered_table
GROUP BY d;
----
logical_plan
Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST] AS first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] AS last_c
--Aggregate: groupBy=[[multiple_ordered_table.d]], aggr=[[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST], LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]]]
----TableScan: multiple_ordered_table projection=[a, c, d]
physical_plan
ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]@2 as last_c]
--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)]
----CoalesceBatchesExec: target_batch_size=2
------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)]
----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true

query II rowsort
SELECT FIRST_VALUE(a ORDER BY a ASC) as first_a,
LAST_VALUE(c ORDER BY c DESC) as last_c
FROM multiple_ordered_table
GROUP BY d;
----
0 0
0 1
0 15
0 4
0 9

query TT
EXPLAIN SELECT c
Expand Down