diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 21503fda53e01..9b6f99328663b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -83,15 +83,14 @@ trait FileScan extends Scan protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") private lazy val (normalizedPartitionFilters, normalizedDataFilters) = { - val output = readSchema().toAttributes val partitionFilterAttributes = AttributeSet(partitionFilters).map(a => a.name -> a).toMap - val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap val normalizedPartitionFilters = ExpressionSet(partitionFilters.map( - QueryPlan.normalizeExpressions(_, - output.map(a => partitionFilterAttributes.getOrElse(a.name, a))))) + QueryPlan.normalizeExpressions(_, fileIndex.partitionSchema.toAttributes + .map(a => partitionFilterAttributes.getOrElse(a.name, a))))) + val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap val normalizedDataFilters = ExpressionSet(dataFilters.map( - QueryPlan.normalizeExpressions(_, - output.map(a => dataFiltersAttributes.getOrElse(a.name, a))))) + QueryPlan.normalizeExpressions(_, dataSchema.toAttributes + .map(a => dataFiltersAttributes.getOrElse(a.name, a))))) (normalizedPartitionFilters, normalizedDataFilters) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 0fa2c7195dbb9..0854c6ba45747 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4028,6 +4028,36 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } + test("SPARK-40245: Fix FileScan canonicalization when partition or data filter columns are not " + + "read") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + withTempPath { path => + spark.range(5) + .withColumn("p", $"id" % 2) + .write + .mode("overwrite") + .partitionBy("p") + .parquet(path.toString) + withTempView("t") { + spark.read.parquet(path.toString).createOrReplaceTempView("t") + val df = sql( + """ + |SELECT t1.id, t2.id, t3.id + |FROM t AS t1 + |JOIN t AS t2 ON t2.id = t1.id + |JOIN t AS t3 ON t3.id = t2.id + |WHERE t1.p = 1 AND t2.p = 1 AND t3.p = 1 + |""".stripMargin) + df.collect() + val reusedExchanges = collect(df.queryExecution.executedPlan) { + case r: ReusedExchangeExec => r + } + assert(reusedExchanges.size == 1) + } + } + } + } + test("SPARK-35331: Fix resolving original expression in RepartitionByExpression after aliased") { Seq("CLUSTER", "DISTRIBUTE").foreach { keyword => Seq("a", "substr(a, 0, 3)").foreach { expr =>