Skip to content

Commit 215609d

Browse files
mallmandongjoon-hyun
authored andcommitted
[SPARK-25407][SQL] Allow nested access for non-existent field for Parquet file when nested pruning is enabled
## What changes were proposed in this pull request? As part of schema clipping in `ParquetReadSupport.scala`, we add fields in the Catalyst requested schema which are missing from the Parquet file schema to the Parquet clipped schema. However, nested schema pruning requires we ignore unrequested field data when reading from a Parquet file. Therefore we pass two schema to `ParquetRecordMaterializer`: the schema of the file data we want to read and the schema of the rows we want to return. The reader is responsible for reconciling the differences between the two. Aside from checking whether schema pruning is enabled, there is an additional complication to constructing the Parquet requested schema. The manner in which Spark's two Parquet readers reconcile the differences between the Parquet requested schema and the Catalyst requested schema differ. Spark's vectorized reader does not (currently) support reading Parquet files with complex types in their schema. Further, it assumes that the Parquet requested schema includes all fields requested in the Catalyst requested schema. It includes logic in its read path to skip fields in the Parquet requested schema which are not present in the file. Spark's parquet-mr based reader supports reading Parquet files of any kind of complex schema, and it supports nested schema pruning as well. Unlike the vectorized reader, the parquet-mr reader requires that the Parquet requested schema include only those fields present in the underlying Parquet file's schema. Therefore, in the case where we use the parquet-mr reader we intersect the Parquet clipped schema with the Parquet file's schema to construct the Parquet requested schema that's set in the `ReadContext`. _Additional description (by HyukjinKwon):_ Let's suppose that we have a Parquet schema as below: ``` message spark_schema { required int32 id; optional group name { optional binary first (UTF8); optional binary last (UTF8); } optional binary address (UTF8); } ``` Currently, the clipped schema as follows: ``` message spark_schema { optional group name { optional binary middle (UTF8); } optional binary address (UTF8); } ``` Parquet MR does not support access to the nested non-existent field (`name.middle`). To workaround this, this PR removes `name.middle` request at all to Parquet reader as below: ``` Parquet requested schema: message spark_schema { optional binary address (UTF8); } ``` and produces the record (`name.middle`) properly as the requested Catalyst schema. ``` root -- name: struct (nullable = true) |-- middle: string (nullable = true) -- address: string (nullable = true) ``` I think technically this is what Parquet library should support since Parquet library made a design decision to produce `null` for non-existent fields IIRC. This PR targets to work around it. ## How was this patch tested? A previously ignored test case which exercises the failure scenario this PR addresses has been enabled. This closes #22880 Closes #24307 from dongjoon-hyun/SPARK-25407. Lead-authored-by: Michael Allman <msa@allman.ms> Co-authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
1 parent 02e9f93 commit 215609d

File tree

4 files changed

+88
-31
lines changed

4 files changed

+88
-31
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,9 @@ class ParquetFileFormat
310310
hadoopConf.set(
311311
SQLConf.SESSION_LOCAL_TIMEZONE.key,
312312
sparkSession.sessionState.conf.sessionLocalTimeZone)
313+
hadoopConf.setBoolean(
314+
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
315+
sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
313316
hadoopConf.setBoolean(
314317
SQLConf.CASE_SENSITIVE.key,
315318
sparkSession.sessionState.conf.caseSensitiveAnalysis)
@@ -424,11 +427,12 @@ class ParquetFileFormat
424427
} else {
425428
logDebug(s"Falling back to parquet-mr")
426429
// ParquetRecordReader returns UnsafeRow
430+
val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false)
427431
val reader = if (pushed.isDefined && enableRecordFilter) {
428432
val parquetFilter = FilterCompat.get(pushed.get, null)
429-
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
433+
new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
430434
} else {
431-
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
435+
new ParquetRecordReader[UnsafeRow](readSupport)
432436
}
433437
val iter = new RecordReaderIterator(reader)
434438
// SPARK-23457 Register a task completion lister before `initialization`.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,34 +49,65 @@ import org.apache.spark.sql.types._
4949
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
5050
* to [[prepareForRead()]], but use a private `var` for simplicity.
5151
*/
52-
private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
53-
extends ReadSupport[UnsafeRow] with Logging {
52+
private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone],
53+
enableVectorizedReader: Boolean)
54+
extends ReadSupport[UnsafeRow] with Logging {
5455
private var catalystRequestedSchema: StructType = _
5556

5657
def this() {
5758
// We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only
5859
// used in the vectorized reader, where we get the convertTz value directly, and the value here
5960
// is ignored.
60-
this(None)
61+
this(None, enableVectorizedReader = true)
6162
}
6263

6364
/**
6465
* Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record
6566
* readers. Responsible for figuring out Parquet requested schema used for column pruning.
6667
*/
6768
override def init(context: InitContext): ReadContext = {
69+
val conf = context.getConfiguration
6870
catalystRequestedSchema = {
69-
val conf = context.getConfiguration
7071
val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
7172
assert(schemaString != null, "Parquet requested schema not set.")
7273
StructType.fromString(schemaString)
7374
}
7475

75-
val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
76+
val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
7677
SQLConf.CASE_SENSITIVE.defaultValue.get)
77-
val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
78-
context.getFileSchema, catalystRequestedSchema, caseSensitive)
79-
78+
val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
79+
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
80+
val parquetFileSchema = context.getFileSchema
81+
val parquetClippedSchema = ParquetReadSupport.clipParquetSchema(parquetFileSchema,
82+
catalystRequestedSchema, caseSensitive)
83+
84+
// We pass two schema to ParquetRecordMaterializer:
85+
// - parquetRequestedSchema: the schema of the file data we want to read
86+
// - catalystRequestedSchema: the schema of the rows we want to return
87+
// The reader is responsible for reconciling the differences between the two.
88+
val parquetRequestedSchema = if (schemaPruningEnabled && !enableVectorizedReader) {
89+
// Parquet-MR reader requires that parquetRequestedSchema include only those fields present
90+
// in the underlying parquetFileSchema. Therefore, we intersect the parquetClippedSchema
91+
// with the parquetFileSchema
92+
ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, parquetFileSchema)
93+
.map(groupType => new MessageType(groupType.getName, groupType.getFields))
94+
.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
95+
} else {
96+
// Spark's vectorized reader only support atomic types currently. It also skip fields
97+
// in parquetRequestedSchema which are not present in the file.
98+
parquetClippedSchema
99+
}
100+
logDebug(
101+
s"""Going to read the following fields from the Parquet file with the following schema:
102+
|Parquet file schema:
103+
|$parquetFileSchema
104+
|Parquet clipped schema:
105+
|$parquetClippedSchema
106+
|Parquet requested schema:
107+
|$parquetRequestedSchema
108+
|Catalyst requested schema:
109+
|${catalystRequestedSchema.treeString}
110+
""".stripMargin)
80111
new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
81112
}
82113

@@ -90,19 +121,7 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
90121
keyValueMetaData: JMap[String, String],
91122
fileSchema: MessageType,
92123
readContext: ReadContext): RecordMaterializer[UnsafeRow] = {
93-
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
94124
val parquetRequestedSchema = readContext.getRequestedSchema
95-
96-
logInfo {
97-
s"""Going to read the following fields from the Parquet file:
98-
|
99-
|Parquet form:
100-
|$parquetRequestedSchema
101-
|Catalyst form:
102-
|$catalystRequestedSchema
103-
""".stripMargin
104-
}
105-
106125
new ParquetRecordMaterializer(
107126
parquetRequestedSchema,
108127
ParquetReadSupport.expandUDT(catalystRequestedSchema),
@@ -322,6 +341,35 @@ private[parquet] object ParquetReadSupport {
322341
}
323342
}
324343

344+
/**
345+
* Computes the structural intersection between two Parquet group types.
346+
* This is used to create a requestedSchema for ReadContext of Parquet-MR reader.
347+
* Parquet-MR reader does not support the nested field access to non-existent field
348+
* while parquet library does support to read the non-existent field by regular field access.
349+
*/
350+
private def intersectParquetGroups(
351+
groupType1: GroupType, groupType2: GroupType): Option[GroupType] = {
352+
val fields =
353+
groupType1.getFields.asScala
354+
.filter(field => groupType2.containsField(field.getName))
355+
.flatMap {
356+
case field1: GroupType =>
357+
val field2 = groupType2.getType(field1.getName)
358+
if (field2.isPrimitive) {
359+
None
360+
} else {
361+
intersectParquetGroups(field1, field2.asGroupType)
362+
}
363+
case field1 => Some(field1)
364+
}
365+
366+
if (fields.nonEmpty) {
367+
Some(groupType1.withNewFields(fields.asJava))
368+
} else {
369+
None
370+
}
371+
}
372+
325373
def expandUDT(schema: StructType): StructType = {
326374
def expand(dataType: DataType): DataType = {
327375
dataType match {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ private[parquet] class ParquetRowConverter(
130130
extends ParquetGroupConverter(updater) with Logging {
131131

132132
assert(
133-
parquetType.getFieldCount == catalystType.length,
134-
s"""Field counts of the Parquet schema and the Catalyst schema don't match:
133+
parquetType.getFieldCount <= catalystType.length,
134+
s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
135135
|
136136
|Parquet schema:
137137
|$parquetType
@@ -182,18 +182,19 @@ private[parquet] class ParquetRowConverter(
182182

183183
// Converters for each field.
184184
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
185-
parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
186-
case ((parquetFieldType, catalystField), ordinal) =>
187-
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
188-
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
185+
parquetType.getFields.asScala.map { parquetField =>
186+
val fieldIndex = catalystType.fieldIndex(parquetField.getName)
187+
val catalystField = catalystType(fieldIndex)
188+
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
189+
newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))
189190
}.toArray
190191
}
191192

192193
override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)
193194

194195
override def end(): Unit = {
195196
var i = 0
196-
while (i < currentRow.numFields) {
197+
while (i < fieldConverters.length) {
197198
fieldConverters(i).updater.end()
198199
i += 1
199200
}
@@ -203,10 +204,14 @@ private[parquet] class ParquetRowConverter(
203204
override def start(): Unit = {
204205
var i = 0
205206
while (i < currentRow.numFields) {
206-
fieldConverters(i).updater.start()
207207
currentRow.setNullAt(i)
208208
i += 1
209209
}
210+
i = 0
211+
while (i < fieldConverters.length) {
212+
fieldConverters(i).updater.start()
213+
i += 1
214+
}
210215
}
211216

212217
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ abstract class SchemaPruningSuite
135135
Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil)
136136
}
137137

138-
ignore("partial schema intersection - select missing subfield") {
138+
testSchemaPruning("partial schema intersection - select missing subfield") {
139139
val query = sql("select name.middle, address from contacts where p=2")
140140
checkScan(query, "struct<name:struct<middle:string>,address:string>")
141141
checkAnswer(query.orderBy("id"),

0 commit comments

Comments
 (0)