-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-20897][SQL] cached self-join should not fail #18121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -58,6 +59,10 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan | |||
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { | |||
child.executeBroadcast() | |||
} | |||
|
|||
override def outputPartitioning: Partitioning = child.outputPartitioning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReusedExchangeExec
can have distinct sets of output attribute ids. Shall we also update outputPartitioning
and outputOrdering
, if its output is different to child.output
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
LGTM except one comment. |
Test build #77426 has finished for PR 18121 at commit
|
Test build #77445 has finished for PR 18121 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending Jenkins
Test build #77454 has finished for PR 18121 at commit
|
Thanks! Merging to master/2.2. |
## What changes were proposed in this pull request? The failed test case is, we have a `SortMergeJoinExec` for a self-join, which means we have a `ReusedExchange` node in the query plan. It works fine without caching, but throws an exception in `SortMergeJoinExec.outputPartitioning` if we cache it. The root cause is, `ReusedExchange` doesn't propagate the output partitioning from its child, so in `SortMergeJoinExec.outputPartitioning` we create `PartitioningCollection` with a hash partitioning and an unknown partitioning, and fail. This bug is mostly fine, because inserting the `ReusedExchange` is the last step to prepare the physical plan, we won't call `SortMergeJoinExec.outputPartitioning` anymore after this. However, if the dataframe is cached, the physical plan of it becomes `InMemoryTableScanExec`, which contains another physical plan representing the cached query, and it has gone through the entire planning phase and may have `ReusedExchange`. Then the planner call `InMemoryTableScanExec.outputPartitioning`, which then calls `SortMergeJoinExec.outputPartitioning` and trigger this bug. ## How was this patch tested? a new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #18121 from cloud-fan/bug. (cherry picked from commit 08ede46) Signed-off-by: Xiao Li <gatorsmile@gmail.com>
case h: HashPartitioning => h.copy(expressions = h.expressions.map(updateAttr)) | ||
case other => other | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @viirya Could you help explain why we only consider HashPartitioning
here?
How about RangePartitioning
?
What changes were proposed in this pull request?
The failed test case is, we have a
SortMergeJoinExec
for a self-join, which means we have aReusedExchange
node in the query plan. It works fine without caching, but throws an exception inSortMergeJoinExec.outputPartitioning
if we cache it.The root cause is,
ReusedExchange
doesn't propagate the output partitioning from its child, so inSortMergeJoinExec.outputPartitioning
we createPartitioningCollection
with a hash partitioning and an unknown partitioning, and fail.This bug is mostly fine, because inserting the
ReusedExchange
is the last step to prepare the physical plan, we won't callSortMergeJoinExec.outputPartitioning
anymore after this.However, if the dataframe is cached, the physical plan of it becomes
InMemoryTableScanExec
, which contains another physical plan representing the cached query, and it has gone through the entire planning phase and may haveReusedExchange
. Then the planner callInMemoryTableScanExec.outputPartitioning
, which then callsSortMergeJoinExec.outputPartitioning
and trigger this bug.How was this patch tested?
a new regression test