Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17868][SQL] Do not use bitmasks during parsing and analysis of CUBE/ROLLUP/GROUPING SETS #15484

Closed
wants to merge 9 commits into from

Conversation

jiangxb1987
Copy link
Contributor

What changes were proposed in this pull request?

We generate bitmasks for grouping sets during the parsing process, and use these during analysis. These bitmasks are difficult to work with in practice and have lead to numerous bugs. This PR removes these and use actual sets instead, however we still need to generate these offsets for the grouping_id.

This PR does the following works:

  1. Replace bitmasks by actual grouping sets durning Parsing/Analysis stage of CUBE/ROLLUP/GROUPING SETS;
  2. Add new testsuite ResolveGroupingAnalyticsSuite to test the Analyzer.ResolveGroupingAnalytics rule directly;
  3. Fix a minor bug in ResolveGroupingAnalytics.

How was this patch tested?

By existing test cases, and add new testsuite ResolveGroupingAnalyticsSuite to test directly.

// Find the index of the expression.
val e = typedVisit[Expression](eCtx)
val index = expressionMap.find(_._1.semanticEquals(e)).map(_._2).getOrElse(
throw new ParseException(
Copy link
Contributor Author

@jiangxb1987 jiangxb1987 Oct 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't check whether expression is in the GROUP BY list here, moved this to Analysis stage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine.

@@ -335,13 +366,16 @@ class Analyzer(

Aggregate(groupingAttrs, aggregations, expand)

case f @ Filter(cond, child) if hasGroupingFunction(cond) =>
// We should make sure all expressions in condition have been resolved.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expression in condition could be unresolved, we should check this and respect the operator until Filter.condition is resolved. A testcase in ResolveGroupingAnalyticsSuite would fail before this PR.

Seq(Seq(a, b, c, nulInt, nulStr, 3), Seq(a, b, c, a, nulStr, 1), Seq(a, b, c, a, b, 0)),
Seq(a, b, c, a, b, gid),
Project(Seq(a, b, c, a.as("a"), b.as("b")), r1)))))
checkAnalysis(originalPlan2, expected2)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add negative cases for this.

@SparkQA
Copy link

SparkQA commented Oct 14, 2016

Test build #66960 has finished for PR 15484 at commit 776d32e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 14, 2016

Test build #66967 has finished for PR 15484 at commit 4906cf4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor Author

@hvanhovell Could you look at this please? Thank you!

*/
def bitmasks(r: Rollup): Seq[Int] = {
Seq.tabulate(r.groupByExprs.length + 1)(idx => (1 << idx) - 1)
def selectGroupExprsRollup(exprs: Seq[Expression]): Seq[Seq[Expression]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too complex. It is hard to grasp what is going on here. Lets make this imperative:

val buffer = mutable.Buffer.empty[Seq[Expression]]
var current = exprs
while (current.nonEmpty) {
  buffer += current
  current = current.init
}
buffer

*/
def bitmasks(c: Cube): Seq[Int] = {
Seq.tabulate(1 << c.groupByExprs.length)(i => i)
def selectGroupExprsCube(exprs: Seq[Expression]): Seq[Seq[Expression]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this one more concise:

def cubeExprs(exprs: Seq[String]): Seq[Seq[String]] = exprs match {
  case x :: xs =>
    val initial = cubeExprs(xs)
    initial.map(x +: _) ++ initial
  case Nil =>
    Seq(Seq.empty)
}

if (exprs.length == 0) {
Seq(Seq.empty[Expression])
} else {
val expandExprsList = selectGroupExprsCube(exprs.drop(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference: exprs.drop(1) ->exprs.tail

} else {
val expandExprsList = selectGroupExprsCube(exprs.drop(1))
expandExprsList.map { expandExprs =>
exprs.take(1) ++ expandExprs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference: exprs.take(1) ->exprs.head

@@ -281,9 +294,11 @@ class Analyzer(
s"${VirtualColumn.hiveGroupingIdName} is deprecated; use grouping_id() instead")

case Aggregate(Seq(c @ Cube(groupByExprs)), aggregateExpressions, child) =>
GroupingSets(bitmasks(c), groupByExprs, child, aggregateExpressions)
GroupingSets(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more generic comment here would be to put the common code used by Cube, Rollup & Grouping Sets in a single method. It seems a bit wasteful to rewrite this into a GroupingSets plan, to pick that up later down the line (especially since determining things like nullabilty is trivial for Cube and Rollup).

// so we unset the bit in bitmap.
bitmap & ~(1 << (numExpressions - 1 - index))
val selectedGroupByExprs = ctx.groupingSet.asScala.map {
_.expression.asScala.foldLeft(Seq.empty[Expression]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use a map here: _.expression().asScala.map(e => expression(e))

// Find the index of the expression.
val e = typedVisit[Expression](eCtx)
val index = expressionMap.find(_._1.semanticEquals(e)).map(_._2).getOrElse(
throw new ParseException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine.

val numAttributes = attrMap.size
val mask = (1 << numAttributes) - 1

groupingSetAttrs.foldLeft(mask) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only need fold left when you want to traverse the collection in a certain order. Folds are typically not the easiest to understand, so use them sparingly and prefer to use more imperative constructs.

Could you rewrite this using a more imperative approach?

Just for kicks & giggles:

groupingSetAttrs.map(attrMap).map(index => ~(1 << (numAttributes - 1 - index))).reduce(_ && _)

@jiangxb1987
Copy link
Contributor Author

@hvanhovell Thank you for your comments - They are awesome! I've made some changes according to your advice, I hope the code looks better now. Thanks a lot!

@SparkQA
Copy link

SparkQA commented Oct 20, 2016

Test build #67253 has finished for PR 15484 at commit 9502196.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 21, 2016

Test build #67325 has finished for PR 15484 at commit 925a5ca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

attrMap: Map[Attribute, Int]): Int = {
val numAttributes = attrMap.size
val mask = (1 << numAttributes) - 1
(mask +: groupingSetAttrs.map(attrMap).map(index =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a little bit of documentation on the mask? It is non-trivial to understand.

It might also be a good idea to split this into two separate statements. One to calculate the the attribute masks and one to reduce them.

// GROUPING SETS ((a,b), a), we do not need to change the nullability of a, but we
// should change the nullabilty of b to be TRUE.
// TODO: For Cube/Rollup just set nullability to be `true`.
val expandedAttributes = groupByAliases.zipWithIndex.map { case (a, idx) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idx is not used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Looking at it more, I feel zipWithIndex is not needed at all and the map would suffice.

aggregations: Seq[NamedExpression],
groupByAliases: Seq[Alias],
groupingAttrs: Seq[Expression],
gid: Attribute): Seq[NamedExpression] = aggregations.map { case expr =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need the "case" here.

@rxin
Copy link
Contributor

rxin commented Oct 22, 2016

cc @davies too

def bitmasks(r: Rollup): Seq[Int] = {
Seq.tabulate(r.groupByExprs.length + 1)(idx => (1 << idx) - 1)
def rollupExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = {
val buffer = ArrayBuffer.empty[Seq[Expression]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using ArrayBuffer as insertions would lead to expansion of underlying array and copying of data to the new one. Since you know the size upfront, you could create an Array of required size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of ArrayBuffer will make this piece of code more concise, since the sequence of exprs is not usually very long, maybe performance is not the major concern here, I'd prefer to keep this one, is it OK? @hvanhovell

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just exprs.inits ?

to be honest this is the first time I've seen the use of init/inits on a trait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exprs.inits is much more concise.

// GROUPING SETS ((a,b), a), we do not need to change the nullability of a, but we
// should change the nullabilty of b to be TRUE.
// TODO: For Cube/Rollup just set nullability to be `true`.
val expandedAttributes = groupByAliases.zipWithIndex.map { case (a, idx) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Looking at it more, I feel zipWithIndex is not needed at all and the map would suffice.

val groupingSetsAttributes = selectedGroupByExprs.map { groupingSetExprs =>
groupingSetExprs.map { expr =>
val alias = groupByAliases.find(_.child.semanticEquals(expr)).getOrElse(
failAnalysis(s"$expr doesn't show up in the GROUP BY list"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also display the GROUP BY list in the message ?

@jiangxb1987
Copy link
Contributor Author

@tejasapatil @rxin I've addressed most of your comments, thanks for reviewing this!

@SparkQA
Copy link

SparkQA commented Oct 23, 2016

Test build #67405 has finished for PR 15484 at commit a47cc68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor Author

@davies Would you please have a look at this PR? Thank you!

@davies
Copy link
Contributor

davies commented Oct 25, 2016

@jiangxb1987 Could you say a little bit more about the "minor bug"? that help us to decide this patch should be backported or not.

@hvanhovell
Copy link
Contributor

It is not really a bug. Bitmap manipulation has bitten us quite a few time in the past, so I would rather use expressions.

@davies
Copy link
Contributor

davies commented Oct 25, 2016

I see, this PR just improve the readability, we don't need to backport it, looks good to me overall.

@jiangxb1987
Copy link
Contributor Author

What else should I update on this PR? Please don't be hesitate to require any change, thanks!

@jiangxb1987
Copy link
Contributor Author

ping @hvanhovell

*/
def bitmasks(c: Cube): Seq[Int] = {
Seq.tabulate(1 << c.groupByExprs.length)(i => i)
def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also write unit tests specifically for cubeExprs and rollupExprs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think you can just use subsets? e.g.

scala> Seq(1, 2, 3).toSet.subsets.foreach(println)
Set()
Set(1)
Set(2)
Set(3)
Set(1, 2)
Set(1, 3)
Set(2, 3)
Set(1, 2, 3)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid we can't just map the exprs to a set because we want to keep the original order.

@SparkQA
Copy link

SparkQA commented Nov 6, 2016

Test build #68245 has finished for PR 15484 at commit f8c6a04.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 6, 2016

Test build #68246 has finished for PR 15484 at commit ef3a733.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor Author

Does this version looks good now?

@jiangxb1987
Copy link
Contributor Author

ping @hvanhovell

@hvanhovell
Copy link
Contributor

LGTM - merging to master. Thanks!

@asfgit asfgit closed this in 344dcad Nov 8, 2016
@jiangxb1987 jiangxb1987 deleted the group-set branch November 9, 2016 02:00
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
… CUBE/ROLLUP/GROUPING SETS

## What changes were proposed in this pull request?

We generate bitmasks for grouping sets during the parsing process, and use these during analysis. These bitmasks are difficult to work with in practice and have lead to numerous bugs. This PR removes these and use actual sets instead, however we still need to generate these offsets for the grouping_id.

This PR does the following works:
1. Replace bitmasks by actual grouping sets durning Parsing/Analysis stage of CUBE/ROLLUP/GROUPING SETS;
2. Add new testsuite `ResolveGroupingAnalyticsSuite` to test the `Analyzer.ResolveGroupingAnalytics` rule directly;
3. Fix a minor bug in `ResolveGroupingAnalytics`.
## How was this patch tested?

By existing test cases, and add new testsuite `ResolveGroupingAnalyticsSuite` to test directly.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes apache#15484 from jiangxb1987/group-set.
asfgit pushed a commit that referenced this pull request Sep 20, 2017
## What changes were proposed in this pull request?

Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```

It can be traced back to #15484 , which made `Expand.projections` a lazy `Stream` for group by cube.

In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts.

This change is also good for master branch, to reduce the serialized size of `Expand.projections`.

## How was this patch tested?

manually verified with Spark with Scala 2.10.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19289 from cloud-fan/bug.

(cherry picked from commit ce6a71e)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
ghost pushed a commit to dbtsai/spark that referenced this pull request Sep 20, 2017
## What changes were proposed in this pull request?

Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```

It can be traced back to apache#15484 , which made `Expand.projections` a lazy `Stream` for group by cube.

In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts.

This change is also good for master branch, to reduce the serialized size of `Expand.projections`.

## How was this patch tested?

manually verified with Spark with Scala 2.10.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#19289 from cloud-fan/bug.
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
## What changes were proposed in this pull request?

Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```

It can be traced back to apache#15484 , which made `Expand.projections` a lazy `Stream` for group by cube.

In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts.

This change is also good for master branch, to reduce the serialized size of `Expand.projections`.

## How was this patch tested?

manually verified with Spark with Scala 2.10.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#19289 from cloud-fan/bug.

(cherry picked from commit ce6a71e)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants