Skip to content

[SPARK-15474][SQL] Write and read back non-emtpy schema with empty dataframe #19571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Oct 25, 2017

What changes were proposed in this pull request?

Previously, ORC file format cannot write a correct schema in case of empty dataframe. Instead, it creates an empty ORC file with empty schema, struct<>. So, Spark users cannot write and read back ORC files with non-empty schema and no rows. This PR uses new Apache ORC 1.4.1 to create an empty ORC file with a correct schema. Also, this PR uses ORC 1.4.1 to infer schema always.

BEFORE

scala> val emptyDf = Seq((true, 1, "str")).toDF("a", "b", "c").limit(0)
scala> emptyDf.write.format("orc").mode("overwrite").save("/tmp/empty")
scala> spark.read.format("orc").load("/tmp/empty").printSchema
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It must be specified manually.;

AFTER

scala> spark.read.format("orc").load("/tmp/empty").printSchema
root
 |-- a: boolean (nullable = true)
 |-- b: integer (nullable = true)
 |-- c: string (nullable = true)

How was this patch tested?

Pass the Jenkins with newly added test cases.

val fs = FileSystem.get(conf)
val options = OrcFile.readerOptions(conf).filesystem(fs)
files.map(_.getPath).flatMap(readSchema(_, options))
.headOption.map { schema =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that you just take the first available schema. Looks like we don't need to read other files when we found the first available schema.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is based on the existing OrcFileOperator.readSchema.

@@ -252,6 +253,13 @@ private[orc] class OrcOutputWriter(
override def close(): Unit = {
if (recordWriterInstantiated) {
recordWriter.close(Reporter.NULL)
} else {
// SPARK-15474 Write empty orc file with correct schema
val conf = context.getConfiguration()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the behavior to skip creating an empty file if no rows are written is deliberate. Is there any impact to current behavior?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, only ORC does. So, it creates more issues like SPARK-22258 (#19477) and SPARK-21762. This is consistent with the other data sources like Parquet.

@@ -252,6 +253,13 @@ private[orc] class OrcOutputWriter(
override def close(): Unit = {
if (recordWriterInstantiated) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, according to the existing comment, seems that we can simply remove recordWriterInstantiated to allow empty file created.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, I'm focusing on emtpy file. We will replace the whole writer and reader with ORC 1.4.1 eventually. The newly added test case in this PR will make us to transit safely.

@SparkQA
Copy link

SparkQA commented Oct 25, 2017

Test build #83030 has finished for PR 19571 at commit be7ba9b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val writer = org.apache.orc.OrcFile.createWriter(
new Path(path), org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf))
new org.apache.orc.mapreduce.OrcMapreduceRecordWriter(writer)
writer.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if i understood correctly it will write out by org.apache.orc.mapreduce.OrcMapreduceRecordWriter when output is empty but, write out by org.apache.hadoop.hive.ql.io.orc.OrcRecordWriter when output is non-empty? I thought we should use the same writer for both paths if possible and this one looks rather a band-aid fix. It won't block this PR but I wonder if this is the only way we could do for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. That's correct understanding. This PR intentionally focuses only on handling empty files and inferring schema. This will help us transit safely from old Hive ORC to new Apache ORC 1.4.1.

@@ -73,6 +70,10 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable

val configuration = job.getConfiguration

configuration.set(
MAPRED_OUTPUT_SCHEMA.getAttribute,
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.getSchemaString(dataSchema))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always need to set this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is the correct schema to be written.

withTempDir { dir =>
val path = dir.getCanonicalPath
val emptyDf = Seq((true, 1, "str")).toDF.limit(0)
emptyDf.write.format(format).mode("overwrite").save(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, why is withTempPath { path => not used without overwrite instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. No problem. I'll use withTempPath.

files.map(_.getPath.toString),
Some(sparkSession.sessionState.newHadoopConf())
)
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.readSchema(sparkSession, files)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure of this one too. This looks a complete rewrite of org.apache.spark.sql.hive.orc.OrcFileOperator.readSchema.. Is this change required to fix this issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's intentional. OrcFileOperator will be replaced later completely. I made this PR as small as possible for review.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Oct 25, 2017

Thank you for review, @viirya and @HyukjinKwon .
You know that I tried to introduce new OrcFileFormat in sql/core before. But, it is too big for review. According to @cloud-fan 's advice, I'm trying to upgrade the existing OrcFileFormat one by one in a piece.

So far,

  • We introduced new ORC 1.4.1 dependency
  • Introduce new Spark SQL ORC parameters and replace Hive constant with new ORC parameters.

This is the actual first PR to use read and write using ORC 1.4.1 library.

  • It reads ORC file only for inferencing schema.
  • It writes only empty ORC file.

@dongjoon-hyun
Copy link
Member Author

I updated the PR.
Could you review this PR again, @viirya , @HyukjinKwon , @gatorsmile , @cloud-fan ?

@gatorsmile
Copy link
Member

What is the backward compatibility of ORC 1.4.1? Can we create multiple ORC files created by the previous versions and ensure they are not broken?

@cloud-fan
Copy link
Contributor

cloud-fan commented Oct 25, 2017

I checked with how we introduce the new parquet reader before, and maybe we can follow it: #4308

Basically we leave the old orc data source as it is, and implement a new orc 1.4.1 data source in sql core module. Then we have an internal config to switch the implementation(by default prefer the new implementation), and remove the old implementation after one or two releases.

@SparkQA
Copy link

SparkQA commented Oct 25, 2017

Test build #83055 has finished for PR 19571 at commit 8b4fc96.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

What is the backward compatibility of ORC 1.4.1? Can we create multiple ORC files created by the previous versions and ensure they are not broken?

That a good point, and I think it's better to have these tests in the orc project. If they don't have, then we can take over and add these tests.

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @gatorsmile and @cloud-fan . Especially, @cloud-fan 's opinion is my original approach in #17980 and #18953 (before Aug 16). I cannot agree any more.

Basically we leave the old orc data source as it is, and implement a new orc 1.4.1 data source in sql core module. Then we have an internal config to switch the implementation(by default prefer the new implementation), and remove the old implementation after one or two releases.

BTW, I'm wondering what is changed after you commented the following on that PR on 16th Aug.

Are the ORC APIs changed a lot in 1.4? I was expecting a small patch to upgrade the current ORC data source, without moving it to sql/core.

@SparkQA
Copy link

SparkQA commented Oct 26, 2017

Test build #83072 has finished for PR 19571 at commit 8d212f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

@gatorsmile and @cloud-fan .
For ORC compatibility, I checked the ORC code, but it's not clearly tested.
I'll try to add some suite as a separate issue.

@dongjoon-hyun
Copy link
Member Author

To be clear, for ORC File Versions, there exists some ORC test case against version 0.11, but it's not our scope because Spark (and Hive 1.2) uses 0.12 with HIVE_8732.

There are 6 versions with 0.12.

0 = original
1 = HIVE-8732 fixed (fixed stripe/file maximum statistics & string statistics use utf8 for min/max)
2 = HIVE-4243 fixed (use real column names from Hive tables)
3 = HIVE-12055 fixed (vectorized writer implementation)
4 = HIVE-13083 fixed (decimals write present stream correctly)
5 = ORC-101 fixed (bloom filters use utf8 consistently)
6 = ORC-135 fixed (timestamp statistics use utc)

@cloud-fan
Copy link
Contributor

Sorry I miss-understood the problem at the beginning. I thought the new orc version just changes the existing APIs a little, but it turns out the new orc version has a new set of read/write APIs.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Oct 29, 2017

I see. Then, can we continue on #17980 Make ORCFileFormat configurable between sql/hive and sql/core?

@cloud-fan
Copy link
Contributor

yes please

@dongjoon-hyun
Copy link
Member Author

This is resolved in #19651 .

@dongjoon-hyun dongjoon-hyun deleted the SPARK-15474 branch January 7, 2019 07:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants