Skip to content

Commit 8f2c066

Browse files
chenhao-dbcloud-fan
authored andcommitted
[SPARK-50615][FOLLOWUP][SQL] Avoid dropping metadata in the push rule
### What changes were proposed in this pull request? There is a bug in the optimizer rule that the `output` of the relation will be rebuilt based on the schema of the `HadoopFsRelation`. This schema doesn't include file metadata (the `_metadata` column). This PR fixes the bug. The new implementation no longer requires `hadoopFsRelation.schema` and `relation.output` to have the same order, which I don't think is guaranteed. ### Why are the changes needed? It is a necessary bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. It would fail without the fix. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50121 from chenhao-db/fix_variant_pushdown_metadata. Authored-by: Chenhao Li <chenhao.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 14fff2a commit 8f2c066

File tree

2 files changed

+35
-20
lines changed

2 files changed

+35
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -289,13 +289,13 @@ object PushVariantIntoScan extends Rule[LogicalPlan] {
289289
relation: LogicalRelation,
290290
hadoopFsRelation: HadoopFsRelation): LogicalPlan = {
291291
val variants = new VariantInRelation
292-
val defaultValues = ResolveDefaultColumns.existenceDefaultValues(hadoopFsRelation.schema)
293-
// I'm not aware of any case that an attribute `relation.output` can have a different data type
294-
// than the corresponding field in `hadoopFsRelation.schema`. Other code seems to prefer using
295-
// the data type in `hadoopFsRelation.schema`, let's also stick to it.
296-
val schemaWithAttributes = hadoopFsRelation.schema.fields.zip(relation.output)
297-
for (((f, attr), defaultValue) <- schemaWithAttributes.zip(defaultValues)) {
298-
variants.addVariantFields(attr.exprId, f.dataType, defaultValue, Nil)
292+
293+
val schemaAttributes = relation.resolve(hadoopFsRelation.dataSchema,
294+
hadoopFsRelation.sparkSession.sessionState.analyzer.resolver)
295+
val defaultValues = ResolveDefaultColumns.existenceDefaultValues(StructType(
296+
schemaAttributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))))
297+
for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) {
298+
variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil)
299299
}
300300
if (variants.mapping.isEmpty) return originalPlan
301301

@@ -304,24 +304,28 @@ object PushVariantIntoScan extends Rule[LogicalPlan] {
304304
// `collectRequestedFields` may have removed all variant columns.
305305
if (variants.mapping.forall(_._2.isEmpty)) return originalPlan
306306

307-
val (newFields, newOutput) = schemaWithAttributes.map {
308-
case (f, attr) =>
309-
if (variants.mapping.get(attr.exprId).exists(_.nonEmpty)) {
310-
val newType = variants.rewriteType(attr.exprId, f.dataType, Nil)
311-
val newAttr = AttributeReference(f.name, newType, f.nullable, f.metadata)()
312-
(f.copy(dataType = newType), newAttr)
313-
} else {
314-
(f, attr)
315-
}
316-
}.unzip
307+
val attributeMap = schemaAttributes.map { a =>
308+
if (variants.mapping.get(a.exprId).exists(_.nonEmpty)) {
309+
val newType = variants.rewriteType(a.exprId, a.dataType, Nil)
310+
val newAttr = AttributeReference(a.name, newType, a.nullable, a.metadata)(
311+
qualifier = a.qualifier)
312+
(a.exprId, newAttr)
313+
} else {
314+
// `relation.resolve` actually returns `Seq[AttributeReference]`, although the return type
315+
// is `Seq[Attribute]`.
316+
(a.exprId, a.asInstanceOf[AttributeReference])
317+
}
318+
}.toMap
319+
val newFields = schemaAttributes.map { a =>
320+
val dataType = attributeMap(a.exprId).dataType
321+
StructField(a.name, dataType, a.nullable, a.metadata)
322+
}
323+
val newOutput = relation.output.map(a => attributeMap.getOrElse(a.exprId, a))
317324

318325
val newHadoopFsRelation = hadoopFsRelation.copy(dataSchema = StructType(newFields))(
319326
hadoopFsRelation.sparkSession)
320327
val newRelation = relation.copy(relation = newHadoopFsRelation, output = newOutput.toIndexedSeq)
321328

322-
val attributeMap = relation.output.zip(newOutput).map {
323-
case (oldAttr, newAttr) => oldAttr.exprId -> newAttr
324-
}.toMap
325329
val withFilter = if (filters.nonEmpty) {
326330
Filter(filters.map(variants.rewriteExpr(_, attributeMap)).reduce(And), newRelation)
327331
} else {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScanSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,17 @@ class PushVariantIntoScanSuite extends SharedSparkSession {
7777
case _ => fail()
7878
}
7979

80+
// Validate _metadata works.
81+
sql("select variant_get(v, '$.a', 'int') as a, _metadata from T")
82+
.queryExecution.optimizedPlan match {
83+
case Project(projectList, l: LogicalRelation) =>
84+
val output = l.output
85+
val v = output(0)
86+
checkAlias(projectList(0), "a", GetStructField(v, 0))
87+
assert(projectList(1).dataType.isInstanceOf[StructType])
88+
case _ => fail()
89+
}
90+
8091
sql("select 1 from T where isnotnull(v)")
8192
.queryExecution.optimizedPlan match {
8293
case Project(projectList, Filter(condition, l: LogicalRelation)) =>

0 commit comments

Comments
 (0)