-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25391][SQL] Make behaviors consistent when converting parquet hive table to parquet data source #22343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@HyukjinKwon @cloud-fan @gatorsmile Could you please kindly help review this if you have time? |
ok to test |
caseSensitive = true) | ||
conf = { | ||
val conf = new Configuration() | ||
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it the default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan There is no default value for spark.sql.caseSensitive
in Configuration. Let me explain in more details below.
This is one of the overloaded methods of testSchemaClipping
. I tried to give this testSchemaClipping
method a default conf, however Scalac complains that
in class ParquetSchemaSuite, multiple overloaded alternatives of testSchemaClipping define default arguments
Lines 1014 to 1019 in 95673cd
private def testSchemaClipping( | |
testName: String, | |
parquetSchema: String, | |
catalystSchema: StructType, | |
expectedSchema: String, | |
conf: Configuration = { |
Lines 1028 to 1033 in 95673cd
private def testSchemaClipping( | |
testName: String, | |
parquetSchema: String, | |
catalystSchema: StructType, | |
expectedSchema: MessageType, | |
conf: Configuration): Unit = { |
It seems a little confusing, because these two methods have different parameter types. After a brief investigation, I found Scala compiler simply disallows overloaded methods with default arguments even when these methods have different parameter types.
@dongjoon-hyun does the orc conversion need the same fix? |
Thank you for pinging me. I'll take a look tomorrow, @cloud-fan . BTW, @seancxmao . Can we handle this |
@seancxmao, mind fixing the PR title BTW? For instance, looks unclear which behaviour you mean in the PR title. |
* behavior as hive parquet table - to keep behaviors consistent. | ||
*/ | ||
val duplicatedFieldsResolutionMode: String = { | ||
parameters.getOrElse(DUPLICATED_FIELDS_RESOLUTION_MODE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should leave this for Parquet options for now. Can we just have a SQL config to control this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whether we have a SQL config for it or not, we must define an option here. The conversion happens per-query, so we must have a per-query option to switch the behavior, instead of a per-session SQL config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conversion itself happens per query but my impression is that the different values don't usually happen in per-query. I mean, I was wondering if users want to set this query by query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is a little unusual. Usually we have a SQL config first, then we create an option for it if necessary. In this case, we are not adding a config/option from user's requirement, but we need it for an internal optimization.
If we can I would suggest we make it an internal option. But anyway we shouldn't rush to add a SQL config, until we get requirement from users.
@dongjoon-hyun @HyukjinKwon I created a new JIRA ticket and try to use a more complete and clear title for this PR. What do you think? |
Test build #95857 has finished for PR 22343 at commit
|
retest this please |
Test build #95864 has finished for PR 22343 at commit
|
Hi, @seancxmao . Should we be consistent? IIRC, all the previous PR raises Exception to prevent any potential issues. In this case, I have a feeling that
|
Hi, @dongjoon-hyun BTW, for parquet data source which is not converted from hive parquet table, we raise exception when there is ambiguity, sine this is more intuitive and reasonable. |
What I asked was the following, wasn't it?
Spark should not pick up the first matched field in any cases because it's considered as a correctness behavior in previous PR which is backported to |
@dongjoon-hyun It is a little complicated. There has been a discussion about this in #22184. Below are some key comments from @cloud-fan and @gatorsmile, just FYI.
BTW, finally it is decided that #22148 should not be backported to branch-2.3. |
To clarify: this is just a workaround when we hit a problematic(having case-insensitive duplicated filed names in the parquet file) hive parquet tables and we want to read it with the native parquet reader. The hive behaivor is weird but we need to follow it as we are reading a hive table. Personally I think it's not a big deal. If the hive table is malformed, I think we don't have to follow hive's bugy behavior. If people are confused by this patch and think this doesn't worth, I'm ok to just leave it. |
Thank you for the pointer, @seancxmao . And thank you for clarification, @cloud-fan . It looks like we are re-creating correctness issue somewhat in this PR when BEFORE THIS PR (master) scala> sql("INSERT OVERWRITE DIRECTORY '/tmp/hive_t' STORED AS PARQUET SELECT 'A', 'a'")
scala> sql("CREATE TABLE hive_t(a STRING) STORED AS PARQUET LOCATION '/tmp/hive_t'")
scala> sql("CREATE TABLE spark_t(a STRING) USING PARQUET LOCATION '/tmp/hive_t'")
scala> sql("set spark.sql.caseSensitive=true")
scala> spark.table("hive_t").show
+---+
| a|
+---+
| a|
+---+
scala> spark.table("spark_t").show
+---+
| a|
+---+
| a|
+---+ AFTER THIS PR scala> sql("set spark.sql.caseSensitive=true")
scala> spark.table("hive_t").show
+---+
| a|
+---+
| A|
+---+
scala> spark.table("spark_t").show
+---+
| a|
+---+
| a|
+---+ |
Could we see this as a behavior change? We can add a legacy conf (e.g.
|
@seancxmao . For Hive compatibility, |
It keeps Hive compatibility but loses performance benefit by setting spark.sql.hive.convertMetastoreParquet=false. We can do better by enabling the conversion and still keeping Hive compatibility. Though this makes our implementation more complex, I guess most end users may keep |
Compatibility is not a gold rule if it sacrifices correctness. Fast and wrong result doesn't looks like benefits to me. Do you think the customer want to get a wrong result like Hive? |
I agree that correctness is more important. If we should not make behaviors consistent when do the conversion, I will close this PR. @cloud-fan @gatorsmile what do you think? |
Thank you for understanding, @seancxmao . Also, I want to make additional note in this PR. The following is a well-known example of Hive incompatibility since Apache Spark 1.6.3. We get a correct result only when scala> sql("CREATE TABLE t1(a CHAR(3))")
scala> sql("CREATE TABLE t3(a CHAR(3)) STORED AS PARQUET")
scala> sql("INSERT INTO TABLE t1 SELECT 'a '")
scala> sql("INSERT INTO TABLE t3 SELECT 'a '")
scala> sql("SELECT a, length(a) FROM t1").show
+---+---------+
| a|length(a)|
+---+---------+
|a | 3|
+---+---------+
scala> sql("SELECT a, length(a) FROM t3").show
+---+---------+
| a|length(a)|
+---+---------+
| a | 2|
+---+---------+
scala> sql("set spark.sql.hive.convertMetastoreParquet=false").show
+--------------------+-----+
| key|value|
+--------------------+-----+
|spark.sql.hive.co...|false|
+--------------------+-----+
scala> sql("SELECT a, length(a) FROM t3").show
+---+---------+
| a|length(a)|
+---+---------+
|a | 3|
+---+---------+ |
Could you close this PR and JIRA, @seancxmao ? |
Sure, close this PR. Thank you all for your time and insights. |
Thank YOU for your PR and open discussion on this, @seancxmao . Let's see in another PRs. |
What changes were proposed in this pull request?
parquet data source tables and hive parquet tables have different behaviors about parquet field resolution. So, when
spark.sql.hive.convertMetastoreParquet
is true, users might face inconsistent behaviors. The differences are:This PR aims to make behaviors consistent when converting hive table to data source table.
How was this patch tested?
Unit tests added.