Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Initial support for Window function #599

Merged
merged 6 commits into from
Jul 2, 2024
Merged

Conversation

huaxingao
Copy link
Contributor

@huaxingao huaxingao commented Jun 27, 2024

Co-authored-by: comphead comphead@ukr.net
Co-authored-by: Huaxin Gao huaxin_gao@apple.com

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

This PR has the initial changes for window function. Currently, only Count, Max and Min are supported. In the following PRs, we will add all the other aggregate functions support, built-in window function support, ignoreNulls support, ect.

How are these changes tested?

Co-authored-by: comphead <comphead@ukr.net>
Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about all the corner cases, but looks good

WindowFrameBound::Preceding(ScalarValue::UInt64(None))
}
LowerFrameBoundStruct::Preceding(offset) => {
let offset_value = if offset.offset < 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can use abs()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. Thanks!

@@ -541,6 +542,17 @@ class CometSparkSessionExtensions
withInfo(s, Seq(info1, info2).flatten.mkString(","))
s

case w: WindowExec =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add isCometOperatorEnabled() to enable/disable the feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I saw some of the isCometOperatorEnabled are checked in CometSparkSessionExtensions, but some of them are in QueryPlanSerde. I followed HashAggregateExec and added the check in QueryPlanSerde

Comment on lines 1439 to 1446
withTable("t1") {
val numRows = 10
spark
.range(numRows)
.selectExpr("id AS a", s"$numRows - id AS b") // Todo: Test Nulls
.repartition(3) // Force repartition to test data will come to single partition
.write
.saveAsTable("t1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified a little with withParquetTable()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to withParquetTable(). Thanks

sort_exprs,
window_frame.into(),
&input_schema,
false, // TODO: Ignore nulls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create a follow up PR for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val aggregateFunctions =
List("COUNT(_1)", "MAX(_1)", "MIN(_1)") // TODO: Test all the aggregates

aggregateFunctions.foreach { function =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making tests generic

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @huaxingao

withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
withParquetTable((0 until 10).map(i => (i, 10 - i)), "t1") { // TODO: test nulls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File a ticket to support test nulls for window functions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
withParquetTable((0 until 10).map(i => (i, 10 - i)), "t1") { // TODO: test nulls
val aggregateFunctions =
List("COUNT(_1)", "MAX(_1)", "MIN(_1)") // TODO: Test all the aggregates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followup to test all the aggregates functions for window clause

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still have problems with other aggregates functions. I will look one by one.
For example:

sum: Error from DataFusion: Internal error: Builtin Sum will be removed
avg: org.apache.comet.CometNativeException: AvgAccumulator for (Int32 --> Float64)

@huaxingao
Copy link
Contributor Author

also cc @andygrove @viirya

Comment on lines 209 to 210
case agg: AggregateExpression =>
Some(agg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to fall back to Spark for aggregate expressions that we do not support, rather than fail during query execution.

Suggested change
case agg: AggregateExpression =>
Some(agg)
case agg: AggregateExpression => agg.aggregateFunction match {
case _: Min | _: Max | _: Count => Some(agg)
case _ =>
withInfo(windowExpr, "Unsupported aggregate", expr)
None
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks!

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @huaxingao

@andygrove andygrove merged commit 0d2fcbc into apache:main Jul 2, 2024
66 checks passed
@andygrove
Copy link
Member

@huaxingao We should also update the user guide to show that we now support window functions

@huaxingao
Copy link
Contributor Author

I will update the user doc to add window function support.

@huaxingao
Copy link
Contributor Author

Thanks everyone!

@huaxingao huaxingao deleted the window_1 branch July 2, 2024 15:28
himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
* feat: initial support for Window function

Co-authored-by: comphead <comphead@ukr.net>

* fix style

* fix style

* address comments

* abs()->unsigned_abs()

* address comments

---------

Co-authored-by: comphead <comphead@ukr.net>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants