Skip to content

[SPARK-20897][SQL] cached self-join should not fail #18121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

The failed test case is, we have a SortMergeJoinExec for a self-join, which means we have a ReusedExchange node in the query plan. It works fine without caching, but throws an exception in SortMergeJoinExec.outputPartitioning if we cache it.

The root cause is, ReusedExchange doesn't propagate the output partitioning from its child, so in SortMergeJoinExec.outputPartitioning we create PartitioningCollection with a hash partitioning and an unknown partitioning, and fail.

This bug is mostly fine, because inserting the ReusedExchange is the last step to prepare the physical plan, we won't call SortMergeJoinExec.outputPartitioning anymore after this.

However, if the dataframe is cached, the physical plan of it becomes InMemoryTableScanExec, which contains another physical plan representing the cached query, and it has gone through the entire planning phase and may have ReusedExchange. Then the planner call InMemoryTableScanExec.outputPartitioning, which then calls SortMergeJoinExec.outputPartitioning and trigger this bug.

How was this patch tested?

a new regression test

@cloud-fan
Copy link
Contributor Author

cc @gatorsmile @davies

@@ -58,6 +59,10 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
child.executeBroadcast()
}

override def outputPartitioning: Partitioning = child.outputPartitioning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReusedExchangeExec can have distinct sets of output attribute ids. Shall we also update outputPartitioning and outputOrdering, if its output is different to child.output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@viirya
Copy link
Member

viirya commented May 26, 2017

LGTM except one comment.

@SparkQA
Copy link

SparkQA commented May 26, 2017

Test build #77426 has finished for PR 18121 at commit ca4a3d1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 27, 2017

Test build #77445 has finished for PR 18121 at commit e91311c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending Jenkins

@SparkQA
Copy link

SparkQA commented May 27, 2017

Test build #77454 has finished for PR 18121 at commit 460f072.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merging to master/2.2.

asfgit pushed a commit that referenced this pull request May 27, 2017
## What changes were proposed in this pull request?

The failed test case is, we have a `SortMergeJoinExec` for a self-join, which means we have a `ReusedExchange` node in the query plan. It works fine without caching, but throws an exception in `SortMergeJoinExec.outputPartitioning` if we cache it.

The root cause is, `ReusedExchange` doesn't propagate the output partitioning from its child, so in `SortMergeJoinExec.outputPartitioning` we create `PartitioningCollection` with a hash partitioning and an unknown partitioning, and fail.

This bug is mostly fine, because inserting the `ReusedExchange` is the last step to prepare the physical plan, we won't call `SortMergeJoinExec.outputPartitioning` anymore after this.

However, if the dataframe is cached, the physical plan of it becomes `InMemoryTableScanExec`, which contains another physical plan representing the cached query, and it has gone through the entire planning phase and may have `ReusedExchange`. Then the planner call `InMemoryTableScanExec.outputPartitioning`, which then calls `SortMergeJoinExec.outputPartitioning` and trigger this bug.

## How was this patch tested?

a new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18121 from cloud-fan/bug.

(cherry picked from commit 08ede46)
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
@asfgit asfgit closed this in 08ede46 May 27, 2017
case h: HashPartitioning => h.copy(expressions = h.expressions.map(updateAttr))
case other => other
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @viirya Could you help explain why we only consider HashPartitioning here?
How about RangePartitioning?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants