Skip to content

[SPARK-20728][SQL] Make OrcFileFormat configurable between sql/hive and sql/core #19871

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,14 @@ object SQLConf {
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
.createWithDefault("snappy")

val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl")
.doc("When native, use the native version of ORC support instead of the ORC library in Hive " +
"1.2.1. It is 'hive' by default prior to Spark 2.3.")
.internal()
.stringConf
.checkValues(Set("hive", "native"))
.createWithDefault("native")

val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
"read files of Hive data source directly.")
}

val cls = DataSource.lookupDataSource(source)
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val options = new DataSourceV2Options(extraOptions.asJava)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

assertNotBucketed("save")

val cls = DataSource.lookupDataSource(source)
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
cls.newInstance() match {
case ds: WriteSupport =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -190,7 +191,7 @@ case class AlterTableAddColumnsCommand(
colsToAdd: Seq[StructField]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val catalogTable = verifyAlterTableAddColumn(catalog, table)
val catalogTable = verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table)

try {
sparkSession.catalog.uncacheTable(table.quotedString)
Expand All @@ -216,6 +217,7 @@ case class AlterTableAddColumnsCommand(
* For datasource table, it currently only supports parquet, json, csv.
*/
private def verifyAlterTableAddColumn(
conf: SQLConf,
catalog: SessionCatalog,
table: TableIdentifier): CatalogTable = {
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
Expand All @@ -229,7 +231,7 @@ case class AlterTableAddColumnsCommand(
}

if (DDLUtils.isDatasourceTable(catalogTable)) {
DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
DataSource.lookupDataSource(catalogTable.provider.get, conf).newInstance() match {
// For datasource table, this command can only support the following File format.
// TextFileFormat only default to one column "value"
// Hive type is already considered as hive serde table, so the logic will not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
Expand Down Expand Up @@ -85,7 +87,8 @@ case class DataSource(

case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])

lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
lazy val providingClass: Class[_] =
DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
Expand Down Expand Up @@ -537,6 +540,7 @@ object DataSource extends Logging {
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
val nativeOrc = classOf[OrcFileFormat].getCanonicalName

Map(
"org.apache.spark.sql.jdbc" -> jdbc,
Expand All @@ -553,6 +557,8 @@ object DataSource extends Logging {
"org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
"org.apache.spark.sql.hive.orc" -> orc,
"org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc,
"org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map is for backward compatibility in case we move data sources around. I think this datasources.orc is newly added. Why we need to add them here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch! sounds like we don't need compatibility rule for the new orc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like USING org.apache.spark.sql.hive.orc, we want to use USING org.apache.spark.sql.execution.datasources.orc, don't we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I received the advice, I thought it's for consistency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for safety. We also do it for parquet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For parquet, this is for historical reason, see #13311.

Previously you can use parquet by org.apache.spark.sql.execution.datasources.parquet and org.apache.spark.sql.execution.datasources.parquet.DefaultSource. So it is also for backward compatibility.

For this new orc, it is not the same case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename variable and/or fix the comments there when we touch some codes around there to prevent confusion next time though.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.spark.sql.execution.* path is meant to be private too. But I think it's okay to leave it for practical use cases with some comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are pretty safe. In case, we move the orc to the other location, it will still refer to the right location.

"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv
Expand All @@ -568,8 +574,16 @@ object DataSource extends Logging {
"org.apache.spark.Logging")

/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
classOf[OrcFileFormat].getCanonicalName
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
"org.apache.spark.sql.hive.orc.OrcFileFormat"
case name => name
}
val provider2 = s"$provider1.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
Expand All @@ -587,7 +601,8 @@ object DataSource extends Logging {
if (provider1.toLowerCase(Locale.ROOT) == "orc" ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provider1 can't be "orc" anymore. It can be only classOf[OrcFileFormat].getCanonicalName or "org.apache.spark.sql.hive.orc.OrcFileFormat".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I'll remove this, provider1.toLowerCase(Locale.ROOT) == "orc" ||.

provider1.startsWith("org.apache.spark.sql.hive.orc")) {
throw new AnalysisException(
"The ORC data source must be used with Hive support enabled")
"Hive-based ORC data source must be used with Hive support enabled. " +
"Please use native ORC data source instead")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make this more actionable, saying spark.sql.orc.impl should be set to native explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon .
For this one, I made #19903.

} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
provider1 == "com.databricks.spark.avro") {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
}

// Check if the specified data source match the data source of the existing table.
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
val conf = sparkSession.sessionState.conf
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get, conf)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get, conf)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
if (existingProvider != specifiedProvider) {
Expand Down
23 changes: 22 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1664,7 +1666,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
e = intercept[AnalysisException] {
sql(s"select id from `org.apache.spark.sql.hive.orc`.`file_path`")
}
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
assert(e.message.contains("Hive-based ORC data source must be used with Hive support"))

e = intercept[AnalysisException] {
sql(s"select id from `com.databricks.spark.avro`.`file_path`")
Expand Down Expand Up @@ -2782,4 +2784,23 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
assert(spark.read.format(orc).load(path).collect().length == 2)
}
}

test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") {
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") {
val e = intercept[AnalysisException] {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
}
assert(e.message.contains("Hive-based ORC data source must be used with Hive support"))
}

withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") {
withTable("spark_20728") {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {
case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
}
assert(fileFormat == Some(classOf[OrcFileFormat]))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
assert(spark.read.format("org.apache.spark.sql.sources.FakeSourceOne")
.load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false))))
}

test("should fail to load ORC without Hive Support") {
val e = intercept[AnalysisException] {
spark.read.format("orc").load()
}
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}


test("resolve default source") {
spark.read
.format("org.apache.spark.sql.test")
Expand Down Expand Up @@ -478,42 +477,56 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
spark.read.schema(userSchema).parquet(Seq(dir, dir): _*), expData ++ expData, userSchema)
}

/**
* This only tests whether API compiles, but does not run it as orc()
* cannot be run without Hive classes.
*/
ignore("orc - API") {
// Reader, with user specified schema
// Refer to csv-specific test suites for behavior without user specified schema
spark.read.schema(userSchema).orc()
spark.read.schema(userSchema).orc(dir)
spark.read.schema(userSchema).orc(dir, dir, dir)
spark.read.schema(userSchema).orc(Seq(dir, dir): _*)
Option(dir).map(spark.read.schema(userSchema).orc)
test("orc - API and behavior regarding schema") {
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") {
// Writer
spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).orc(dir)
val df = spark.read.orc(dir)
checkAnswer(df, spark.createDataset(data).toDF())
val schema = df.schema

// Writer
spark.range(10).write.orc(dir)
// Reader, without user specified schema
intercept[AnalysisException] {
testRead(spark.read.orc(), Seq.empty, schema)
}
testRead(spark.read.orc(dir), data, schema)
testRead(spark.read.orc(dir, dir), data ++ data, schema)
testRead(spark.read.orc(Seq(dir, dir): _*), data ++ data, schema)
// Test explicit calls to single arg method - SPARK-16009
testRead(Option(dir).map(spark.read.orc).get, data, schema)

// Reader, with user specified schema, data should be nulls as schema in file different
// from user schema
val expData = Seq[String](null, null, null)
testRead(spark.read.schema(userSchema).orc(), Seq.empty, userSchema)
testRead(spark.read.schema(userSchema).orc(dir), expData, userSchema)
testRead(spark.read.schema(userSchema).orc(dir, dir), expData ++ expData, userSchema)
testRead(
spark.read.schema(userSchema).orc(Seq(dir, dir): _*), expData ++ expData, userSchema)
}
}

test("column nullability and comment - write and then read") {
Seq("json", "parquet", "csv").foreach { format =>
val schema = StructType(
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
StructField("cl2", IntegerType, nullable = true) ::
StructField("cl3", IntegerType, nullable = true) :: Nil)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format(format).mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") {
Seq("json", "orc", "parquet", "csv").foreach { format =>
val schema = StructType(
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
StructField("cl2", IntegerType, nullable = true) ::
StructField("cl3", IntegerType, nullable = true) :: Nil)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format(format).mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}


Expand Down Expand Up @@ -195,8 +194,19 @@ case class RelationConversions(
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.storage.properties
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
Expand Down Expand Up @@ -621,4 +621,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
makeOrcFile((1 to 10).map(Tuple1.apply), path2)
assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count())
}

test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") {
Seq(
("native", classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]),
("hive", classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (i, format) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i => orcImpl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one, I will update #19882 .
I updated in my local and am running some tests.


withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> i) {
withTable("spark_20728") {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {
case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
}
assert(fileFormat == Some(format))
}
}
}
}
}