-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17549][sql] Only collect table size stat in driver for cached relation. #15112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,6 @@ | |
|
||
package org.apache.spark.sql.execution.columnar | ||
|
||
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.commons.lang3.StringUtils | ||
|
||
import org.apache.spark.network.util.JavaUtils | ||
|
@@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical | |
import org.apache.spark.sql.catalyst.plans.logical.Statistics | ||
import org.apache.spark.sql.execution.SparkPlan | ||
import org.apache.spark.storage.StorageLevel | ||
import org.apache.spark.util.CollectionAccumulator | ||
import org.apache.spark.util.LongAccumulator | ||
|
||
|
||
object InMemoryRelation { | ||
|
@@ -63,8 +61,7 @@ case class InMemoryRelation( | |
@transient child: SparkPlan, | ||
tableName: Option[String])( | ||
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, | ||
val batchStats: CollectionAccumulator[InternalRow] = | ||
child.sqlContext.sparkContext.collectionAccumulator[InternalRow]) | ||
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) | ||
extends logical.LeafNode with MultiInstanceRelation { | ||
|
||
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) | ||
|
@@ -74,21 +71,12 @@ case class InMemoryRelation( | |
@transient val partitionStatistics = new PartitionStatistics(output) | ||
|
||
override lazy val statistics: Statistics = { | ||
if (batchStats.value.isEmpty) { | ||
if (batchStats.value == 0L) { | ||
// Underlying columnar RDD hasn't been materialized, no useful statistics information | ||
// available, return the default statistics. | ||
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) | ||
} else { | ||
// Underlying columnar RDD has been materialized, required information has also been | ||
// collected via the `batchStats` accumulator. | ||
val sizeOfRow: Expression = | ||
BindReferences.bindReference( | ||
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), | ||
partitionStatistics.schema) | ||
|
||
val sizeInBytes = | ||
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum | ||
Statistics(sizeInBytes = sizeInBytes) | ||
Statistics(sizeInBytes = batchStats.value.longValue) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you double check if we have test to make sure the total size is correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that I changed the stat and all tests still passed locally, I doubt we have one... I'll take a look once I find some time to get back to this patch. |
||
} | ||
} | ||
|
||
|
@@ -139,10 +127,10 @@ case class InMemoryRelation( | |
rowCount += 1 | ||
} | ||
|
||
batchStats.add(totalSize) | ||
|
||
val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) | ||
.flatMap(_.values)) | ||
|
||
batchStats.add(stats) | ||
CachedBatch(rowCount, columnBuilders.map { builder => | ||
JavaUtils.bufferToArray(builder.build()) | ||
}, stats) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -232,4 +232,18 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { | |
val columnTypes2 = List.fill(length2)(IntegerType) | ||
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2) | ||
} | ||
|
||
test("SPARK-17549: cached table size should be correctly calculated") { | ||
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF() | ||
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan | ||
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None) | ||
|
||
// Materialize the data. | ||
val expectedAnswer = data.collect() | ||
checkAnswer(cached, expectedAnswer) | ||
|
||
// Check that the right size was calculated. | ||
assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! |
||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will this block fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Thanks. Seems it is a separate issue. Let's not change this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops. I forgot to send the above comment...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But without this fix you can't read the table I described in the bug at all, because SQL just blows up. If you want a separate patch just for that ok, but seems like overkill to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My worry is that we will just forget about this issue if we just make it log a warning. Removing this try/catch will not fail any existing tests, right? We can create a new jira to fix this issue for Spark 2.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about my suggestion of adding the workaround and filing a bug? Then there's no worry about forgetting anything.
Because it's most probably a Janino bug, fixing it might not be as simple as just making some change in Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Seems this part is used to record some metrics. I guess it is fine. But, let me ping @ericl who added this method to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any case I filed SPARK-17565 to track the actual fix. This is just a workaround so Spark doesn't fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems ok to me. We have a unit test for the metric here so it isn't likely to break entirely without notice.