-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-22390][SPARK-32833][SQL] JDBC V2 Datasource aggregate push down #29695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @viirya |
Test build #128461 has finished for PR 29695 at commit
|
Thanks for pinging me. Let's sync with master and fix the compilation error? |
Test build #128463 has finished for PR 29695 at commit
|
Test build #128466 has finished for PR 29695 at commit
|
Test build #128536 has finished for PR 29695 at commit
|
Test build #128546 has finished for PR 29695 at commit
|
Test build #128545 has finished for PR 29695 at commit
|
retest this please |
Test build #128588 has finished for PR 29695 at commit
|
case class Avg (column: String) extends AggregateFunction | ||
|
||
case class Min (column: String) extends AggregateFunction | ||
|
||
case class Max (column: String) extends AggregateFunction | ||
|
||
case class Sum (column: String) extends AggregateFunction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove extra space like Avg(column: String)
* Pushes down Aggregation and returns aggregates that need to be evaluated after scanning. | ||
* The Aggregation can be pushed down only if all the Aggregate Functions can | ||
* be pushed down. | ||
* @since 3.1.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need since for the methods here? pushFilters
and pushedFilters
don't have it.
} | ||
|
||
aggregates.aggregateFunction match { | ||
case aggregate.Min(child) => Some(Min(columnAsString(child))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If child
is not a AttributeReference
, can this work?
case Min(column) => Some(quote(column) -> s"MIN(${quote(column)})") | ||
case Max(column) => Some(quote(column) -> s"MAX(${quote(column)})") | ||
case Sum(column) => Some(quote(column) -> s"Sum(${quote(column)})") | ||
case Avg(column) => Some(quote(column) -> s"Avg(${quote(column)})") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why MIN and MAX are all upper-case, Sum and Avg are not?
case _ => None | ||
} | ||
if (!compiledAggregates.contains(None)) { | ||
compiledAggregates.flatMap(x => x).toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since column
is key, each column can have only one aggregate function?
if (untranslatableExprs.isEmpty) r.pushAggregates(translatedAggregates.toArray) | ||
|
||
// push down only if all the aggregates can be pushed down | ||
if (!r.pushedAggregates.isEmpty) (r.pushedAggregates, Nil) else (Nil, aggregates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even we can pushdown aggregate functions, we don't need post-scan aggregates at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually do aggregate at Spark layer regardless if aggregate is pushed down or not
Test build #128663 has finished for PR 29695 at commit
|
Test build #128664 has finished for PR 29695 at commit
|
retest this please |
Test build #128673 has finished for PR 29695 at commit
|
Test build #128925 has finished for PR 29695 at commit
|
retest this please |
Test build #128936 has finished for PR 29695 at commit
|
} else { | ||
aggregates | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove this method
} | ||
|
||
val r = buildLogicalPlan(newOutput, relation, wrappedScan, newOutput, | ||
normalizedProjects, postScanFilters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the new output to build the plan.
e.g. original output, JDBCScan$$anon$1@226de93c [ID#18]
, new output, JDBCScan$$anon$1@3f6f9cef [max(ID)#77,min(ID)#78]
} | ||
agg.transform { | ||
case a: aggregate.AggregateFunction => aggFunction | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update optimized logical plan
before update
Aggregate [max(id#18) AS max(id)#21, min(id#18) AS min(id)#22]
+- RelationV2[ID#18] test.people
after update
Aggregate [max(max(ID)#77) AS max(ID)#72, min(min(ID)#78) AS min(ID)#73]
+- RelationV2[max(ID)#77, min(ID)#78] test.people
* @since 3.1.0 | ||
*/ | ||
def unhandledAggregates(aggregates: Array[AggregateFunc]): Array[AggregateFunc] = | ||
aggregates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used. Will remove
also cc @cloud-fan @maropu @MaxGekk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other questions I have are as follows;
- Can we support aggregate pushdown with all the data types? For example, aggregating decimal values seems to have different behaviours between database implementations.
- How does Spark receive aggregated values on database sides? It seems the data types of input/aggregated values are different in some databases, e.g., sum(bigint)=>numeric in PostgreSQL.
- How does Spark handle overflows on database sides?
|
||
def columnAsString(e: Expression): String = e match { | ||
case AttributeReference(name, _, _, _) => name | ||
case Cast(child, _, _) => child match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this safe? Aggregating casted values and casting aggregated values looks different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is probably OK?
For example, if I have sum(cast(SALARY as bigInt))
here, I will remove cast and push down sum(SALARY)
to data source. Then I will cast the output of sum(SALARY)
to bigInt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if Ah, I misunderstood a bit.SALARY
is a tiny int? Looks sum(SALARY)
can have an overflow easily in the datasource side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, if t
exists in a database side, it seems we can get different answers with/without pushdown?
scala> Seq(0.6, 0.7, 0.8).toDF("v").write.saveAsTable("t")
scala> sql("select sum(cast(v as int)) from t").show()
+-------------------+
|sum(CAST(v AS INT))|
+-------------------+
| 0|
+-------------------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the example. I actually only want to strip off the cast added by Spark. For example, when doing sum, Spark cast integral type to long.
case Sum(e @ IntegralType()) if e.dataType != LongType => Sum(Cast(e, LongType))
For the casting added by Spark, I will remove the casting, push down aggregate and do the same casting on database side.
If the cast is from user, I will keep the cast and NOT push down aggregate (cast(col))
for now.
To differentiate user explicitly casting from Spark added casting, I will add a flag some where. Does this sound OK to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, how about the fractional case? If e
is float, we should not remove the cast but pass it into a database side for computing double-typed partial aggregated values?
case Sum(e @ FractionalType()) if e.dataType != DoubleType => Sum(Cast(e, DoubleType))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the col data type is float, I think we should remove the cast and pass it to database side. On database side, cast it to double:
if (cast by Spark)
remove cast from sum
push down Sum(col) to database
if (datatype of col is float)
cast to Double, something like SELECT Sum(CAST(col AS DOUBLE)) FROM table
else // cast by user
not remove cast and not push down aggregate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
such casting are removed by spark in #31079
case _ => "" | ||
} | ||
|
||
aggregates.aggregateFunction match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if aggregates
has isDistinct=true
or filter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need to change the following to add isDistinct
and filter
. Also change translateAggregate
accordingly. When push down the aggregates, need to check the filter to make sure it can be pushed down too.
case class Avg(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
case class Min(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
case class Max(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
case class Sum(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
// == Physical Plan == | ||
// *(2) HashAggregate(keys=[DEPT#0], functions=[max(max(SALARY)#13), min(min(BONUS)#14)], output=[max(SALARY)#6, min(BONUS)#7]) | ||
// +- Exchange hashpartitioning(DEPT#0, 5), true, [id=#13] | ||
// +- *(1) HashAggregate(keys=[DEPT#0], functions=[partial_max(max(SALARY)#13), partial_min(min(BONUS)#14)], output=[DEPT#0, max#17, min#18]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this partial aggregate in this case? We can replace it with Project
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems partial aggregate is not needed any more. Probably need to add a method AggUtils.planAggregateWithoutPartial
. I will figure out.
@maropu Thank you very much for your review.
Sorry I am not familiar with this. Could you please give me an example of the different implementations?
I will cast the output of aggregates to the type that spark expects.
Not sure how to handle this yet. I will try to figure out. Please let me know if you have a good idea. |
For example, since PostgreSQL can have pretty large scale/precision in numeric, I think partial aggregated values can go over the value range of Spark decimals. So, I'm not sure that we can keep the same behaviour between with/without aggregate pushdown. |
I don't have a smart idea, too. Any other system implementing this feature? If it eixists, its better to refer to it. |
Kubernetes integration test status failure |
Test build #135329 has finished for PR 29695 at commit
|
Test build #135327 has finished for PR 29695 at commit
|
Thanks @huaxingao we did some tests on aggregate push down in real product environment last month. here are results
we didn't test without aggregate push down, because it is 10 X slower than push down However the current PR has some limitations:
Thanks |
For the simple PostgreSQL JDBC case, if Spark queries decimal values which have large scale/precision at PostgreSQL, how Spark handles this? Cast the values to fix Spark scale/precision? |
* A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to | ||
* push down aggregates to the data source. | ||
* | ||
* @since 3.1.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe 3.2.0 now.
void pushAggregation(Aggregation aggregation); | ||
|
||
/** | ||
* Returns the aggregates that are pushed to the data source via |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this returns Aggregation
?
@@ -700,6 +704,41 @@ object DataSourceStrategy | |||
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) | |||
} | |||
|
|||
private def columnAsString(e: Expression): String = e match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For predicate pushdown, seems we simplify the cases to handle by only looking at column name.
This covers a lot of cases but also makes it easy to break. We can begin with simplest case and add more supports later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's wait for others. See if there is any other voices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This covers a lot of cases but also makes it easy to break. We can begin with simplest case and add more supports later.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. It also seems strange to convert binary expression into a "magic" string form that is (seems) special to JDBC datasources.
I also wonder if we should handle nested columns the same way as PushableColumnBase
stmt = conn.prepareStatement(sqlText, | ||
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) | ||
stmt.setFetchSize(options.fetchSize) | ||
stmt.setQueryTimeout(options.queryTimeout) | ||
rs = stmt.executeQuery() | ||
val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change?
if (sb.length == 0) "1" else sb.substring(1) | ||
} | ||
|
||
private def getAggregateColumnsList( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a comment here to explain what getAggregateColumnsList
does and why it is needed?
updatedSchema = schema | ||
columns.foreach(x => sb.append(",").append(x)) | ||
} else { | ||
getAggregateColumnsList(sb, compiledAgg, aggDataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
columns
is empty for this case?
case a: AttributeReference => replaceAlias(a, aliasMap) | ||
} | ||
} | ||
val normalizedgroupingExpressions = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
normalizedGroupingExpressions
val (pushedFilters, postScanFilters) = pushDownFilter(scanBuilder, filters, relation) | ||
if (postScanFilters.nonEmpty) { | ||
Aggregate(groupingExpressions, resultExpressions, child) | ||
} else { // only push down aggregate of all the filers can be push down |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of -> if
child match { | ||
case ScanOperation(project, filters, relation: DataSourceV2Relation) => | ||
val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) | ||
val (pushedFilters, postScanFilters) = pushDownFilter(scanBuilder, filters, relation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see there is a dependency between filter pushdown and aggregate pushdown. As we need to check if all filters are pushed down.
I think an alternative approach is to not touch filter pushdown, but to check if filter pushdown is happened and there is still Filter
on top of the scan relation.
I feel that can simplify the code here. And we don't need to call pushDownFilter
twice for aggregate and filter.
for (groupBy <- groupingExpressions) { | ||
aggOutputBuilder += groupBy.asInstanceOf[AttributeReference] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
groupingExpressions
is Seq[Expression]
, are we sure they all AttributeReference
?
I did a quick test using H2. Spark actually threw Exception if the underlying database returns larger precision than Spark's MAX_PRECISION which is 38.
The test failed with
Here are the code: Spark first maps the JDBC Decimal to Spark DecimalType with the maximum precision of 38.
It then throws Exception here:
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #136320 has finished for PR 29695 at commit
|
@viirya Thanks for reviewing. I have addressed your comments. Could you please check one more time? |
also cc @sunchao @dongjoon-hyun |
@@ -191,6 +191,9 @@ class JDBCOptions( | |||
// An option to allow/disallow pushing down predicate into JDBC data source | |||
val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean | |||
|
|||
// An option to allow/disallow pushing down aggregate into JDBC data source | |||
val pushDownAggregate = parameters.getOrElse(JDBC_PUSHDOWN_AGGREGATE, "false").toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be documented in https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
@@ -77,15 +77,25 @@ case class Count(children: Seq[Expression]) extends DeclarativeAggregate { | |||
|
|||
override def defaultResult: Option[Literal] = Option(Literal(0L)) | |||
|
|||
private[sql] var pushDown: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel putting this variable for the pushdown feature does not look a good design. We cannot extend Count
for pushdown-specific counting?
@@ -700,6 +704,41 @@ object DataSourceStrategy | |||
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) | |||
} | |||
|
|||
private def columnAsString(e: Expression): String = e match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This covers a lot of cases but also makes it easy to break. We can begin with simplest case and add more supports later.
+1
} | ||
|
||
protected[sql] def translateAggregate(aggregates: AggregateExpression): Option[AggregateFunc] = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary blank.
case count: aggregate.Count => | ||
val columnName = count.children.head match { | ||
case Literal(_, _) => | ||
"1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case Literal(_, _) => "1"
extends RDD[InternalRow](sc, Nil) { | ||
|
||
/** | ||
* Retrieve the list of partitions corresponding to this RDD. | ||
*/ | ||
override def getPartitions: Array[Partition] = partitions | ||
|
||
private var updatedSchema: StructType = new StructType() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid to use var
here?
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #136600 has finished for PR 29695 at commit
|
|
||
import org.apache.spark.sql.types.DataType | ||
|
||
case class Aggregation(aggregateExpressions: Seq[AggregateFunc], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these need some docs since they are user-facing? and maybe some examples on how to handle aggregateExpressions
and groupByExpressions
. For the latter, should we also name it groupByColumns
?
@@ -700,6 +704,41 @@ object DataSourceStrategy | |||
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) | |||
} | |||
|
|||
private def columnAsString(e: Expression): String = e match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. It also seems strange to convert binary expression into a "magic" string form that is (seems) special to JDBC datasources.
I also wonder if we should handle nested columns the same way as PushableColumnBase
@@ -700,6 +704,49 @@ object DataSourceStrategy | |||
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) | |||
} | |||
|
|||
private def columnAsString(e: Expression): String = e match { | |||
case AttributeReference(name, _, _, _) => name | |||
case Cast(child, _, _) => columnAsString (child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space after columnAsString
.
columnAsString(left) + " * " + columnAsString(right) | ||
case Divide(left, right, _) => | ||
columnAsString(left) + " / " + columnAsString(right) | ||
case CheckOverflow(child, _, _) => columnAsString (child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space after columnAsString
.
if (colName.nonEmpty) Some(Sum(colName, sum.dataType, aggregates.isDistinct)) else None | ||
case count: aggregate.Count => | ||
val columnName = count.children.head match { | ||
case Literal(_, _) => "1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this is "1"? also should we check if there is more than one elements in children
?
|
||
val (pushedFilters, postScanFilters) = pushDownFilter(scanBuilder, newFilters, relation) | ||
if (postScanFilters.nonEmpty) { | ||
Aggregate(groupingExpressions, resultExpressions, child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps we should return the original plan node rather than a new Aggregate
?
} | ||
|
||
if (aggregation.aggregateExpressions.isEmpty) { | ||
Aggregate(groupingExpressions, resultExpressions, child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
val aggOutputBuilder = ArrayBuilder.make[AttributeReference] | ||
for (i <- 0 until aggregates.length) { | ||
aggOutputBuilder += AttributeReference( | ||
aggregation.aggregateExpressions(i).toString, aggregates(i).dataType)() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm is this correct?
/** | ||
* @since 3.1.0 | ||
*/ | ||
trait PrunedFilteredAggregateScan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a bit strange that this is a DSv1 API but is only used by DSv2 JDBC scan? is it possible that a V1 data source implements this and goes through the V1 code path (i.e., through DataSourceStrategy
)?
import DataSourceV2Implicits._ | ||
|
||
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { | ||
case ScanOperation(project, filters, relation: DataSourceV2Relation) => | ||
val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) | ||
case Aggregate(groupingExpressions, resultExpressions, child) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little hard to read. Maybe we can better separate the logic for pushing down aggregate with pushing down filters. Also some comments can help.
we are developing a spark connector (https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12) using DataSoruceV2 api and are interested in aggregate push-down support. We are excited about seeing your PR. :-) @huaxingao @sunchao @viirya I wonder what the status of aggregate push-down for spark3 is. Are you planning to include this change in spark 3.2? |
@moderakh Thanks for the interest in this PR. We are working with @cloud-fan on this Aggregate push down feature. Hopefully we can reach a consensus soon and are able to move forward. |
JDBC aggregate push down is merged in 3.2 using this PR #33352. There are several limitations:
@rf972 If you have time, please try it out and let me know if there are any issues. Thanks a lot! I will close this PR. Thank you everyone for helping me reviewing and testing! Really appreciate all your help! |
@huaxingao Thanks for merging aggregate push down ! We have tested this Spark 3.2 aggregate push down support with our own push down capable datasource for S3 and HDFS. We have run all our tests and we have not found any issues. |
@rf972 Thank you very much for helping us testing this! Really appreciate your help! |
What changes were proposed in this pull request?
Push down JDBC aggregate to datasource layer
pushDownAggregate
to control if push down or not. The default is false.Examples:
before push down:
after push down:
before push down:
after push down:
Why are the changes needed?
for better performance
Does this PR introduce any user-facing change?
No
How was this patch tested?
add new tests