Skip to content

[SPARK-15474][SQL] Write and read back non-emtpy schema with empty dataframe #19571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@

package org.apache.spark.sql.execution.datasources.orc

import org.apache.orc.TypeDescription
import java.io._

import org.apache.spark.sql.AnalysisException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.orc.{OrcFile, TypeDescription}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.types.StructType

private[sql] object OrcFileFormat {
private[sql] object OrcFileFormat extends Logging {
private def checkFieldName(name: String): Unit = {
try {
TypeDescription.fromString(s"struct<$name:int>")
Expand All @@ -39,4 +45,32 @@ private[sql] object OrcFileFormat {
schema.fieldNames.foreach(checkFieldName)
schema
}

def getSchemaString(schema: StructType): String = {
schema.fields.map(f => s"${f.name}:${f.dataType.catalogString}").mkString("struct<", ",", ">")
}

private def readSchema(file: Path, conf: Configuration, fs: FileSystem) = {
try {
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
val reader = OrcFile.createReader(file, readerOptions)
val schema = reader.getSchema
if (schema.getFieldNames.size == 0) {
None
} else {
Some(schema)
}
} catch {
case _: IOException => None
}
}

def readSchema(sparkSession: SparkSession, files: Seq[FileStatus]): Option[StructType] = {
val conf = sparkSession.sparkContext.hadoopConfiguration
val fs = FileSystem.get(conf)
files.map(_.getPath).flatMap(readSchema(_, conf, fs)).headOption.map { schema =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.orc.OrcConf.COMPRESS
import org.apache.orc.OrcConf.{COMPRESS, MAPRED_OUTPUT_SCHEMA}

import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
Expand All @@ -58,10 +58,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
OrcFileOperator.readSchema(
files.map(_.getPath.toString),
Some(sparkSession.sessionState.newHadoopConf())
)
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.readSchema(sparkSession, files)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure of this one too. This looks a complete rewrite of org.apache.spark.sql.hive.orc.OrcFileOperator.readSchema.. Is this change required to fix this issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's intentional. OrcFileOperator will be replaced later completely. I made this PR as small as possible for review.

}

override def prepareWrite(
Expand All @@ -73,6 +70,10 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable

val configuration = job.getConfiguration

configuration.set(
MAPRED_OUTPUT_SCHEMA.getAttribute,
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.getSchemaString(dataSchema))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always need to set this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is the correct schema to be written.


configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
configuration match {
case conf: JobConf =>
Expand Down Expand Up @@ -252,6 +253,12 @@ private[orc] class OrcOutputWriter(
override def close(): Unit = {
if (recordWriterInstantiated) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, according to the existing comment, seems that we can simply remove recordWriterInstantiated to allow empty file created.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, I'm focusing on emtpy file. We will replace the whole writer and reader with ORC 1.4.1 eventually. The newly added test case in this PR will make us to transit safely.

recordWriter.close(Reporter.NULL)
} else {
// SPARK-15474 Write empty orc file with correct schema
val conf = context.getConfiguration()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the behavior to skip creating an empty file if no rows are written is deliberate. Is there any impact to current behavior?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, only ORC does. So, it creates more issues like SPARK-22258 (#19477) and SPARK-21762. This is consistent with the other data sources like Parquet.

val writer = org.apache.orc.OrcFile.createWriter(
new Path(path), org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf))
new org.apache.orc.mapreduce.OrcMapreduceRecordWriter(writer).close(context)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2153,4 +2153,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}

Seq("orc", "parquet").foreach { format =>
test(s"SPARK-15474 Write and read back non-emtpy schema with empty dataframe - $format") {
withTempPath { file =>
val path = file.getCanonicalPath
val emptyDf = Seq((true, 1, "str")).toDF.limit(0)
emptyDf.write.format(format).save(path)

val df = spark.read.format(format).load(path)
assert(df.schema.sameType(emptyDf.schema))
checkAnswer(df, emptyDf)
}
}
}
}