[SPARK-22390][SPARK-32833][SQL] JDBC V2 Datasource aggregate push down#29695
[SPARK-22390][SPARK-32833][SQL] JDBC V2 Datasource aggregate push down#29695huaxingao wants to merge 23 commits intoapache:masterfrom
Conversation
|
cc @viirya |
|
Test build #128461 has finished for PR 29695 at commit
|
|
Thanks for pinging me. Let's sync with master and fix the compilation error? |
|
Test build #128463 has finished for PR 29695 at commit
|
|
Test build #128466 has finished for PR 29695 at commit
|
|
Test build #128536 has finished for PR 29695 at commit
|
|
Test build #128546 has finished for PR 29695 at commit
|
|
Test build #128545 has finished for PR 29695 at commit
|
|
retest this please |
|
Test build #128588 has finished for PR 29695 at commit
|
| case class Avg (column: String) extends AggregateFunction | ||
|
|
||
| case class Min (column: String) extends AggregateFunction | ||
|
|
||
| case class Max (column: String) extends AggregateFunction | ||
|
|
||
| case class Sum (column: String) extends AggregateFunction |
There was a problem hiding this comment.
nit: remove extra space like Avg(column: String)
| * Pushes down Aggregation and returns aggregates that need to be evaluated after scanning. | ||
| * The Aggregation can be pushed down only if all the Aggregate Functions can | ||
| * be pushed down. | ||
| * @since 3.1.0 |
There was a problem hiding this comment.
do we need since for the methods here? pushFilters and pushedFilters don't have it.
| } | ||
|
|
||
| aggregates.aggregateFunction match { | ||
| case aggregate.Min(child) => Some(Min(columnAsString(child))) |
There was a problem hiding this comment.
If child is not a AttributeReference, can this work?
| case Min(column) => Some(quote(column) -> s"MIN(${quote(column)})") | ||
| case Max(column) => Some(quote(column) -> s"MAX(${quote(column)})") | ||
| case Sum(column) => Some(quote(column) -> s"Sum(${quote(column)})") | ||
| case Avg(column) => Some(quote(column) -> s"Avg(${quote(column)})") |
There was a problem hiding this comment.
Why MIN and MAX are all upper-case, Sum and Avg are not?
| case _ => None | ||
| } | ||
| if (!compiledAggregates.contains(None)) { | ||
| compiledAggregates.flatMap(x => x).toMap |
There was a problem hiding this comment.
Since column is key, each column can have only one aggregate function?
| if (untranslatableExprs.isEmpty) r.pushAggregates(translatedAggregates.toArray) | ||
|
|
||
| // push down only if all the aggregates can be pushed down | ||
| if (!r.pushedAggregates.isEmpty) (r.pushedAggregates, Nil) else (Nil, aggregates) |
There was a problem hiding this comment.
Even we can pushdown aggregate functions, we don't need post-scan aggregates at all?
There was a problem hiding this comment.
I actually do aggregate at Spark layer regardless if aggregate is pushed down or not
|
Test build #128663 has finished for PR 29695 at commit
|
|
Test build #128664 has finished for PR 29695 at commit
|
|
retest this please |
|
Test build #128673 has finished for PR 29695 at commit
|
|
Test build #128925 has finished for PR 29695 at commit
|
|
retest this please |
|
Test build #128936 has finished for PR 29695 at commit
|
| } else { | ||
| aggregates | ||
| } | ||
| } |
There was a problem hiding this comment.
will remove this method
| } | ||
|
|
||
| val r = buildLogicalPlan(newOutput, relation, wrappedScan, newOutput, | ||
| normalizedProjects, postScanFilters) |
There was a problem hiding this comment.
use the new output to build the plan.
e.g. original output, JDBCScan$$anon$1@226de93c [ID#18], new output, JDBCScan$$anon$1@3f6f9cef [max(ID)#77,min(ID)#78]
| } | ||
| agg.transform { | ||
| case a: aggregate.AggregateFunction => aggFunction | ||
| } |
There was a problem hiding this comment.
update optimized logical plan
before update
Aggregate [max(id#18) AS max(id)#21, min(id#18) AS min(id)#22]
+- RelationV2[ID#18] test.people
after update
Aggregate [max(max(ID)#77) AS max(ID)#72, min(min(ID)#78) AS min(ID)#73]
+- RelationV2[max(ID)#77, min(ID)#78] test.people
| * @since 3.1.0 | ||
| */ | ||
| def unhandledAggregates(aggregates: Array[AggregateFunc]): Array[AggregateFunc] = | ||
| aggregates |
There was a problem hiding this comment.
This is not used. Will remove
|
also cc @cloud-fan @maropu @MaxGekk |
maropu
left a comment
There was a problem hiding this comment.
Other questions I have are as follows;
- Can we support aggregate pushdown with all the data types? For example, aggregating decimal values seems to have different behaviours between database implementations.
- How does Spark receive aggregated values on database sides? It seems the data types of input/aggregated values are different in some databases, e.g., sum(bigint)=>numeric in PostgreSQL.
- How does Spark handle overflows on database sides?
|
|
||
| def columnAsString(e: Expression): String = e match { | ||
| case AttributeReference(name, _, _, _) => name | ||
| case Cast(child, _, _) => child match { |
There was a problem hiding this comment.
Is this safe? Aggregating casted values and casting aggregated values looks different.
There was a problem hiding this comment.
I guess this is probably OK?
For example, if I have sum(cast(SALARY as bigInt)) here, I will remove cast and push down sum(SALARY) to data source. Then I will cast the output of sum(SALARY) to bigInt.
There was a problem hiding this comment.
What if Ah, I misunderstood a bit.SALARY is a tiny int? Looks sum(SALARY) can have an overflow easily in the datasource side.
There was a problem hiding this comment.
For example, if t exists in a database side, it seems we can get different answers with/without pushdown?
scala> Seq(0.6, 0.7, 0.8).toDF("v").write.saveAsTable("t")
scala> sql("select sum(cast(v as int)) from t").show()
+-------------------+
|sum(CAST(v AS INT))|
+-------------------+
| 0|
+-------------------+
There was a problem hiding this comment.
Thanks for the example. I actually only want to strip off the cast added by Spark. For example, when doing sum, Spark cast integral type to long.
case Sum(e @ IntegralType()) if e.dataType != LongType => Sum(Cast(e, LongType))
For the casting added by Spark, I will remove the casting, push down aggregate and do the same casting on database side.
If the cast is from user, I will keep the cast and NOT push down aggregate (cast(col)) for now.
To differentiate user explicitly casting from Spark added casting, I will add a flag some where. Does this sound OK to you?
There was a problem hiding this comment.
For example, how about the fractional case? If e is float, we should not remove the cast but pass it into a database side for computing double-typed partial aggregated values?
case Sum(e @ FractionalType()) if e.dataType != DoubleType => Sum(Cast(e, DoubleType))
There was a problem hiding this comment.
If the col data type is float, I think we should remove the cast and pass it to database side. On database side, cast it to double:
if (cast by Spark)
remove cast from sum
push down Sum(col) to database
if (datatype of col is float)
cast to Double, something like SELECT Sum(CAST(col AS DOUBLE)) FROM table
else // cast by user
not remove cast and not push down aggregate
| case _ => "" | ||
| } | ||
|
|
||
| aggregates.aggregateFunction match { |
There was a problem hiding this comment.
What if aggregates has isDistinct=true or filter?
There was a problem hiding this comment.
I will need to change the following to add isDistinct and filter. Also change translateAggregate accordingly. When push down the aggregates, need to check the filter to make sure it can be pushed down too.
case class Avg(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
case class Min(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
case class Max(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
case class Sum(column: String, isDistinct: Boolean, filter: Option[Filter]) extends AggregateFunc
| // == Physical Plan == | ||
| // *(2) HashAggregate(keys=[DEPT#0], functions=[max(max(SALARY)#13), min(min(BONUS)#14)], output=[max(SALARY)#6, min(BONUS)#7]) | ||
| // +- Exchange hashpartitioning(DEPT#0, 5), true, [id=#13] | ||
| // +- *(1) HashAggregate(keys=[DEPT#0], functions=[partial_max(max(SALARY)#13), partial_min(min(BONUS)#14)], output=[DEPT#0, max#17, min#18]) |
There was a problem hiding this comment.
We need this partial aggregate in this case? We can replace it with Project?
There was a problem hiding this comment.
Seems partial aggregate is not needed any more. Probably need to add a method AggUtils.planAggregateWithoutPartial. I will figure out.
|
@maropu Thank you very much for your review.
Sorry I am not familiar with this. Could you please give me an example of the different implementations?
I will cast the output of aggregates to the type that spark expects.
Not sure how to handle this yet. I will try to figure out. Please let me know if you have a good idea. |
For example, since PostgreSQL can have pretty large scale/precision in numeric, I think partial aggregated values can go over the value range of Spark decimals. So, I'm not sure that we can keep the same behaviour between with/without aggregate pushdown. |
I don't have a smart idea, too. Any other system implementing this feature? If it eixists, its better to refer to it. |
|
Kubernetes integration test status failure |
|
Test build #135329 has finished for PR 29695 at commit
|
|
Test build #135327 has finished for PR 29695 at commit
|
|
Thanks @huaxingao we did some tests on aggregate push down in real product environment last month. here are results
we didn't test without aggregate push down, because it is 10 X slower than push down However the current PR has some limitations:
Thanks |
For the simple PostgreSQL JDBC case, if Spark queries decimal values which have large scale/precision at PostgreSQL, how Spark handles this? Cast the values to fix Spark scale/precision? |
| * A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to | ||
| * push down aggregates to the data source. | ||
| * | ||
| * @since 3.1.0 |
| void pushAggregation(Aggregation aggregation); | ||
|
|
||
| /** | ||
| * Returns the aggregates that are pushed to the data source via |
There was a problem hiding this comment.
I think this returns Aggregation?
| (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) | ||
| } | ||
|
|
||
| private def columnAsString(e: Expression): String = e match { |
There was a problem hiding this comment.
For predicate pushdown, seems we simplify the cases to handle by only looking at column name.
This covers a lot of cases but also makes it easy to break. We can begin with simplest case and add more supports later.
There was a problem hiding this comment.
Let's wait for others. See if there is any other voices.
There was a problem hiding this comment.
This covers a lot of cases but also makes it easy to break. We can begin with simplest case and add more supports later.
+1
There was a problem hiding this comment.
+1. It also seems strange to convert binary expression into a "magic" string form that is (seems) special to JDBC datasources.
I also wonder if we should handle nested columns the same way as PushableColumnBase
| stmt.setQueryTimeout(options.queryTimeout) | ||
| rs = stmt.executeQuery() | ||
| val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics) | ||
|
|
| if (sb.length == 0) "1" else sb.substring(1) | ||
| } | ||
|
|
||
| private def getAggregateColumnsList( |
There was a problem hiding this comment.
Shall we add a comment here to explain what getAggregateColumnsList does and why it is needed?
| updatedSchema = schema | ||
| columns.foreach(x => sb.append(",").append(x)) | ||
| } else { | ||
| getAggregateColumnsList(sb, compiledAgg, aggDataType) |
| case a: AttributeReference => replaceAlias(a, aliasMap) | ||
| } | ||
| } | ||
| val normalizedgroupingExpressions = |
| val (pushedFilters, postScanFilters) = pushDownFilter(scanBuilder, filters, relation) | ||
| if (postScanFilters.nonEmpty) { | ||
| Aggregate(groupingExpressions, resultExpressions, child) | ||
| } else { // only push down aggregate of all the filers can be push down |
| child match { | ||
| case ScanOperation(project, filters, relation: DataSourceV2Relation) => | ||
| val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) | ||
| val (pushedFilters, postScanFilters) = pushDownFilter(scanBuilder, filters, relation) |
There was a problem hiding this comment.
I can see there is a dependency between filter pushdown and aggregate pushdown. As we need to check if all filters are pushed down.
I think an alternative approach is to not touch filter pushdown, but to check if filter pushdown is happened and there is still Filter on top of the scan relation.
I feel that can simplify the code here. And we don't need to call pushDownFilter twice for aggregate and filter.
| for (groupBy <- groupingExpressions) { | ||
| aggOutputBuilder += groupBy.asInstanceOf[AttributeReference] | ||
| } |
There was a problem hiding this comment.
groupingExpressions is Seq[Expression], are we sure they all AttributeReference?
I did a quick test using H2. Spark actually threw Exception if the underlying database returns larger precision than Spark's MAX_PRECISION which is 38. The test failed with Here are the code: Spark first maps the JDBC Decimal to Spark DecimalType with the maximum precision of 38. It then throws Exception here: |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #136320 has finished for PR 29695 at commit
|
|
@viirya Thanks for reviewing. I have addressed your comments. Could you please check one more time? |
|
also cc @sunchao @dongjoon-hyun |
| val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean | ||
|
|
||
| // An option to allow/disallow pushing down aggregate into JDBC data source | ||
| val pushDownAggregate = parameters.getOrElse(JDBC_PUSHDOWN_AGGREGATE, "false").toBoolean |
There was a problem hiding this comment.
I think this should be documented in https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
|
|
||
| override def defaultResult: Option[Literal] = Option(Literal(0L)) | ||
|
|
||
| private[sql] var pushDown: Boolean = false |
There was a problem hiding this comment.
I feel putting this variable for the pushdown feature does not look a good design. We cannot extend Count for pushdown-specific counting?
| (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) | ||
| } | ||
|
|
||
| private def columnAsString(e: Expression): String = e match { |
There was a problem hiding this comment.
This covers a lot of cases but also makes it easy to break. We can begin with simplest case and add more supports later.
+1
| } | ||
|
|
||
| protected[sql] def translateAggregate(aggregates: AggregateExpression): Option[AggregateFunc] = { | ||
|
|
| case count: aggregate.Count => | ||
| val columnName = count.children.head match { | ||
| case Literal(_, _) => | ||
| "1" |
| */ | ||
| override def getPartitions: Array[Partition] = partitions | ||
|
|
||
| private var updatedSchema: StructType = new StructType() |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #136600 has finished for PR 29695 at commit
|
|
|
||
| import org.apache.spark.sql.types.DataType | ||
|
|
||
| case class Aggregation(aggregateExpressions: Seq[AggregateFunc], |
There was a problem hiding this comment.
I think these need some docs since they are user-facing? and maybe some examples on how to handle aggregateExpressions and groupByExpressions. For the latter, should we also name it groupByColumns?
| (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) | ||
| } | ||
|
|
||
| private def columnAsString(e: Expression): String = e match { |
There was a problem hiding this comment.
+1. It also seems strange to convert binary expression into a "magic" string form that is (seems) special to JDBC datasources.
I also wonder if we should handle nested columns the same way as PushableColumnBase
|
|
||
| private def columnAsString(e: Expression): String = e match { | ||
| case AttributeReference(name, _, _, _) => name | ||
| case Cast(child, _, _) => columnAsString (child) |
There was a problem hiding this comment.
nit: extra space after columnAsString.
| columnAsString(left) + " * " + columnAsString(right) | ||
| case Divide(left, right, _) => | ||
| columnAsString(left) + " / " + columnAsString(right) | ||
| case CheckOverflow(child, _, _) => columnAsString (child) |
There was a problem hiding this comment.
nit: extra space after columnAsString.
| if (colName.nonEmpty) Some(Sum(colName, sum.dataType, aggregates.isDistinct)) else None | ||
| case count: aggregate.Count => | ||
| val columnName = count.children.head match { | ||
| case Literal(_, _) => "1" |
There was a problem hiding this comment.
why this is "1"? also should we check if there is more than one elements in children?
|
|
||
| val (pushedFilters, postScanFilters) = pushDownFilter(scanBuilder, newFilters, relation) | ||
| if (postScanFilters.nonEmpty) { | ||
| Aggregate(groupingExpressions, resultExpressions, child) |
There was a problem hiding this comment.
perhaps we should return the original plan node rather than a new Aggregate?
| } | ||
|
|
||
| if (aggregation.aggregateExpressions.isEmpty) { | ||
| Aggregate(groupingExpressions, resultExpressions, child) |
| val aggOutputBuilder = ArrayBuilder.make[AttributeReference] | ||
| for (i <- 0 until aggregates.length) { | ||
| aggOutputBuilder += AttributeReference( | ||
| aggregation.aggregateExpressions(i).toString, aggregates(i).dataType)() |
| /** | ||
| * @since 3.1.0 | ||
| */ | ||
| trait PrunedFilteredAggregateScan { |
There was a problem hiding this comment.
it's a bit strange that this is a DSv1 API but is only used by DSv2 JDBC scan? is it possible that a V1 data source implements this and goes through the V1 code path (i.e., through DataSourceStrategy)?
| override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { | ||
| case ScanOperation(project, filters, relation: DataSourceV2Relation) => | ||
| val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) | ||
| case Aggregate(groupingExpressions, resultExpressions, child) => |
There was a problem hiding this comment.
This is a little hard to read. Maybe we can better separate the logic for pushing down aggregate with pushing down filters. Also some comments can help.
|
we are developing a spark connector (https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12) using DataSoruceV2 api and are interested in aggregate push-down support. We are excited about seeing your PR. :-) @huaxingao @sunchao @viirya I wonder what the status of aggregate push-down for spark3 is. Are you planning to include this change in spark 3.2? |
|
@moderakh Thanks for the interest in this PR. We are working with @cloud-fan on this Aggregate push down feature. Hopefully we can reach a consensus soon and are able to move forward. |
|
JDBC aggregate push down is merged in 3.2 using this PR #33352. There are several limitations:
@rf972 If you have time, please try it out and let me know if there are any issues. Thanks a lot! I will close this PR. Thank you everyone for helping me reviewing and testing! Really appreciate all your help! |
|
@huaxingao Thanks for merging aggregate push down ! We have tested this Spark 3.2 aggregate push down support with our own push down capable datasource for S3 and HDFS. We have run all our tests and we have not found any issues. |
|
@rf972 Thank you very much for helping us testing this! Really appreciate your help! |
What changes were proposed in this pull request?
Push down JDBC aggregate to datasource layer
pushDownAggregateto control if push down or not. The default is false.Examples:
before push down:
after push down:
before push down:
after push down:
Why are the changes needed?
for better performance
Does this PR introduce any user-facing change?
No
How was this patch tested?
add new tests