-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25314][SQL] Fix Python UDF accessing attributes from both side of join in join conditions #22326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
// the new join conditions, if all conditions is unevaluable, we should | ||
// change the join type to CrossJoin. | ||
val newJoinType = | ||
if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross else joinType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should at least warn or leave a note that unevaluable (or Python UDF) in the join condition will be ignored and turned to a cross join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, I'll leave a warn log here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about also checking spark.sql.crossJoin.enabled
and allow the transformation only in that case?
python/pyspark/sql/tests.py
Outdated
right = self.spark.createDataFrame([Row(b=1)]) | ||
f = udf(lambda a, b: a == b, BooleanType()) | ||
df = left.crossJoin(right).filter(f("a", "b")) | ||
self.assertEqual(df.collect(), [Row(a=1, b=1)]) | ||
self.assertEqual(df.collect(), [Row(a=1, b=1)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks duplicated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, sorry for the mess here, another commit left on. I'll fix soon, how to cancel the test?
python/pyspark/sql/tests.py
Outdated
left = self.spark.createDataFrame([Row(a=1)]) | ||
right = self.spark.createDataFrame([Row(b=1)]) | ||
f = udf(lambda a, b: a == b, BooleanType()) | ||
df = left.crossJoin(right).filter(f("a", "b")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, why do we explicitly test cross join here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, the correct test is df = left.join(right, f("a", "b"))
.
Test build #95653 has finished for PR 22326 at commit
|
@@ -97,6 +100,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { | |||
} | |||
assert(qualifiedPlanNodes.size == 1) | |||
} | |||
|
|||
test("Python UDF refers to the attributes from more than one child in join condition") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add a JIRA prefix here - this change sounds more like fixing a particular problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, will add in this commit.
// the new join conditions, if all conditions is unevaluable, we should | ||
// change the join type to CrossJoin. | ||
val newJoinType = | ||
if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be done only in this case: #22326 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reminding, crossJoinEnable should be checked here.
if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { | ||
logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + | ||
s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + | ||
s"turned to cross join.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s
should be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in a86a7d5.
val newJoinType = | ||
if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { | ||
logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + | ||
s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure that inlining the plan here makes this warning very readable...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log will be shown like this:
16:13:35.218 WARN org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin: The whole commonJoinCondition:List((dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14))) of the join plan:
Join Inner, (dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14))
:- LocalRelation [a#5, b#6]
+- LocalRelation [c#14, d#15]
is unevaluable, it will be ignored and the join plan will be turned to cross join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, and if the plan is big, than this would become quite unreadable IMHO. I think it would be better to refactor the message and put the plan at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking @mgaido91 In the above example, the UDF's refer to attributes from distinct legs of the join. Can we not plan this better than a cross join in this case ? I am wondering why we can't do -
Join Inner, leftAlias1 = rightAlias1
Project dummyUDF(a, b) as leftAlias1
LocalRelation(a, b)
Project dummyUDF(c, d) as rightAlias1
LocalRelation(c, d)
Perhaps i am missing something ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dilipbiswal I haven't checked the particular plan posted in that comment, for which I think you are right, we can handle as you suggested, but I was checking the case in the UT and in the description of this PR, ie. when the input for the Python UDF contains attributes from both sides. In that case I don't have a better suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgaido91 Thanks. Marco, do you know if there are instances when we pick cross join implicitly ? It wouldn't perform very well, right ? Wondering if we should error out or pick a bad plan. I guess, like you, i am not sure whats the right thing to do here.
One other thing marco, so for join types other than inner and leftsemi, we still have the same issue, no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dilipbiswal Thanks for your detailed check, I should write the case more typical, here the case we want to solve is UDF which accessing the attribute in both side, I'll change the case to dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c"))
in next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, and if the plan is big, than this would become quite unreadable IMHO. I think it would be better to refactor the message and put the plan at the end.
@mgaido91 Thanks for your advise, will do the refactor in next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dilipbiswal there are cases when "trivial conditions" are removed from a join so we make a inner join a cross one for instance. The performance would be awful, you're right. The point is that I am not sure that there is a better way to achieve this. I mean, since we have no clue what the UDF does, we need to compare all the rows from both sides, ie. we need to perform a cartesian product.
Wondering if we should error out or pick a bad plan
This is, indeed, arguable. I think that letting the user choose is a good idea. If the users runs the query and gets an AnalysisException
because he/she is trying to perform a cartesian product, he/she can decide: ok, I am doing a wrong thing, let's change it; or he/she can say, well, one of my 2 tables involved contains 10 rows, not a big deal, I want to perform it nonetheless, let's set spark.sql.crossJoin.enabled=true
and run it.
for join types other than inner and leftsemi, we still have the same issue, no ?
I think the current PR handles properly only the case with type inner (for the left semi) this PR returns an incorrect result IIUC. This needs to be fixed as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is, indeed, arguable. I think that letting the user choose is a good idea. If the users runs the query and gets an AnalysisException because he/she is trying to perform a cartesian product, he/she can decide: ok, I am doing a wrong thing, let's change it; or he/she can say, well, one of my 2 tables involved contains 10 rows, not a big deal, I want to perform it nonetheless, let's set spark.sql.crossJoin.enabled=true and run it.
Sounds reasonable ..
s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + | ||
s"turned to cross join.") | ||
Cross | ||
} else joinType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else {
joinType
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in a86a7d5.
Test build #95662 has finished for PR 22326 at commit
|
|
||
val join = Join(newLeft, newRight, newJoinType, newJoinCond) | ||
if (others.nonEmpty) { | ||
Filter(others.reduceLeft(And), join) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as pointed out by @dilipbiswal, this is correct only in the case of InnerJoin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, no need to add extra Filter in LeftSemi case.
// [[CheckCartesianProducts]], we throw firstly here for better readable | ||
// information. | ||
throw new AnalysisException("Detected the whole commonJoinCondition:" + | ||
"$commonJoinCondition of the join plan is unevaluable, we need to cast the" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in 82e50d5.
throw new AnalysisException("Detected the whole commonJoinCondition:" + | ||
"$commonJoinCondition of the join plan is unevaluable, we need to cast the" + | ||
" join to cross join by setting the configuration variable" + | ||
" spark.sql.crossJoin.enabled = true.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using the conf val in SQLConf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, also change this in CheckCartesianProducts
. Done in 82e50d5.
if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { | ||
Filter(others.reduceLeft(And), join) | ||
} else { | ||
join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means that we are removing without doing anything the condition when we have a SemiJoin. This is wrong. All this logic can be applied only to the Inner case. In the other cases, this fix is wrong. Moreover, please add a UT to enforce the correctness in the case LeftSemi join, so we can be sure that a wrong fix doesn't go in. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means that in the left_semi join the output of the Join operator should contain only the attributes from the left side, so attributes from the right side should not be referenced after the join. Therefore the plan should be invalid. I am a bit surprised that works, it would be great to understand why. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mgaido91 and @dilipbiswal !
I fix this in 63fbcce. The mainly problem is semi join in both deterministic and non-deterministic condition, filter after semi join will fail. Also add more tests both on python and scala side, including semi join, inner join and complex scenario described below.
It makes the strategy difficult to read after considering left semi, so in 63fbcce I split the logic of semi join and inner join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit surprised that works, it would be great to understand why. Thanks.
Sorry for the bad test, that's too special and the result just right by accident. The original implement will make all semi join return []
in PySpark.
Test build #95676 has finished for PR 22326 at commit
|
Test build #95678 has finished for PR 22326 at commit
|
Test build #95716 has finished for PR 22326 at commit
|
Test build #95718 has finished for PR 22326 at commit
|
Gental ping @mgaido91 @HyukjinKwon @dilipbiswal, great thanks for advice, please have a look when you have time. |
Cross | ||
} else { | ||
// if the crossJoinEnabled is false, an AnalysisException will throw by | ||
// [[CheckCartesianProducts]], we throw firstly here for better readable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny nit: we don't necessarily [[..]]
in inlined comments. We can just leave it as is or `...`
if you feel like you should. Feel free to address this with other comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in 87440b0. I'll also pay attention in future work.
} | ||
case _: InnerLike => | ||
// push down the single side only join filter for both sides sub queries | ||
val newLeft = leftJoinConditions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we deduplicate the codes here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem, done in 87440b0.
} | ||
} | ||
|
||
test("join condition pushdown: deterministic and non-deterministic in left semi join") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't add SPARK-25314 cause it maybe a supplement for test("join condition pushdown: deterministic and non-deterministic").
* Generate new left and right child of join by pushing down the side only join filter, | ||
* split commonJoinCondition based on the expression can be evaluated within join or not. | ||
* | ||
* @return (newLeftChild, newRightChild, newJoinCondition, conditionCannotEvaluateWithinJoin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is not very useful, we can see that these are the names returned...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, just see the demo here https://github.com/apache/spark/pull/22326/files#diff-a636a87d8843eeccca90140be91d4fafR1140, remove in next commit.
Join(newLeft, newRight, joinType, newJoinCond) | ||
val join = Join(newLeft, newRight, newJoinType, newJoinCond) | ||
if (others.nonEmpty) { | ||
Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before the patch we are not doing this pojection. I am not sure why.
cc @hvanhovell @davies who I see worked on this previously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could I try to answer this? The projection only used in a left semi join after cross join in this scenario for ensuring it only contains left side attributes.
val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( | ||
j, leftJoinConditions, rightJoinConditions, commonJoinCondition) | ||
// only need to add cross join when whole commonJoinCondition are unevaluable | ||
val newJoinType = if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmh...why here we have this check and later for the filter we check others.nonEmpty
? Shouldn't be the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, after a detailed checking, I change this to others.nonEmpty
, this maybe an unnecessary worry about the commonJoin contains both unevaluable and evaluable condition. Also add a test in next commit to ensure this.
Test build #95790 has finished for PR 22326 at commit
|
Test build #95789 has finished for PR 22326 at commit
|
nit: attibutes in the title probably mean attributes |
@holdenk Thanks, sorry for the typo. |
Test build #95830 has finished for PR 22326 at commit
|
} | ||
|
||
/** | ||
* Generate new join by pushing down the side only join filter, split commonJoinCondition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: filters
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest { | |||
"x.a".attr === Rand(10) && "y.b".attr === 5)) | |||
val correctAnswer = | |||
x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), | |||
condition = Some("x.a".attr === Rand(10))) | |||
joinType = Cross).where("x.a".attr === Rand(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not a change we want, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I changed this to let the test passing. The original thought is nondeterministic expression in join condition is not supported yet, so that's no big problem.
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
Line 105 in 0736e72
// Non-deterministic expressions are not allowed as join conditions. |
spark/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
Lines 1158 to 1159 in 0736e72
// CheckAnalysis will ensure nondeterministic expressions not appear in join condition. | |
// TODO support nondeterministic expressions in join condition. |
But now I think I should more carefully about this and just limit the cross join changes only in PythonUDF case. WDYT? @mgaido91 .Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the code in canEvaluateWithinJoin, we can get the scope relation : (CannotEvaluateWithinJoin = nonDeterminstic + Unevaluable) > Unevaluable > PythonUDF.
So for the safety maybe I just limit the change scope to the smallest PythonUDF only. Need some advise from you thanks :)
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
Lines 104 to 120 in 0736e72
protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match { | |
// Non-deterministic expressions are not allowed as join conditions. | |
case e if !e.deterministic => false | |
case _: ListQuery | _: Exists => | |
// A ListQuery defines the query which we want to search in an IN subquery expression. | |
// Currently the only way to evaluate an IN subquery is to convert it to a | |
// LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule. | |
// It cannot be evaluated as part of a Join operator. | |
// An Exists shouldn't be push into a Join operator too. | |
false | |
case e: SubqueryExpression => | |
// non-correlated subquery will be replaced as literal | |
e.children.isEmpty | |
case a: AttributeReference => true | |
case e: Unevaluable => false | |
case e => e.children.forall(canEvaluateWithinJoin) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan @gatorsmile @hvanhovell for advice on this. It may probably be ok, as it lets supporting a case which was not supported before. But I am not sure about the added value as performing a cross join is often an impossible operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mgaido91 for the detailed review and advise, for me, I maybe choose only limited the change scope to pythonUDF only or at lease Unevaluable only. Waiting for others advice.
IIUC, you are pulling out the join condition with python UDF and create a filter above join. Then the join become a cross join, which usually runs very slowly. I think we should keep the cross join check for this case. |
@cloud-fan Thanks for your comment.
Yes, that's right.
Yes, as Marco suggestion, the currently behavior is control cross join by |
@@ -995,7 +995,8 @@ class Dataset[T] private[sql]( | |||
// After the cloning, left and right side will have distinct expression ids. | |||
val plan = withPlan( | |||
Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))) | |||
.queryExecution.analyzed.asInstanceOf[Join] | |||
.queryExecution.analyzed | |||
val joinPlan = plan.collectFirst { case j: Join => j }.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reviewer, we need this change cause the rule HandlePythonUDFInJoinCondition
will break the assumption about the join plan after analyzing will only return Join. After we add the rule of handling python udf, we'll add filter or project node on top of Join.
Test build #96617 has finished for PR 22326 at commit
|
Test build #96621 has finished for PR 22326 at commit
|
/** | ||
* PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF | ||
* and pull them out from join condition. For python udf accessing attributes from only one side, | ||
* they would be pushed down by operation push down rules. If not(e.g. user disables filter push |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits:
they are
- missing space before
(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in d2739af.
s" $joinType is not supported.") | ||
} | ||
// If condition expression contains python udf, it will be moved out from | ||
// the new join conditions. If join condition has python udf only, it will be turned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need here the second sentence, ie. the one startng with If join condition ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, duplicate with log. Done in d2739af.
// the new join conditions. If join condition has python udf only, it will be turned | ||
// to cross join and the crossJoinEnable will be checked in CheckCartesianProducts. | ||
val (udf, rest) = | ||
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: -> splitConjunctivePredicates(condition.get).partition(...)
seems more clear to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in d2739af.
val newCondition = if (rest.isEmpty) { | ||
logWarning(s"The join condition:$condition of the join plan contains " + | ||
"PythonUDF only, it will be moved out and the join plan will be turned to cross " + | ||
s"join. This plan shows below:\n $j") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we at least remove the whole plan from the warning? Plans can be pretty big...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, done in d2739af.
case LeftSemi => | ||
Project( | ||
j.left.output.map(_.toAttribute), | ||
Filter(udf.reduceLeft(And), newJoin.copy(joinType = Inner))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in d2739af.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one comment, other than that my only concern is: here we are introducing a lot of end-to-end tests and we have no test targeting only the newly introduced optimizer rule. So I'd prefer having one or 2 end-to-end tests and create a new suite testing only the rule and the plan transformation, both for having lower testing time and finer grained tests checking that the output plan is indeed the expected one (not only checking the result of the query).
Apart from this, the change looks fine to me.
} | ||
assert(errMsg.getMessage.startsWith("Detected implicit cartesian product")) | ||
// Test with spark.sql.crossJoin.enabled=true | ||
spark.conf.set("spark.sql.crossJoin.enabled", "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use withSQLConf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in 7f66954.
So I'd prefer having one or 2 end-to-end tests and create a new suite testing only the rule and the plan transformation, both for having lower testing time and finer grained tests checking that the output plan is indeed the expected one (not only checking the result of the query).
Make sense, will add a plan test for this rule.
Test build #96630 has finished for PR 22326 at commit
|
Test build #96631 has finished for PR 22326 at commit
|
Test build #96635 has finished for PR 22326 at commit
|
python/pyspark/sql/tests.py
Outdated
with self.sql_conf({"spark.sql.crossJoin.enabled": True}): | ||
self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) | ||
|
||
def test_udf_and_filter_in_join_condition(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test (and the corresponding one for left semi join) is not very useful. The filter in join condition will be pushed down so this test is basically same as the test_udf_in_join_condition
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, just for checking during implement, delete both in 2b6977d.
@@ -100,6 +105,29 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { | |||
} | |||
assert(qualifiedPlanNodes.size == 1) | |||
} | |||
|
|||
test("SPARK-25314: Python UDF refers to the attributes from more than one child " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still an end-to-end test, I don't think we need it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, I use this for IDE mock python UDF, will do this in a follow up PR with a new test suites in org.apache.spark.sql.catalyst.optimizer
, revert in 2b6977d.
LGTM except some unnecessary end-to-end tests. +1 for @mgaido91 's idea about unit test, something like the test suites under |
Test build #96659 has finished for PR 22326 at commit
|
thanks, merging to master/2.4! |
… of join in join conditions ## What changes were proposed in this pull request? Thanks for bahchis reporting this. It is more like a follow up work for #16581, this PR fix the scenario of Python UDF accessing attributes from both side of join in join condition. ## How was this patch tested? Add regression tests in PySpark and `BatchEvalPythonExecSuite`. Closes #22326 from xuanyuanking/SPARK-25314. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 2a8cbfd) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Thanks everyone for your review and advise. |
… of join in join conditions ## What changes were proposed in this pull request? Thanks for bahchis reporting this. It is more like a follow up work for apache#16581, this PR fix the scenario of Python UDF accessing attributes from both side of join in join condition. ## How was this patch tested? Add regression tests in PySpark and `BatchEvalPythonExecSuite`. Closes apache#22326 from xuanyuanking/SPARK-25314. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
late LGTM |
…dition #22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable. This PR fixes this mistake. a new test Closes #23153 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit affe809) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? As comment in apache#22326 (comment), we test the new added optimizer rule by end-to-end test in python side, need to add suites under `org.apache.spark.sql.catalyst.optimizer` like other optimizer rules. ## How was this patch tested? new added UT Closes apache#22955 from xuanyuanking/SPARK-25949. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…dition ## What changes were proposed in this pull request? apache#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable. This PR fixes this mistake. ## How was this patch tested? a new test Closes apache#23153 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…dition apache#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable. This PR fixes this mistake. a new test Closes apache#23153 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit affe809) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…dition apache#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable. This PR fixes this mistake. a new test Closes apache#23153 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit affe809) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…dition apache/spark#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable. This PR fixes this mistake. a new test Closes #23153 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit affe809) Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit ac26a1d)
What changes were proposed in this pull request?
Thanks for @bahchis reporting this. It is more like a follow up work for #16581, this PR fix the scenario of Python UDF accessing attributes from both side of join in join condition.
How was this patch tested?
Add regression tests in PySpark and
BatchEvalPythonExecSuite
.