Skip to content

[SPARK-30027][SQL] Support codegen for aggregate filters in HashAggregateExec #27019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

maropu
Copy link
Member

@maropu maropu commented Dec 26, 2019

What changes were proposed in this pull request?

This pr intends to support code generation for HashAggregateExec with filters.

Quick benchmark results:

$ ./bin/spark-shell --master=local[1] --conf spark.driver.memory=8g --conf spark.sql.shuffle.partitions=1 -v

scala> spark.range(100000000).selectExpr("id % 3 as k1", "id % 5 as k2", "rand() as v1", "rand() as v2").write.saveAsTable("t")
scala> sql("SELECT k1, k2, AVG(v1) FILTER (WHERE v2 > 0.5) FROM t GROUP BY k1, k2").write.format("noop").mode("overwrite").save()

>> Before this PR
Elapsed time: 16.170697619s 

>> After this PR
Elapsed time: 6.7825313s  

The query above is compiled into code below;

...
/* 285 */   private void agg_doAggregate_avg_0(boolean agg_exprIsNull_2_0, org.apache.spark.sql.catalyst.InternalRow agg_unsafeRowAggBuffer_0, double agg_expr_2_0) throws java.io.IOException {
/* 286 */     // evaluate aggregate function for avg
/* 287 */     boolean agg_isNull_10 = true;
/* 288 */     double agg_value_12 = -1.0;
/* 289 */     boolean agg_isNull_11 = agg_unsafeRowAggBuffer_0.isNullAt(0);
/* 290 */     double agg_value_13 = agg_isNull_11 ?
/* 291 */     -1.0 : (agg_unsafeRowAggBuffer_0.getDouble(0));
/* 292 */     if (!agg_isNull_11) {
/* 293 */       agg_agg_isNull_12_0 = true;
/* 294 */       double agg_value_14 = -1.0;
/* 295 */       do {
/* 296 */         if (!agg_exprIsNull_2_0) {
/* 297 */           agg_agg_isNull_12_0 = false;
/* 298 */           agg_value_14 = agg_expr_2_0;
/* 299 */           continue;
/* 300 */         }
/* 301 */
/* 302 */         if (!false) {
/* 303 */           agg_agg_isNull_12_0 = false;
/* 304 */           agg_value_14 = 0.0D;
/* 305 */           continue;
/* 306 */         }
/* 307 */
/* 308 */       } while (false);
/* 309 */
/* 310 */       agg_isNull_10 = false; // resultCode could change nullability.
/* 311 */
/* 312 */       agg_value_12 = agg_value_13 + agg_value_14;
/* 313 */
/* 314 */     }
/* 315 */     boolean agg_isNull_15 = false;
/* 316 */     long agg_value_17 = -1L;
/* 317 */     if (!false && agg_exprIsNull_2_0) {
/* 318 */       boolean agg_isNull_18 = agg_unsafeRowAggBuffer_0.isNullAt(1);
/* 319 */       long agg_value_20 = agg_isNull_18 ?
/* 320 */       -1L : (agg_unsafeRowAggBuffer_0.getLong(1));
/* 321 */       agg_isNull_15 = agg_isNull_18;
/* 322 */       agg_value_17 = agg_value_20;
/* 323 */     } else {
/* 324 */       boolean agg_isNull_19 = true;
/* 325 */       long agg_value_21 = -1L;
/* 326 */       boolean agg_isNull_20 = agg_unsafeRowAggBuffer_0.isNullAt(1);
/* 327 */       long agg_value_22 = agg_isNull_20 ?
/* 328 */       -1L : (agg_unsafeRowAggBuffer_0.getLong(1));
/* 329 */       if (!agg_isNull_20) {
/* 330 */         agg_isNull_19 = false; // resultCode could change nullability.
/* 331 */
/* 332 */         agg_value_21 = agg_value_22 + 1L;
/* 333 */
/* 334 */       }
/* 335 */       agg_isNull_15 = agg_isNull_19;
/* 336 */       agg_value_17 = agg_value_21;
/* 337 */     }
/* 338 */     // update unsafe row buffer
/* 339 */     if (!agg_isNull_10) {
/* 340 */       agg_unsafeRowAggBuffer_0.setDouble(0, agg_value_12);
/* 341 */     } else {
/* 342 */       agg_unsafeRowAggBuffer_0.setNullAt(0);
/* 343 */     }
/* 344 */
/* 345 */     if (!agg_isNull_15) {
/* 346 */       agg_unsafeRowAggBuffer_0.setLong(1, agg_value_17);
/* 347 */     } else {
/* 348 */       agg_unsafeRowAggBuffer_0.setNullAt(1);
/* 349 */     }
/* 350 */   }
...

Why are the changes needed?

For high performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests.

@SparkQA
Copy link

SparkQA commented Dec 26, 2019

Test build #115814 has finished for PR 27019 at commit 0f75cab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait GeneratePredicateHelper extends PredicateHelper
  • case class FilterExec(condition: Expression, child: SparkPlan)

@@ -153,9 +153,7 @@ case class HashAggregateExec(

override def supportCodegen: Boolean = {
// ImperativeAggregate and filter predicate are not supported right now
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's also update the comment here as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks.

@SparkQA
Copy link

SparkQA commented Dec 28, 2019

Test build #115878 has finished for PR 27019 at commit e449d3c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cc @rednaxelafx as well

@@ -91,60 +91,39 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
}
}


/** Physical plan for Filter. */
case class FilterExec(condition: Expression, child: SparkPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu I don't understand why we need change FilterExec ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for sharing code to process predicates between aggregates and filters.

@maropu
Copy link
Member Author

maropu commented Jan 10, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 10, 2020

Test build #116458 has finished for PR 27019 at commit e449d3c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jan 10, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 10, 2020

Test build #116467 has finished for PR 27019 at commit e449d3c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jan 15, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116759 has finished for PR 27019 at commit e449d3c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jan 15, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116780 has finished for PR 27019 at commit e449d3c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Jan 19, 2020

Would it be possible to add benchmark result?

| ${generatePredicateCode(ctx, condition, inputAttrs, input)}
| $aggCode
|} while(false);
""".stripMargin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation problem?

| ${generatePredicateCode(ctx, condition, inputAttrs, input)}
| $aggCode
|} while(false);
""".stripMargin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

| ${generatePredicateCode(ctx, condition, inputAttrs, input)}
| $aggCode
|} while(false);
""".stripMargin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@maropu
Copy link
Member Author

maropu commented Jan 20, 2020

@kiszk I added the performance numbers in the PR description. I think the codegen for hash-aggregates can have performance gains in most queries. But, aggregate filters (recently merged in the master) forcibly disable the codegen. So, I think this fix has a good effect on performance.

@SparkQA
Copy link

SparkQA commented Jan 20, 2020

Test build #117048 has finished for PR 27019 at commit c064d58.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -375,16 +373,30 @@ case class HashAggregateExec(
""".stripMargin
}

val codeToEvalAggFunc = if (conf.codegenSplitAggregateFunc &&
val codeToEvalAggFunc = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we refactor this and that to use one common function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I'll update.

@maropu maropu force-pushed the AggregateFilterCodegen branch from 5a2114f to ba48342 Compare January 21, 2020 03:25
@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117150 has finished for PR 27019 at commit ba48342.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jan 21, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117144 has finished for PR 27019 at commit 5a2114f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Jan 21, 2020

cc @rednaxelafx @cloud-fan

@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117154 has finished for PR 27019 at commit ba48342.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Jan 21, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117174 has finished for PR 27019 at commit ba48342.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu force-pushed the AggregateFilterCodegen branch from ba48342 to 2796407 Compare January 26, 2020 11:30
@maropu
Copy link
Member Author

maropu commented Aug 1, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Aug 1, 2020

Test build #126907 has finished for PR 27019 at commit 464fbaa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait GeneratePredicateHelper extends PredicateHelper
  • case class FilterExec(condition: Expression, child: SparkPlan)

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 26, 2020

Test build #127912 has finished for PR 27019 at commit 464fbaa.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait GeneratePredicateHelper extends PredicateHelper
  • case class FilterExec(condition: Expression, child: SparkPlan)

@maropu maropu force-pushed the AggregateFilterCodegen branch from 464fbaa to a43aa25 Compare August 26, 2020 23:21
@SparkQA
Copy link

SparkQA commented Aug 27, 2020

Test build #127941 has finished for PR 27019 at commit a43aa25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait GeneratePredicateHelper extends PredicateHelper
  • case class FilterExec(condition: Expression, child: SparkPlan)

@maropu maropu force-pushed the AggregateFilterCodegen branch 3 times, most recently from 9b1aea0 to 26ce9b2 Compare September 10, 2020 12:01
@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128517 has finished for PR 27019 at commit 9b1aea0.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128519 has finished for PR 27019 at commit 26ce9b2.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait GeneratePredicateHelper extends PredicateHelper
  • case class FilterExec(condition: Expression, child: SparkPlan)

@maropu maropu force-pushed the AggregateFilterCodegen branch from 26ce9b2 to 009fe4a Compare September 10, 2020 13:18
@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128525 has finished for PR 27019 at commit 009fe4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait GeneratePredicateHelper extends PredicateHelper
  • case class FilterExec(condition: Expression, child: SparkPlan)

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@HeartSaVioR
Copy link
Contributor

@maropu Looks like this PR is not properly tracked. I've removed the Stale tag assuming you want to move forward, but wanted to check again, and curious how you handle this.

@maropu
Copy link
Member Author

maropu commented Dec 21, 2020

Thanks for taking care of it, @HeartSaVioR. If there are no reviewer who is against this PR and one has no more comment, I will merge this for the v3.2.0 release in a few weeks. cc: @cloud-fan @viirya @dongjoon-hyun

@maropu maropu force-pushed the AggregateFilterCodegen branch from 009fe4a to 86d89ba Compare December 21, 2020 01:40
@SparkQA
Copy link

SparkQA commented Dec 21, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37719/

@SparkQA
Copy link

SparkQA commented Dec 21, 2020

Test build #133120 has finished for PR 27019 at commit 86d89ba.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait GeneratePredicateHelper extends PredicateHelper
  • case class FilterExec(condition: Expression, child: SparkPlan)

@SparkQA
Copy link

SparkQA commented Dec 21, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37719/

@maropu
Copy link
Member Author

maropu commented Dec 22, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Dec 22, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37786/

@SparkQA
Copy link

SparkQA commented Dec 22, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37786/

@SparkQA
Copy link

SparkQA commented Dec 22, 2020

Test build #133188 has finished for PR 27019 at commit 86d89ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait GeneratePredicateHelper extends PredicateHelper
  • case class FilterExec(condition: Expression, child: SparkPlan)

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Sorry for the delay and thank you, @maropu .
Merged to master for Apache Spark 3.2.0.

Merry Christmas and Happy New Year!

@maropu
Copy link
Member Author

maropu commented Dec 25, 2020

Thanks for your review, Dongjoon ! Yea, you, too. Happy Merry Christmas!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants