Skip to content

Commit bd00f10

Browse files
dima-asanasrowen
authored andcommitted
[MINOR][SQL][DOC] Correct parquet nullability documentation
## What changes were proposed in this pull request? Parquet files appear to have nullability info when being written, not being read. ## How was this patch tested? Some test code: (running spark 2.3, but the relevant code in DataSource looks identical on master) case class NullTest(bo: Boolean, opbol: Option[Boolean]) val testDf = spark.createDataFrame(Seq(NullTest(true, Some(false)))) defined class NullTest testDf: org.apache.spark.sql.DataFrame = [bo: boolean, opbol: boolean] testDf.write.parquet("s3://asana-stats/tmp_dima/parquet_check_schema") spark.read.parquet("s3://asana-stats/tmp_dima/parquet_check_schema/part-00000-b1bf4a19-d9fe-4ece-a2b4-9bbceb490857-c000.snappy.parquet4").printSchema() root |-- bo: boolean (nullable = true) |-- opbol: boolean (nullable = true) Meanwhile, the parquet file formed does have nullable info: []batchprod-report000:/tmp/dimakamalov-batch$ aws s3 ls s3://asana-stats/tmp_dima/parquet_check_schema/ 2018-10-17 21:03:52 0 _SUCCESS 2018-10-17 21:03:50 504 part-00000-b1bf4a19-d9fe-4ece-a2b4-9bbceb490857-c000.snappy.parquet []batchprod-report000:/tmp/dimakamalov-batch$ aws s3 cp s3://asana-stats/tmp_dima/parquet_check_schema/part-00000-b1bf4a19-d9fe-4ece-a2b4-9bbceb490857-c000.snappy.parquet . download: s3://asana-stats/tmp_dima/parquet_check_schema/part-00000-b1bf4a19-d9fe-4ece-a2b4-9bbceb490857-c000.snappy.parquet to ./part-00000-b1bf4a19-d9fe-4ece-a2b4-9bbceb490857-c000.snappy.parquet []batchprod-report000:/tmp/dimakamalov-batch$ java -jar parquet-tools-1.8.2.jar schema part-00000-b1bf4a19-d9fe-4ece-a2b4-9bbceb490857-c000.snappy.parquet message spark_schema { required boolean bo; optional boolean opbol; } Closes #22759 from dima-asana/dima-asana-nullable-parquet-doc. Authored-by: dima-asana <42555784+dima-asana@users.noreply.github.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
1 parent 9b7679a commit bd00f10

File tree

2 files changed

+42
-4
lines changed

2 files changed

+42
-4
lines changed

docs/sql-data-sources-parquet.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ displayTitle: Parquet Files
99

1010
[Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
1111
Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema
12-
of the original data. When writing Parquet files, all columns are automatically converted to be nullable for
12+
of the original data. When reading Parquet files, all columns are automatically converted to be nullable for
1313
compatibility reasons.
1414

1515
### Loading Data Programmatically

sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ import java.util.concurrent.ConcurrentLinkedQueue
2323

2424
import scala.collection.JavaConverters._
2525

26+
import org.apache.hadoop.conf.Configuration
27+
import org.apache.hadoop.fs.Path
28+
import org.apache.parquet.hadoop.ParquetFileReader
29+
import org.apache.parquet.hadoop.util.HadoopInputFile
30+
import org.apache.parquet.schema.PrimitiveType
31+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
32+
import org.apache.parquet.schema.Type.Repetition
2633
import org.scalatest.BeforeAndAfter
2734

2835
import org.apache.spark.SparkContext
@@ -31,6 +38,7 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
3138
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
3239
import org.apache.spark.sql._
3340
import org.apache.spark.sql.catalyst.TableIdentifier
41+
import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
3442
import org.apache.spark.sql.internal.SQLConf
3543
import org.apache.spark.sql.sources._
3644
import org.apache.spark.sql.types._
@@ -522,11 +530,12 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
522530
Seq("json", "orc", "parquet", "csv").foreach { format =>
523531
val schema = StructType(
524532
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
525-
StructField("cl2", IntegerType, nullable = true) ::
526-
StructField("cl3", IntegerType, nullable = true) :: Nil)
533+
StructField("cl2", IntegerType, nullable = true) ::
534+
StructField("cl3", IntegerType, nullable = true) :: Nil)
527535
val row = Row(3, null, 4)
528536
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
529537

538+
// if we write and then read, the read will enforce schema to be nullable
530539
val tableName = "tab"
531540
withTable(tableName) {
532541
df.write.format(format).mode("overwrite").saveAsTable(tableName)
@@ -536,12 +545,41 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
536545
Row("cl1", "test") :: Nil)
537546
// Verify the schema
538547
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
539-
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
548+
assert(spark.table(tableName).schema === schema.copy(fields = expectedFields))
540549
}
541550
}
542551
}
543552
}
544553

554+
test("parquet - column nullability -- write only") {
555+
val schema = StructType(
556+
StructField("cl1", IntegerType, nullable = false) ::
557+
StructField("cl2", IntegerType, nullable = true) :: Nil)
558+
val row = Row(3, 4)
559+
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
560+
561+
withTempPath { dir =>
562+
val path = dir.getAbsolutePath
563+
df.write.mode("overwrite").parquet(path)
564+
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
565+
566+
val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), new Configuration())
567+
val f = ParquetFileReader.open(hadoopInputFile)
568+
val parquetSchema = f.getFileMetaData.getSchema.getColumns.asScala
569+
.map(_.getPrimitiveType)
570+
f.close()
571+
572+
// the write keeps nullable info from the schema
573+
val expectedParquetSchema = Seq(
574+
new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.INT32, "cl1"),
575+
new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "cl2")
576+
)
577+
578+
assert (expectedParquetSchema === parquetSchema)
579+
}
580+
581+
}
582+
545583
test("SPARK-17230: write out results of decimal calculation") {
546584
val df = spark.range(99, 101)
547585
.selectExpr("id", "cast(id as long) * cast('1.0' as decimal(38, 18)) as num")

0 commit comments

Comments
 (0)