-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25314][SQL] Fix Python UDF accessing attributes from both side of join in join conditions #22326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25314][SQL] Fix Python UDF accessing attributes from both side of join in join conditions #22326
Changes from all commits
9c579bd
9ea1cf6
b6b0aa6
b626fa7
53dd028
4ca7fd1
1109eb3
c6345fe
83660d5
fdc86ca
fbf32f4
292b09c
6749a96
a598a4e
a2c8ddd
005bb3f
b0dfab3
306fcb9
98cd3cc
d1db33a
87f0f50
d2739af
7f66954
2b6977d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer | |||||||
|
||||||||
import scala.annotation.tailrec | ||||||||
|
||||||||
import org.apache.spark.sql.AnalysisException | ||||||||
import org.apache.spark.sql.catalyst.expressions._ | ||||||||
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins | ||||||||
import org.apache.spark.sql.catalyst.plans._ | ||||||||
|
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { | |||||||
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF | ||||||||
* and pull them out from join condition. For python udf accessing attributes from only one side, | ||||||||
* they are pushed down by operation push down rules. If not (e.g. user disables filter push | ||||||||
* down rules), we need to pull them out in this rule too. | ||||||||
*/ | ||||||||
object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { | ||||||||
def hasPythonUDF(expression: Expression): Boolean = { | ||||||||
expression.collectFirst { case udf: PythonUDF => udf }.isDefined | ||||||||
xuanyuanking marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
} | ||||||||
|
||||||||
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { | ||||||||
case j @ Join(_, _, joinType, condition) | ||||||||
if condition.isDefined && hasPythonUDF(condition.get) => | ||||||||
if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) { | ||||||||
xuanyuanking marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
// The current strategy only support InnerLike and LeftSemi join because for other type, | ||||||||
// it breaks SQL semantic if we run the join condition as a filter after join. If we pass | ||||||||
// the plan here, it'll still get a an invalid PythonUDF RuntimeException with message | ||||||||
// `requires attributes from more than one child`, we throw firstly here for better | ||||||||
// readable information. | ||||||||
throw new AnalysisException("Using PythonUDF in join condition of join type" + | ||||||||
s" $joinType is not supported.") | ||||||||
} | ||||||||
// If condition expression contains python udf, it will be moved out from | ||||||||
// the new join conditions. | ||||||||
val (udf, rest) = | ||||||||
splitConjunctivePredicates(condition.get).partition(hasPythonUDF) | ||||||||
val newCondition = if (rest.isEmpty) { | ||||||||
logWarning(s"The join condition:$condition of the join plan contains PythonUDF only," + | ||||||||
s" it will be moved out and the join plan will be turned to cross join.") | ||||||||
None | ||||||||
} else { | ||||||||
Some(rest.reduceLeft(And)) | ||||||||
} | ||||||||
val newJoin = j.copy(condition = newCondition) | ||||||||
joinType match { | ||||||||
case _: InnerLike => Filter(udf.reduceLeft(And), newJoin) | ||||||||
case LeftSemi => | ||||||||
Project( | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so we are simulating a left semi join here. Seems we can do the same thing for left anti join. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me try. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried two ways to implement LeftAnti here:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, let's leave left anti join then, thanks for trying! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thanks :) |
||||||||
j.left.output.map(_.toAttribute), | ||||||||
Filter(udf.reduceLeft(And), newJoin.copy(joinType = Inner))) | ||||||||
case _ => | ||||||||
throw new AnalysisException("Using PythonUDF in join condition of join type" + | ||||||||
s" $joinType is not supported.") | ||||||||
} | ||||||||
} | ||||||||
} |
Uh oh!
There was an error while loading. Please reload this page.