Skip to content

[SPARK-18137][SQL]Fix RewriteDistinctAggregates UnresolvedException when a UDAF has a foldable TypeCheck #15668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

windpiger
Copy link
Contributor

What changes were proposed in this pull request?

In RewriteDistinctAggregates rewrite funtion,after the UDAF's childs are mapped to AttributeRefference, If the UDAF(such as ApproximatePercentile) has a foldable TypeCheck for the input, It will failed because the AttributeRefference is not foldable,then the UDAF is not resolved, and then nullify on the unresolved object will throw a Exception.

In this PR, only map Unfoldable child to AttributeRefference, this can avoid the UDAF's foldable TypeCheck. and then only Expand Unfoldable child, there is no need to Expand a static value(foldable value).

Before sql result

select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'percentile_approx(CAST(src.key AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000)
at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:92)
at org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates$.org$apache$spark$sql$catalyst$optimizer$RewriteDistinctAggregates$$nullify(RewriteDistinctAggregates.scala:261)

After sql result

select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1
[498.0,309,79136]

How was this patch tested?

Add a test case in HiveUDFSuit.

@cloud-fan
Copy link
Contributor

OK to test

@windpiger windpiger changed the title [SPARK-18137][SQL]Fix RewriteDistinctAggregates UnresolvedException w… [SPARK-18137][SQL]Fix RewriteDistinctAggregates UnresolvedException when a UDAF has a foldable TypeCheck Oct 28, 2016
@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Oct 28, 2016

Test build #67698 has finished for PR 15668 at commit 7029e89.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

@windpiger I have taken a quick look at your PR. I think your approach has merit, however I was wondering if it is easier not to extract/move the foldable expressions at all?

@cloud-fan
Copy link
Contributor

+1 on @hvanhovell 's idea, literals don't need to be pre-executed.

@windpiger
Copy link
Contributor Author

@hvanhovell @cloud-fan Do you mean that just change distinctAggChildren/regularAggChildren to represent the unfoldable expressions, and then the followings code could be unchanged?

...
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct.filter(!_.foldable)
...

val regularAggChildren = regularAggExprs.flatMap(.aggregateFunction.children).distinct.**filter(!.foldable)**

@cloud-fan
Copy link
Contributor

It looks to me that after this PR, we still have literals in expand, can you call df.explain(true) to double check the query plan?

and how about this? https://github.com/apache/spark/compare/master...cloud-fan:showcase?expand=1 It's a little simpler and do avoid puting literals in expand.

@windpiger
Copy link
Contributor Author

@cloud-fan yes,It is simpler,I check the query plan, my test case
sql("SELECTpercentile_approx(key, 0.99999)" + ", count(distinct key),sum(distinct key) FROM src LIMIT 1")
explain is ok as follows:
+- *Expand [List(null, null, 0, cast(key#29 as double)), List(key#29, null, 1, null), List(null, cast(key#29 as bigint), 2, null)], [src.key#38, CAST(src.keyAS BIGINT)#39L, gid#37, CAST(src.keyAS DOUBLE)#40] +- HiveTableScan [key#29], MetastoreRelation default, src

But, I miss one case that the distinct on a constant like:
sql("SELECTpercentile_approx(key, 0.99999)" + ", count(distinct key),sum(distinct key),count(distinct 1) FROM src LIMIT 1")
explain with a literal expand as follows:
+- *Expand [List(null, null, null, 0, cast(key#69 as double)), List(1, null, null, 1, null), List(null, key#69, null, 2, null), List(null, null, cast(key#69 as bigint), 3, null)], [src.key#80, CAST(src.keyAS BIGINT)#81L, gid#79, CAST(src.keyAS DOUBLE)#82] +- HiveTableScan [key#69], MetastoreRelation default, src

…hild;if has unfoldable children,it will only expand the unfoldable children
@SparkQA
Copy link

SparkQA commented Nov 7, 2016

Test build #68267 has finished for PR 15668 at commit 8a6dd8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@windpiger
Copy link
Contributor Author

windpiger commented Nov 7, 2016

@cloud-fan I rewrite the expand logic:

  1. If the distinct aggFunction has unfoldable children,it will only expand the unfoldable children;
  2. If the distinct aggFunction only has foldable children, it will only expand the first children, generally the aggFunction should not run foldable TypeChecker for the first child(this is the way to avoid foldable typechecker for the first child).

for example:
select percentile_approx(2,0.99999),sum(distinct 1),count(distinct 1,2,3,1) from src limit 1

explained:
== Physical Plan == CollectLimit 1 +- *HashAggregate(keys=[], functions=[first(if ((gid#177 = 0)) percentile_approx(CAST(2 AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000)#180 else null, true), sum(if ((gid#177 = 1)) CAST(1 AS BIGINT)#178L else null), count(if ((gid#177 = 2)) 1#179 else null, 2, 3, if ((gid#177 = 2)) 1#179 else null)]) +- Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_first(if ((gid#177 = 0)) percentile_approx(CAST(2 AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000)#180 else null, true), partial_sum(if ((gid#177 = 1)) CAST(1 AS BIGINT)#178L else null), partial_count(if ((gid#177 = 2)) 1#179 else null, 2, 3, if ((gid#177 = 2)) 1#179 else null)]) +- SortAggregate(key=[CAST(1 AS BIGINT)#178L, 1#179, gid#177], functions=[percentile_approx(2.0, 0.99999, 10000, 0, 0)]) +- *Sort [CAST(1 AS BIGINT)#178L ASC NULLS FIRST, 1#179 ASC NULLS FIRST, gid#177 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(CAST(1 AS BIGINT)#178L, 1#179, gid#177, 200) +- SortAggregate(key=[CAST(1 AS BIGINT)#178L, 1#179, gid#177], functions=[partial_percentile_approx(2.0, 0.99999, 10000, 0, 0)]) +- *Sort [CAST(1 AS BIGINT)#178L ASC NULLS FIRST, 1#179 ASC NULLS FIRST, gid#177 ASC NULLS FIRST], false, 0 +- *Expand [List(null, null, 0), List(1, null, 1), List(null, 1, 2)], [CAST(1 AS BIGINT)#178L, 1#179, gid#177] +- HiveTableScan MetastoreRelation default, src
and the result is:
+--------------------------------------------------------------------+---------------+--------------------------+ |percentile_approx(CAST(2 AS DOUBLE), CAST(0.99999 AS DOUBLE), 10000)|sum(DISTINCT 1)|count(DISTINCT 1, 2, 3, 1)| +--------------------------------------------------------------------+---------------+--------------------------+ | 2.0| 1| 1| +--------------------------------------------------------------------+---------------+--------------------------+

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is shaping up nicely. I left a few minor comments.

@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPl
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.IntegerType

/**
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you revert this? this breaks scaladoc.

// NamedExpression. This is done to prevent collisions between distinct and regular aggregate
// children, in this case attribute reuse causes the input of the regular aggregate to bound to
// the (nulled out) input of the distinct aggregate.
// We are creating a new reference here instead of reusing the attribute in case of a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this change

@@ -237,8 +251,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {

// Construct the second aggregate
val transformations: Map[Expression, Expression] =
(distinctAggOperatorMap.flatMap(_._2) ++
regularAggOperatorMap.map(e => (e._1, e._3))).toMap
(distinctAggOperatorMap.flatMap(_._2) ++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this change

@@ -150,6 +150,24 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
}

test("Generic UDAF aggregates") {
checkAnswer(sql("SELECT percentile_approx(2, 0.99999), " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use multiline strings for these tests.

val distinctAggGroups = aggExpressions
.filter(_.isDistinct)
.groupBy(_.aggregateFunction.children.toSet)
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Space between groupBy and bracket.

.groupBy(_.aggregateFunction.children.toSet)
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy{
e =>
if (e.aggregateFunction.children.exists(!_.foldable)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just materialize the nonFoldables. Instead of filtering them twice.

af: AggregateFunction)(
attrs: Expression => Expression): AggregateFunction = {
af.withNewChildren(af.children.map(attrs)).asInstanceOf[AggregateFunction]
def patchAggregateFunctionChildren(af: AggregateFunction)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Style, please keep this the way it was.

// count(distinct 1) will be explained to count(1) after the rewrite function.
// Generally, the distinct aggregateFunction should not run
// foldable TypeCheck for the first child.
e.aggregateFunction.children.take(1).toSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch. It would be great if we could git rid of this by constant folding (not needed in this PR). Another way of getting rid of this, would be by creating a separate processing group for these distincts.

@SparkQA
Copy link

SparkQA commented Nov 7, 2016

Test build #68270 has finished for PR 15668 at commit c5b3a3d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 7, 2016

Test build #68274 has finished for PR 15668 at commit 67fc72d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@windpiger
Copy link
Contributor Author

retest this please

@windpiger
Copy link
Contributor Author

@cloud-fan @hvanhovell could you please help to retest this?

@cloud-fan
Copy link
Contributor

retest this please

@cloud-fan
Copy link
Contributor

add to whitelist

val distinctAggGroups = aggExpressions
.filter(_.isDistinct)
.groupBy(_.aggregateFunction.children.toSet)
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

...groupBy { e =>
  ...
}

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins

@SparkQA
Copy link

SparkQA commented Nov 8, 2016

Test build #68315 has finished for PR 15668 at commit 67fc72d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 8, 2016

Test build #68323 has finished for PR 15668 at commit 6e58167.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

hvanhovell commented Nov 8, 2016

LGTM. Merging to master/2.1/2.0. Thanks!

asfgit pushed a commit that referenced this pull request Nov 8, 2016
…when a UDAF has a foldable TypeCheck

## What changes were proposed in this pull request?

In RewriteDistinctAggregates rewrite funtion,after the UDAF's childs are mapped to AttributeRefference, If the UDAF(such as ApproximatePercentile) has a foldable TypeCheck for the input, It will failed because the AttributeRefference is not foldable,then the UDAF is not resolved, and then nullify on the unresolved object will throw a Exception.

In this PR, only map Unfoldable child to AttributeRefference, this can avoid the UDAF's foldable TypeCheck. and then only Expand Unfoldable child, there is no need to Expand a static value(foldable value).

**Before sql result**

> select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'percentile_approx(CAST(src.`key` AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000)
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:92)
>     at org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates$.org$apache$spark$sql$catalyst$optimizer$RewriteDistinctAggregates$$nullify(RewriteDistinctAggregates.scala:261)

**After sql result**

> select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1
> [498.0,309,79136]
## How was this patch tested?

Add a test case in HiveUDFSuit.

Author: root <root@iZbp1gsnrlfzjxh82cz80vZ.(none)>

Closes #15668 from windpiger/RewriteDistinctUDAFUnresolveExcep.

(cherry picked from commit c291bd2)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
asfgit pushed a commit that referenced this pull request Nov 8, 2016
…when a UDAF has a foldable TypeCheck

## What changes were proposed in this pull request?

In RewriteDistinctAggregates rewrite funtion,after the UDAF's childs are mapped to AttributeRefference, If the UDAF(such as ApproximatePercentile) has a foldable TypeCheck for the input, It will failed because the AttributeRefference is not foldable,then the UDAF is not resolved, and then nullify on the unresolved object will throw a Exception.

In this PR, only map Unfoldable child to AttributeRefference, this can avoid the UDAF's foldable TypeCheck. and then only Expand Unfoldable child, there is no need to Expand a static value(foldable value).

**Before sql result**

> select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'percentile_approx(CAST(src.`key` AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000)
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:92)
>     at org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates$.org$apache$spark$sql$catalyst$optimizer$RewriteDistinctAggregates$$nullify(RewriteDistinctAggregates.scala:261)

**After sql result**

> select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1
> [498.0,309,79136]
## How was this patch tested?

Add a test case in HiveUDFSuit.

Author: root <root@iZbp1gsnrlfzjxh82cz80vZ.(none)>

Closes #15668 from windpiger/RewriteDistinctUDAFUnresolveExcep.

(cherry picked from commit c291bd2)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
@asfgit asfgit closed this in c291bd2 Nov 8, 2016
@hvanhovell
Copy link
Contributor

@windpiger do you have a JIRA username? So I can credit you on the JIRA.

@windpiger
Copy link
Contributor Author

@hvanhovell username is Song Jun

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…when a UDAF has a foldable TypeCheck

## What changes were proposed in this pull request?

In RewriteDistinctAggregates rewrite funtion,after the UDAF's childs are mapped to AttributeRefference, If the UDAF(such as ApproximatePercentile) has a foldable TypeCheck for the input, It will failed because the AttributeRefference is not foldable,then the UDAF is not resolved, and then nullify on the unresolved object will throw a Exception.

In this PR, only map Unfoldable child to AttributeRefference, this can avoid the UDAF's foldable TypeCheck. and then only Expand Unfoldable child, there is no need to Expand a static value(foldable value).

**Before sql result**

> select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'percentile_approx(CAST(src.`key` AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000)
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:92)
>     at org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates$.org$apache$spark$sql$catalyst$optimizer$RewriteDistinctAggregates$$nullify(RewriteDistinctAggregates.scala:261)

**After sql result**

> select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1
> [498.0,309,79136]
## How was this patch tested?

Add a test case in HiveUDFSuit.

Author: root <root@iZbp1gsnrlfzjxh82cz80vZ.(none)>

Closes apache#15668 from windpiger/RewriteDistinctUDAFUnresolveExcep.
davidnavas pushed a commit to davidnavas/spark that referenced this pull request Jun 7, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants