Skip to content

Commit 9e7141f

Browse files
alambnuno-faria
andauthored
fix: Implement AggregateUDFImpl::reverse_expr for StringAgg (apache#17165) (apache#17473)
* fix: Implement AggregateUDFImpl::reverse_expr for StringAgg * Add a test with two invocations of aggregateion --------- Co-authored-by: Nuno Faria <nunofpfaria@gmail.com>
1 parent 9942ecd commit 9e7141f

File tree

2 files changed

+56
-1
lines changed

2 files changed

+56
-1
lines changed

datafusion/functions-aggregate/src/string_agg.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ impl AggregateUDFImpl for StringAgg {
178178
)))
179179
}
180180

181+
fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
182+
datafusion_expr::ReversedUDAF::Reversed(string_agg_udaf())
183+
}
184+
181185
fn documentation(&self) -> Option<&Documentation> {
182186
self.doc()
183187
}

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6203,6 +6203,58 @@ from t;
62036203
----
62046204
a,c,d,b
62056205

6206+
# Test explain / reverse_expr for string_agg
6207+
query TT
6208+
explain select string_agg(k, ',' order by v) from t;
6209+
----
6210+
logical_plan
6211+
01)Aggregate: groupBy=[[]], aggr=[[string_agg(t.k, Utf8(",")) ORDER BY [t.v ASC NULLS LAST]]]
6212+
02)--TableScan: t projection=[k, v]
6213+
physical_plan
6214+
01)AggregateExec: mode=Single, gby=[], aggr=[string_agg(t.k,Utf8(",")) ORDER BY [t.v ASC NULLS LAST]]
6215+
02)--SortExec: expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false]
6216+
03)----DataSourceExec: partitions=1, partition_sizes=[1]
6217+
6218+
query T
6219+
select string_agg(k, ',' order by v) from t;
6220+
----
6221+
c,a,b,d
6222+
6223+
query TT
6224+
explain select string_agg(k, ',' order by v desc) from t;
6225+
----
6226+
logical_plan
6227+
01)Aggregate: groupBy=[[]], aggr=[[string_agg(t.k, Utf8(",")) ORDER BY [t.v DESC NULLS FIRST]]]
6228+
02)--TableScan: t projection=[k, v]
6229+
physical_plan
6230+
01)AggregateExec: mode=Single, gby=[], aggr=[string_agg(t.k,Utf8(",")) ORDER BY [t.v DESC NULLS FIRST]]
6231+
02)--SortExec: expr=[v@1 DESC], preserve_partitioning=[false]
6232+
03)----DataSourceExec: partitions=1, partition_sizes=[1]
6233+
6234+
query T
6235+
select string_agg(k, ',' order by v desc) from t;
6236+
----
6237+
d,b,a,c
6238+
6239+
# Call string_agg with both ASC and DESC orderings, and expect only one sort
6240+
# (because the aggregate can handle reversed inputs)
6241+
query TT
6242+
explain select string_agg(k, ',' order by v asc), string_agg(k, ',' order by v desc) from t;
6243+
----
6244+
logical_plan
6245+
01)Aggregate: groupBy=[[]], aggr=[[string_agg(t.k, Utf8(",")) ORDER BY [t.v ASC NULLS LAST], string_agg(t.k, Utf8(",")) ORDER BY [t.v DESC NULLS FIRST]]]
6246+
02)--TableScan: t projection=[k, v]
6247+
physical_plan
6248+
01)AggregateExec: mode=Single, gby=[], aggr=[string_agg(t.k,Utf8(",")) ORDER BY [t.v ASC NULLS LAST], string_agg(t.k,Utf8(",")) ORDER BY [t.v DESC NULLS FIRST]]
6249+
02)--SortExec: expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false]
6250+
03)----DataSourceExec: partitions=1, partition_sizes=[1]
6251+
6252+
query TT
6253+
select string_agg(k, ',' order by v asc), string_agg(k, ',' order by v desc) from t;
6254+
----
6255+
c,a,b,d d,b,a,c
6256+
6257+
62066258
statement ok
62076259
drop table t;
62086260

@@ -7444,4 +7496,3 @@ NULL NULL
74447496

74457497
statement ok
74467498
drop table distinct_avg;
7447-

0 commit comments

Comments
 (0)