Skip to content

[SPARK-17549][sql] Only collect table size stat in driver for cached relation. #15112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.{Map => JavaMap}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler}
Expand Down Expand Up @@ -910,14 +911,19 @@ object CodeGenerator extends Logging {
codeAttrField.setAccessible(true)
classes.foreach { case (_, classBytes) =>
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
cf.methodInfos.asScala.foreach { method =>
method.getAttributes().foreach { a =>
if (a.getClass.getName == codeAttr.getName) {
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
try {
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
cf.methodInfos.asScala.foreach { method =>
method.getAttributes().foreach { a =>
if (a.getClass.getName == codeAttr.getName) {
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this block fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Thanks. Seems it is a separate issue. Let's not change this part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. I forgot to send the above comment...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But without this fix you can't read the table I described in the bug at all, because SQL just blows up. If you want a separate patch just for that ok, but seems like overkill to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry is that we will just forget about this issue if we just make it log a warning. Removing this try/catch will not fail any existing tests, right? We can create a new jira to fix this issue for Spark 2.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about my suggestion of adding the workaround and filing a bug? Then there's no worry about forgetting anything.

Because it's most probably a Janino bug, fixing it might not be as simple as just making some change in Spark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Seems this part is used to record some metrics. I guess it is fine. But, let me ping @ericl who added this method to double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case I filed SPARK-17565 to track the actual fix. This is just a workaround so Spark doesn't fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems ok to me. We have a unit test for the metric here so it isn't likely to break entirely without notice.

} catch {
case NonFatal(e) =>
logWarning("Error calculating stats of compiled class.", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.columnar

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils

import org.apache.spark.network.util.JavaUtils
Expand All @@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.CollectionAccumulator
import org.apache.spark.util.LongAccumulator


object InMemoryRelation {
Expand Down Expand Up @@ -63,8 +61,7 @@ case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: CollectionAccumulator[InternalRow] =
child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
Expand All @@ -74,21 +71,12 @@ case class InMemoryRelation(
@transient val partitionStatistics = new PartitionStatistics(output)

override lazy val statistics: Statistics = {
if (batchStats.value.isEmpty) {
if (batchStats.value == 0L) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
// Underlying columnar RDD has been materialized, required information has also been
// collected via the `batchStats` accumulator.
val sizeOfRow: Expression =
BindReferences.bindReference(
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
partitionStatistics.schema)

val sizeInBytes =
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
Statistics(sizeInBytes = sizeInBytes)
Statistics(sizeInBytes = batchStats.value.longValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you double check if we have test to make sure the total size is correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that I changed the stat and all tests still passed locally, I doubt we have one... I'll take a look once I find some time to get back to this patch.

}
}

Expand Down Expand Up @@ -139,10 +127,10 @@ case class InMemoryRelation(
rowCount += 1
}

batchStats.add(totalSize)

val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))

batchStats.add(stats)
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,18 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}

test("SPARK-17549: cached table size should be correctly calculated") {
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)

// Materialize the data.
val expectedAnswer = data.collect()
checkAnswer(cached, expectedAnswer)

// Check that the right size was calculated.
assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


}