Skip to content

[SPARK-25314][SQL] Fix Python UDF accessing attributes from both side of join in join conditions #22326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 24 commits into from

Conversation

xuanyuanking
Copy link
Member

What changes were proposed in this pull request?

Thanks for @bahchis reporting this. It is more like a follow up work for #16581, this PR fix the scenario of Python UDF accessing attributes from both side of join in join condition.

How was this patch tested?

Add regression tests in PySpark and BatchEvalPythonExecSuite.

// the new join conditions, if all conditions is unevaluable, we should
// change the join type to CrossJoin.
val newJoinType =
if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross else joinType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should at least warn or leave a note that unevaluable (or Python UDF) in the join condition will be ignored and turned to a cross join.

Copy link
Member Author

@xuanyuanking xuanyuanking Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, I'll leave a warn log here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about also checking spark.sql.crossJoin.enabled and allow the transformation only in that case?

right = self.spark.createDataFrame([Row(b=1)])
f = udf(lambda a, b: a == b, BooleanType())
df = left.crossJoin(right).filter(f("a", "b"))
self.assertEqual(df.collect(), [Row(a=1, b=1)])
self.assertEqual(df.collect(), [Row(a=1, b=1)])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks duplicated

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, sorry for the mess here, another commit left on. I'll fix soon, how to cancel the test?

left = self.spark.createDataFrame([Row(a=1)])
right = self.spark.createDataFrame([Row(b=1)])
f = udf(lambda a, b: a == b, BooleanType())
df = left.crossJoin(right).filter(f("a", "b"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, why do we explicitly test cross join here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, the correct test is df = left.join(right, f("a", "b")).

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95653 has finished for PR 22326 at commit 23b1028.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -97,6 +100,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
}
assert(qualifiedPlanNodes.size == 1)
}

test("Python UDF refers to the attributes from more than one child in join condition") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a JIRA prefix here - this change sounds more like fixing a particular problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, will add in this commit.

// the new join conditions, if all conditions is unevaluable, we should
// change the join type to CrossJoin.
val newJoinType =
if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be done only in this case: #22326 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding, crossJoinEnable should be checked here.

if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " +
s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " +
s"turned to cross join.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s should be removed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in a86a7d5.

val newJoinType =
if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " +
s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure that inlining the plan here makes this warning very readable...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log will be shown like this:

16:13:35.218 WARN org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin: The whole commonJoinCondition:List((dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14))) of the join plan:
 Join Inner, (dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14))
:- LocalRelation [a#5, b#6]
+- LocalRelation [c#14, d#15]
 is unevaluable, it will be ignored and the join plan will be turned to cross join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and if the plan is big, than this would become quite unreadable IMHO. I think it would be better to refactor the message and put the plan at the end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking @mgaido91 In the above example, the UDF's refer to attributes from distinct legs of the join. Can we not plan this better than a cross join in this case ? I am wondering why we can't do -

  Join Inner, leftAlias1 = rightAlias1
      Project dummyUDF(a, b) as leftAlias1
         LocalRelation(a, b)
      Project dummyUDF(c, d) as rightAlias1
         LocalRelation(c, d)

Perhaps i am missing something ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal I haven't checked the particular plan posted in that comment, for which I think you are right, we can handle as you suggested, but I was checking the case in the UT and in the description of this PR, ie. when the input for the Python UDF contains attributes from both sides. In that case I don't have a better suggestion.

Copy link
Contributor

@dilipbiswal dilipbiswal Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Thanks. Marco, do you know if there are instances when we pick cross join implicitly ? It wouldn't perform very well, right ? Wondering if we should error out or pick a bad plan. I guess, like you, i am not sure whats the right thing to do here.

One other thing marco, so for join types other than inner and leftsemi, we still have the same issue, no ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal Thanks for your detailed check, I should write the case more typical, here the case we want to solve is UDF which accessing the attribute in both side, I'll change the case to dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c")) in next commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and if the plan is big, than this would become quite unreadable IMHO. I think it would be better to refactor the message and put the plan at the end.

@mgaido91 Thanks for your advise, will do the refactor in next commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal there are cases when "trivial conditions" are removed from a join so we make a inner join a cross one for instance. The performance would be awful, you're right. The point is that I am not sure that there is a better way to achieve this. I mean, since we have no clue what the UDF does, we need to compare all the rows from both sides, ie. we need to perform a cartesian product.

Wondering if we should error out or pick a bad plan

This is, indeed, arguable. I think that letting the user choose is a good idea. If the users runs the query and gets an AnalysisException because he/she is trying to perform a cartesian product, he/she can decide: ok, I am doing a wrong thing, let's change it; or he/she can say, well, one of my 2 tables involved contains 10 rows, not a big deal, I want to perform it nonetheless, let's set spark.sql.crossJoin.enabled=true and run it.

for join types other than inner and leftsemi, we still have the same issue, no ?

I think the current PR handles properly only the case with type inner (for the left semi) this PR returns an incorrect result IIUC. This needs to be fixed as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91

This is, indeed, arguable. I think that letting the user choose is a good idea. If the users runs the query and gets an AnalysisException because he/she is trying to perform a cartesian product, he/she can decide: ok, I am doing a wrong thing, let's change it; or he/she can say, well, one of my 2 tables involved contains 10 rows, not a big deal, I want to perform it nonetheless, let's set spark.sql.crossJoin.enabled=true and run it.

Sounds reasonable ..

s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " +
s"turned to cross join.")
Cross
} else joinType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
  joinType
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in a86a7d5.

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95662 has finished for PR 22326 at commit d58f3a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


val join = Join(newLeft, newRight, newJoinType, newJoinCond)
if (others.nonEmpty) {
Filter(others.reduceLeft(And), join)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as pointed out by @dilipbiswal, this is correct only in the case of InnerJoin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, no need to add extra Filter in LeftSemi case.

// [[CheckCartesianProducts]], we throw firstly here for better readable
// information.
throw new AnalysisException("Detected the whole commonJoinCondition:" +
"$commonJoinCondition of the join plan is unevaluable, we need to cast the" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing s

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in 82e50d5.

throw new AnalysisException("Detected the whole commonJoinCondition:" +
"$commonJoinCondition of the join plan is unevaluable, we need to cast the" +
" join to cross join by setting the configuration variable" +
" spark.sql.crossJoin.enabled = true.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using the conf val in SQLConf?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, also change this in CheckCartesianProducts. Done in 82e50d5.

if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
Filter(others.reduceLeft(And), join)
} else {
join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means that we are removing without doing anything the condition when we have a SemiJoin. This is wrong. All this logic can be applied only to the Inner case. In the other cases, this fix is wrong. Moreover, please add a UT to enforce the correctness in the case LeftSemi join, so we can be sure that a wrong fix doesn't go in. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll do more test on the SemiJoin here, but as currently test over PySpark, this is not wrong, maybe I misunderstand you two wrong means, is your wrong means correctness or just benchmark regression?
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that in the left_semi join the output of the Join operator should contain only the attributes from the left side, so attributes from the right side should not be referenced after the join. Therefore the plan should be invalid. I am a bit surprised that works, it would be great to understand why. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mgaido91 and @dilipbiswal !
I fix this in 63fbcce. The mainly problem is semi join in both deterministic and non-deterministic condition, filter after semi join will fail. Also add more tests both on python and scala side, including semi join, inner join and complex scenario described below.
It makes the strategy difficult to read after considering left semi, so in 63fbcce I split the logic of semi join and inner join.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 I am a bit surprised that works, it would be great to understand why. Thanks.

Sorry for the bad test, that's too special and the result just right by accident. The original implement will make all semi join return [] in PySpark.

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95676 has finished for PR 22326 at commit a86a7d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95678 has finished for PR 22326 at commit 82e50d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 5, 2018

Test build #95716 has finished for PR 22326 at commit 63fbcce.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 5, 2018

Test build #95718 has finished for PR 22326 at commit 777b881.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

Gental ping @mgaido91 @HyukjinKwon @dilipbiswal, great thanks for advice, please have a look when you have time.

Cross
} else {
// if the crossJoinEnabled is false, an AnalysisException will throw by
// [[CheckCartesianProducts]], we throw firstly here for better readable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: we don't necessarily [[..]] in inlined comments. We can just leave it as is or `...` if you feel like you should. Feel free to address this with other comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in 87440b0. I'll also pay attention in future work.

}
case _: InnerLike =>
// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we deduplicate the codes here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, done in 87440b0.

}
}

test("join condition pushdown: deterministic and non-deterministic in left semi join") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add SPARK-25314 cause it maybe a supplement for test("join condition pushdown: deterministic and non-deterministic").

* Generate new left and right child of join by pushing down the side only join filter,
* split commonJoinCondition based on the expression can be evaluated within join or not.
*
* @return (newLeftChild, newRightChild, newJoinCondition, conditionCannotEvaluateWithinJoin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is not very useful, we can see that these are the names returned...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Join(newLeft, newRight, joinType, newJoinCond)
val join = Join(newLeft, newRight, newJoinType, newJoinCond)
if (others.nonEmpty) {
Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before the patch we are not doing this pojection. I am not sure why.

cc @hvanhovell @davies who I see worked on this previously.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could I try to answer this? The projection only used in a left semi join after cross join in this scenario for ensuring it only contains left side attributes.

val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin(
j, leftJoinConditions, rightJoinConditions, commonJoinCondition)
// only need to add cross join when whole commonJoinCondition are unevaluable
val newJoinType = if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmh...why here we have this check and later for the filter we check others.nonEmpty? Shouldn't be the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, after a detailed checking, I change this to others.nonEmpty, this maybe an unnecessary worry about the commonJoin contains both unevaluable and evaluable condition. Also add a test in next commit to ensure this.

@SparkQA
Copy link

SparkQA commented Sep 7, 2018

Test build #95790 has finished for PR 22326 at commit 4d546e2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 7, 2018

Test build #95789 has finished for PR 22326 at commit 87440b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented Sep 7, 2018

nit: attibutes in the title probably mean attributes

@xuanyuanking
Copy link
Member Author

@holdenk Thanks, sorry for the typo.

@xuanyuanking xuanyuanking changed the title [SPARK-25314][SQL] Fix Python UDF accessing attibutes from both side of join in join conditions [SPARK-25314][SQL] Fix Python UDF accessing attributes from both side of join in join conditions Sep 8, 2018
@SparkQA
Copy link

SparkQA commented Sep 8, 2018

Test build #95830 has finished for PR 22326 at commit 6875719.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/**
* Generate new join by pushing down the side only join filter, split commonJoinCondition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: filters

@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
"x.a".attr === Rand(10) && "y.b".attr === 5))
val correctAnswer =
x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5),
condition = Some("x.a".attr === Rand(10)))
joinType = Cross).where("x.a".attr === Rand(10))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a change we want, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I changed this to let the test passing. The original thought is nondeterministic expression in join condition is not supported yet, so that's no big problem.

// Non-deterministic expressions are not allowed as join conditions.

// CheckAnalysis will ensure nondeterministic expressions not appear in join condition.
// TODO support nondeterministic expressions in join condition.

But now I think I should more carefully about this and just limit the cross join changes only in PythonUDF case. WDYT? @mgaido91 .Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the code in canEvaluateWithinJoin, we can get the scope relation : (CannotEvaluateWithinJoin = nonDeterminstic + Unevaluable) > Unevaluable > PythonUDF.
So for the safety maybe I just limit the change scope to the smallest PythonUDF only. Need some advise from you thanks :)

protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match {
// Non-deterministic expressions are not allowed as join conditions.
case e if !e.deterministic => false
case _: ListQuery | _: Exists =>
// A ListQuery defines the query which we want to search in an IN subquery expression.
// Currently the only way to evaluate an IN subquery is to convert it to a
// LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule.
// It cannot be evaluated as part of a Join operator.
// An Exists shouldn't be push into a Join operator too.
false
case e: SubqueryExpression =>
// non-correlated subquery will be replaced as literal
e.children.isEmpty
case a: AttributeReference => true
case e: Unevaluable => false
case e => e.children.forall(canEvaluateWithinJoin)
}

Copy link
Contributor

@mgaido91 mgaido91 Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cloud-fan @gatorsmile @hvanhovell for advice on this. It may probably be ok, as it lets supporting a case which was not supported before. But I am not sure about the added value as performing a cross join is often an impossible operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mgaido91 for the detailed review and advise, for me, I maybe choose only limited the change scope to pythonUDF only or at lease Unevaluable only. Waiting for others advice.

@cloud-fan
Copy link
Contributor

IIUC, you are pulling out the join condition with python UDF and create a filter above join. Then the join become a cross join, which usually runs very slowly. I think we should keep the cross join check for this case.

@xuanyuanking
Copy link
Member Author

@cloud-fan Thanks for your comment.

IIUC, you are pulling out the join condition with python UDF and create a filter above join. Then the join become a cross join, which usually runs very slowly.

Yes, that's right.

I think we should keep the cross join check for this case.

Yes, as Marco suggestion, the currently behavior is control cross join by crossJoinEnabled config, if crossJoinEnabled = false, it will throw AnalysisException.

@@ -995,7 +995,8 @@ class Dataset[T] private[sql](
// After the cloning, left and right side will have distinct expression ids.
val plan = withPlan(
Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)))
.queryExecution.analyzed.asInstanceOf[Join]
.queryExecution.analyzed
val joinPlan = plan.collectFirst { case j: Join => j }.get
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewer, we need this change cause the rule HandlePythonUDFInJoinCondition will break the assumption about the join plan after analyzing will only return Join. After we add the rule of handling python udf, we'll add filter or project node on top of Join.

@SparkQA
Copy link

SparkQA commented Sep 26, 2018

Test build #96617 has finished for PR 22326 at commit 98cd3cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 26, 2018

Test build #96621 has finished for PR 22326 at commit d1db33a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF
* and pull them out from join condition. For python udf accessing attributes from only one side,
* they would be pushed down by operation push down rules. If not(e.g. user disables filter push
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:

  • they are
  • missing space before (

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in d2739af.

s" $joinType is not supported.")
}
// If condition expression contains python udf, it will be moved out from
// the new join conditions. If join condition has python udf only, it will be turned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need here the second sentence, ie. the one startng with If join condition ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, duplicate with log. Done in d2739af.

// the new join conditions. If join condition has python udf only, it will be turned
// to cross join and the crossJoinEnable will be checked in CheckCartesianProducts.
val (udf, rest) =
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: -> splitConjunctivePredicates(condition.get).partition(...) seems more clear to me

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in d2739af.

val newCondition = if (rest.isEmpty) {
logWarning(s"The join condition:$condition of the join plan contains " +
"PythonUDF only, it will be moved out and the join plan will be turned to cross " +
s"join. This plan shows below:\n $j")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we at least remove the whole plan from the warning? Plans can be pretty big...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, done in d2739af.

case LeftSemi =>
Project(
j.left.output.map(_.toAttribute),
Filter(udf.reduceLeft(And), newJoin.copy(joinType = Inner)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in d2739af.

Copy link
Contributor

@mgaido91 mgaido91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one comment, other than that my only concern is: here we are introducing a lot of end-to-end tests and we have no test targeting only the newly introduced optimizer rule. So I'd prefer having one or 2 end-to-end tests and create a new suite testing only the rule and the plan transformation, both for having lower testing time and finer grained tests checking that the output plan is indeed the expected one (not only checking the result of the query).

Apart from this, the change looks fine to me.

}
assert(errMsg.getMessage.startsWith("Detected implicit cartesian product"))
// Test with spark.sql.crossJoin.enabled=true
spark.conf.set("spark.sql.crossJoin.enabled", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use withSQLConf

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in 7f66954.

So I'd prefer having one or 2 end-to-end tests and create a new suite testing only the rule and the plan transformation, both for having lower testing time and finer grained tests checking that the output plan is indeed the expected one (not only checking the result of the query).

Make sense, will add a plan test for this rule.

@SparkQA
Copy link

SparkQA commented Sep 26, 2018

Test build #96630 has finished for PR 22326 at commit 87f0f50.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 26, 2018

Test build #96631 has finished for PR 22326 at commit d2739af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 26, 2018

Test build #96635 has finished for PR 22326 at commit 7f66954.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])

def test_udf_and_filter_in_join_condition(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test (and the corresponding one for left semi join) is not very useful. The filter in join condition will be pushed down so this test is basically same as the test_udf_in_join_condition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, just for checking during implement, delete both in 2b6977d.

@@ -100,6 +105,29 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
}
assert(qualifiedPlanNodes.size == 1)
}

test("SPARK-25314: Python UDF refers to the attributes from more than one child " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still an end-to-end test, I don't think we need it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I use this for IDE mock python UDF, will do this in a follow up PR with a new test suites in org.apache.spark.sql.catalyst.optimizer, revert in 2b6977d.

@cloud-fan
Copy link
Contributor

LGTM except some unnecessary end-to-end tests.

+1 for @mgaido91 's idea about unit test, something like the test suites under org.apache.spark.sql.catalyst.optimizer. I'm ok to do it in a followup, since it will be the first UT for python rules.

@SparkQA
Copy link

SparkQA commented Sep 27, 2018

Test build #96659 has finished for PR 22326 at commit 2b6977d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.4!

asfgit pushed a commit that referenced this pull request Sep 27, 2018
… of join in join conditions

## What changes were proposed in this pull request?

Thanks for bahchis reporting this. It is more like a follow up work for #16581, this PR fix the scenario of Python UDF accessing attributes from both side of join in join condition.

## How was this patch tested?

Add  regression tests in PySpark and `BatchEvalPythonExecSuite`.

Closes #22326 from xuanyuanking/SPARK-25314.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 2a8cbfd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@asfgit asfgit closed this in 2a8cbfd Sep 27, 2018
@xuanyuanking
Copy link
Member Author

Thanks everyone for your review and advise.

@xuanyuanking xuanyuanking deleted the SPARK-25314 branch September 27, 2018 18:07
daspalrahul pushed a commit to daspalrahul/spark that referenced this pull request Sep 29, 2018
… of join in join conditions

## What changes were proposed in this pull request?

Thanks for bahchis reporting this. It is more like a follow up work for apache#16581, this PR fix the scenario of Python UDF accessing attributes from both side of join in join condition.

## How was this patch tested?

Add  regression tests in PySpark and `BatchEvalPythonExecSuite`.

Closes apache#22326 from xuanyuanking/SPARK-25314.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@HyukjinKwon
Copy link
Member

late LGTM

asfgit pushed a commit that referenced this pull request Nov 28, 2018
…dition

#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable.

This PR fixes this mistake.

a new test

Closes #23153 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit affe809)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

As comment in apache#22326 (comment), we test the new added optimizer rule by end-to-end test in python side, need to add suites under `org.apache.spark.sql.catalyst.optimizer` like other optimizer rules.

## How was this patch tested?
new added UT

Closes apache#22955 from xuanyuanking/SPARK-25949.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…dition

## What changes were proposed in this pull request?

apache#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable.

This PR fixes this mistake.

## How was this patch tested?

a new test

Closes apache#23153 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…dition

apache#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable.

This PR fixes this mistake.

a new test

Closes apache#23153 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit affe809)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…dition

apache#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable.

This PR fixes this mistake.

a new test

Closes apache#23153 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit affe809)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
zhongjinhan pushed a commit to zhongjinhan/spark-1 that referenced this pull request Sep 3, 2019
…dition

apache/spark#22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable.

This PR fixes this mistake.

a new test

Closes #23153 from cloud-fan/join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit affe809)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit ac26a1d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants