-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17868][SQL] Do not use bitmasks during parsing and analysis of CUBE/ROLLUP/GROUPING SETS #15484
Conversation
// Find the index of the expression. | ||
val e = typedVisit[Expression](eCtx) | ||
val index = expressionMap.find(_._1.semanticEquals(e)).map(_._2).getOrElse( | ||
throw new ParseException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't check whether expression is in the GROUP BY list here, moved this to Analysis stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is fine.
@@ -335,13 +366,16 @@ class Analyzer( | |||
|
|||
Aggregate(groupingAttrs, aggregations, expand) | |||
|
|||
case f @ Filter(cond, child) if hasGroupingFunction(cond) => | |||
// We should make sure all expressions in condition have been resolved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expression in condition could be unresolved, we should check this and respect the operator until Filter.condition
is resolved. A testcase in ResolveGroupingAnalyticsSuite
would fail before this PR.
Seq(Seq(a, b, c, nulInt, nulStr, 3), Seq(a, b, c, a, nulStr, 1), Seq(a, b, c, a, b, 0)), | ||
Seq(a, b, c, a, b, gid), | ||
Project(Seq(a, b, c, a.as("a"), b.as("b")), r1))))) | ||
checkAnalysis(originalPlan2, expected2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add negative cases for this.
Test build #66960 has finished for PR 15484 at commit
|
Test build #66967 has finished for PR 15484 at commit
|
@hvanhovell Could you look at this please? Thank you! |
*/ | ||
def bitmasks(r: Rollup): Seq[Int] = { | ||
Seq.tabulate(r.groupByExprs.length + 1)(idx => (1 << idx) - 1) | ||
def selectGroupExprsRollup(exprs: Seq[Expression]): Seq[Seq[Expression]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too complex. It is hard to grasp what is going on here. Lets make this imperative:
val buffer = mutable.Buffer.empty[Seq[Expression]]
var current = exprs
while (current.nonEmpty) {
buffer += current
current = current.init
}
buffer
*/ | ||
def bitmasks(c: Cube): Seq[Int] = { | ||
Seq.tabulate(1 << c.groupByExprs.length)(i => i) | ||
def selectGroupExprsCube(exprs: Seq[Expression]): Seq[Seq[Expression]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make this one more concise:
def cubeExprs(exprs: Seq[String]): Seq[Seq[String]] = exprs match {
case x :: xs =>
val initial = cubeExprs(xs)
initial.map(x +: _) ++ initial
case Nil =>
Seq(Seq.empty)
}
if (exprs.length == 0) { | ||
Seq(Seq.empty[Expression]) | ||
} else { | ||
val expandExprsList = selectGroupExprsCube(exprs.drop(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For future reference: exprs.drop(1)
->exprs.tail
} else { | ||
val expandExprsList = selectGroupExprsCube(exprs.drop(1)) | ||
expandExprsList.map { expandExprs => | ||
exprs.take(1) ++ expandExprs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For future reference: exprs.take(1) ->exprs.head
@@ -281,9 +294,11 @@ class Analyzer( | |||
s"${VirtualColumn.hiveGroupingIdName} is deprecated; use grouping_id() instead") | |||
|
|||
case Aggregate(Seq(c @ Cube(groupByExprs)), aggregateExpressions, child) => | |||
GroupingSets(bitmasks(c), groupByExprs, child, aggregateExpressions) | |||
GroupingSets( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more generic comment here would be to put the common code used by Cube, Rollup & Grouping Sets in a single method. It seems a bit wasteful to rewrite this into a GroupingSets plan, to pick that up later down the line (especially since determining things like nullabilty is trivial for Cube and Rollup).
// so we unset the bit in bitmap. | ||
bitmap & ~(1 << (numExpressions - 1 - index)) | ||
val selectedGroupByExprs = ctx.groupingSet.asScala.map { | ||
_.expression.asScala.foldLeft(Seq.empty[Expression]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use a map here: _.expression().asScala.map(e => expression(e))
// Find the index of the expression. | ||
val e = typedVisit[Expression](eCtx) | ||
val index = expressionMap.find(_._1.semanticEquals(e)).map(_._2).getOrElse( | ||
throw new ParseException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is fine.
val numAttributes = attrMap.size | ||
val mask = (1 << numAttributes) - 1 | ||
|
||
groupingSetAttrs.foldLeft(mask) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You only need fold left when you want to traverse the collection in a certain order. Folds are typically not the easiest to understand, so use them sparingly and prefer to use more imperative constructs.
Could you rewrite this using a more imperative approach?
Just for kicks & giggles:
groupingSetAttrs.map(attrMap).map(index => ~(1 << (numAttributes - 1 - index))).reduce(_ && _)
@hvanhovell Thank you for your comments - They are awesome! I've made some changes according to your advice, I hope the code looks better now. Thanks a lot! |
Test build #67253 has finished for PR 15484 at commit
|
Test build #67325 has finished for PR 15484 at commit
|
attrMap: Map[Attribute, Int]): Int = { | ||
val numAttributes = attrMap.size | ||
val mask = (1 << numAttributes) - 1 | ||
(mask +: groupingSetAttrs.map(attrMap).map(index => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a little bit of documentation on the mask? It is non-trivial to understand.
It might also be a good idea to split this into two separate statements. One to calculate the the attribute masks and one to reduce them.
// GROUPING SETS ((a,b), a), we do not need to change the nullability of a, but we | ||
// should change the nullabilty of b to be TRUE. | ||
// TODO: For Cube/Rollup just set nullability to be `true`. | ||
val expandedAttributes = groupByAliases.zipWithIndex.map { case (a, idx) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idx is not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Looking at it more, I feel zipWithIndex
is not needed at all and the map
would suffice.
aggregations: Seq[NamedExpression], | ||
groupByAliases: Seq[Alias], | ||
groupingAttrs: Seq[Expression], | ||
gid: Attribute): Seq[NamedExpression] = aggregations.map { case expr => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need the "case" here.
cc @davies too |
def bitmasks(r: Rollup): Seq[Int] = { | ||
Seq.tabulate(r.groupByExprs.length + 1)(idx => (1 << idx) - 1) | ||
def rollupExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = { | ||
val buffer = ArrayBuffer.empty[Seq[Expression]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using ArrayBuffer
as insertions would lead to expansion of underlying array and copying of data to the new one. Since you know the size upfront, you could create an Array
of required size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of ArrayBuffer
will make this piece of code more concise, since the sequence of exprs
is not usually very long, maybe performance is not the major concern here, I'd prefer to keep this one, is it OK? @hvanhovell
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this just exprs.inits
?
to be honest this is the first time I've seen the use of init/inits on a trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exprs.inits
is much more concise.
// GROUPING SETS ((a,b), a), we do not need to change the nullability of a, but we | ||
// should change the nullabilty of b to be TRUE. | ||
// TODO: For Cube/Rollup just set nullability to be `true`. | ||
val expandedAttributes = groupByAliases.zipWithIndex.map { case (a, idx) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Looking at it more, I feel zipWithIndex
is not needed at all and the map
would suffice.
val groupingSetsAttributes = selectedGroupByExprs.map { groupingSetExprs => | ||
groupingSetExprs.map { expr => | ||
val alias = groupByAliases.find(_.child.semanticEquals(expr)).getOrElse( | ||
failAnalysis(s"$expr doesn't show up in the GROUP BY list")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also display the GROUP BY list in the message ?
@tejasapatil @rxin I've addressed most of your comments, thanks for reviewing this! |
Test build #67405 has finished for PR 15484 at commit
|
@davies Would you please have a look at this PR? Thank you! |
@jiangxb1987 Could you say a little bit more about the "minor bug"? that help us to decide this patch should be backported or not. |
It is not really a bug. Bitmap manipulation has bitten us quite a few time in the past, so I would rather use expressions. |
I see, this PR just improve the readability, we don't need to backport it, looks good to me overall. |
What else should I update on this PR? Please don't be hesitate to require any change, thanks! |
ping @hvanhovell |
*/ | ||
def bitmasks(c: Cube): Seq[Int] = { | ||
Seq.tabulate(1 << c.groupByExprs.length)(i => i) | ||
def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also write unit tests specifically for cubeExprs and rollupExprs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I think you can just use subsets? e.g.
scala> Seq(1, 2, 3).toSet.subsets.foreach(println)
Set()
Set(1)
Set(2)
Set(3)
Set(1, 2)
Set(1, 3)
Set(2, 3)
Set(1, 2, 3)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid we can't just map the exprs
to a set because we want to keep the original order.
f8c6a04
to
ef3a733
Compare
Test build #68245 has finished for PR 15484 at commit
|
Test build #68246 has finished for PR 15484 at commit
|
Does this version looks good now? |
ping @hvanhovell |
LGTM - merging to master. Thanks! |
… CUBE/ROLLUP/GROUPING SETS ## What changes were proposed in this pull request? We generate bitmasks for grouping sets during the parsing process, and use these during analysis. These bitmasks are difficult to work with in practice and have lead to numerous bugs. This PR removes these and use actual sets instead, however we still need to generate these offsets for the grouping_id. This PR does the following works: 1. Replace bitmasks by actual grouping sets durning Parsing/Analysis stage of CUBE/ROLLUP/GROUPING SETS; 2. Add new testsuite `ResolveGroupingAnalyticsSuite` to test the `Analyzer.ResolveGroupingAnalytics` rule directly; 3. Fix a minor bug in `ResolveGroupingAnalytics`. ## How was this patch tested? By existing test cases, and add new testsuite `ResolveGroupingAnalyticsSuite` to test directly. Author: jiangxingbo <jiangxb1987@gmail.com> Closes apache#15484 from jiangxb1987/group-set.
## What changes were proposed in this pull request? Spark with Scala 2.10 fails with a group by cube: ``` spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug") spark.sql("select 1 from rollup_bug group by rollup ()").show ``` It can be traced back to #15484 , which made `Expand.projections` a lazy `Stream` for group by cube. In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts. This change is also good for master branch, to reduce the serialized size of `Expand.projections`. ## How was this patch tested? manually verified with Spark with Scala 2.10. Author: Wenchen Fan <wenchen@databricks.com> Closes #19289 from cloud-fan/bug. (cherry picked from commit ce6a71e) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request? Spark with Scala 2.10 fails with a group by cube: ``` spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug") spark.sql("select 1 from rollup_bug group by rollup ()").show ``` It can be traced back to apache#15484 , which made `Expand.projections` a lazy `Stream` for group by cube. In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts. This change is also good for master branch, to reduce the serialized size of `Expand.projections`. ## How was this patch tested? manually verified with Spark with Scala 2.10. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#19289 from cloud-fan/bug.
## What changes were proposed in this pull request? Spark with Scala 2.10 fails with a group by cube: ``` spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug") spark.sql("select 1 from rollup_bug group by rollup ()").show ``` It can be traced back to apache#15484 , which made `Expand.projections` a lazy `Stream` for group by cube. In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts. This change is also good for master branch, to reduce the serialized size of `Expand.projections`. ## How was this patch tested? manually verified with Spark with Scala 2.10. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#19289 from cloud-fan/bug. (cherry picked from commit ce6a71e) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
What changes were proposed in this pull request?
We generate bitmasks for grouping sets during the parsing process, and use these during analysis. These bitmasks are difficult to work with in practice and have lead to numerous bugs. This PR removes these and use actual sets instead, however we still need to generate these offsets for the grouping_id.
This PR does the following works:
ResolveGroupingAnalyticsSuite
to test theAnalyzer.ResolveGroupingAnalytics
rule directly;ResolveGroupingAnalytics
.How was this patch tested?
By existing test cases, and add new testsuite
ResolveGroupingAnalyticsSuite
to test directly.