-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25391][SQL] Make behaviors consistent when converting parquet hive table to parquet data source #22343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet | |||||||||||||||||||||||||
import scala.reflect.ClassTag | ||||||||||||||||||||||||||
import scala.reflect.runtime.universe.TypeTag | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
import org.apache.hadoop.conf.Configuration | ||||||||||||||||||||||||||
import org.apache.parquet.io.ParquetDecodingException | ||||||||||||||||||||||||||
import org.apache.parquet.schema.{MessageType, MessageTypeParser} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
@@ -1015,20 +1016,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | |||||||||||||||||||||||||
parquetSchema: String, | ||||||||||||||||||||||||||
catalystSchema: StructType, | ||||||||||||||||||||||||||
expectedSchema: String, | ||||||||||||||||||||||||||
caseSensitive: Boolean = true): Unit = { | ||||||||||||||||||||||||||
conf: Configuration = { | ||||||||||||||||||||||||||
val conf = new Configuration() | ||||||||||||||||||||||||||
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, true) | ||||||||||||||||||||||||||
conf | ||||||||||||||||||||||||||
}): Unit = { | ||||||||||||||||||||||||||
testSchemaClipping(testName, parquetSchema, catalystSchema, | ||||||||||||||||||||||||||
MessageTypeParser.parseMessageType(expectedSchema), caseSensitive) | ||||||||||||||||||||||||||
MessageTypeParser.parseMessageType(expectedSchema), conf) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
private def testSchemaClipping( | ||||||||||||||||||||||||||
testName: String, | ||||||||||||||||||||||||||
parquetSchema: String, | ||||||||||||||||||||||||||
catalystSchema: StructType, | ||||||||||||||||||||||||||
expectedSchema: MessageType, | ||||||||||||||||||||||||||
caseSensitive: Boolean): Unit = { | ||||||||||||||||||||||||||
conf: Configuration): Unit = { | ||||||||||||||||||||||||||
test(s"Clipping - $testName") { | ||||||||||||||||||||||||||
val actual = ParquetReadSupport.clipParquetSchema( | ||||||||||||||||||||||||||
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive) | ||||||||||||||||||||||||||
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, conf) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||
expectedSchema.checkContains(actual) | ||||||||||||||||||||||||||
|
@@ -1390,7 +1395,11 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | |||||||||||||||||||||||||
catalystSchema = new StructType(), | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE, | ||||||||||||||||||||||||||
caseSensitive = true) | ||||||||||||||||||||||||||
conf = { | ||||||||||||||||||||||||||
val conf = new Configuration() | ||||||||||||||||||||||||||
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, true) | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't it the default value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan There is no default value for This is one of the overloaded methods of
Lines 1014 to 1019 in 95673cd
Lines 1028 to 1033 in 95673cd
It seems a little confusing, because these two methods have different parameter types. After a brief investigation, I found Scala compiler simply disallows overloaded methods with default arguments even when these methods have different parameter types. |
||||||||||||||||||||||||||
conf | ||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
testSchemaClipping( | ||||||||||||||||||||||||||
"disjoint field sets", | ||||||||||||||||||||||||||
|
@@ -1572,9 +1581,13 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | |||||||||||||||||||||||||
| optional int32 c; | ||||||||||||||||||||||||||
|} | ||||||||||||||||||||||||||
""".stripMargin, | ||||||||||||||||||||||||||
caseSensitive = false) | ||||||||||||||||||||||||||
conf = { | ||||||||||||||||||||||||||
val conf = new Configuration() | ||||||||||||||||||||||||||
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, false) | ||||||||||||||||||||||||||
conf | ||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
test("Clipping - case-insensitive resolution: more than one field is matched") { | ||||||||||||||||||||||||||
test("Clipping - case-insensitive resolution with ambiguity: fail to resolve fields") { | ||||||||||||||||||||||||||
val parquetSchema = | ||||||||||||||||||||||||||
"""message root { | ||||||||||||||||||||||||||
| required group A { | ||||||||||||||||||||||||||
|
@@ -1590,9 +1603,45 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | |||||||||||||||||||||||||
.add("a", nestedType, nullable = true) | ||||||||||||||||||||||||||
.add("c", IntegerType, nullable = true) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
assertThrows[RuntimeException] { | ||||||||||||||||||||||||||
val conf = new Configuration() | ||||||||||||||||||||||||||
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, false) | ||||||||||||||||||||||||||
ParquetReadSupport.clipParquetSchema( | ||||||||||||||||||||||||||
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive = false) | ||||||||||||||||||||||||||
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, conf) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
testSchemaClipping( | ||||||||||||||||||||||||||
"case-insensitive resolution with ambiguity: pick the first matched field", | ||||||||||||||||||||||||||
parquetSchema = | ||||||||||||||||||||||||||
"""message root { | ||||||||||||||||||||||||||
| required group A { | ||||||||||||||||||||||||||
| optional int32 B; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| optional int32 c; | ||||||||||||||||||||||||||
| optional int32 a; | ||||||||||||||||||||||||||
|} | ||||||||||||||||||||||||||
""".stripMargin, | ||||||||||||||||||||||||||
catalystSchema = { | ||||||||||||||||||||||||||
val nestedType = new StructType().add("b", IntegerType, nullable = true) | ||||||||||||||||||||||||||
new StructType() | ||||||||||||||||||||||||||
.add("a", nestedType, nullable = true) | ||||||||||||||||||||||||||
.add("c", IntegerType, nullable = true) | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
expectedSchema = | ||||||||||||||||||||||||||
"""message root { | ||||||||||||||||||||||||||
| required group A { | ||||||||||||||||||||||||||
| optional int32 B; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| optional int32 c; | ||||||||||||||||||||||||||
|} | ||||||||||||||||||||||||||
""".stripMargin, | ||||||||||||||||||||||||||
conf = { | ||||||||||||||||||||||||||
val conf = new Configuration() | ||||||||||||||||||||||||||
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, false) | ||||||||||||||||||||||||||
conf.set(ParquetOptions.DUPLICATED_FIELDS_RESOLUTION_MODE, | ||||||||||||||||||||||||||
ParquetDuplicatedFieldsResolutionMode.FIRST_MATCH.toString) | ||||||||||||||||||||||||||
conf | ||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should leave this for Parquet options for now. Can we just have a SQL config to control this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whether we have a SQL config for it or not, we must define an option here. The conversion happens per-query, so we must have a per-query option to switch the behavior, instead of a per-session SQL config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conversion itself happens per query but my impression is that the different values don't usually happen in per-query. I mean, I was wondering if users want to set this query by query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is a little unusual. Usually we have a SQL config first, then we create an option for it if necessary. In this case, we are not adding a config/option from user's requirement, but we need it for an internal optimization.
If we can I would suggest we make it an internal option. But anyway we shouldn't rush to add a SQL config, until we get requirement from users.