Skip to content

[SPARK-16628][SQL] Don't convert Orc Metastore tables to datasource tables if metastore schema does not match schema stored in the files #14282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
options: Map[String, String],
defaultSource: FileFormat,
fileFormatClass: Class[_ <: FileFormat],
fileType: String): LogicalRelation = {
fileType: String): Option[LogicalRelation] = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
Expand Down Expand Up @@ -285,7 +285,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
bucketSpec,
Some(partitionSpec))

val hadoopFsRelation = cached.getOrElse {
val hadoopFsRelation = if (cached.isDefined) {
cached
} else {
val fileCatalog = new MetaStorePartitionedTableFileCatalog(
sparkSession,
new Path(metastoreRelation.catalogTable.storage.locationUri.get),
Expand All @@ -301,21 +303,27 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get
}

val relation = HadoopFsRelation(
sparkSession = sparkSession,
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = inferredSchema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
options = options)

val created = LogicalRelation(
relation,
metastoreTableIdentifier =
Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database))))
cachedDataSourceTables.put(tableIdentifier, created)
created
// If the inferred schema from the data doesn't match the schema stored in metastore,
// we should not convert the MetastoreRelation to data source relation.
if (!DataType.equalsIgnoreCompatibleNullability(inferredSchema, metastoreSchema)) {
None
} else {
val relation = HadoopFsRelation(
sparkSession = sparkSession,
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = inferredSchema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
options = options)

val created = LogicalRelation(
relation,
metastoreTableIdentifier =
Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database))))
cachedDataSourceTables.put(tableIdentifier, created)
Some(created)
}
}

hadoopFsRelation
Expand Down Expand Up @@ -347,9 +355,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
created
}

logicalRelation
Some(logicalRelation)
}
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
result.map(_.copy(expectedOutputAttributes = Some(metastoreRelation.output)))
}

/**
Expand All @@ -362,7 +370,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
sessionState.convertMetastoreParquet
}

private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = {
private def convertToParquetRelation(relation: MetastoreRelation): Option[LogicalRelation] = {
val defaultSource = new ParquetFileFormat()
val fileFormatClass = classOf[ParquetFileFormat]

Expand All @@ -379,15 +387,24 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log

plan transformUp {
// Write path
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
case i @ InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)
val parquetRelation = convertToParquetRelation(r)
if (parquetRelation.isDefined) {
InsertIntoTable(parquetRelation.get, partition, child, overwrite, ifNotExists)
} else {
i
}

// Read path
case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
val parquetRelation = convertToParquetRelation(relation)
SubqueryAlias(relation.tableName, parquetRelation)
if (parquetRelation.isDefined) {
SubqueryAlias(relation.tableName, parquetRelation.get)
} else {
relation
}
}
}
}
Expand All @@ -402,7 +419,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
sessionState.convertMetastoreOrc
}

private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = {
private def convertToOrcRelation(relation: MetastoreRelation): Option[LogicalRelation] = {
val defaultSource = new OrcFileFormat()
val fileFormatClass = classOf[OrcFileFormat]
val options = Map[String, String]()
Expand All @@ -417,15 +434,24 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log

plan transformUp {
// Write path
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
case i @ InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Orc data source (yet).
if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)
val orcRelation = convertToOrcRelation(r)
if (orcRelation.isDefined) {
InsertIntoTable(orcRelation.get, partition, child, overwrite, ifNotExists)
} else {
i
}

// Read path
case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
val orcRelation = convertToOrcRelation(relation)
SubqueryAlias(relation.tableName, orcRelation)
if (orcRelation.isDefined) {
SubqueryAlias(relation.tableName, orcRelation.get)
} else {
relation
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,40 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}

test("No ORC conversion when metastore schema does not match schema stored in ORC files") {
withTempTable("single") {
val singleRowDF = Seq((0, "foo")).toDF("key", "value")
singleRowDF.createOrReplaceTempView("single")

withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") {
withTable("dummy_orc") {
withTempPath { dir =>
val path = dir.getCanonicalPath
singleRowDF.write.partitionBy("key").orc(path)

// Create a Metastore ORC table with different schema.
spark.sql(
s"""
|CREATE TABLE dummy_orc(value STRING, value2 STRING)
|PARTITIONED BY (key INT)
|STORED AS ORC
|LOCATION '$path'
""".stripMargin)

val df = spark.sql("SELECT key, value FROM dummy_orc WHERE key=0")
val queryExecution = df.queryExecution
queryExecution.analyzed.collectFirst {
case _: MetastoreRelation => ()
}.getOrElse {
fail(s"Expecting no conversion from orc to data sources, " +
s"but got:\n$queryExecution")
}
}
}
}
}
}

test("SPARK-14962 Produce correct results on array type with isnotnull") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val data = (0 until 10).map(i => Tuple1(Array(i)))
Expand Down