Skip to content

[SPARK-25391][SQL] Make behaviors consistent when converting parquet hive table to parquet data source #22343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ class ParquetFileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)

hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
Expand All @@ -313,6 +315,10 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
hadoopConf.set(
ParquetOptions.DUPLICATED_FIELDS_RESOLUTION_MODE,
parquetOptions.duplicatedFieldsResolutionMode
)

ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,25 @@ class ParquetOptions(
.get(MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sqlConf.isParquetSchemaMergingEnabled)

/**
* How to resolve duplicated field names. By default, parquet data source fails when hitting
* duplicated field names in case-insensitive mode. When converting hive parquet table to parquet
* data source, we need to ask parquet data source to pick the first matched field - the same
* behavior as hive parquet table - to keep behaviors consistent.
*/
val duplicatedFieldsResolutionMode: String = {
parameters.getOrElse(DUPLICATED_FIELDS_RESOLUTION_MODE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should leave this for Parquet options for now. Can we just have a SQL config to control this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether we have a SQL config for it or not, we must define an option here. The conversion happens per-query, so we must have a per-query option to switch the behavior, instead of a per-session SQL config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion itself happens per query but my impression is that the different values don't usually happen in per-query. I mean, I was wondering if users want to set this query by query.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a little unusual. Usually we have a SQL config first, then we create an option for it if necessary. In this case, we are not adding a config/option from user's requirement, but we need it for an internal optimization.

If we can I would suggest we make it an internal option. But anyway we shouldn't rush to add a SQL config, until we get requirement from users.

ParquetDuplicatedFieldsResolutionMode.FAIL.toString)
}
}


object ParquetOptions {
val MERGE_SCHEMA = "mergeSchema"

val DUPLICATED_FIELDS_RESOLUTION_MODE = "duplicatedFieldsResolutionMode"

// The parquet compression short names
private val shortParquetCompressionCodecNames = Map(
"none" -> CompressionCodecName.UNCOMPRESSED,
Expand All @@ -90,3 +103,7 @@ object ParquetOptions {
shortParquetCompressionCodecNames(name).name()
}
}

object ParquetDuplicatedFieldsResolutionMode extends Enumeration {
val FAIL, FIRST_MATCH = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,16 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
* readers. Responsible for figuring out Parquet requested schema used for column pruning.
*/
override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration

catalystRequestedSchema = {
val conf = context.getConfiguration
val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
}

val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
SQLConf.CASE_SENSITIVE.defaultValue.get)
val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
context.getFileSchema, catalystRequestedSchema, caseSensitive)
context.getFileSchema, catalystRequestedSchema, conf)

new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}
Expand Down Expand Up @@ -123,9 +122,9 @@ private[parquet] object ParquetReadSupport {
def clipParquetSchema(
parquetSchema: MessageType,
catalystSchema: StructType,
caseSensitive: Boolean = true): MessageType = {
conf: Configuration): MessageType = {
val clippedParquetFields = clipParquetGroupFields(
parquetSchema.asGroupType(), catalystSchema, caseSensitive)
parquetSchema.asGroupType(), catalystSchema, conf)
if (clippedParquetFields.isEmpty) {
ParquetSchemaConverter.EMPTY_MESSAGE
} else {
Expand All @@ -137,20 +136,20 @@ private[parquet] object ParquetReadSupport {
}

private def clipParquetType(
parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type = {
parquetType: Type, catalystType: DataType, conf: Configuration): Type = {
catalystType match {
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
// Only clips array types with nested type as element type.
clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive)
clipParquetListType(parquetType.asGroupType(), t.elementType, conf)

case t: MapType
if !isPrimitiveCatalystType(t.keyType) ||
!isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested key type or value type
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive)
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, conf)

case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t, caseSensitive)
clipParquetGroup(parquetType.asGroupType(), t, conf)

case _ =>
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
Expand All @@ -177,14 +176,14 @@ private[parquet] object ParquetReadSupport {
* [[StructType]].
*/
private def clipParquetListType(
parquetList: GroupType, elementType: DataType, caseSensitive: Boolean): Type = {
parquetList: GroupType, elementType: DataType, conf: Configuration): Type = {
// Precondition of this method, should only be called for lists with nested element types.
assert(!isPrimitiveCatalystType(elementType))

// Unannotated repeated group should be interpreted as required list of required element, so
// list element type is just the group itself. Clip it.
if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
clipParquetType(parquetList, elementType, caseSensitive)
clipParquetType(parquetList, elementType, conf)
} else {
assert(
parquetList.getOriginalType == OriginalType.LIST,
Expand Down Expand Up @@ -216,7 +215,7 @@ private[parquet] object ParquetReadSupport {
Types
.buildGroup(parquetList.getRepetition)
.as(OriginalType.LIST)
.addField(clipParquetType(repeatedGroup, elementType, caseSensitive))
.addField(clipParquetType(repeatedGroup, elementType, conf))
.named(parquetList.getName)
} else {
// Otherwise, the repeated field's type is the element type with the repeated field's
Expand All @@ -227,7 +226,7 @@ private[parquet] object ParquetReadSupport {
.addField(
Types
.repeatedGroup()
.addField(clipParquetType(repeatedGroup.getType(0), elementType, caseSensitive))
.addField(clipParquetType(repeatedGroup.getType(0), elementType, conf))
.named(repeatedGroup.getName))
.named(parquetList.getName)
}
Expand All @@ -243,7 +242,7 @@ private[parquet] object ParquetReadSupport {
parquetMap: GroupType,
keyType: DataType,
valueType: DataType,
caseSensitive: Boolean): GroupType = {
conf: Configuration): GroupType = {
// Precondition of this method, only handles maps with nested key types or value types.
assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))

Expand All @@ -255,8 +254,8 @@ private[parquet] object ParquetReadSupport {
Types
.repeatedGroup()
.as(repeatedGroup.getOriginalType)
.addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
.addField(clipParquetType(parquetValueType, valueType, caseSensitive))
.addField(clipParquetType(parquetKeyType, keyType, conf))
.addField(clipParquetType(parquetValueType, valueType, conf))
.named(repeatedGroup.getName)

Types
Expand All @@ -275,8 +274,8 @@ private[parquet] object ParquetReadSupport {
* pruning.
*/
private def clipParquetGroup(
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): GroupType = {
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseSensitive)
parquetRecord: GroupType, structType: StructType, conf: Configuration): GroupType = {
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, conf)
Types
.buildGroup(parquetRecord.getRepetition)
.as(parquetRecord.getOriginalType)
Expand All @@ -290,15 +289,17 @@ private[parquet] object ParquetReadSupport {
* @return A list of clipped [[GroupType]] fields, which can be empty.
*/
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = {
parquetRecord: GroupType, structType: StructType, conf: Configuration): Seq[Type] = {
val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
SQLConf.CASE_SENSITIVE.defaultValue.get)
val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
if (caseSensitive) {
val caseSensitiveParquetFieldMap =
parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
structType.map { f =>
caseSensitiveParquetFieldMap
.get(f.name)
.map(clipParquetType(_, f.dataType, caseSensitive))
.map(clipParquetType(_, f.dataType, conf))
.getOrElse(toParquet.convertField(f))
}
} else {
Expand All @@ -310,12 +311,19 @@ private[parquet] object ParquetReadSupport {
.get(f.name.toLowerCase(Locale.ROOT))
.map { parquetTypes =>
if (parquetTypes.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ +
s"$parquetTypesString in case-insensitive mode")
val resolutionMode = ParquetDuplicatedFieldsResolutionMode.withName(
conf.get(ParquetOptions.DUPLICATED_FIELDS_RESOLUTION_MODE,
ParquetDuplicatedFieldsResolutionMode.FAIL.toString))
resolutionMode match {
case ParquetDuplicatedFieldsResolutionMode.FAIL =>
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ +
s"$parquetTypesString in case-insensitive mode")
case ParquetDuplicatedFieldsResolutionMode.FIRST_MATCH =>
clipParquetType(parquetTypes.head, f.dataType, conf)
}
} else {
clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
clipParquetType(parquetTypes.head, f.dataType, conf)
}
}.getOrElse(toParquet.convertField(f))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.io.ParquetDecodingException
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

Expand Down Expand Up @@ -1015,20 +1016,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: String,
caseSensitive: Boolean = true): Unit = {
conf: Configuration = {
val conf = new Configuration()
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, true)
conf
}): Unit = {
testSchemaClipping(testName, parquetSchema, catalystSchema,
MessageTypeParser.parseMessageType(expectedSchema), caseSensitive)
MessageTypeParser.parseMessageType(expectedSchema), conf)
}

private def testSchemaClipping(
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: MessageType,
caseSensitive: Boolean): Unit = {
conf: Configuration): Unit = {
test(s"Clipping - $testName") {
val actual = ParquetReadSupport.clipParquetSchema(
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive)
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, conf)

try {
expectedSchema.checkContains(actual)
Expand Down Expand Up @@ -1390,7 +1395,11 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
catalystSchema = new StructType(),

expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE,
caseSensitive = true)
conf = {
val conf = new Configuration()
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it the default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan There is no default value for spark.sql.caseSensitive in Configuration. Let me explain in more details below.

This is one of the overloaded methods of testSchemaClipping. I tried to give this testSchemaClipping method a default conf, however Scalac complains that

in class ParquetSchemaSuite, multiple overloaded alternatives of testSchemaClipping define default arguments

private def testSchemaClipping(
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: String,
conf: Configuration = {

private def testSchemaClipping(
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: MessageType,
conf: Configuration): Unit = {

It seems a little confusing, because these two methods have different parameter types. After a brief investigation, I found Scala compiler simply disallows overloaded methods with default arguments even when these methods have different parameter types.

https://stackoverflow.com/questions/4652095/why-does-the-scala-compiler-disallow-overloaded-methods-with-default-arguments

conf
})

testSchemaClipping(
"disjoint field sets",
Expand Down Expand Up @@ -1572,9 +1581,13 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| optional int32 c;
|}
""".stripMargin,
caseSensitive = false)
conf = {
val conf = new Configuration()
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, false)
conf
})

test("Clipping - case-insensitive resolution: more than one field is matched") {
test("Clipping - case-insensitive resolution with ambiguity: fail to resolve fields") {
val parquetSchema =
"""message root {
| required group A {
Expand All @@ -1590,9 +1603,45 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
.add("a", nestedType, nullable = true)
.add("c", IntegerType, nullable = true)
}

assertThrows[RuntimeException] {
val conf = new Configuration()
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, false)
ParquetReadSupport.clipParquetSchema(
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive = false)
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, conf)
}
}

testSchemaClipping(
"case-insensitive resolution with ambiguity: pick the first matched field",
parquetSchema =
"""message root {
| required group A {
| optional int32 B;
| }
| optional int32 c;
| optional int32 a;
|}
""".stripMargin,
catalystSchema = {
val nestedType = new StructType().add("b", IntegerType, nullable = true)
new StructType()
.add("a", nestedType, nullable = true)
.add("c", IntegerType, nullable = true)
},
expectedSchema =
"""message root {
| required group A {
| optional int32 B;
| }
| optional int32 c;
|}
""".stripMargin,
conf = {
val conf = new Configuration()
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, false)
conf.set(ParquetOptions.DUPLICATED_FIELDS_RESOLUTION_MODE,
ParquetDuplicatedFieldsResolutionMode.FIRST_MATCH.toString)
conf
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetDuplicatedFieldsResolutionMode,
ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}

Expand Down Expand Up @@ -181,9 +182,20 @@ case class RelationConversions(
conf: SQLConf,
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
private def isConvertible(relation: HiveTableRelation): Boolean = {
var isConvertible = false
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
if (serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)) {
if (conf.getConf(SQLConf.CASE_SENSITIVE)) {
logWarning("The behavior must be consistent to do the conversion. We skip the " +
"conversion in case-sensitive mode because hive parquet table always do " +
"case-insensitive field resolution.")
} else {
isConvertible = true
}
} else if (serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)) {
isConvertible = true
}
isConvertible
}

// Return true for Apache ORC and Hive ORC-related configuration names.
Expand All @@ -200,9 +212,14 @@ case class RelationConversions(
// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
logInfo("When converting hive parquet table to parquet data source, we switch the " +
"duplicated fields resolution mode to ask parquet data source to pick the first matched " +
"field - the same behavior as hive parquet table - to keep behaviors consistent.")
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) +
(ParquetOptions.DUPLICATED_FIELDS_RESOLUTION_MODE ->
ParquetDuplicatedFieldsResolutionMode.FIRST_MATCH.toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
Expand Down
Loading