-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-16948][SQL] Use metastore schema instead of inferring schema for ORC in HiveMetastoreCatalog #14537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #63352 has finished for PR 14537 at commit
|
@@ -294,7 +294,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log | |||
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) | |||
}.getOrElse(metastoreSchema) | |||
} else { | |||
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get | |||
val inferredSchema = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some code duplicated in both branches of this if
expression. Can you refactor it to remove the duplication, please?
@rajeshbalamohan, the changes to @tejasapatil, can you help review this PR? I ask because you're the author of 1e88615, which is where the code in question in |
Test build #63441 has finished for PR 14537 at commit
|
@@ -287,14 +287,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log | |||
new Path(metastoreRelation.catalogTable.storage.locationUri.get), | |||
partitionSpec) | |||
|
|||
val schema = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for refactoring this.
I think it makes more sense if
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
is called inferredSchema
and the value of the if (fileType.equals("parquet"))
expression is called schema
.
Thanks @mallman . Fixed review comments in latest commit. |
Test build #63474 has finished for PR 14537 at commit
|
@tejasapatil, @mallman - Can you please review when you find time? |
LGTM |
@rajeshbalamohan We'll need a committer to review your patch. |
@rxin Can you please review when you find time? |
Thank you thejas and @mallman |
val schema = Try(OrcFileOperator.readSchema( | ||
files.map(_.getPath.toUri.toString), | ||
Some(sparkSession.sessionState.newHadoopConf()))) | ||
.recover { case _: FileNotFoundException => None } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we ignoring file not found exception here?
cc @cloud-fan @gatorsmile can you also take a look at this? |
Test build #64183 has finished for PR 14537 at commit
|
Thanks @rxin . Incorporated review comments. |
why do we infer schema for tables? Table schema should be persisted to metastore when it was created. |
Right, for Parquet this could be part of initial codebase (from Spark-1251 I believe) which merges any metastore conflicts with parq files. But in the case of ORC, this inference is still valid as the column names stored in old ORC format could be different from that of Hive Metastore (e.g HIVE-4243). There is a separate PR:#14471 which track the ORC compatibility issue. |
@rajeshbalamohan So for Orc 2.x files, would schema inference be unnecessary? |
For latest ORC, if the data was written out by Hive, it would have the same mapping. |
uh, I missed this ping. Will review it tonight. Thanks! |
withTable("empty_text_partitioned") { | ||
spark.sql( | ||
s"""CREATE TABLE empty_text_partitioned(key INT, value STRING) | ||
| PARTITIONED BY (p INT) STORED AS TEXTFILE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing the textfile format sounds useless. We do not convert it to LogicalRelation
.
Fixed the test case name. I haven't changed the parquet code path as I wasn't sure on whether it would break any backward compatibility. |
You might forget this comment #14537 (comment) |
Thanks @gatorsmile . Removed the changes related to OrcFileFormat |
Test build #64446 has finished for PR 14537 at commit
|
@gatorsmile Thanks for cc'ing me. As I've implemented two approaches to this issue, #14282 is simply disabling Orc conversion if the case happens, #14365 is doing complicated schema mapping. Once this is merged, I think we should fix the schema inconsistency soon. |
BTW, @rajeshbalamohan as you directly use metastore schema now, the PR description looks not correct anymore, can you also update it? Thanks. |
Test build #64449 has finished for PR 14537 at commit
|
@rajeshbalamohan do you have time to update it? thanks! |
Sorry about the delay. Updated the PR. |
can you address this https://github.com/apache/spark/pull/14537/files#r76355262? thanks! |
What's the progress on this one? |
Code for not throwing FileNotFoundException in OrcFileFormat.inferSchema was removed from this patch. I can create separate JIRA for that; plz let me know if that is blocking this patch. |
LGTM, pending jenkins |
Test build #65755 has finished for PR 14537 at commit
|
@cloud-fan . Failure is related to the parquet changes introduced for returning metastoreSchema (it has issues with complex types). I am not very comfortable with the Parquet codepath. For time being, I would revert back the last change. We can create subsequent jira if needed for parq related changes; Alternatively I am fine with someone who is comfortable with parq code taking over this as well. |
Test build #65765 has finished for PR 14537 at commit
|
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) | ||
}.getOrElse(metastoreSchema) | ||
case "orc" => | ||
metastoreSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through the code path again, seems we must infer the schema here.
In metastore, we store the table schema and partition columns. HadoopFsRelation
need a dataSchema
which is the real schema of the data files. Normally it's just the table schema exclude partition columns, however, Spark SQL supports a special case: partition columns can also exist in data files. (see the doc for HadoopFsRelation.dataSchema
). This information is not preserved in metastore, so we have to infer the data schema based on data files here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan As we discussed offline yesterday, this is probably fine since ORC supports column pruning. Therefore, when reading an ORC file in a partitioned table, the reader always ignores partition columns stored inside the physical file and uses the value encoded in partition directory path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have a test case for this case here.
|PARTITIONED BY (p INT) STORED AS ORC | ||
""".stripMargin) | ||
|
||
val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't really need .coalesce(1)
since the create DataFrame wraps a LocalRelation
.
LGTM. Thanks! |
val inferredSchema = | ||
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) | ||
inferredSchema.map { inferred => | ||
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little worried here. If the table is partitioned, metastoreSchema
will always contain partition columns, and thus the merged schema will contain partition columns too. This means, we always read parquet files with partition columns, I think we may have a hidden bug.
The schema inferring is replaced with metastore schema completely in #14690. I think we can close this now? cc @cloud-fan @liancheng |
What changes were proposed in this pull request?
Querying empty partitioned ORC tables from spark-sql throws exception with spark.sql.hive.convertMetastoreOrc=true. This PR fixes it by using metastoreSchema for ORC files in HiveMetastoreCatalog.
How was this patch tested?
Included unit tests and also tested it in small scale cluster.