Skip to content

Commit e7beb02

Browse files
committed
Address comments.
1 parent 37e240c commit e7beb02

File tree

11 files changed

+26
-21
lines changed

11 files changed

+26
-21
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,10 +363,13 @@ object SQLConf {
363363
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
364364
.createWithDefault("snappy")
365365

366-
val ORC_ENABLED = buildConf("spark.sql.orc.enabled")
367-
.doc("When true, use OrcFileFormat in sql/core module instead of the one in sql/hive module.")
366+
val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion")
367+
.doc("When true, use new OrcFileFormat in sql/core module instead of the one in sql/hive. " +
368+
"Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " +
369+
"more stable and faster.")
370+
.internal()
368371
.booleanConf
369-
.createWithDefault(false)
372+
.createWithDefault(true)
370373

371374
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
372375
.doc("When true, enable filter pushdown for ORC files.")

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
182182
"read files of Hive data source directly.")
183183
}
184184

185-
val cls = DataSource.lookupDataSource(sparkSession, source)
185+
val cls = DataSource.lookupDataSource(sparkSession.sessionState.conf, source)
186186
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
187187
val options = new DataSourceV2Options(extraOptions.asJava)
188188

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
234234

235235
assertNotBucketed("save")
236236

237-
val cls = DataSource.lookupDataSource(df.sparkSession, source)
237+
val cls = DataSource.lookupDataSource(df.sparkSession.sessionState.conf, source)
238238
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
239239
cls.newInstance() match {
240240
case ds: WriteSupport =>

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils
4040
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
4141
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
4242
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
43+
import org.apache.spark.sql.internal.SQLConf
4344
import org.apache.spark.sql.types._
4445
import org.apache.spark.sql.util.SchemaUtils
4546
import org.apache.spark.util.Utils
@@ -190,7 +191,7 @@ case class AlterTableAddColumnsCommand(
190191
colsToAdd: Seq[StructField]) extends RunnableCommand {
191192
override def run(sparkSession: SparkSession): Seq[Row] = {
192193
val catalog = sparkSession.sessionState.catalog
193-
val catalogTable = verifyAlterTableAddColumn(sparkSession, catalog, table)
194+
val catalogTable = verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table)
194195

195196
try {
196197
sparkSession.catalog.uncacheTable(table.quotedString)
@@ -216,7 +217,7 @@ case class AlterTableAddColumnsCommand(
216217
* For datasource table, it currently only supports parquet, json, csv.
217218
*/
218219
private def verifyAlterTableAddColumn(
219-
sparkSession: SparkSession,
220+
conf: SQLConf,
220221
catalog: SessionCatalog,
221222
table: TableIdentifier): CatalogTable = {
222223
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
@@ -230,7 +231,7 @@ case class AlterTableAddColumnsCommand(
230231
}
231232

232233
if (DDLUtils.isDatasourceTable(catalogTable)) {
233-
DataSource.lookupDataSource(sparkSession, catalogTable.provider.get).newInstance() match {
234+
DataSource.lookupDataSource(conf, catalogTable.provider.get).newInstance() match {
234235
// For datasource table, this command can only support the following File format.
235236
// TextFileFormat only default to one column "value"
236237
// Hive type is already considered as hive serde table, so the logic will not

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ case class DataSource(
8787

8888
case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
8989

90-
lazy val providingClass: Class[_] = DataSource.lookupDataSource(sparkSession, className)
90+
lazy val providingClass: Class[_] =
91+
DataSource.lookupDataSource(sparkSession.sessionState.conf, className)
9192
lazy val sourceInfo: SourceInfo = sourceSchema()
9293
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
9394
private val equality = sparkSession.sessionState.conf.resolver
@@ -570,10 +571,9 @@ object DataSource extends Logging {
570571
"org.apache.spark.Logging")
571572

572573
/** Given a provider name, look up the data source class definition. */
573-
def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = {
574+
def lookupDataSource(conf: SQLConf, provider: String): Class[_] = {
574575
var provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
575-
if (Seq("orc", "org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) &&
576-
sparkSession.conf.get(SQLConf.ORC_ENABLED)) {
576+
if (Seq("orc").contains(provider1.toLowerCase) && conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) {
577577
logInfo(s"$provider1 is replaced with ${classOf[OrcFileFormat].getCanonicalName}")
578578
provider1 = classOf[OrcFileFormat].getCanonicalName
579579
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
108108
}
109109

110110
// Check if the specified data source match the data source of the existing table.
111-
val existingProvider = DataSource.lookupDataSource(sparkSession, existingTable.provider.get)
112-
val specifiedProvider = DataSource.lookupDataSource(sparkSession, tableDesc.provider.get)
111+
val conf = sparkSession.sessionState.conf
112+
val existingProvider = DataSource.lookupDataSource(conf, existingTable.provider.get)
113+
val specifiedProvider = DataSource.lookupDataSource(conf, tableDesc.provider.get)
113114
// TODO: Check that options from the resolved relation match the relation that we are
114115
// inserting into (i.e. using the same compression).
115116
if (existingProvider != specifiedProvider) {

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2786,14 +2786,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
27862786
}
27872787

27882788
test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") {
2789-
withSQLConf(SQLConf.ORC_ENABLED.key -> "false") {
2789+
withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "false") {
27902790
val e = intercept[AnalysisException] {
27912791
sql("CREATE TABLE spark_20728(a INT) USING ORC")
27922792
}
27932793
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
27942794
}
27952795

2796-
withSQLConf(SQLConf.ORC_ENABLED.key -> "true") {
2796+
withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") {
27972797
withTable("spark_20728") {
27982798
sql("CREATE TABLE spark_20728(a INT) USING ORC")
27992799
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {

sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
5959
Seq(
6060
(true, "Unable to infer schema for ORC. It must be specified manually"),
6161
(false, "The ORC data source must be used with Hive support")).foreach { case (value, m) =>
62-
withSQLConf(SQLConf.ORC_ENABLED.key -> s"$value") {
62+
withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> s"$value") {
6363
val e = intercept[AnalysisException] {
6464
spark.read.format("orc").load()
6565
}

sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
478478
}
479479

480480
test("orc - API and behavior regarding schema") {
481-
withSQLConf(SQLConf.ORC_ENABLED.key -> "true") {
481+
withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") {
482482
// Writer
483483
spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).orc(dir)
484484
val df = spark.read.orc(dir)
@@ -507,7 +507,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
507507
}
508508

509509
test("column nullability and comment - write and then read") {
510-
withSQLConf(SQLConf.ORC_ENABLED.key -> "true") {
510+
withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") {
511511
Seq("json", "orc", "parquet", "csv").foreach { format =>
512512
val schema = StructType(
513513
StructField("cl1", IntegerType, nullable = false).withComment("test") ::

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ case class RelationConversions(
195195
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
196196
} else {
197197
val options = relation.tableMeta.storage.properties
198-
if (conf.getConf(SQLConf.ORC_ENABLED)) {
198+
if (conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) {
199199
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
200200
relation,
201201
options,

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2159,7 +2159,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
21592159
(true, classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]),
21602160
(false, classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (v, format) =>
21612161

2162-
withSQLConf(SQLConf.ORC_ENABLED.key -> s"$v") {
2162+
withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> s"$v") {
21632163
withTable("spark_20728") {
21642164
sql("CREATE TABLE spark_20728(a INT) USING ORC")
21652165
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {

0 commit comments

Comments
 (0)