Skip to content

Commit

Permalink
[SPARK-22076][SQL] Expand.projections should not be a Stream
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```

It can be traced back to #15484 , which made `Expand.projections` a lazy `Stream` for group by cube.

In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts.

This change is also good for master branch, to reduce the serialized size of `Expand.projections`.

## How was this patch tested?

manually verified with Spark with Scala 2.10.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19289 from cloud-fan/bug.
  • Loading branch information
cloud-fan authored and gatorsmile committed Sep 20, 2017
1 parent e17901d commit ce6a71e
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,15 @@ class Analyzer(
* We need to get all of its subsets for a given GROUPBY expression, the subsets are
* represented as sequence of expressions.
*/
def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList match {
def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = {
// `cubeExprs0` is recursive and returns a lazy Stream. Here we call `toIndexedSeq` to
// materialize it and avoid serialization problems later on.
cubeExprs0(exprs).toIndexedSeq
}

def cubeExprs0(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList match {
case x :: xs =>
val initial = cubeExprs(xs)
val initial = cubeExprs0(xs)
initial.map(x +: _) ++ initial
case Nil =>
Seq(Seq.empty)
Expand Down

0 comments on commit ce6a71e

Please sign in to comment.