Skip to content

Commit b9503fc

Browse files
committed
[SPARK-23312][SQL] add a config to turn off vectorized cache reader
## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-23309 reported a performance regression about cached table in Spark 2.3. While the investigating is still going on, this PR adds a conf to turn off the vectorized cache reader, to unblock the 2.3 release. ## How was this patch tested? a new test Author: Wenchen Fan <wenchen@databricks.com> Closes #20483 from cloud-fan/cache.
1 parent 19c7c7e commit b9503fc

File tree

3 files changed

+22
-3
lines changed

3 files changed

+22
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,12 @@ object SQLConf {
141141
.booleanConf
142142
.createWithDefault(true)
143143

144+
val CACHE_VECTORIZED_READER_ENABLED =
145+
buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader")
146+
.doc("Enables vectorized reader for columnar caching.")
147+
.booleanConf
148+
.createWithDefault(true)
149+
144150
val COLUMN_VECTOR_OFFHEAP_ENABLED =
145151
buildConf("spark.sql.columnVector.offheap.enabled")
146152
.internal()
@@ -1272,6 +1278,8 @@ class SQLConf extends Serializable with Logging {
12721278

12731279
def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
12741280

1281+
def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)
1282+
12751283
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
12761284

12771285
def targetPostShuffleInputSize: Long =

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ case class InMemoryTableScanExec(
5454
override val supportsBatch: Boolean = {
5555
// In the initial implementation, for ease of review
5656
// support only primitive data types and # of fields is less than wholeStageMaxNumFields
57-
relation.schema.fields.forall(f => f.dataType match {
57+
conf.cacheVectorizedReaderEnabled && relation.schema.fields.forall(f => f.dataType match {
5858
case BooleanType | ByteType | ShortType | IntegerType | LongType |
5959
FloatType | DoubleType => true
6060
case _ => false

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@ import scala.collection.mutable.HashSet
2121
import scala.concurrent.duration._
2222
import scala.language.postfixOps
2323

24-
import org.scalatest.concurrent.Eventually._
25-
2624
import org.apache.spark.CleanerListener
2725
import org.apache.spark.sql.catalyst.TableIdentifier
2826
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
2927
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
3028
import org.apache.spark.sql.execution.columnar._
3129
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
3230
import org.apache.spark.sql.functions._
31+
import org.apache.spark.sql.internal.SQLConf
3332
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
3433
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
3534
import org.apache.spark.util.{AccumulatorContext, Utils}
@@ -782,4 +781,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
782781
assert(getNumInMemoryRelations(cachedDs2) == 1)
783782
}
784783
}
784+
785+
test("SPARK-23312: vectorized cache reader can be disabled") {
786+
Seq(true, false).foreach { vectorized =>
787+
withSQLConf(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
788+
val df = spark.range(10).cache()
789+
df.queryExecution.executedPlan.foreach {
790+
case i: InMemoryTableScanExec => assert(i.supportsBatch == vectorized)
791+
case _ =>
792+
}
793+
}
794+
}
795+
}
785796
}

0 commit comments

Comments
 (0)