Skip to content

Commit 507b771

Browse files
andygroveviirya
authored andcommitted
[SPARK-35093][SQL] AQE now uses newQueryStage plan as key for looking up cached exchanges for re-use
### What changes were proposed in this pull request? AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances. This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange. ### Why are the changes needed? When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them. Closes apache#32195 from andygrove/SPARK-35093. Authored-by: Andy Grove <andygrove73@gmail.com> Signed-off-by: Thomas Graves <tgraves@apache.org> (cherry picked from commit 52e3cf9) Signed-off-by: Thomas Graves <tgraves@apache.org>
1 parent b5856dd commit 507b771

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,8 @@ case class AdaptiveSparkPlanExec(
418418
// Check the `stageCache` again for reuse. If a match is found, ditch the new stage
419419
// and reuse the existing stage found in the `stageCache`, otherwise update the
420420
// `stageCache` with the new stage.
421-
val queryStage = context.stageCache.getOrElseUpdate(e.canonicalized, newStage)
421+
val queryStage = context.stageCache.getOrElseUpdate(
422+
newStage.plan.canonicalized, newStage)
422423
if (queryStage.ne(newStage)) {
423424
newStage = reuseQueryStage(queryStage, e)
424425
}

0 commit comments

Comments
 (0)