-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18137][SQL]Fix RewriteDistinctAggregates UnresolvedException when a UDAF has a foldable TypeCheck #15668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18137][SQL]Fix RewriteDistinctAggregates UnresolvedException when a UDAF has a foldable TypeCheck #15668
Conversation
…hen the UDAF has a foldable TypeCheck
OK to test |
ok to test |
Test build #67698 has finished for PR 15668 at commit
|
@windpiger I have taken a quick look at your PR. I think your approach has merit, however I was wondering if it is easier not to extract/move the foldable expressions at all? |
+1 on @hvanhovell 's idea, literals don't need to be pre-executed. |
@hvanhovell @cloud-fan Do you mean that just change distinctAggChildren/regularAggChildren to represent the unfoldable expressions, and then the followings code could be unchanged? ... val regularAggChildren = regularAggExprs.flatMap(.aggregateFunction.children).distinct.**filter(!.foldable)** |
It looks to me that after this PR, we still have literals in expand, can you call and how about this? https://github.com/apache/spark/compare/master...cloud-fan:showcase?expand=1 It's a little simpler and do avoid puting literals in expand. |
@cloud-fan yes,It is simpler,I check the query plan, my test case But, I miss one case that the distinct on a constant like: |
…hild;if has unfoldable children,it will only expand the unfoldable children
Test build #68267 has finished for PR 15668 at commit
|
@cloud-fan I rewrite the expand logic:
for example: explained: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is shaping up nicely. I left a few minor comments.
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPl | |||
import org.apache.spark.sql.catalyst.rules.Rule | |||
import org.apache.spark.sql.types.IntegerType | |||
|
|||
/** | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you revert this? this breaks scaladoc.
// NamedExpression. This is done to prevent collisions between distinct and regular aggregate | ||
// children, in this case attribute reuse causes the input of the regular aggregate to bound to | ||
// the (nulled out) input of the distinct aggregate. | ||
// We are creating a new reference here instead of reusing the attribute in case of a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert this change
@@ -237,8 +251,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { | |||
|
|||
// Construct the second aggregate | |||
val transformations: Map[Expression, Expression] = | |||
(distinctAggOperatorMap.flatMap(_._2) ++ | |||
regularAggOperatorMap.map(e => (e._1, e._3))).toMap | |||
(distinctAggOperatorMap.flatMap(_._2) ++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert this change
@@ -150,6 +150,24 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { | |||
} | |||
|
|||
test("Generic UDAF aggregates") { | |||
checkAnswer(sql("SELECT percentile_approx(2, 0.99999), " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use multiline strings for these tests.
val distinctAggGroups = aggExpressions | ||
.filter(_.isDistinct) | ||
.groupBy(_.aggregateFunction.children.toSet) | ||
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Space between groupBy and bracket.
.groupBy(_.aggregateFunction.children.toSet) | ||
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy{ | ||
e => | ||
if (e.aggregateFunction.children.exists(!_.foldable)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just materialize the nonFoldables. Instead of filtering them twice.
af: AggregateFunction)( | ||
attrs: Expression => Expression): AggregateFunction = { | ||
af.withNewChildren(af.children.map(attrs)).asInstanceOf[AggregateFunction] | ||
def patchAggregateFunctionChildren(af: AggregateFunction)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Style, please keep this the way it was.
// count(distinct 1) will be explained to count(1) after the rewrite function. | ||
// Generally, the distinct aggregateFunction should not run | ||
// foldable TypeCheck for the first child. | ||
e.aggregateFunction.children.take(1).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good catch. It would be great if we could git rid of this by constant folding (not needed in this PR). Another way of getting rid of this, would be by creating a separate processing group for these distincts.
Test build #68270 has finished for PR 15668 at commit
|
Test build #68274 has finished for PR 15668 at commit
|
retest this please |
@cloud-fan @hvanhovell could you please help to retest this? |
retest this please |
add to whitelist |
val distinctAggGroups = aggExpressions | ||
.filter(_.isDistinct) | ||
.groupBy(_.aggregateFunction.children.toSet) | ||
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
...groupBy { e =>
...
}
LGTM, pending jenkins |
Test build #68315 has finished for PR 15668 at commit
|
Test build #68323 has finished for PR 15668 at commit
|
LGTM. Merging to master/2.1/2.0. Thanks! |
…when a UDAF has a foldable TypeCheck ## What changes were proposed in this pull request? In RewriteDistinctAggregates rewrite funtion,after the UDAF's childs are mapped to AttributeRefference, If the UDAF(such as ApproximatePercentile) has a foldable TypeCheck for the input, It will failed because the AttributeRefference is not foldable,then the UDAF is not resolved, and then nullify on the unresolved object will throw a Exception. In this PR, only map Unfoldable child to AttributeRefference, this can avoid the UDAF's foldable TypeCheck. and then only Expand Unfoldable child, there is no need to Expand a static value(foldable value). **Before sql result** > select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1 > org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'percentile_approx(CAST(src.`key` AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000) > at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:92) > at org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates$.org$apache$spark$sql$catalyst$optimizer$RewriteDistinctAggregates$$nullify(RewriteDistinctAggregates.scala:261) **After sql result** > select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1 > [498.0,309,79136] ## How was this patch tested? Add a test case in HiveUDFSuit. Author: root <root@iZbp1gsnrlfzjxh82cz80vZ.(none)> Closes #15668 from windpiger/RewriteDistinctUDAFUnresolveExcep. (cherry picked from commit c291bd2) Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
…when a UDAF has a foldable TypeCheck ## What changes were proposed in this pull request? In RewriteDistinctAggregates rewrite funtion,after the UDAF's childs are mapped to AttributeRefference, If the UDAF(such as ApproximatePercentile) has a foldable TypeCheck for the input, It will failed because the AttributeRefference is not foldable,then the UDAF is not resolved, and then nullify on the unresolved object will throw a Exception. In this PR, only map Unfoldable child to AttributeRefference, this can avoid the UDAF's foldable TypeCheck. and then only Expand Unfoldable child, there is no need to Expand a static value(foldable value). **Before sql result** > select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1 > org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'percentile_approx(CAST(src.`key` AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000) > at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:92) > at org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates$.org$apache$spark$sql$catalyst$optimizer$RewriteDistinctAggregates$$nullify(RewriteDistinctAggregates.scala:261) **After sql result** > select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1 > [498.0,309,79136] ## How was this patch tested? Add a test case in HiveUDFSuit. Author: root <root@iZbp1gsnrlfzjxh82cz80vZ.(none)> Closes #15668 from windpiger/RewriteDistinctUDAFUnresolveExcep. (cherry picked from commit c291bd2) Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
@windpiger do you have a JIRA username? So I can credit you on the JIRA. |
@hvanhovell username is Song Jun |
…when a UDAF has a foldable TypeCheck ## What changes were proposed in this pull request? In RewriteDistinctAggregates rewrite funtion,after the UDAF's childs are mapped to AttributeRefference, If the UDAF(such as ApproximatePercentile) has a foldable TypeCheck for the input, It will failed because the AttributeRefference is not foldable,then the UDAF is not resolved, and then nullify on the unresolved object will throw a Exception. In this PR, only map Unfoldable child to AttributeRefference, this can avoid the UDAF's foldable TypeCheck. and then only Expand Unfoldable child, there is no need to Expand a static value(foldable value). **Before sql result** > select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1 > org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'percentile_approx(CAST(src.`key` AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000) > at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:92) > at org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates$.org$apache$spark$sql$catalyst$optimizer$RewriteDistinctAggregates$$nullify(RewriteDistinctAggregates.scala:261) **After sql result** > select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1 > [498.0,309,79136] ## How was this patch tested? Add a test case in HiveUDFSuit. Author: root <root@iZbp1gsnrlfzjxh82cz80vZ.(none)> Closes apache#15668 from windpiger/RewriteDistinctUDAFUnresolveExcep.
What changes were proposed in this pull request?
In RewriteDistinctAggregates rewrite funtion,after the UDAF's childs are mapped to AttributeRefference, If the UDAF(such as ApproximatePercentile) has a foldable TypeCheck for the input, It will failed because the AttributeRefference is not foldable,then the UDAF is not resolved, and then nullify on the unresolved object will throw a Exception.
In this PR, only map Unfoldable child to AttributeRefference, this can avoid the UDAF's foldable TypeCheck. and then only Expand Unfoldable child, there is no need to Expand a static value(foldable value).
Before sql result
After sql result
How was this patch tested?
Add a test case in HiveUDFSuit.