-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-15474][SQL] Write and read back non-emtpy schema with empty dataframe #19571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
val fs = FileSystem.get(conf) | ||
val options = OrcFile.readerOptions(conf).filesystem(fs) | ||
files.map(_.getPath).flatMap(readSchema(_, options)) | ||
.headOption.map { schema => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that you just take the first available schema. Looks like we don't need to read other files when we found the first available schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is based on the existing OrcFileOperator.readSchema.
@@ -252,6 +253,13 @@ private[orc] class OrcOutputWriter( | |||
override def close(): Unit = { | |||
if (recordWriterInstantiated) { | |||
recordWriter.close(Reporter.NULL) | |||
} else { | |||
// SPARK-15474 Write empty orc file with correct schema | |||
val conf = context.getConfiguration() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the behavior to skip creating an empty file if no rows are written is deliberate. Is there any impact to current behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, only ORC does. So, it creates more issues like SPARK-22258 (#19477) and SPARK-21762. This is consistent with the other data sources like Parquet.
@@ -252,6 +253,13 @@ private[orc] class OrcOutputWriter( | |||
override def close(): Unit = { | |||
if (recordWriterInstantiated) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, according to the existing comment, seems that we can simply remove recordWriterInstantiated
to allow empty file created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, I'm focusing on emtpy file. We will replace the whole writer and reader with ORC 1.4.1 eventually. The newly added test case in this PR will make us to transit safely.
Test build #83030 has finished for PR 19571 at commit
|
val writer = org.apache.orc.OrcFile.createWriter( | ||
new Path(path), org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf)) | ||
new org.apache.orc.mapreduce.OrcMapreduceRecordWriter(writer) | ||
writer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if i understood correctly it will write out by org.apache.orc.mapreduce.OrcMapreduceRecordWriter
when output is empty but, write out by org.apache.hadoop.hive.ql.io.orc.OrcRecordWriter
when output is non-empty? I thought we should use the same writer for both paths if possible and this one looks rather a band-aid fix. It won't block this PR but I wonder if this is the only way we could do for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. That's correct understanding. This PR intentionally focuses only on handling empty files and inferring schema. This will help us transit safely from old Hive ORC to new Apache ORC 1.4.1.
@@ -73,6 +70,10 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable | |||
|
|||
val configuration = job.getConfiguration | |||
|
|||
configuration.set( | |||
MAPRED_OUTPUT_SCHEMA.getAttribute, | |||
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.getSchemaString(dataSchema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we always need to set this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is the correct schema to be written.
withTempDir { dir => | ||
val path = dir.getCanonicalPath | ||
val emptyDf = Seq((true, 1, "str")).toDF.limit(0) | ||
emptyDf.write.format(format).mode("overwrite").save(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, why is withTempPath { path =>
not used without overwrite
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. No problem. I'll use withTempPath
.
files.map(_.getPath.toString), | ||
Some(sparkSession.sessionState.newHadoopConf()) | ||
) | ||
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.readSchema(sparkSession, files) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure of this one too. This looks a complete rewrite of org.apache.spark.sql.hive.orc.OrcFileOperator.readSchema
.. Is this change required to fix this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It's intentional. OrcFileOperator will be replaced later completely. I made this PR as small as possible for review.
Thank you for review, @viirya and @HyukjinKwon . So far,
This is the actual first PR to use read and write using ORC 1.4.1 library.
|
I updated the PR. |
What is the backward compatibility of ORC 1.4.1? Can we create multiple ORC files created by the previous versions and ensure they are not broken? |
I checked with how we introduce the new parquet reader before, and maybe we can follow it: #4308 Basically we leave the old orc data source as it is, and implement a new orc 1.4.1 data source in sql core module. Then we have an internal config to switch the implementation(by default prefer the new implementation), and remove the old implementation after one or two releases. |
Test build #83055 has finished for PR 19571 at commit
|
That a good point, and I think it's better to have these tests in the orc project. If they don't have, then we can take over and add these tests. |
Thank you for review, @gatorsmile and @cloud-fan . Especially, @cloud-fan 's opinion is my original approach in #17980 and #18953 (before Aug 16). I cannot agree any more.
BTW, I'm wondering what is changed after you commented the following on that PR on 16th Aug.
|
Test build #83072 has finished for PR 19571 at commit
|
@gatorsmile and @cloud-fan . |
To be clear, for ORC File Versions, there exists some ORC test case against version 0.11, but it's not our scope because Spark (and Hive 1.2) uses There are 6 versions with 0.12. 0 = original |
Sorry I miss-understood the problem at the beginning. I thought the new orc version just changes the existing APIs a little, but it turns out the new orc version has a new set of read/write APIs. |
I see. Then, can we continue on #17980 |
yes please |
This is resolved in #19651 . |
What changes were proposed in this pull request?
Previously, ORC file format cannot write a correct schema in case of empty dataframe. Instead, it creates an empty ORC file with empty schema,
struct<>
. So, Spark users cannot write and read back ORC files with non-empty schema and no rows. This PR uses new Apache ORC 1.4.1 to create an empty ORC file with a correct schema. Also, this PR uses ORC 1.4.1 to infer schema always.BEFORE
AFTER
How was this patch tested?
Pass the Jenkins with newly added test cases.