Skip to content

Commit

Permalink
Group By All (#7622)
Browse files Browse the repository at this point in the history
* group by all is resolved

* restore the removed tests

* tests extended

* fix after merge
  • Loading branch information
berkaysynnada authored Sep 22, 2023
1 parent 9fa0207 commit 70b8620
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 29 deletions.
23 changes: 20 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,22 @@

[workspace]
exclude = ["datafusion-cli"]
members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen", "datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait", "datafusion-examples", "test-utils", "benchmarks",
members = [
"datafusion/common",
"datafusion/core",
"datafusion/expr",
"datafusion/execution",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/physical-plan",
"datafusion/proto",
"datafusion/proto/gen",
"datafusion/sql",
"datafusion/sqllogictest",
"datafusion/substrait",
"datafusion-examples",
"test-utils",
"benchmarks",
]
resolver = "2"

Expand All @@ -33,12 +48,14 @@ version = "31.0.0"

[workspace.dependencies]
arrow = { version = "46.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
arrow-array = { version = "46.0.0", default-features = false, features = ["chrono-tz"] }
arrow-array = { version = "46.0.0", default-features = false, features = [
"chrono-tz",
] }
arrow-buffer = { version = "46.0.0", default-features = false }
arrow-flight = { version = "46.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "46.0.0", default-features = false }
parquet = { version = "46.0.0", features = ["arrow", "async", "object_store"] }
sqlparser = { version = "0.37.0", features = ["visitor"] }
sqlparser = { version = "0.38.0", features = ["visitor"] }
chrono = { version = "0.4.31", default-features = false }


Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 45 additions & 24 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, ReplaceSelectItem, WildcardAdditionalOptions, WindowType,
Distinct, Expr as SQLExpr, GroupByExpr, ReplaceSelectItem, WildcardAdditionalOptions,
WindowType,
};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};

Expand Down Expand Up @@ -136,29 +137,49 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let aggr_exprs = find_aggregate_exprs(&aggr_expr_haystack);

// All of the group by expressions
let group_by_exprs = select
.group_by
.into_iter()
.map(|e| {
let group_by_expr =
self.sql_expr_to_logical_expr(e, &combined_schema, planner_context)?;
// aliases from the projection can conflict with same-named expressions in the input
let mut alias_map = alias_map.clone();
for f in plan.schema().fields() {
alias_map.remove(f.name());
}
let group_by_expr = resolve_aliases_to_exprs(&group_by_expr, &alias_map)?;
let group_by_expr =
resolve_positions_to_exprs(&group_by_expr, &select_exprs)
.unwrap_or(group_by_expr);
let group_by_expr = normalize_col(group_by_expr, &projected_plan)?;
self.validate_schema_satisfies_exprs(
plan.schema(),
&[group_by_expr.clone()],
)?;
Ok(group_by_expr)
})
.collect::<Result<Vec<Expr>>>()?;
let group_by_exprs = if let GroupByExpr::Expressions(exprs) = select.group_by {
exprs
.into_iter()
.map(|e| {
let group_by_expr = self.sql_expr_to_logical_expr(
e,
&combined_schema,
planner_context,
)?;
// aliases from the projection can conflict with same-named expressions in the input
let mut alias_map = alias_map.clone();
for f in plan.schema().fields() {
alias_map.remove(f.name());
}
let group_by_expr =
resolve_aliases_to_exprs(&group_by_expr, &alias_map)?;
let group_by_expr =
resolve_positions_to_exprs(&group_by_expr, &select_exprs)
.unwrap_or(group_by_expr);
let group_by_expr = normalize_col(group_by_expr, &projected_plan)?;
self.validate_schema_satisfies_exprs(
plan.schema(),
&[group_by_expr.clone()],
)?;
Ok(group_by_expr)
})
.collect::<Result<Vec<Expr>>>()?
} else {
// 'group by all' groups wrt. all select expressions except 'AggregateFunction's.
// Filter and collect non-aggregate select expressions
select_exprs
.iter()
.filter(|select_expr| match select_expr {
Expr::AggregateFunction(_) | Expr::AggregateUDF(_) => false,
Expr::Alias(Alias { expr, name: _ }) => !matches!(
**expr,
Expr::AggregateFunction(_) | Expr::AggregateUDF(_)
),
_ => true,
})
.cloned()
.collect()
};

// process group by, aggregation or having
let (plan, mut select_exprs_post_aggr, having_expr_post_aggr) = if !group_by_exprs
Expand Down
57 changes: 57 additions & 0 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1922,6 +1922,63 @@ SELECT DISTINCT + col1 FROM tab2 AS cor0 GROUP BY cor0.col1
59
61

# Group By All tests
statement ok
CREATE TABLE tab3(col0 INTEGER, col1 INTEGER, col2 INTEGER, col3 INTEGER)

statement ok
INSERT INTO tab3 VALUES(0,1,12,-1)

statement ok
INSERT INTO tab3 VALUES(0,2,13,-1)

statement ok
INSERT INTO tab3 VALUES(0,1,10,-2)

statement ok
INSERT INTO tab3 VALUES(0,2,15,-2)

statement ok
INSERT INTO tab3 VALUES(1, NULL, 10, -2)

query IRI rowsort
SELECT col1, AVG(col2), col0 FROM tab3 GROUP BY ALL
----
1 11 0
2 14 0
NULL 10 1

query IIR rowsort
SELECT sub.col1, sub.col0, AVG(sub.col2) AS avg_col2
FROM (
SELECT col1, col0, col2
FROM tab3
WHERe col3 = -1
GROUP BY ALL
) AS sub
GROUP BY ALL;
----
1 0 12
2 0 13

query IIR rowsort
SELECT sub.col1, sub.col0, sub."AVG(tab3.col2)" AS avg_col2
FROM (
SELECT col1, AVG(col2), col0 FROM tab3 GROUP BY ALL
) AS sub
GROUP BY ALL;
----
1 0 11
2 0 14
NULL 1 10

query IIII rowsort
SELECT col0, col1, COUNT(col2), SUM(col3) FROM tab3 GROUP BY ALL
----
0 1 2 -3
0 2 2 -3
1 NULL 1 -2

# query below should work in multi partition, successfully.
query II
SELECT l.col0, LAST_VALUE(r.col1 ORDER BY r.col0) as last_col1
Expand Down

0 comments on commit 70b8620

Please sign in to comment.