Skip to content

Commit 93a5d34

Browse files
peter-tothcloud-fan
authored andcommitted
[SPARK-33482][SPARK-34756][SQL] Fix FileScan equality check
### What changes were proposed in this pull request? This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. ### Why are the changes needed? - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. ### Does this PR introduce _any_ user-facing change? Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. ### How was this patch tested? Added new UTs. Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent e00afd3 commit 93a5d34

File tree

4 files changed

+446
-4
lines changed

4 files changed

+446
-4
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.avro
19+
20+
import org.apache.spark.sql.FileScanSuiteBase
21+
import org.apache.spark.sql.v2.avro.AvroScan
22+
23+
class AvroScanSuite extends FileScanSuiteBase {
24+
val scanBuilders = Seq[(String, ScanBuilder, Seq[String])](
25+
("AvroScan",
26+
(s, fi, ds, rds, rps, f, o, pf, df) => AvroScan(s, fi, ds, rds, rps, o, f, pf, df),
27+
Seq.empty))
28+
29+
run(scanBuilders)
30+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ import org.apache.hadoop.fs.Path
2424
import org.apache.spark.internal.Logging
2525
import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD
2626
import org.apache.spark.sql.{AnalysisException, SparkSession}
27-
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet}
27+
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ExpressionSet}
2828
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
29+
import org.apache.spark.sql.catalyst.plans.QueryPlan
2930
import org.apache.spark.sql.connector.read.{Batch, InputPartition, Scan, Statistics, SupportsReportStatistics}
3031
import org.apache.spark.sql.execution.PartitionedFileUtil
3132
import org.apache.spark.sql.execution.datasources._
@@ -84,11 +85,24 @@ trait FileScan extends Scan
8485

8586
protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")
8687

88+
private lazy val (normalizedPartitionFilters, normalizedDataFilters) = {
89+
val output = readSchema().toAttributes
90+
val partitionFilterAttributes = AttributeSet(partitionFilters).map(a => a.name -> a).toMap
91+
val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap
92+
val normalizedPartitionFilters = ExpressionSet(partitionFilters.map(
93+
QueryPlan.normalizeExpressions(_,
94+
output.map(a => partitionFilterAttributes.getOrElse(a.name, a)))))
95+
val normalizedDataFilters = ExpressionSet(dataFilters.map(
96+
QueryPlan.normalizeExpressions(_,
97+
output.map(a => dataFiltersAttributes.getOrElse(a.name, a)))))
98+
(normalizedPartitionFilters, normalizedDataFilters)
99+
}
100+
87101
override def equals(obj: Any): Boolean = obj match {
88102
case f: FileScan =>
89-
fileIndex == f.fileIndex && readSchema == f.readSchema
90-
ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) &&
91-
ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters)
103+
fileIndex == f.fileIndex && readSchema == f.readSchema &&
104+
normalizedPartitionFilters == f.normalizedPartitionFilters &&
105+
normalizedDataFilters == f.normalizedDataFilters
92106

93107
case _ => false
94108
}

0 commit comments

Comments
 (0)