Skip to content

[SPARK-22390][SPARK-32833][SQL] JDBC V2 Datasource aggregate push down #29695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 23 commits into from

Conversation

huaxingao
Copy link
Contributor

@huaxingao huaxingao commented Sep 9, 2020

What changes were proposed in this pull request?

Push down JDBC aggregate to datasource layer

  1. push down iff all the aggregates can be pushed down
  2. Use JDBCOption pushDownAggregate to control if push down or not. The default is false.

Examples:

sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where dept > 0 group by DEPT")

before push down:

== Parsed Logical Plan ==
'Aggregate ['DEPT], [unresolvedalias('MAX('SALARY), None), unresolvedalias('MIN('BONUS), None)]
+- 'Filter ('dept > 0)
   +- 'UnresolvedRelation [h2, test, employee], [], false

== Analyzed Logical Plan ==
max(SALARY): decimal(20,2), min(BONUS): double
Aggregate [DEPT#0], [max(SALARY#2) AS max(SALARY)#6, min(BONUS#3) AS min(BONUS)#7]
+- Filter (dept#0 > 0)
   +- SubqueryAlias h2.test.employee
      +- RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3] test.employee

== Optimized Logical Plan ==
Aggregate [DEPT#0], [max(SALARY#2) AS max(SALARY)#6, min(BONUS#3) AS min(BONUS)#7]
+- RelationV2[DEPT#0, SALARY#2, BONUS#3] test.employee

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEPT#0], functions=[max(SALARY#2), min(BONUS#3)], output=[max(SALARY)#6, min(BONUS)#7])
   +- Exchange hashpartitioning(DEPT#0, 5), ENSURE_REQUIREMENTS, [id=#11]
      +- HashAggregate(keys=[DEPT#0], functions=[partial_max(SALARY#2), partial_min(BONUS#3)], output=[DEPT#0, max#18, min#19])
         +- Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@377949f1 [DEPT#0,SALARY#2,BONUS#3] PushedAggregates: [], PushedFilters: [IsNotNull(dept), GreaterThan(dept,0)], PushedGroupby: [], ReadSchema: struct<DEPT:int,SALARY:decimal(20,2),BONUS:double>

after push down:

== Parsed Logical Plan ==
'Aggregate ['DEPT], [unresolvedalias('MAX('SALARY), None), unresolvedalias('MIN('BONUS), None)]
+- 'Filter ('dept > 0)
   +- 'UnresolvedRelation [h2, test, employee], [], false

== Analyzed Logical Plan ==
max(SALARY): decimal(20,2), min(BONUS): decimal(6,2)
Aggregate [DEPT#253], [max(SALARY#255) AS max(SALARY)#259, min(BONUS#256) AS min(BONUS)#260]
+- Filter (dept#253 > 0)
   +- SubqueryAlias h2.test.employee
      +- RelationV2[DEPT#253, NAME#254, SALARY#255, BONUS#256] test.employee

== Optimized Logical Plan ==
Aggregate [DEPT#253], [max(Max(SALARY,DecimalType(20,2))#266) AS max(SALARY)#259, min(Min(BONUS,DecimalType(6,2))#267) AS min(BONUS)#260]
+- RelationV2[Max(SALARY,DecimalType(20,2))#266, Min(BONUS,DecimalType(6,2))#267, DEPT#253] test.employee

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEPT#253], functions=[max(Max(SALARY,DecimalType(20,2))#266), min(Min(BONUS,DecimalType(6,2))#267)], output=[max(SALARY)#259, min(BONUS)#260])
   +- Exchange hashpartitioning(DEPT#253, 5), ENSURE_REQUIREMENTS, [id=#397]
      +- HashAggregate(keys=[DEPT#253], functions=[partial_max(Max(SALARY,DecimalType(20,2))#266), partial_min(Min(BONUS,DecimalType(6,2))#267)], output=[DEPT#253, max#270, min#271])
         +- Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@30437e9c [Max(SALARY,DecimalType(20,2))#266,Min(BONUS,DecimalType(6,2))#267,DEPT#253] PushedAggregates: [*Max(SALARY,DecimalType(20,2)), *Min(BONUS,DecimalType(6,2))], PushedFilters: [IsNotNull(dept), GreaterThan(dept,0)], PushedGroupby: [*DEPT], ReadSchema: struct<Max(SALARY,DecimalType(20,2)):decimal(20,2),Min(BONUS,DecimalType(6,2)):decimal(6,2),DEPT:...
sql("select max(id), min(id) FROM h2.test.people where id > 0")

before push down:

== Parsed Logical Plan ==
'Project [unresolvedalias('MAX('ID), None), unresolvedalias('MIN('ID), None)]
+- 'Filter ('id > 0)
   +- 'UnresolvedRelation [h2, test, people], [], false

== Analyzed Logical Plan ==
max(ID): int, min(ID): int
Aggregate [max(ID#35) AS max(ID)#38, min(ID#35) AS min(ID)#39]
+- Filter (id#35 > 0)
   +- SubqueryAlias h2.test.people
      +- RelationV2[NAME#34, ID#35] test.people

== Optimized Logical Plan ==
Aggregate [max(ID#35) AS max(ID)#38, min(ID#35) AS min(ID)#39]
+- RelationV2[ID#35] test.people

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(ID#35), min(ID#35)], output=[max(ID)#38, min(ID)#39])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#83]
      +- HashAggregate(keys=[], functions=[partial_max(ID#35), partial_min(ID#35)], output=[max#46, min#47])
         +- Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@34beadce [ID#35] PushedAggregates: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0)], PushedGroupby: [], ReadSchema: struct<ID:int>

after push down:

== Parsed Logical Plan ==
'Project [unresolvedalias('MAX('ID), None), unresolvedalias('MIN('ID), None)]
+- 'Filter ('id > 0)
   +- 'UnresolvedRelation [h2, test, people], [], false

== Analyzed Logical Plan ==
max(ID): int, min(ID): int
Aggregate [max(ID#290) AS max(ID)#293, min(ID#290) AS min(ID)#294]
+- Filter (id#290 > 0)
   +- SubqueryAlias h2.test.people
      +- RelationV2[NAME#289, ID#290] test.people

== Optimized Logical Plan ==
Aggregate [max(Max(ID,IntegerType)#298) AS max(ID)#293, min(Min(ID,IntegerType)#299) AS min(ID)#294]
+- RelationV2[Max(ID,IntegerType)#298, Min(ID,IntegerType)#299] test.people

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(Max(ID,IntegerType)#298), min(Min(ID,IntegerType)#299)], output=[max(ID)#293, min(ID)#294])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#469]
      +- HashAggregate(keys=[], functions=[partial_max(Max(ID,IntegerType)#298), partial_min(Min(ID,IntegerType)#299)], output=[max#302, min#303])
         +- Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@1368e2f7 [Max(ID,IntegerType)#298,Min(ID,IntegerType)#299] PushedAggregates: [*Max(ID,IntegerType), *Min(ID,IntegerType)], PushedFilters: [IsNotNull(id), GreaterThan(id,0)], PushedGroupby: [], ReadSchema: struct<Max(ID,IntegerType):int,Min(ID,IntegerType):int>

Why are the changes needed?

for better performance

Does this PR introduce any user-facing change?

No

How was this patch tested?

add new tests

@huaxingao
Copy link
Contributor Author

cc @viirya

@SparkQA
Copy link

SparkQA commented Sep 9, 2020

Test build #128461 has finished for PR 29695 at commit 8257314.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Avg (column: String) extends Aggregate
  • case class Min (column: String) extends Aggregate
  • case class Max (column: String) extends Aggregate
  • case class Sum (column: String) extends Aggregate
  • trait PrunedFilteredAggregateScan

@viirya
Copy link
Member

viirya commented Sep 9, 2020

Thanks for pinging me. Let's sync with master and fix the compilation error?

@SparkQA
Copy link

SparkQA commented Sep 9, 2020

Test build #128463 has finished for PR 29695 at commit 8ccefd9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 9, 2020

Test build #128466 has finished for PR 29695 at commit f88e896.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 11, 2020

Test build #128536 has finished for PR 29695 at commit 78bc331.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 11, 2020

Test build #128546 has finished for PR 29695 at commit b11b24b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 11, 2020

Test build #128545 has finished for PR 29695 at commit a30fd6d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Aggregation(aggregateExpressions: Seq[AggregateFunction],
  • case class Avg (column: String) extends AggregateFunction
  • case class Min (column: String) extends AggregateFunction
  • case class Max (column: String) extends AggregateFunction
  • case class Sum (column: String) extends AggregateFunction

@huaxingao
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 12, 2020

Test build #128588 has finished for PR 29695 at commit b11b24b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Comment on lines 25 to 31
case class Avg (column: String) extends AggregateFunction

case class Min (column: String) extends AggregateFunction

case class Max (column: String) extends AggregateFunction

case class Sum (column: String) extends AggregateFunction
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra space like Avg(column: String)

* Pushes down Aggregation and returns aggregates that need to be evaluated after scanning.
* The Aggregation can be pushed down only if all the Aggregate Functions can
* be pushed down.
* @since 3.1.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need since for the methods here? pushFilters and pushedFilters don't have it.

}

aggregates.aggregateFunction match {
case aggregate.Min(child) => Some(Min(columnAsString(child)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If child is not a AttributeReference, can this work?

Comment on lines 140 to 143
case Min(column) => Some(quote(column) -> s"MIN(${quote(column)})")
case Max(column) => Some(quote(column) -> s"MAX(${quote(column)})")
case Sum(column) => Some(quote(column) -> s"Sum(${quote(column)})")
case Avg(column) => Some(quote(column) -> s"Avg(${quote(column)})")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why MIN and MAX are all upper-case, Sum and Avg are not?

case _ => None
}
if (!compiledAggregates.contains(None)) {
compiledAggregates.flatMap(x => x).toMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since column is key, each column can have only one aggregate function?

if (untranslatableExprs.isEmpty) r.pushAggregates(translatedAggregates.toArray)

// push down only if all the aggregates can be pushed down
if (!r.pushedAggregates.isEmpty) (r.pushedAggregates, Nil) else (Nil, aggregates)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even we can pushdown aggregate functions, we don't need post-scan aggregates at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually do aggregate at Spark layer regardless if aggregate is pushed down or not

@SparkQA
Copy link

SparkQA commented Sep 14, 2020

Test build #128663 has finished for PR 29695 at commit 134a83e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Avg(column: String) extends AggregateFunction
  • case class Min(column: String) extends AggregateFunction
  • case class Max(column: String) extends AggregateFunction
  • case class Sum(column: String) extends AggregateFunction

@SparkQA
Copy link

SparkQA commented Sep 14, 2020

Test build #128664 has finished for PR 29695 at commit c45a2b6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@huaxingao
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 15, 2020

Test build #128673 has finished for PR 29695 at commit c45a2b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 21, 2020

Test build #128925 has finished for PR 29695 at commit adf1588.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Aggregation(aggregateExpressions: Seq[AggregateFunc],
  • case class Avg(column: String) extends AggregateFunc
  • case class Min(column: String) extends AggregateFunc
  • case class Max(column: String) extends AggregateFunc
  • case class Sum(column: String) extends AggregateFunc

@huaxingao
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 21, 2020

Test build #128936 has finished for PR 29695 at commit adf1588.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Aggregation(aggregateExpressions: Seq[AggregateFunc],
  • case class Avg(column: String) extends AggregateFunc
  • case class Min(column: String) extends AggregateFunc
  • case class Max(column: String) extends AggregateFunc
  • case class Sum(column: String) extends AggregateFunc

} else {
aggregates
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove this method

}

val r = buildLogicalPlan(newOutput, relation, wrappedScan, newOutput,
normalizedProjects, postScanFilters)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the new output to build the plan.
e.g. original output, JDBCScan$$anon$1@226de93c [ID#18], new output, JDBCScan$$anon$1@3f6f9cef [max(ID)#77,min(ID)#78]

}
agg.transform {
case a: aggregate.AggregateFunction => aggFunction
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update optimized logical plan
before update

Aggregate [max(id#18) AS max(id)#21, min(id#18) AS min(id)#22]
+- RelationV2[ID#18] test.people

after update

Aggregate [max(max(ID)#77) AS max(ID)#72, min(min(ID)#78) AS min(ID)#73]
+- RelationV2[max(ID)#77, min(ID)#78] test.people

* @since 3.1.0
*/
def unhandledAggregates(aggregates: Array[AggregateFunc]): Array[AggregateFunc] =
aggregates
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used. Will remove

@huaxingao
Copy link
Contributor Author

also cc @cloud-fan @maropu @MaxGekk
I did some initial work for aggregate push down. There are still quite some places to be improved, but could you please take a look to see if this is OK? Thanks!

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other questions I have are as follows;

  • Can we support aggregate pushdown with all the data types? For example, aggregating decimal values seems to have different behaviours between database implementations.
  • How does Spark receive aggregated values on database sides? It seems the data types of input/aggregated values are different in some databases, e.g., sum(bigint)=>numeric in PostgreSQL.
  • How does Spark handle overflows on database sides?


def columnAsString(e: Expression): String = e match {
case AttributeReference(name, _, _, _) => name
case Cast(child, _, _) => child match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe? Aggregating casted values and casting aggregated values looks different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is probably OK?
For example, if I have sum(cast(SALARY as bigInt)) here, I will remove cast and push down sum(SALARY) to data source. Then I will cast the output of sum(SALARY) to bigInt.

Copy link
Member

@maropu maropu Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if SALARY is a tiny int? Looks sum(SALARY) can have an overflow easily in the datasource side. Ah, I misunderstood a bit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if t exists in a database side, it seems we can get different answers with/without pushdown?

scala> Seq(0.6, 0.7, 0.8).toDF("v").write.saveAsTable("t")
scala> sql("select sum(cast(v as int)) from t").show()
+-------------------+
|sum(CAST(v AS INT))|
+-------------------+
|                  0|
+-------------------+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the example. I actually only want to strip off the cast added by Spark. For example, when doing sum, Spark cast integral type to long.

case Sum(e @ IntegralType()) if e.dataType != LongType => Sum(Cast(e, LongType))

For the casting added by Spark, I will remove the casting, push down aggregate and do the same casting on database side.
If the cast is from user, I will keep the cast and NOT push down aggregate (cast(col)) for now.
To differentiate user explicitly casting from Spark added casting, I will add a flag some where. Does this sound OK to you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, how about the fractional case? If e is float, we should not remove the cast but pass it into a database side for computing double-typed partial aggregated values?

case Sum(e @ FractionalType()) if e.dataType != DoubleType => Sum(Cast(e, DoubleType))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the col data type is float, I think we should remove the cast and pass it to database side. On database side, cast it to double:

if (cast by Spark)
  remove cast from sum
  push down Sum(col) to database
  if (datatype of col is float)
    cast to Double, something like SELECT  Sum(CAST(col AS DOUBLE)) FROM table
else // cast by user
  not remove cast and not push down aggregate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu @huaxingao

such casting are removed by spark in #31079

case _ => ""
}

aggregates.aggregateFunction match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if aggregates has isDistinct=true or filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will need to change the following to add isDistinct and filter. Also change translateAggregate accordingly. When push down the aggregates, need to check the filter to make sure it can be pushed down too.

case class Avg(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc

case class Min(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc

case class Max(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc

case class Sum(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc

// == Physical Plan ==
// *(2) HashAggregate(keys=[DEPT#0], functions=[max(max(SALARY)#13), min(min(BONUS)#14)], output=[max(SALARY)#6, min(BONUS)#7])
// +- Exchange hashpartitioning(DEPT#0, 5), true, [id=#13]
// +- *(1) HashAggregate(keys=[DEPT#0], functions=[partial_max(max(SALARY)#13), partial_min(min(BONUS)#14)], output=[DEPT#0, max#17, min#18])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this partial aggregate in this case? We can replace it with Project?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems partial aggregate is not needed any more. Probably need to add a method AggUtils.planAggregateWithoutPartial. I will figure out.

@huaxingao
Copy link
Contributor Author

@maropu Thank you very much for your review.

Can we support aggregate pushdown with all the data types? For example, aggregating decimal values seems to have different behaviours between database implementations.

Sorry I am not familiar with this. Could you please give me an example of the different implementations?

How does Spark receive aggregated values on database sides? It seems the data types of input/aggregated values are different in some databases, e.g., sum(bigint)=>numeric in PostgreSQL.

I will cast the output of aggregates to the type that spark expects.
For example, spark expects bigInt from sum(int), so the output of sum(int) from database needs to be casted to bigint.

How does Spark handle overflows on database sides?

Not sure how to handle this yet. I will try to figure out. Please let me know if you have a good idea.

@maropu
Copy link
Member

maropu commented Oct 1, 2020

Can we support aggregate pushdown with all the data types? For example, aggregating decimal values seems to have different behaviours between database implementations.

Sorry I am not familiar with this. Could you please give me an example of the different implementations?

For example, since PostgreSQL can have pretty large scale/precision in numeric, I think partial aggregated values can go over the value range of Spark decimals. So, I'm not sure that we can keep the same behaviour between with/without aggregate pushdown.

@maropu
Copy link
Member

maropu commented Oct 1, 2020

How does Spark handle overflows on database sides?

Not sure how to handle this yet. I will try to figure out. Please let me know if you have a good idea.

I don't have a smart idea, too. Any other system implementing this feature? If it eixists, its better to refer to it.

@SparkQA
Copy link

SparkQA commented Feb 22, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39907/

@SparkQA
Copy link

SparkQA commented Feb 22, 2021

Test build #135329 has finished for PR 29695 at commit 69813c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 22, 2021

Test build #135327 has finished for PR 29695 at commit e9e984e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Avg(column: String, dataType: DataType, isDistinct: Boolean) extends AggregateFunc
  • case class Min(column: String, dataType: DataType) extends AggregateFunc
  • case class Max(column: String, dataType: DataType) extends AggregateFunc
  • case class Sum(column: String, dataType: DataType, isDistinct: Boolean) extends AggregateFunc

@baibaichen
Copy link
Contributor

Thanks @huaxingao

we did some tests on aggregate push down in real product environment last month. here are results

  1. datasets: 550M records
  2. 4 click-house nodes
  1 User 10 Users 20 Users 60 Users
QPS 2.76 6.1 4.43 4.45
90% (sec) 0.4 2.1 7 17
slowest (sec) 0.45 3.3 12 27

we didn't test without aggregate push down, because it is 10 X slower than push down

However the current PR has some limitations:

  1. Don't support count
  2. Don't support AVG in case of multiple shards
  3. Don't know how to extend the implementation for supporting more aggregation case, for example, sum(if()).

Thanks
Chang

@viirya
Copy link
Member

viirya commented Mar 17, 2021

Can we support aggregate pushdown with all the data types? For example, aggregating decimal values seems to have different behaviours between database implementations.

Sorry I am not familiar with this. Could you please give me an example of the different implementations?

For example, since PostgreSQL can have pretty large scale/precision in numeric, I think partial aggregated values can go over the value range of Spark decimals. So, I'm not sure that we can keep the same behaviour between with/without aggregate pushdown.

For the simple PostgreSQL JDBC case, if Spark queries decimal values which have large scale/precision at PostgreSQL, how Spark handles this? Cast the values to fix Spark scale/precision?

* A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to
* push down aggregates to the data source.
*
* @since 3.1.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe 3.2.0 now.

void pushAggregation(Aggregation aggregation);

/**
* Returns the aggregates that are pushed to the data source via
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this returns Aggregation?

@@ -700,6 +704,41 @@ object DataSourceStrategy
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
}

private def columnAsString(e: Expression): String = e match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For predicate pushdown, seems we simplify the cases to handle by only looking at column name.

This covers a lot of cases but also makes it easy to break. We can begin with simplest case and add more supports later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wait for others. See if there is any other voices.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This covers a lot of cases but also makes it easy to break. We can begin with simplest case and add more supports later.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. It also seems strange to convert binary expression into a "magic" string form that is (seems) special to JDBC datasources.

I also wonder if we should handle nested columns the same way as PushableColumnBase

stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setFetchSize(options.fetchSize)
stmt.setQueryTimeout(options.queryTimeout)
rs = stmt.executeQuery()
val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change?

if (sb.length == 0) "1" else sb.substring(1)
}

private def getAggregateColumnsList(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a comment here to explain what getAggregateColumnsList does and why it is needed?

updatedSchema = schema
columns.foreach(x => sb.append(",").append(x))
} else {
getAggregateColumnsList(sb, compiledAgg, aggDataType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

columns is empty for this case?

case a: AttributeReference => replaceAlias(a, aliasMap)
}
}
val normalizedgroupingExpressions =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normalizedGroupingExpressions

val (pushedFilters, postScanFilters) = pushDownFilter(scanBuilder, filters, relation)
if (postScanFilters.nonEmpty) {
Aggregate(groupingExpressions, resultExpressions, child)
} else { // only push down aggregate of all the filers can be push down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of -> if

child match {
case ScanOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options)
val (pushedFilters, postScanFilters) = pushDownFilter(scanBuilder, filters, relation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see there is a dependency between filter pushdown and aggregate pushdown. As we need to check if all filters are pushed down.

I think an alternative approach is to not touch filter pushdown, but to check if filter pushdown is happened and there is still Filter on top of the scan relation.

I feel that can simplify the code here. And we don't need to call pushDownFilter twice for aggregate and filter.

Comment on lines 96 to 98
for (groupBy <- groupingExpressions) {
aggOutputBuilder += groupBy.asInstanceOf[AttributeReference]
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groupingExpressions is Seq[Expression], are we sure they all AttributeReference?

@huaxingao
Copy link
Contributor Author

For the simple PostgreSQL JDBC case, if Spark queries decimal values which have large scale/precision at PostgreSQL, how Spark handles this? Cast the values to fix Spark scale/precision?

I did a quick test using H2. Spark actually threw Exception if the underlying database returns larger precision than Spark's MAX_PRECISION which is 38.
Is this the correct behavior?

      conn.prepareStatement(
        "CREATE TABLE \"test\".\"test_decimal\" (C1 DECIMAL(45, 30), C2 INTEGER NOT NULL)")
        .executeUpdate()
      conn.prepareStatement("INSERT INTO \"test\".\"test_decimal\" VALUES " +
        "(123456789012345.5432154321543215432154321, 1)").executeUpdate()


     sql("SELECT C1, C2 FROM h2.test.test_decimal").show(false)

The test failed with

java.lang.ArithmeticException: Decimal precision 45 exceeds max precision 38

Here are the code: Spark first maps the JDBC Decimal to Spark DecimalType with the maximum precision of 38.

  private def getCatalystType(
      sqlType: Int,
      precision: Int,
      scale: Int,
      signed: Boolean): DataType = {

      ......

      case java.sql.Types.DECIMAL
        if precision != 0 || scale != 0 => DecimalType.bounded(precision, scale)
  private[sql] def bounded(precision: Int, scale: Int): DecimalType = {
    DecimalType(min(precision, MAX_PRECISION), min(scale, MAX_SCALE))
  }

It then throws Exception here:

  def set(decimal: BigDecimal, precision: Int, scale: Int): Decimal = {
    DecimalType.checkNegativeScale(scale)
    this.decimalVal = decimal.setScale(scale, ROUND_HALF_UP)
    if (decimalVal.precision > precision) {
      throw new ArithmeticException(
        s"Decimal precision ${decimalVal.precision} exceeds max precision $precision")
    }

@SparkQA
Copy link

SparkQA commented Mar 22, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40902/

@SparkQA
Copy link

SparkQA commented Mar 22, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40902/

@SparkQA
Copy link

SparkQA commented Mar 22, 2021

Test build #136320 has finished for PR 29695 at commit ef4bab9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Count(column: String, dataType: DataType, isDistinct: Boolean) extends AggregateFunc

@huaxingao
Copy link
Contributor Author

@viirya Thanks for reviewing. I have addressed your comments. Could you please check one more time?

@huaxingao
Copy link
Contributor Author

cc @cloud-fan @maropu

@huaxingao huaxingao changed the title [SPARK-22390][SPARK-32833][SQL] [WIP]JDBC V2 Datasource aggregate push down [SPARK-22390][SPARK-32833][SQL] JDBC V2 Datasource aggregate push down Mar 22, 2021
@huaxingao
Copy link
Contributor Author

also cc @sunchao @dongjoon-hyun

@@ -191,6 +191,9 @@ class JDBCOptions(
// An option to allow/disallow pushing down predicate into JDBC data source
val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean

// An option to allow/disallow pushing down aggregate into JDBC data source
val pushDownAggregate = parameters.getOrElse(JDBC_PUSHDOWN_AGGREGATE, "false").toBoolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -77,15 +77,25 @@ case class Count(children: Seq[Expression]) extends DeclarativeAggregate {

override def defaultResult: Option[Literal] = Option(Literal(0L))

private[sql] var pushDown: Boolean = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel putting this variable for the pushdown feature does not look a good design. We cannot extend Count for pushdown-specific counting?

@@ -700,6 +704,41 @@ object DataSourceStrategy
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
}

private def columnAsString(e: Expression): String = e match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This covers a lot of cases but also makes it easy to break. We can begin with simplest case and add more supports later.

+1

}

protected[sql] def translateAggregate(aggregates: AggregateExpression): Option[AggregateFunc] = {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary blank.

case count: aggregate.Count =>
val columnName = count.children.head match {
case Literal(_, _) =>
"1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: case Literal(_, _) => "1"

extends RDD[InternalRow](sc, Nil) {

/**
* Retrieve the list of partitions corresponding to this RDD.
*/
override def getPartitions: Array[Partition] = partitions

private var updatedSchema: StructType = new StructType()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid to use var here?

@github-actions github-actions bot added the DOCS label Mar 27, 2021
@SparkQA
Copy link

SparkQA commented Mar 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41183/

@SparkQA
Copy link

SparkQA commented Mar 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41183/

@SparkQA
Copy link

SparkQA commented Mar 27, 2021

Test build #136600 has finished for PR 29695 at commit 782a0a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Count(children: Seq[Expression]) extends CountBase(children)
  • abstract class CountBase(children: Seq[Expression]) extends DeclarativeAggregate
  • case class PushDownCount(children: Seq[Expression]) extends CountBase(children)


import org.apache.spark.sql.types.DataType

case class Aggregation(aggregateExpressions: Seq[AggregateFunc],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these need some docs since they are user-facing? and maybe some examples on how to handle aggregateExpressions and groupByExpressions. For the latter, should we also name it groupByColumns?

@@ -700,6 +704,41 @@ object DataSourceStrategy
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
}

private def columnAsString(e: Expression): String = e match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. It also seems strange to convert binary expression into a "magic" string form that is (seems) special to JDBC datasources.

I also wonder if we should handle nested columns the same way as PushableColumnBase

@@ -700,6 +704,49 @@ object DataSourceStrategy
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
}

private def columnAsString(e: Expression): String = e match {
case AttributeReference(name, _, _, _) => name
case Cast(child, _, _) => columnAsString (child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space after columnAsString.

columnAsString(left) + " * " + columnAsString(right)
case Divide(left, right, _) =>
columnAsString(left) + " / " + columnAsString(right)
case CheckOverflow(child, _, _) => columnAsString (child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space after columnAsString.

if (colName.nonEmpty) Some(Sum(colName, sum.dataType, aggregates.isDistinct)) else None
case count: aggregate.Count =>
val columnName = count.children.head match {
case Literal(_, _) => "1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is "1"? also should we check if there is more than one elements in children?


val (pushedFilters, postScanFilters) = pushDownFilter(scanBuilder, newFilters, relation)
if (postScanFilters.nonEmpty) {
Aggregate(groupingExpressions, resultExpressions, child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should return the original plan node rather than a new Aggregate?

}

if (aggregation.aggregateExpressions.isEmpty) {
Aggregate(groupingExpressions, resultExpressions, child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

val aggOutputBuilder = ArrayBuilder.make[AttributeReference]
for (i <- 0 until aggregates.length) {
aggOutputBuilder += AttributeReference(
aggregation.aggregateExpressions(i).toString, aggregates(i).dataType)()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm is this correct?

/**
* @since 3.1.0
*/
trait PrunedFilteredAggregateScan {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit strange that this is a DSv1 API but is only used by DSv2 JDBC scan? is it possible that a V1 data source implements this and goes through the V1 code path (i.e., through DataSourceStrategy)?

import DataSourceV2Implicits._

override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case ScanOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options)
case Aggregate(groupingExpressions, resultExpressions, child) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little hard to read. Maybe we can better separate the logic for pushing down aggregate with pushing down filters. Also some comments can help.

@moderakh
Copy link

moderakh commented May 4, 2021

we are developing a spark connector (https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12) using DataSoruceV2 api and are interested in aggregate push-down support.

We are excited about seeing your PR. :-)

@huaxingao @sunchao @viirya I wonder what the status of aggregate push-down for spark3 is. Are you planning to include this change in spark 3.2?

@huaxingao
Copy link
Contributor Author

@moderakh Thanks for the interest in this PR. We are working with @cloud-fan on this Aggregate push down feature. Hopefully we can reach a consensus soon and are able to move forward.

@huaxingao
Copy link
Contributor Author

JDBC aggregate push down is merged in 3.2 using this PR #33352. There are several limitations:

  1. Avg is not supported yet. I will probably work on this in 3.3
  2. aggregate over alias is not pushed down
  3. aggregate over arithmetic operation is not pushed down

@rf972 If you have time, please try it out and let me know if there are any issues. Thanks a lot!

I will close this PR. Thank you everyone for helping me reviewing and testing! Really appreciate all your help!

@huaxingao huaxingao closed this Jul 28, 2021
@rf972
Copy link

rf972 commented Aug 9, 2021

@huaxingao Thanks for merging aggregate push down ! We have tested this Spark 3.2 aggregate push down support with our own push down capable datasource for S3 and HDFS. We have run all our tests and we have not found any issues.

@huaxingao
Copy link
Contributor Author

@rf972 Thank you very much for helping us testing this! Really appreciate your help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants