-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-20728][SQL] Make OrcFileFormat configurable between sql/hive and sql/core #19871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
37e240c
e7beb02
8b7e88a
2e498f9
5474a07
2393e1d
8bc420a
e3f6f75
7fac88f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,8 +36,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | |
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat | ||
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider | ||
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat | ||
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat | ||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||
import org.apache.spark.sql.execution.streaming._ | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.sources._ | ||
import org.apache.spark.sql.streaming.OutputMode | ||
import org.apache.spark.sql.types.{CalendarIntervalType, StructType} | ||
|
@@ -85,7 +87,8 @@ case class DataSource( | |
|
||
case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) | ||
|
||
lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) | ||
lazy val providingClass: Class[_] = | ||
DataSource.lookupDataSource(className, sparkSession.sessionState.conf) | ||
lazy val sourceInfo: SourceInfo = sourceSchema() | ||
private val caseInsensitiveOptions = CaseInsensitiveMap(options) | ||
private val equality = sparkSession.sessionState.conf.resolver | ||
|
@@ -537,6 +540,7 @@ object DataSource extends Logging { | |
val csv = classOf[CSVFileFormat].getCanonicalName | ||
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" | ||
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" | ||
val nativeOrc = classOf[OrcFileFormat].getCanonicalName | ||
|
||
Map( | ||
"org.apache.spark.sql.jdbc" -> jdbc, | ||
|
@@ -553,6 +557,8 @@ object DataSource extends Logging { | |
"org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet, | ||
"org.apache.spark.sql.hive.orc.DefaultSource" -> orc, | ||
"org.apache.spark.sql.hive.orc" -> orc, | ||
"org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc, | ||
"org.apache.spark.sql.execution.datasources.orc" -> nativeOrc, | ||
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, | ||
"org.apache.spark.ml.source.libsvm" -> libsvm, | ||
"com.databricks.spark.csv" -> csv | ||
|
@@ -568,8 +574,16 @@ object DataSource extends Logging { | |
"org.apache.spark.Logging") | ||
|
||
/** Given a provider name, look up the data source class definition. */ | ||
def lookupDataSource(provider: String): Class[_] = { | ||
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) | ||
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = { | ||
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match { | ||
case name if name.equalsIgnoreCase("orc") && | ||
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => | ||
classOf[OrcFileFormat].getCanonicalName | ||
case name if name.equalsIgnoreCase("orc") && | ||
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => | ||
"org.apache.spark.sql.hive.orc.OrcFileFormat" | ||
case name => name | ||
} | ||
val provider2 = s"$provider1.DefaultSource" | ||
val loader = Utils.getContextOrSparkClassLoader | ||
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) | ||
|
@@ -587,7 +601,8 @@ object DataSource extends Logging { | |
if (provider1.toLowerCase(Locale.ROOT) == "orc" || | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep. I'll remove this, |
||
provider1.startsWith("org.apache.spark.sql.hive.orc")) { | ||
throw new AnalysisException( | ||
"The ORC data source must be used with Hive support enabled") | ||
"Hive-based ORC data source must be used with Hive support enabled. " + | ||
"Please use native ORC data source instead") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should make this more actionable, saying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @HyukjinKwon . |
||
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" || | ||
provider1 == "com.databricks.spark.avro") { | ||
throw new AnalysisException( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll | |
import org.apache.spark.sql._ | ||
import org.apache.spark.sql.catalyst.TableIdentifier | ||
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation | ||
import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator} | ||
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} | ||
import org.apache.spark.sql.hive.HiveUtils | ||
import org.apache.spark.sql.hive.test.TestHive._ | ||
import org.apache.spark.sql.hive.test.TestHive.implicits._ | ||
|
@@ -621,4 +621,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { | |
makeOrcFile((1 to 10).map(Tuple1.apply), path2) | ||
assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count()) | ||
} | ||
|
||
test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { | ||
Seq( | ||
("native", classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), | ||
("hive", classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (i, format) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this one, I will update #19882 . |
||
|
||
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> i) { | ||
withTable("spark_20728") { | ||
sql("CREATE TABLE spark_20728(a INT) USING ORC") | ||
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { | ||
case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass | ||
} | ||
assert(fileFormat == Some(format)) | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This map is for backward compatibility in case we move data sources around. I think this
datasources.orc
is newly added. Why we need to add them here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch! sounds like we don't need compatibility rule for the new orc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like
USING org.apache.spark.sql.hive.orc
, we want to useUSING org.apache.spark.sql.execution.datasources.orc
, don't we?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I received the advice, I thought it's for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for safety. We also do it for parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For parquet, this is for historical reason, see #13311.
Previously you can use parquet by
org.apache.spark.sql.execution.datasources.parquet
andorg.apache.spark.sql.execution.datasources.parquet.DefaultSource
. So it is also for backward compatibility.For this new orc, it is not the same case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should rename variable and/or fix the comments there when we touch some codes around there to prevent confusion next time though.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.spark.sql.execution.*
path is meant to be private too. But I think it's okay to leave it for practical use cases with some comments.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are pretty safe. In case, we move the orc to the other location, it will still refer to the right location.