Skip to content

[SPARK-25391][SQL] Make behaviors consistent when converting parquet hive table to parquet data source #22343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

seancxmao
Copy link
Contributor

@seancxmao seancxmao commented Sep 5, 2018

What changes were proposed in this pull request?

parquet data source tables and hive parquet tables have different behaviors about parquet field resolution. So, when spark.sql.hive.convertMetastoreParquet is true, users might face inconsistent behaviors. The differences are:

This PR aims to make behaviors consistent when converting hive table to data source table.

  • The behavior must be consistent to do the conversion, so we skip the conversion in case-sensitive mode because hive parquet table always do case-insensitive field resolution.
  • In case-insensitive mode, when converting hive parquet table to parquet data source, we switch the duplicated fields resolution mode to ask parquet data source to pick the first matched field - the same behavior as hive parquet table - to keep behaviors consistent.

How was this patch tested?

Unit tests added.

@seancxmao
Copy link
Contributor Author

@HyukjinKwon @cloud-fan @gatorsmile Could you please kindly help review this if you have time?

@cloud-fan
Copy link
Contributor

ok to test

caseSensitive = true)
conf = {
val conf = new Configuration()
conf.setBoolean(SQLConf.CASE_SENSITIVE.key, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it the default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan There is no default value for spark.sql.caseSensitive in Configuration. Let me explain in more details below.

This is one of the overloaded methods of testSchemaClipping. I tried to give this testSchemaClipping method a default conf, however Scalac complains that

in class ParquetSchemaSuite, multiple overloaded alternatives of testSchemaClipping define default arguments

private def testSchemaClipping(
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: String,
conf: Configuration = {

private def testSchemaClipping(
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: MessageType,
conf: Configuration): Unit = {

It seems a little confusing, because these two methods have different parameter types. After a brief investigation, I found Scala compiler simply disallows overloaded methods with default arguments even when these methods have different parameter types.

https://stackoverflow.com/questions/4652095/why-does-the-scala-compiler-disallow-overloaded-methods-with-default-arguments

@cloud-fan
Copy link
Contributor

@dongjoon-hyun does the orc conversion need the same fix?

@dongjoon-hyun
Copy link
Member

Thank you for pinging me. I'll take a look tomorrow, @cloud-fan .

BTW, @seancxmao . Can we handle this convertMetastoreXXX case in a new JIRA issue? The behavior must be consistent to do the conversion doesn't look good to me because it's not complete as a single patch title.

@HyukjinKwon
Copy link
Member

@seancxmao, mind fixing the PR title BTW? For instance, looks unclear which behaviour you mean in the PR title.

* behavior as hive parquet table - to keep behaviors consistent.
*/
val duplicatedFieldsResolutionMode: String = {
parameters.getOrElse(DUPLICATED_FIELDS_RESOLUTION_MODE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should leave this for Parquet options for now. Can we just have a SQL config to control this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether we have a SQL config for it or not, we must define an option here. The conversion happens per-query, so we must have a per-query option to switch the behavior, instead of a per-session SQL config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion itself happens per query but my impression is that the different values don't usually happen in per-query. I mean, I was wondering if users want to set this query by query.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a little unusual. Usually we have a SQL config first, then we create an option for it if necessary. In this case, we are not adding a config/option from user's requirement, but we need it for an internal optimization.

If we can I would suggest we make it an internal option. But anyway we shouldn't rush to add a SQL config, until we get requirement from users.

@seancxmao seancxmao changed the title [SPARK-25132][SQL][FOLLOW-UP] The behavior must be consistent to do the conversion [SPARK-25391][SQL] Make behaviors consistent when converting parquet hive table to parquet data source Sep 10, 2018
@seancxmao
Copy link
Contributor Author

@dongjoon-hyun @HyukjinKwon I created a new JIRA ticket and try to use a more complete and clear title for this PR. What do you think?

@SparkQA
Copy link

SparkQA commented Sep 10, 2018

Test build #95857 has finished for PR 22343 at commit 95673cd.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Sep 10, 2018

Test build #95864 has finished for PR 22343 at commit 95673cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Hi, @seancxmao . Should we be consistent? IIRC, all the previous PR raises Exception to prevent any potential issues. In this case, I have a feeling that convertMetastoreXXX should be used to prevent the problem of Hive behavior by raising Exception, not hiding the problem of Hive behavior.

In case-insensitive mode, when converting hive parquet table to parquet data source, we switch the duplicated fields resolution mode to ask parquet data source to pick the first matched field - the same behavior as hive parquet table - to keep behaviors consistent.

@seancxmao
Copy link
Contributor Author

Hi, @dongjoon-hyun
When we find duplicated field names in the case of convertMetastoreXXX, we have 2 options
(1) raise exception as parquet data source. To most of end users, they do not know the difference between hive parquet table and parquet data source. If the conversion leads to different behaviors, they may be confused, and in some cases even lead to tricky data issues silently.
(2) Adjust behaviors of parquet data source to keep behaviors consistent. This seems more friendly to end users, and avoid any potential issues introduced by the conversion.

BTW, for parquet data source which is not converted from hive parquet table, we raise exception when there is ambiguity, sine this is more intuitive and reasonable.

@dongjoon-hyun
Copy link
Member

What I asked was the following, wasn't it?

In case-insensitive mode, when converting hive parquet table to parquet data source, we switch the duplicated fields resolution mode to ask parquet data source to pick the first matched field - the same behavior as hive parquet table - to keep behaviors consistent.

Spark should not pick up the first matched field in any cases because it's considered as a correctness behavior in previous PR which is backported to branch-2.3 #22183. I don't think we need to follow incorrect Hive behavior.

@seancxmao
Copy link
Contributor Author

seancxmao commented Sep 11, 2018

@dongjoon-hyun It is a little complicated. There has been a discussion about this in #22184. Below are some key comments from @cloud-fan and @gatorsmile, just FYI.

BTW, finally it is decided that #22148 should not be backported to branch-2.3.

@cloud-fan
Copy link
Contributor

To clarify: this is just a workaround when we hit a problematic(having case-insensitive duplicated filed names in the parquet file) hive parquet tables and we want to read it with the native parquet reader. The hive behaivor is weird but we need to follow it as we are reading a hive table.

Personally I think it's not a big deal. If the hive table is malformed, I think we don't have to follow hive's bugy behavior. If people are confused by this patch and think this doesn't worth, I'm ok to just leave it.

@dongjoon-hyun
Copy link
Member

Thank you for the pointer, @seancxmao . And thank you for clarification, @cloud-fan .

It looks like we are re-creating correctness issue somewhat in this PR when caseSensitive=true.

BEFORE THIS PR (master)

scala> sql("INSERT OVERWRITE DIRECTORY '/tmp/hive_t' STORED AS PARQUET SELECT 'A', 'a'")
scala> sql("CREATE TABLE hive_t(a STRING) STORED AS PARQUET LOCATION '/tmp/hive_t'")
scala> sql("CREATE TABLE spark_t(a STRING) USING PARQUET LOCATION '/tmp/hive_t'")
scala> sql("set spark.sql.caseSensitive=true")
scala> spark.table("hive_t").show
+---+
|  a|
+---+
|  a|
+---+

scala> spark.table("spark_t").show
+---+
|  a|
+---+
|  a|
+---+

AFTER THIS PR

scala> sql("set spark.sql.caseSensitive=true")
scala> spark.table("hive_t").show
+---+
|  a|
+---+
|  A|
+---+

scala> spark.table("spark_t").show
+---+
|  a|
+---+
|  a|
+---+

@seancxmao
Copy link
Contributor Author

Could we see this as a behavior change? We can add a legacy conf (e.g. spark.sql.hive.legacy.convertMetastoreParquet, may be defined in HiveUtils) to enable users to revert back to the previous behavior for backward compatibility. If this legacy conf is set to true, behaviors will be reverted both in case-sensitive and case-insensitive mode.

caseSensitive legacy behavior new behavior
true convert anyway skip conversion, log warning message
false convert, fail if there's ambiguity convert, first match if there's ambiguity

@dongjoon-hyun
Copy link
Member

@seancxmao . For Hive compatibility, spark.sql.hive.convertMetastoreParquet=false looks enough to me.

@seancxmao
Copy link
Contributor Author

It keeps Hive compatibility but loses performance benefit by setting spark.sql.hive.convertMetastoreParquet=false. We can do better by enabling the conversion and still keeping Hive compatibility. Though this makes our implementation more complex, I guess most end users may keep spark.sql.hive.convertMetastoreParquet=true and spark.sql.caseSensitive=false which are default values, this brings benefits to end users.

@dongjoon-hyun
Copy link
Member

Compatibility is not a gold rule if it sacrifices correctness. Fast and wrong result doesn't looks like benefits to me. Do you think the customer want to get a wrong result like Hive?

@seancxmao
Copy link
Contributor Author

seancxmao commented Sep 12, 2018

I agree that correctness is more important. If we should not make behaviors consistent when do the conversion, I will close this PR. @cloud-fan @gatorsmile what do you think?

@dongjoon-hyun
Copy link
Member

Thank you for understanding, @seancxmao .

Also, I want to make additional note in this PR. The following is a well-known example of Hive incompatibility since Apache Spark 1.6.3. We get a correct result only when spark.sql.hive.convertMetastoreParquet=false. The user should know what they are using.

scala> sql("CREATE TABLE t1(a CHAR(3))")
scala> sql("CREATE TABLE t3(a CHAR(3)) STORED AS PARQUET")

scala> sql("INSERT INTO TABLE t1 SELECT 'a '")
scala> sql("INSERT INTO TABLE t3 SELECT 'a '")

scala> sql("SELECT a, length(a) FROM t1").show
+---+---------+
|  a|length(a)|
+---+---------+
|a  |        3|
+---+---------+

scala> sql("SELECT a, length(a) FROM t3").show
+---+---------+
|  a|length(a)|
+---+---------+
| a |        2|
+---+---------+

scala> sql("set spark.sql.hive.convertMetastoreParquet=false").show
+--------------------+-----+
|                 key|value|
+--------------------+-----+
|spark.sql.hive.co...|false|
+--------------------+-----+


scala> sql("SELECT a, length(a) FROM t3").show
+---+---------+
|  a|length(a)|
+---+---------+
|a  |        3|
+---+---------+

@dongjoon-hyun
Copy link
Member

Could you close this PR and JIRA, @seancxmao ?

@seancxmao
Copy link
Contributor Author

Sure, close this PR. Thank you all for your time and insights.

@seancxmao seancxmao closed this Sep 16, 2018
@dongjoon-hyun
Copy link
Member

Thank YOU for your PR and open discussion on this, @seancxmao . Let's see in another PRs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants