-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25407][SQL] Ensure we pass a compatible pruned schema to ParquetRowConverter #22880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e5e60ad
6b19f57
598d965
4dfd459
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,34 +49,82 @@ import org.apache.spark.sql.types._ | |
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] | ||
* to [[prepareForRead()]], but use a private `var` for simplicity. | ||
*/ | ||
private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) | ||
private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], | ||
usingVectorizedReader: Boolean) | ||
extends ReadSupport[UnsafeRow] with Logging { | ||
private var catalystRequestedSchema: StructType = _ | ||
|
||
def this() { | ||
// We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only | ||
// used in the vectorized reader, where we get the convertTz value directly, and the value here | ||
// is ignored. | ||
this(None) | ||
this(None, usingVectorizedReader = true) | ||
} | ||
|
||
/** | ||
* Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record | ||
* readers. Responsible for figuring out Parquet requested schema used for column pruning. | ||
*/ | ||
override def init(context: InitContext): ReadContext = { | ||
val conf = context.getConfiguration | ||
catalystRequestedSchema = { | ||
val conf = context.getConfiguration | ||
val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA) | ||
assert(schemaString != null, "Parquet requested schema not set.") | ||
StructType.fromString(schemaString) | ||
} | ||
|
||
val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key, | ||
val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, | ||
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get) | ||
val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key, | ||
SQLConf.CASE_SENSITIVE.defaultValue.get) | ||
val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema( | ||
context.getFileSchema, catalystRequestedSchema, caseSensitive) | ||
|
||
val parquetFileSchema = context.getFileSchema | ||
val parquetClippedSchema = ParquetReadSupport.clipParquetSchema(parquetFileSchema, | ||
catalystRequestedSchema, caseSensitive) | ||
|
||
// As part of schema clipping, we add fields in catalystRequestedSchema which are missing | ||
// from parquetFileSchema to parquetClippedSchema. However, nested schema pruning requires | ||
// we ignore unrequested field data when reading from a Parquet file. Therefore we pass two | ||
// schema to ParquetRecordMaterializer: the schema of the file data we want to read | ||
// (parquetRequestedSchema), and the schema of the rows we want to return | ||
// (catalystRequestedSchema). The reader is responsible for reconciling the differences between | ||
// the two. | ||
// | ||
// Aside from checking whether schema pruning is enabled (schemaPruningEnabled), there | ||
// is an additional complication to constructing parquetRequestedSchema. The manner in which | ||
// Spark's two Parquet readers reconcile the differences between parquetRequestedSchema and | ||
// catalystRequestedSchema differ. Spark's vectorized reader does not (currently) support | ||
// reading Parquet files with complex types in their schema. Further, it assumes that | ||
// parquetRequestedSchema includes all fields requested in catalystRequestedSchema. It includes | ||
// logic in its read path to skip fields in parquetRequestedSchema which are not present in the | ||
// file. | ||
// | ||
// Spark's parquet-mr based reader supports reading Parquet files of any kind of complex | ||
// schema, and it supports nested schema pruning as well. Unlike the vectorized reader, the | ||
// parquet-mr reader requires that parquetRequestedSchema include only those fields present in | ||
// the underlying parquetFileSchema. Therefore, in the case where we use the parquet-mr reader | ||
// we intersect the parquetClippedSchema with the parquetFileSchema to construct the | ||
// parquetRequestedSchema set in the ReadContext. | ||
val parquetRequestedSchema = | ||
if (schemaPruningEnabled && !usingVectorizedReader) { | ||
ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, parquetFileSchema) | ||
.map(intersectionGroup => | ||
new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) | ||
.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) | ||
} else { | ||
parquetClippedSchema | ||
} | ||
log.debug { | ||
s"""Going to read the following fields from the Parquet file with the following schema: | ||
|Parquet file schema: | ||
|$parquetFileSchema | ||
|Parquet clipped schema: | ||
|$parquetClippedSchema | ||
|Parquet requested schema: | ||
|$parquetRequestedSchema | ||
|Catalyst requested schema: | ||
|${catalystRequestedSchema.treeString} | ||
""".stripMargin | ||
} | ||
new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) | ||
} | ||
|
||
|
@@ -90,16 +138,15 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) | |
keyValueMetaData: JMap[String, String], | ||
fileSchema: MessageType, | ||
readContext: ReadContext): RecordMaterializer[UnsafeRow] = { | ||
log.debug(s"Preparing for read Parquet file with message type: $fileSchema") | ||
val parquetRequestedSchema = readContext.getRequestedSchema | ||
|
||
logInfo { | ||
s"""Going to read the following fields from the Parquet file: | ||
| | ||
|Parquet form: | ||
log.debug { | ||
s"""Going to read the following fields from the Parquet file with the following schema: | ||
|Parquet file schema: | ||
|$fileSchema | ||
|Parquet read schema: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might increase a lot of log data. Do we need to output There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This detailed, formatted information was very helpful in developing and debugging this patch. Perhaps this should be logged at the debug level instead? Even the original message does seem rather technical for info-level logging. What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is useful for debugging this patch, but may not useful for end users and will increase log size. Make it as debug level sounds good to me. But let's wait for others opinions too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, we should maybe change this into debugging level for them. I would additionally log them somewhere as debugging level. |
||
|$parquetRequestedSchema | ||
|Catalyst form: | ||
|$catalystRequestedSchema | ||
|Catalyst read schema: | ||
|${catalystRequestedSchema.treeString} | ||
""".stripMargin | ||
} | ||
|
||
|
@@ -322,6 +369,27 @@ private[parquet] object ParquetReadSupport { | |
} | ||
} | ||
|
||
/** | ||
* Computes the structural intersection between two Parquet group types. | ||
*/ | ||
private def intersectParquetGroups( | ||
groupType1: GroupType, groupType2: GroupType): Option[GroupType] = { | ||
val fields = | ||
groupType1.getFields.asScala | ||
.filter(field => groupType2.containsField(field.getName)) | ||
.flatMap { | ||
case field1: GroupType => | ||
intersectParquetGroups(field1, groupType2.getType(field1.getName).asGroupType) | ||
case field1 => Some(field1) | ||
} | ||
|
||
if (fields.nonEmpty) { | ||
Some(groupType1.withNewFields(fields.asJava)) | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
def expandUDT(schema: StructType): StructType = { | ||
def expand(dataType: DataType): DataType = { | ||
dataType match { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -130,8 +130,8 @@ private[parquet] class ParquetRowConverter( | |
extends ParquetGroupConverter(updater) with Logging { | ||
|
||
assert( | ||
parquetType.getFieldCount == catalystType.length, | ||
s"""Field counts of the Parquet schema and the Catalyst schema don't match: | ||
parquetType.getFieldCount <= catalystType.length, | ||
s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we assert this only when this pruning is enabled? - we could fix the condition like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you ask? Is it for safety, clarity? My concern is around reducing complexity, but I'm not strictly against this. |
||
| | ||
|Parquet schema: | ||
|$parquetType | ||
|
@@ -182,18 +182,19 @@ private[parquet] class ParquetRowConverter( | |
|
||
// Converters for each field. | ||
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { | ||
parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { | ||
case ((parquetFieldType, catalystField), ordinal) => | ||
// Converted field value should be set to the `ordinal`-th cell of `currentRow` | ||
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) | ||
parquetType.getFields.asScala.map { parquetField => | ||
val fieldIndex = catalystType.fieldIndex(parquetField.getName) | ||
val catalystField = catalystType(fieldIndex) | ||
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow` | ||
newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex)) | ||
}.toArray | ||
} | ||
|
||
override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) | ||
|
||
override def end(): Unit = { | ||
var i = 0 | ||
while (i < currentRow.numFields) { | ||
while (i < fieldConverters.length) { | ||
fieldConverters(i).updater.end() | ||
i += 1 | ||
} | ||
|
@@ -202,8 +203,12 @@ private[parquet] class ParquetRowConverter( | |
|
||
override def start(): Unit = { | ||
var i = 0 | ||
while (i < currentRow.numFields) { | ||
while (i < fieldConverters.length) { | ||
fieldConverters(i).updater.start() | ||
i += 1 | ||
} | ||
i = 0 | ||
while (i < currentRow.numFields) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we loop once with if? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but I think it's clearer this way. If @viirya has an opinion either way I'll take it as a "tie-breaker". |
||
currentRow.setNullAt(i) | ||
i += 1 | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For vectorized reader, even we do this additional
intersectParquetGroups
, will it cause any problem?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The relevant passage being
If we break this assumption by giving the vectorized reader a Parquet requested schema which does not include all of the fields in the Catalyst requested schema, then it will fail with an exception. This scenario is covered by the tests. (Comment out the relevant code below and run the tests to see.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I see.