Skip to content

Commit 385edc0

Browse files
committed
address comments and added tests
1 parent 064f6d1 commit 385edc0

File tree

3 files changed

+38
-5
lines changed

3 files changed

+38
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class CacheManager extends Logging {
8181
}
8282

8383
private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = {
84-
if (plan.conf.cboEnabled && plan.stats.rowCount.isDefined) {
84+
if (plan.stats.rowCount.isDefined) {
8585
Some(plan.stats)
8686
} else {
8787
None
@@ -156,7 +156,7 @@ class CacheManager extends Logging {
156156
storageLevel = cd.cachedRepresentation.storageLevel,
157157
child = spark.sessionState.executePlan(cd.plan).executedPlan,
158158
tableName = cd.cachedRepresentation.tableName,
159-
stats = extractStatsOfPlanForCache(cd.plan))
159+
statsOfPlanToCache = extractStatsOfPlanForCache(cd.plan))
160160
needToRecache += cd.copy(cachedRepresentation = newCache)
161161
}
162162
}

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ object InMemoryRelation {
3838
storageLevel: StorageLevel,
3939
child: SparkPlan,
4040
tableName: Option[String],
41-
stats: Option[Statistics]): InMemoryRelation =
41+
statsOfPlanToCache: Option[Statistics]): InMemoryRelation =
4242
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)(
43-
statsOfPlanToCache = stats)
43+
statsOfPlanToCache = statsOfPlanToCache)
4444
}
4545

4646

@@ -73,7 +73,10 @@ case class InMemoryRelation(
7373
@transient val partitionStatistics = new PartitionStatistics(output)
7474

7575
override def computeStats(): Statistics = {
76+
// scalastyle:off
7677
if (batchStats.value == 0L) {
78+
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache when
79+
// applicable
7780
statsOfPlanToCache.getOrElse(Statistics(sizeInBytes =
7881
child.sqlContext.conf.defaultSizeInBytes))
7982
} else {

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.columnar
2020
import java.nio.charset.StandardCharsets
2121
import java.sql.{Date, Timestamp}
2222

23-
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
23+
import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
24+
import org.apache.spark.sql.catalyst.catalog._
2425
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In}
2526
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
2627
import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec, WholeStageCodegenExec}
@@ -30,6 +31,7 @@ import org.apache.spark.sql.test.SharedSQLContext
3031
import org.apache.spark.sql.test.SQLTestData._
3132
import org.apache.spark.sql.types._
3233
import org.apache.spark.storage.StorageLevel._
34+
import org.apache.spark.util.Utils
3335

3436
class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
3537
import testImplicits._
@@ -480,4 +482,32 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
480482
}
481483
}
482484
}
485+
486+
test("SPARK-22673: InMemoryRelation should utilize existing stats whenever possible") {
487+
withSQLConf("spark.sql.cbo.enabled" -> "true") {
488+
// scalastyle:off
489+
val workDir = s"${Utils.createTempDir()}/table1"
490+
val data = Seq(100, 200, 300, 400).toDF("count")
491+
data.write.parquet(workDir)
492+
val dfFromFile = spark.read.parquet(workDir).cache()
493+
val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect {
494+
case plan: InMemoryRelation => plan
495+
}.head
496+
// InMemoryRelation's stats is Long.MaxValue before the underlying RDD is materialized
497+
assert(inMemoryRelation.computeStats().sizeInBytes === Long.MaxValue)
498+
// InMemoryRelation's stats is updated after materializing RDD
499+
dfFromFile.collect()
500+
assert(inMemoryRelation.computeStats().sizeInBytes === 16)
501+
// test of catalog table
502+
val dfFromTable = spark.catalog.createTable("table1", workDir).cache()
503+
val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
504+
collect { case plan: InMemoryRelation => plan }.head
505+
// Even CBO enabled, InMemoryRelation's stats keeps as the default one before table's stats
506+
// is calculated
507+
assert(inMemoryRelation2.computeStats().sizeInBytes === Long.MaxValue)
508+
// InMemoryRelation's stats should be updated after calculating stats of the table
509+
spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
510+
assert(inMemoryRelation2.computeStats().sizeInBytes === 16)
511+
}
512+
}
483513
}

0 commit comments

Comments
 (0)