Skip to content

[SPARK-16948][SQL] Use metastore schema instead of inferring schema for ORC in HiveMetastoreCatalog #14537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from

Conversation

rajeshbalamohan
Copy link

@rajeshbalamohan rajeshbalamohan commented Aug 8, 2016

What changes were proposed in this pull request?

Querying empty partitioned ORC tables from spark-sql throws exception with spark.sql.hive.convertMetastoreOrc=true. This PR fixes it by using metastoreSchema for ORC files in HiveMetastoreCatalog.

How was this patch tested?

Included unit tests and also tested it in small scale cluster.

@rajeshbalamohan rajeshbalamohan changed the title [SPARK-16948][SQL] Querying empty partitioned orc tables throws excep… [SPARK-16948][SQL] Querying empty partitioned orc tables throws exceptions Aug 8, 2016
@SparkQA
Copy link

SparkQA commented Aug 8, 2016

Test build #63352 has finished for PR 14537 at commit 5721b88.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -294,7 +294,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
} else {
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get
val inferredSchema =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some code duplicated in both branches of this if expression. Can you refactor it to remove the duplication, please?

@mallman
Copy link
Contributor

mallman commented Aug 9, 2016

@rajeshbalamohan, the changes to HiveMetastoreCatalog.scala look reasonable. This mirrors the behavior of this method before the if (fileType.equals("parquet")) expression was introduced in 1e88615.

@tejasapatil, can you help review this PR? I ask because you're the author of 1e88615, which is where the code in question in HiveMetastoreCatalog.scala was written.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63441 has finished for PR 14537 at commit 4ae92d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -287,14 +287,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
new Path(metastoreRelation.catalogTable.storage.locationUri.get),
partitionSpec)

val schema =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for refactoring this.

I think it makes more sense if

defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())

is called inferredSchema and the value of the if (fileType.equals("parquet")) expression is called schema.

@rajeshbalamohan
Copy link
Author

Thanks @mallman . Fixed review comments in latest commit.

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63474 has finished for PR 14537 at commit 75bca1b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rajeshbalamohan
Copy link
Author

@tejasapatil, @mallman - Can you please review when you find time?

@tejasapatil
Copy link
Contributor

LGTM

@mallman
Copy link
Contributor

mallman commented Aug 12, 2016

@rajeshbalamohan We'll need a committer to review your patch.

@rajeshbalamohan
Copy link
Author

@rxin Can you please review when you find time?

@rajeshbalamohan
Copy link
Author

Thank you thejas and @mallman

val schema = Try(OrcFileOperator.readSchema(
files.map(_.getPath.toUri.toString),
Some(sparkSession.sessionState.newHadoopConf())))
.recover { case _: FileNotFoundException => None }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we ignoring file not found exception here?

@rxin
Copy link
Contributor

rxin commented Aug 17, 2016

cc @cloud-fan @gatorsmile can you also take a look at this?

@SparkQA
Copy link

SparkQA commented Aug 22, 2016

Test build #64183 has finished for PR 14537 at commit 4004c0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rajeshbalamohan
Copy link
Author

Thanks @rxin . Incorporated review comments.

@cloud-fan
Copy link
Contributor

why do we infer schema for tables? Table schema should be persisted to metastore when it was created.

@rajeshbalamohan
Copy link
Author

Right, for Parquet this could be part of initial codebase (from Spark-1251 I believe) which merges any metastore conflicts with parq files. But in the case of ORC, this inference is still valid as the column names stored in old ORC format could be different from that of Hive Metastore (e.g HIVE-4243). There is a separate PR:#14471 which track the ORC compatibility issue.

@mallman
Copy link
Contributor

mallman commented Aug 22, 2016

@rajeshbalamohan So for Orc 2.x files, would schema inference be unnecessary?

@rajeshbalamohan
Copy link
Author

For latest ORC, if the data was written out by Hive, it would have the same mapping.

@gatorsmile
Copy link
Member

uh, I missed this ping. Will review it tonight. Thanks!

withTable("empty_text_partitioned") {
spark.sql(
s"""CREATE TABLE empty_text_partitioned(key INT, value STRING)
| PARTITIONED BY (p INT) STORED AS TEXTFILE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing the textfile format sounds useless. We do not convert it to LogicalRelation.

@rajeshbalamohan
Copy link
Author

Fixed the test case name. I haven't changed the parquet code path as I wasn't sure on whether it would break any backward compatibility.

@gatorsmile
Copy link
Member

You might forget this comment #14537 (comment)

@rajeshbalamohan
Copy link
Author

Thanks @gatorsmile . Removed the changes related to OrcFileFormat

@rajeshbalamohan rajeshbalamohan changed the title [SPARK-16948][SQL] Querying empty partitioned orc tables throws exceptions [SPARK-16948][SQL] support empty orc table when converting hive serde table to data source table Aug 26, 2016
@rajeshbalamohan rajeshbalamohan changed the title [SPARK-16948][SQL] support empty orc table when converting hive serde table to data source table [SPARK-16948][SQL] Support empty orc table when converting hive serde table to data source table Aug 26, 2016
@SparkQA
Copy link

SparkQA commented Aug 26, 2016

Test build #64446 has finished for PR 14537 at commit fc14e2d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Aug 26, 2016

@gatorsmile Thanks for cc'ing me.

As spark.sql.hive.convertMetastoreOrc is set to false by default, this change looks fine. However, if setting the config to true, and hitting with inconsistent schema between metastore and Orc files, I remember it will cause failure when reading the files.

I've implemented two approaches to this issue, #14282 is simply disabling Orc conversion if the case happens, #14365 is doing complicated schema mapping. Once this is merged, I think we should fix the schema inconsistency soon.

@viirya
Copy link
Member

viirya commented Aug 26, 2016

BTW, @rajeshbalamohan as you directly use metastore schema now, the PR description looks not correct anymore, can you also update it? Thanks.

@SparkQA
Copy link

SparkQA commented Aug 26, 2016

Test build #64449 has finished for PR 14537 at commit 9ecb2ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@rajeshbalamohan do you have time to update it? thanks!

@rajeshbalamohan rajeshbalamohan changed the title [SPARK-16948][SQL] Support empty orc table when converting hive serde table to data source table [SPARK-16948][SQL] Use metastore schema instead of inferring schema in ORC in HiveMetastoreCatalog Sep 3, 2016
@rajeshbalamohan rajeshbalamohan changed the title [SPARK-16948][SQL] Use metastore schema instead of inferring schema in ORC in HiveMetastoreCatalog [SPARK-16948][SQL] Use metastore schema instead of inferring schema for ORC in HiveMetastoreCatalog Sep 3, 2016
@rajeshbalamohan
Copy link
Author

Sorry about the delay. Updated the PR.

@cloud-fan
Copy link
Contributor

can you address this https://github.com/apache/spark/pull/14537/files#r76355262? thanks!

@davies
Copy link
Contributor

davies commented Sep 14, 2016

What's the progress on this one?

@rajeshbalamohan
Copy link
Author

rajeshbalamohan commented Sep 21, 2016

@cloud-fan

For branch 2.0, we should open another PR to fix the OrcFileFormat.inferSchema, to not throw FileNotFoundException for empty table.

Code for not throwing FileNotFoundException in OrcFileFormat.inferSchema was removed from this patch. I can create separate JIRA for that; plz let me know if that is blocking this patch.

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins

@SparkQA
Copy link

SparkQA commented Sep 22, 2016

Test build #65755 has finished for PR 14537 at commit fa71370.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rajeshbalamohan
Copy link
Author

@cloud-fan . Failure is related to the parquet changes introduced for returning metastoreSchema (it has issues with complex types). I am not very comfortable with the Parquet codepath. For time being, I would revert back the last change. We can create subsequent jira if needed for parq related changes; Alternatively I am fine with someone who is comfortable with parq code taking over this as well.

@SparkQA
Copy link

SparkQA commented Sep 22, 2016

Test build #65765 has finished for PR 14537 at commit e39715e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
case "orc" =>
metastoreSchema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the code path again, seems we must infer the schema here.

In metastore, we store the table schema and partition columns. HadoopFsRelation need a dataSchema which is the real schema of the data files. Normally it's just the table schema exclude partition columns, however, Spark SQL supports a special case: partition columns can also exist in data files. (see the doc for HadoopFsRelation.dataSchema). This information is not preserved in metastore, so we have to infer the data schema based on data files here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan As we discussed offline yesterday, this is probably fine since ORC supports column pruning. Therefore, when reading an ORC file in a partitioned table, the reader always ignores partition columns stored inside the physical file and uses the value encoded in partition directory path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a test case for this case here.

|PARTITIONED BY (p INT) STORED AS ORC
""".stripMargin)

val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really need .coalesce(1) since the create DataFrame wraps a LocalRelation.

@liancheng
Copy link
Contributor

LGTM. Thanks!

val inferredSchema =
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
inferredSchema.map { inferred =>
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried here. If the table is partitioned, metastoreSchema will always contain partition columns, and thus the merged schema will contain partition columns too. This means, we always read parquet files with partition columns, I think we may have a hidden bug.

@viirya
Copy link
Member

viirya commented Dec 7, 2016

The schema inferring is replaced with metastore schema completely in #14690. I think we can close this now? cc @cloud-fan @liancheng

@asfgit asfgit closed this in 08d6441 Dec 7, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.