Skip to content

Commit d1d5069

Browse files
Wesley Tangsrowen
Wesley Tang
authored andcommitted
[SPARK-16664][SQL] Fix persist call on Data frames with more than 200…
## What changes were proposed in this pull request? f12f11e introduced this bug, missed foreach as map ## How was this patch tested? Test added Author: Wesley Tang <tangmingjun@mininglamp.com> Closes #14324 from breakdawn/master.
1 parent 274f3b9 commit d1d5069

File tree

3 files changed

+12
-3
lines changed

3 files changed

+12
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
127127
val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
128128
val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
129129
var groupedAccessorsLength = 0
130-
groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
130+
groupedAccessorsItr.zipWithIndex.foreach { case (body, i) =>
131131
groupedAccessorsLength += 1
132132
val funcName = s"accessors$i"
133133
val funcCode = s"""
@@ -137,7 +137,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
137137
""".stripMargin
138138
ctx.addNewFunction(funcName, funcCode)
139139
}
140-
groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
140+
groupedExtractorsItr.zipWithIndex.foreach { case (body, i) =>
141141
val funcName = s"extractors$i"
142142
val funcCode = s"""
143143
|private void $funcName() {

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,4 +1571,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
15711571
checkAnswer(joined, Row("x", null, null))
15721572
checkAnswer(joined.filter($"new".isNull), Row("x", null, null))
15731573
}
1574+
1575+
test("SPARK-16664: persist with more than 200 columns") {
1576+
val size = 201L
1577+
val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(Seq.range(0, size))))
1578+
val schemas = List.range(0, size).map(a => StructField("name" + a, LongType, true))
1579+
val df = spark.createDataFrame(rdd, StructType(schemas), false)
1580+
assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100)
1581+
}
15741582
}

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
227227
val columnTypes1 = List.fill(length1)(IntegerType)
228228
val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)
229229

230-
val length2 = 10000
230+
// SPARK-16664: the limit of janino is 8117
231+
val length2 = 8117
231232
val columnTypes2 = List.fill(length2)(IntegerType)
232233
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
233234
}

0 commit comments

Comments
 (0)