-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter #27888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter #27888
Conversation
* As StructType only accept case senstive field name mapping, use explicit field name to field index mapping based on case sensitivty * For check case sensitivity get case sensitivity flag from ParquetReadSupport * Also, add test cases to check column selection for each cases
ok to test |
Thank you for making your first contribution, @kimtkyeom . |
@@ -804,6 +804,162 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS | |||
} | |||
} | |||
|
|||
test("SPARK-31116: Select simple parquet columns correctly in case insensitive manner") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you move new test cases into FileBasedDataSourceSuite
and run with Orc/Parquet/Json
at least?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll test soon. however, could these new test cases apply to Orc and Json also?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Please start with all of three and comment out if ORC/Json fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested ORC and Json file format and there exist some failures.
Json test failure
Json passed case sensitive cases, but it failed in case insensitive case
[info] - SPARK-31116: Select simple columns correctly in case insensitive manner *** FAILED *** (4 seconds, 277 milliseconds)
[info] Results do not match for query:
[info] Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
[info] Timezone Env:
[info]
[info] == Parsed Logical Plan ==
[info] Relation[camelcase#56] json
[info]
[info] == Analyzed Logical Plan ==
[info] camelcase: string
[info] Relation[camelcase#56] json
[info]
[info] == Optimized Logical Plan ==
[info] Relation[camelcase#56] json
[info]
[info] == Physical Plan ==
[info] FileScan json [camelcase#56] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex[file:/Users/kimtkyeom/Dev/spark_devel/target/tmp/spark-95f1357a-85c9-444f-bdcc-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<camelcase:string>
[info]
[info] == Results ==
[info]
[info] == Results ==
[info] !== Correct Answer - 1 == == Spark Answer - 1 ==
[info] !struct<> struct<camelcase:string>
[info] ![A] [null] (QueryTest.scala:248)
[info] - SPARK-31116: Select nested columns correctly in case insensitive manner *** FAILED *** (2 seconds, 117 milliseconds)
[info] Results do not match for query:
[info] Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
[info] Timezone Env:
[info]
[info] == Parsed Logical Plan ==
[info] Relation[StructColumn#147] json
[info]
[info] == Analyzed Logical Plan ==
[info] StructColumn: struct<LowerCase:bigint,camelcase:bigint>
[info] Relation[StructColumn#147] json
[info]
[info] == Optimized Logical Plan ==
[info] Relation[StructColumn#147] json
[info]
[info] == Physical Plan ==
[info] FileScan json [StructColumn#147] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex[file:/Users/kimtkyeom/Dev/spark_devel/target/tmp/spark-f9ecd1a4-e5aa-4dd7-bdfd-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<StructColumn:struct<LowerCase:bigint,camelcase:bigint>>
[info]
[info] == Results ==
[info]
[info] == Results ==
[info] !== Correct Answer - 1 == == Spark Answer - 1 ==
[info] !struct<> struct<StructColumn:struct<LowerCase:bigint,camelcase:bigint>>
[info] ![[0,1]] [[null,null]] (QueryTest.scala:248)
ORC test failure
ORC passed case insensitive test cases, but it failed case sensitive manner.
[info] - SPARK-31116: Select nested columns correctly in case sensitive manner *** FAILED *** (871 milliseconds)
[info] Results do not match for query:
[info] Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
[info] Timezone Env:
[info]
[info] == Parsed Logical Plan ==
[info] Relation[StructColumn#329] json
[info]
[info] == Analyzed Logical Plan ==
[info] StructColumn: struct<LowerCase:bigint,camelcase:bigint>
[info] Relation[StructColumn#329] json
[info]
[info] == Optimized Logical Plan ==
[info] Relation[StructColumn#329] json
[info]
[info] == Physical Plan ==
[info] FileScan json [StructColumn#329] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex[file:/Users/kimtkyeom/Dev/spark_devel/target/tmp/spark-612baf76-a9d0-41e5-89f4-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<StructColumn:struct<LowerCase:bigint,camelcase:bigint>>
[info]
[info] == Results ==
[info]
[info] == Results ==
[info] !== Correct Answer - 1 == == Spark Answer - 1 ==
[info] !struct<> struct<StructColumn:struct<LowerCase:bigint,camelcase:bigint>>
[info] ![null] [[null,null]] (QueryTest.scala:248)
But i think ORC failure is due to difference between materializing Row. Is there clean way to test properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, I noticed that json does not follow case sensitivity even in spark 2.4.4. Below is my local machine test using spark-shell
20/03/12 19:20:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/12 19:20:24 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://61.75.36.130:4041
Spark context available as 'sc' (master = local[*], app id = local-1584008425035).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val df = Seq("A").toDF("camelCase")
df: org.apache.spark.sql.DataFrame = [camelCase: string]
scala> df.write.format("json").save("./json_simple")
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val sch2 = new StructType().add("camelcase", StringType)
sch2: org.apache.spark.sql.types.StructType = StructType(StructField(camelcase,StringType,true))
scala> spark.read.format("json").schema(sch2).load("./json_simple").show()
+---------+
|camelcase|
+---------+
| null. |
+---------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for checking. Could you file a JIRA for regressions only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update your PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I updated my PR and create jira issue. As other file formats (orc and json) also fail these test cases I omit to check these formats by now, just moved current test cases into FileBasedDataSourceSuite. I think it would be added when regression will be fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was the ORC case only when spark.sql.optimizer.nestedSchemaPruning.enabled
is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked test, but it produce same result above regardless of nestedSchemaPruning option
cc @cloud-fan , @gengliangwang and @yhuai |
cc @rxin as a release manager for 3.0.0. (Also, cc @gatorsmile ) |
Is this a regression in 3.0? If it is, do you know which commit/PR caused it? |
@cloud-fan After #22880, IllegalArgumentException is thrown. |
is it possible to normalize column names before entering these low-level parquet classes? |
Maybe possible in 'ParquetReadSupport' similar to clipping parquet requested schema then generate normalized caralystRequestedSchema I think. But i could not found clean way to normalize it, espacially normalization through Array type is quite complicated. |
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to spark.sql.optimizer.nestedSchemaPruning.enabled
which is enabled by default as of SPARK-29805 although some of the new configurations and fixes were landed in 3.0 such as SPARK-26837, SPARK-27707 and SPARK-25407
I have seen a couple of such issues such as SPARK-29721 and SPARK-31116 during this code freeze.
Should we really enable it by default, @dbtsai?
.../src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
Show resolved
Hide resolved
@HyukjinKwon I'm not deeply investigated on 2.4.x, but i'm not experienced this issue in 2.4.x. |
* As there is regressions when schema pruning is enabled, keep previous logic.
Thanks for confirmation, @kimtkyeom |
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
NESTED_SCHEMA_PRUNING option
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Retest this please. |
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
Outdated
Show resolved
Hide resolved
This comment has been minimized.
This comment has been minimized.
val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key, | ||
SQLConf.CASE_SENSITIVE.defaultValue.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why you need to pass caseSensitive
across ParquetRecordMaterializer
, ParquetRowConverter
. Can't we just get it at ParquetRowConverter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I get runtime config at ParquetRowConverter
? I'm not concretely understand it's behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLConf.get
works, even in executor sid, see dd37529
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I'll update to using SQLConf instead of passing argument across classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be good to merge after addressing @dongjoon-hyun's comment about test case.
.../src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
Show resolved
Hide resolved
* MISC: Change excpetion in `ParquetRowConverter.fieldConverters` to RuntimeException
BTW, @kimtkyeom . While reviewing this PR again throughly, the original failure report on ORC looks wrong. You wrote like the following, but it was
|
@dongjoon-hyun Ah, sorry I mis-pasted test result. ORC also shows same result as following whatever value of
|
That's a correct behavior in |
@dongjoon-hyun Ah got it. There is no failure except above case. (BTW, I think materialization of rows with nested column where non-matched column would be consistent regardless of file format, but that is out of this PR scope.) |
.../src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
Outdated
Show resolved
Hide resolved
SQLConf.get instead. * Also non-existing column in parquet requested column is filled null, so do not calling getOrElse directly applying instead.
Test build #119846 has finished for PR 27888 at commit
|
...ain/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, good catch!
Test build #119845 has finished for PR 27888 at commit
|
Test build #119852 has finished for PR 27888 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @kimtkyeom and all.
Test build #119863 has finished for PR 27888 at commit
|
Test build #119865 has finished for PR 27888 at commit
|
…nverter ### What changes were proposed in this pull request? This PR (SPARK-31116) add caseSensitive parameter to ParquetRowConverter so that it handle materialize parquet properly with respect to case sensitivity ### Why are the changes needed? From spark 3.0.0, below statement throws IllegalArgumentException in caseInsensitive mode because of explicit field index searching in ParquetRowConverter. As we already constructed parquet requested schema and catalyst requested schema during schema clipping in ParquetReadSupport, just follow these behavior. ```scala val path = "/some/temp/path" spark .range(1L) .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") .write.parquet(path) val caseInsensitiveSchema = new StructType() .add( "StructColumn", new StructType() .add("LowerCase", LongType) .add("camelcase", LongType)) spark.read.schema(caseInsensitiveSchema).parquet(path).show() ``` ### Does this PR introduce any user-facing change? No. The changes are only in unreleased branches (`master` and `branch-3.0`). ### How was this patch tested? Passed new test cases that check parquet column selection with respect to schemas and case sensitivities Closes #27888 from kimtkyeom/parquet_row_converter_case_sensitivity. Authored-by: Tae-kyeom, Kim <kimtkyeom@devsisters.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit e736c62) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Merged to master/3.0. |
Thank you so much for your first contribution, @kimtkyeom . I added you |
Thanks for all reviewers with all generous review & comments! :) |
+1, LGTM. |
…nverter ### What changes were proposed in this pull request? This PR (SPARK-31116) add caseSensitive parameter to ParquetRowConverter so that it handle materialize parquet properly with respect to case sensitivity ### Why are the changes needed? From spark 3.0.0, below statement throws IllegalArgumentException in caseInsensitive mode because of explicit field index searching in ParquetRowConverter. As we already constructed parquet requested schema and catalyst requested schema during schema clipping in ParquetReadSupport, just follow these behavior. ```scala val path = "/some/temp/path" spark .range(1L) .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") .write.parquet(path) val caseInsensitiveSchema = new StructType() .add( "StructColumn", new StructType() .add("LowerCase", LongType) .add("camelcase", LongType)) spark.read.schema(caseInsensitiveSchema).parquet(path).show() ``` ### Does this PR introduce any user-facing change? No. The changes are only in unreleased branches (`master` and `branch-3.0`). ### How was this patch tested? Passed new test cases that check parquet column selection with respect to schemas and case sensitivities Closes apache#27888 from kimtkyeom/parquet_row_converter_case_sensitivity. Authored-by: Tae-kyeom, Kim <kimtkyeom@devsisters.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This PR (SPARK-31116) add caseSensitive parameter to ParquetRowConverter so that it handle materialize parquet properly with respect to case sensitivity
Why are the changes needed?
From spark 3.0.0, below statement throws IllegalArgumentException in caseInsensitive mode because of explicit field index searching in ParquetRowConverter. As we already constructed parquet requested schema and catalyst requested schema during schema clipping in ParquetReadSupport, just follow these behavior.
Does this PR introduce any user-facing change?
No. The changes are only in unreleased branches (
master
andbranch-3.0
).How was this patch tested?
Passed new test cases that check parquet column selection with respect to schemas and case sensitivities