Skip to content

[SPARK-35855][SQL] Unify reuse map data structures in non-AQE and AQE rules #33021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.execution.reuse

import scala.collection.mutable

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
import org.apache.spark.sql.util.ReuseMap

/**
* Find out duplicated exchanges and subqueries in the whole spark plan including subqueries, then
Expand All @@ -36,24 +37,34 @@ case object ReuseExchangeAndSubquery extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {
val exchanges = new ReuseMap[Exchange, SparkPlan]()
val subqueries = new ReuseMap[BaseSubqueryExec, SparkPlan]()
val exchanges = mutable.Map.empty[SparkPlan, Exchange]
val subqueries = mutable.Map.empty[SparkPlan, BaseSubqueryExec]

def reuse(plan: SparkPlan): SparkPlan = {
plan.transformUpWithPruning(_.containsAnyPattern(EXCHANGE, PLAN_EXPRESSION)) {
case exchange: Exchange if conf.exchangeReuseEnabled =>
exchanges.reuseOrElseAdd(exchange, ReusedExchangeExec(exchange.output, _))
val cachedExchange = exchanges.getOrElseUpdate(exchange.canonicalized, exchange)
if (cachedExchange.ne(exchange)) {
ReusedExchangeExec(exchange.output, cachedExchange)
} else {
cachedExchange
}

case other =>
other.transformExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case sub: ExecSubqueryExpression =>
val subquery = reuse(sub.plan).asInstanceOf[BaseSubqueryExec]
sub.withNewPlan(
if (conf.subqueryReuseEnabled) {
subqueries.reuseOrElseAdd(subquery, ReusedSubqueryExec(_))
val newSubquery = if (conf.subqueryReuseEnabled) {
val cachedSubquery = subqueries.getOrElseUpdate(subquery.canonicalized, subquery)
if (cachedSubquery.ne(subquery)) {
ReusedSubqueryExec(cachedSubquery)
} else {
subquery
})
cachedSubquery
}
} else {
subquery
}
sub.withNewPlan(newSubquery)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,9 @@ abstract class BroadcastJoinSuiteBase extends QueryTest with SQLTestUtils
test("broadcast join where streamed side's output partitioning is PartitioningCollection") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") {
val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
Copy link
Contributor Author

@peter-toth peter-toth Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These slight changes are needed to avoid ReuseExchange nodes in the test plans and keep the original test logic as this PR changes the ReuseExchangeAndSubquery rule to identify reuses based on canonicalized plans only (the schema of the exchanges are no longer need to match).
Without these changes the canonicalized plans of LocalTableScanExec in t1, t2 and t4 are the same as they have the same content.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth are you saying that the new logic may reuse an exchange even if the schema doesn't match? The canonicalized form of the plan doesn't consider schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema contains the name of the output attributes as well, but in canonicalized plans we always replace names to "none" and only the structure of plan matters.
Please also note that ReusedExchangeExec captures the output of the original exchange so that can we restore the schema in a ReusedExchangeExec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so the new logic is more effective (can trigger reuse in more cases), and we need to update the test here to not trigger reuse.

Copy link
Contributor Author

@peter-toth peter-toth Jun 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is, but I don't think this comes up frequently in real world applications.
Changing t2 and t4 seemed easier than preparing the test for reused exchanged.
I also looked into why the issue didn't come up with the AQE version of the suite (BroadcastJoinSuiteAE) before this PR and that's because the test doesn't run the queries before inspecting the plans and the initial AQE plans don't contain reuses (but final plans do).

val t2 = (0 until 100).map(i => (i % 5, i % 14)).toDF("i2", "j2")
val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")
val t4 = (0 until 100).map(i => (i % 5, i % 15)).toDF("i4", "j4")

// join1 is a sort merge join (shuffle on the both sides).
val join1 = t1.join(t2, t1("i1") === t2("i2"))
Expand Down