Skip to content

Commit 39e2bad

Browse files
Marcelo Vanzinyhuai
Marcelo Vanzin
authored andcommitted
[SPARK-17549][SQL] Only collect table size stat in driver for cached relation.
The existing code caches all stats for all columns for each partition in the driver; for a large relation, this causes extreme memory usage, which leads to gc hell and application failures. It seems that only the size in bytes of the data is actually used in the driver, so instead just colllect that. In executors, the full stats are still kept, but that's not a big problem; we expect the data to be distributed and thus not really incur in too much memory pressure in each individual executor. There are also potential improvements on the executor side, since the data being stored currently is very wasteful (e.g. storing boxed types vs. primitive types for stats). But that's a separate issue. On a mildly related change, I'm also adding code to catch exceptions in the code generator since Janino was breaking with the test data I tried this patch on. Tested with unit tests and by doing a count a very wide table (20k columns) with many partitions. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #15112 from vanzin/SPARK-17549.
1 parent b9323fc commit 39e2bad

File tree

3 files changed

+32
-24
lines changed

3 files changed

+32
-24
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.{Map => JavaMap}
2323
import scala.collection.JavaConverters._
2424
import scala.collection.mutable
2525
import scala.collection.mutable.ArrayBuffer
26+
import scala.util.control.NonFatal
2627

2728
import com.google.common.cache.{CacheBuilder, CacheLoader}
2829
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler}
@@ -910,14 +911,19 @@ object CodeGenerator extends Logging {
910911
codeAttrField.setAccessible(true)
911912
classes.foreach { case (_, classBytes) =>
912913
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
913-
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
914-
cf.methodInfos.asScala.foreach { method =>
915-
method.getAttributes().foreach { a =>
916-
if (a.getClass.getName == codeAttr.getName) {
917-
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
918-
codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
914+
try {
915+
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
916+
cf.methodInfos.asScala.foreach { method =>
917+
method.getAttributes().foreach { a =>
918+
if (a.getClass.getName == codeAttr.getName) {
919+
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
920+
codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
921+
}
919922
}
920923
}
924+
} catch {
925+
case NonFatal(e) =>
926+
logWarning("Error calculating stats of compiled class.", e)
921927
}
922928
}
923929
}

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.execution.columnar
1919

20-
import scala.collection.JavaConverters._
21-
2220
import org.apache.commons.lang3.StringUtils
2321

2422
import org.apache.spark.network.util.JavaUtils
@@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical
3129
import org.apache.spark.sql.catalyst.plans.logical.Statistics
3230
import org.apache.spark.sql.execution.SparkPlan
3331
import org.apache.spark.storage.StorageLevel
34-
import org.apache.spark.util.CollectionAccumulator
32+
import org.apache.spark.util.LongAccumulator
3533

3634

3735
object InMemoryRelation {
@@ -63,8 +61,7 @@ case class InMemoryRelation(
6361
@transient child: SparkPlan,
6462
tableName: Option[String])(
6563
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
66-
val batchStats: CollectionAccumulator[InternalRow] =
67-
child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
64+
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
6865
extends logical.LeafNode with MultiInstanceRelation {
6966

7067
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
@@ -74,21 +71,12 @@ case class InMemoryRelation(
7471
@transient val partitionStatistics = new PartitionStatistics(output)
7572

7673
override lazy val statistics: Statistics = {
77-
if (batchStats.value.isEmpty) {
74+
if (batchStats.value == 0L) {
7875
// Underlying columnar RDD hasn't been materialized, no useful statistics information
7976
// available, return the default statistics.
8077
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
8178
} else {
82-
// Underlying columnar RDD has been materialized, required information has also been
83-
// collected via the `batchStats` accumulator.
84-
val sizeOfRow: Expression =
85-
BindReferences.bindReference(
86-
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
87-
partitionStatistics.schema)
88-
89-
val sizeInBytes =
90-
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
91-
Statistics(sizeInBytes = sizeInBytes)
79+
Statistics(sizeInBytes = batchStats.value.longValue)
9280
}
9381
}
9482

@@ -139,10 +127,10 @@ case class InMemoryRelation(
139127
rowCount += 1
140128
}
141129

130+
batchStats.add(totalSize)
131+
142132
val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
143133
.flatMap(_.values))
144-
145-
batchStats.add(stats)
146134
CachedBatch(rowCount, columnBuilders.map { builder =>
147135
JavaUtils.bufferToArray(builder.build())
148136
}, stats)

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,4 +232,18 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
232232
val columnTypes2 = List.fill(length2)(IntegerType)
233233
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
234234
}
235+
236+
test("SPARK-17549: cached table size should be correctly calculated") {
237+
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
238+
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
239+
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
240+
241+
// Materialize the data.
242+
val expectedAnswer = data.collect()
243+
checkAnswer(cached, expectedAnswer)
244+
245+
// Check that the right size was calculated.
246+
assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
247+
}
248+
235249
}

0 commit comments

Comments
 (0)