Skip to content

Commit 01868ab

Browse files
committed
[SPARK-20728][SQL] Make ORCFileFormat configurable between sql/hive and sql/core
1 parent 520d92a commit 01868ab

File tree

21 files changed

+3770
-43
lines changed

21 files changed

+3770
-43
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,23 @@ object SQLConf {
325325
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
326326
.createWithDefault("snappy")
327327

328+
val ORC_ENABLED = buildConf("spark.sql.orc.enabled")
329+
.doc("When true, new ORCFileFormat in sql/core module is used instead of sql/hive module.")
330+
.booleanConf
331+
.createWithDefault(true)
332+
333+
val ORC_COLUMNAR_BATCH_READER_ENABLED =
334+
buildConf("spark.sql.orc.columnarBatchReader.enabled")
335+
.doc("Enables both vectorized orc decoding and columnar batch in whole-stage code gen.")
336+
.booleanConf
337+
.createWithDefault(true)
338+
339+
val ORC_VECTORIZED_READER_ENABLED =
340+
buildConf("spark.sql.orc.vectorizedReader.enabled")
341+
.doc("Enables vectorized orc decoding.")
342+
.booleanConf
343+
.createWithDefault(true)
344+
328345
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
329346
.doc("When true, enable filter pushdown for ORC files.")
330347
.booleanConf
@@ -1007,6 +1024,10 @@ class SQLConf extends Serializable with Logging {
10071024

10081025
def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)
10091026

1027+
def orcColumnarBatchReaderEnabled: Boolean = getConf(ORC_COLUMNAR_BATCH_READER_ENABLED)
1028+
1029+
def orcVectorizedReaderEnabled: Boolean = getConf(ORC_VECTORIZED_READER_ENABLED)
1030+
10101031
def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
10111032

10121033
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
3131
import org.apache.spark.sql.catalyst.plans.QueryPlan
3232
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
3333
import org.apache.spark.sql.execution.datasources._
34+
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
3435
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
3536
import org.apache.spark.sql.execution.metric.SQLMetrics
3637
import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -170,6 +171,8 @@ case class FileSourceScanExec(
170171

171172
val needsUnsafeRowConversion: Boolean = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
172173
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
174+
} else if (relation.fileFormat.isInstanceOf[OrcFileFormat]) {
175+
SparkSession.getActiveSession.get.sessionState.conf.orcVectorizedReaderEnabled
173176
} else {
174177
false
175178
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ case class AlterTableAddColumnsCommand(
189189
columns: Seq[StructField]) extends RunnableCommand {
190190
override def run(sparkSession: SparkSession): Seq[Row] = {
191191
val catalog = sparkSession.sessionState.catalog
192-
val catalogTable = verifyAlterTableAddColumn(catalog, table)
192+
val catalogTable = verifyAlterTableAddColumn(sparkSession, catalog, table)
193193

194194
try {
195195
sparkSession.catalog.uncacheTable(table.quotedString)
@@ -219,6 +219,7 @@ case class AlterTableAddColumnsCommand(
219219
* For datasource table, it currently only supports parquet, json, csv.
220220
*/
221221
private def verifyAlterTableAddColumn(
222+
sparkSession: SparkSession,
222223
catalog: SessionCatalog,
223224
table: TableIdentifier): CatalogTable = {
224225
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
@@ -232,7 +233,7 @@ case class AlterTableAddColumnsCommand(
232233
}
233234

234235
if (DDLUtils.isDatasourceTable(catalogTable)) {
235-
DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
236+
DataSource.lookupDataSource(sparkSession, catalogTable.provider.get).newInstance() match {
236237
// For datasource table, this command can only support the following File format.
237238
// TextFileFormat only default to one column "value"
238239
// OrcFileFormat can not handle difference between user-specified schema and

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3636
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
3737
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
3838
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
39+
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
3940
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
4041
import org.apache.spark.sql.execution.streaming._
42+
import org.apache.spark.sql.internal.SQLConf
4143
import org.apache.spark.sql.sources._
4244
import org.apache.spark.sql.streaming.OutputMode
4345
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
@@ -85,7 +87,7 @@ case class DataSource(
8587

8688
case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
8789

88-
lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
90+
lazy val providingClass: Class[_] = DataSource.lookupDataSource(sparkSession, className)
8991
lazy val sourceInfo: SourceInfo = sourceSchema()
9092
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
9193
private val equality = sparkSession.sessionState.conf.resolver
@@ -557,8 +559,13 @@ object DataSource extends Logging {
557559
"org.apache.spark.Logging")
558560

559561
/** Given a provider name, look up the data source class definition. */
560-
def lookupDataSource(provider: String): Class[_] = {
561-
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
562+
def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = {
563+
var provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
564+
if (Seq("orc", "org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) &&
565+
sparkSession.conf.get(SQLConf.ORC_ENABLED)) {
566+
logInfo(s"$provider1 is replaced with ${classOf[OrcFileFormat].getCanonicalName}")
567+
provider1 = classOf[OrcFileFormat].getCanonicalName
568+
}
562569
val provider2 = s"$provider1.DefaultSource"
563570
val loader = Utils.getContextOrSparkClassLoader
564571
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

0 commit comments

Comments
 (0)