Skip to content

Commit 4ad7386

Browse files
beliefercloud-fan
authored andcommitted
[SPARK-38978][SQL] DS V2 supports push down OFFSET operator
### What changes were proposed in this pull request? Currently, DS V2 push-down supports `LIMIT` but `OFFSET`. If we can pushing down `OFFSET` to JDBC data source, it will be better performance. ### Why are the changes needed? push down `OFFSET` could improves the performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes #36295 from beliefer/SPARK-38978. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent a077701 commit 4ad7386

File tree

17 files changed

+540
-41
lines changed

17 files changed

+540
-41
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
* An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ
2424
* interfaces to do operator push down, and keep the operator push down result in the returned
2525
* {@link Scan}. When pushing down operators, the push down order is:
26-
* sample -&gt; filter -&gt; aggregate -&gt; limit -&gt; column pruning.
26+
* sample -&gt; filter -&gt; aggregate -&gt; limit/top-n(sort + limit) -&gt; offset -&gt;
27+
* column pruning.
2728
*
2829
* @since 3.0.0
2930
*/

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownLimit.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
/**
2323
* A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to
24-
* push down LIMIT. Please note that the combination of LIMIT with other operations
25-
* such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is NOT pushed down.
24+
* push down LIMIT. We can push down LIMIT with many other operations if they follow the
25+
* operator order we defined in {@link ScanBuilder}'s class doc.
2626
*
2727
* @since 3.3.0
2828
*/
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.read;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
22+
/**
23+
* A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to
24+
* push down OFFSET. We can push down OFFSET with many other operations if they follow the
25+
* operator order we defined in {@link ScanBuilder}'s class doc.
26+
*
27+
* @since 3.4.0
28+
*/
29+
@Evolving
30+
public interface SupportsPushDownOffset extends ScanBuilder {
31+
32+
/**
33+
* Pushes down OFFSET to the data source.
34+
*/
35+
boolean pushOffset(int offset);
36+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownTopN.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,22 @@
2222

2323
/**
2424
* A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to
25-
* push down top N(query with ORDER BY ... LIMIT n). Please note that the combination of top N
26-
* with other operations such as AGGREGATE, GROUP BY, CLUSTER BY, DISTRIBUTE BY, etc.
27-
* is NOT pushed down.
25+
* push down top N(query with ORDER BY ... LIMIT n). We can push down top N with many other
26+
* operations if they follow the operator order we defined in {@link ScanBuilder}'s class doc.
2827
*
2928
* @since 3.3.0
3029
*/
3130
@Evolving
3231
public interface SupportsPushDownTopN extends ScanBuilder {
3332

34-
/**
35-
* Pushes down top N to the data source.
36-
*/
37-
boolean pushTopN(SortOrder[] orders, int limit);
33+
/**
34+
* Pushes down top N to the data source.
35+
*/
36+
boolean pushTopN(SortOrder[] orders, int limit);
3837

39-
/**
40-
* Whether the top N is partially pushed or not. If it returns true, then Spark will do top N
41-
* again. This method will only be called when {@link #pushTopN} returns true.
42-
*/
43-
default boolean isPartiallyPushed() { return true; }
38+
/**
39+
* Whether the top N is partially pushed or not. If it returns true, then Spark will do top N
40+
* again. This method will only be called when {@link #pushTopN} returns true.
41+
*/
42+
default boolean isPartiallyPushed() { return true; }
4443
}

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2103,7 +2103,7 @@ class Dataset[T] private[sql](
21032103
}
21042104

21052105
/**
2106-
* Returns a new Dataset by skipping the first `m` rows.
2106+
* Returns a new Dataset by skipping the first `n` rows.
21072107
*
21082108
* @group typedrel
21092109
* @since 3.4.0

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,11 @@ case class RowDataSourceScanExec(
148148
s"ORDER BY ${seqToString(pushedDownOperators.sortValues.map(_.describe()))}" +
149149
s" LIMIT ${pushedDownOperators.limit.get}"
150150
Some("PushedTopN" -> pushedTopN)
151-
} else {
152-
pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value")
153-
}
151+
} else {
152+
pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value")
153+
}
154+
155+
val offsetInfo = pushedDownOperators.offset.map(value => "PushedOffset" -> s"OFFSET $value")
154156

155157
val pushedFilters = if (pushedDownOperators.pushedPredicates.nonEmpty) {
156158
seqToString(pushedDownOperators.pushedPredicates.map(_.describe()))
@@ -164,6 +166,7 @@ case class RowDataSourceScanExec(
164166
Map("PushedAggregates" -> seqToString(v.aggregateExpressions.map(_.describe())),
165167
"PushedGroupByExpressions" -> seqToString(v.groupByExpressions.map(_.describe())))} ++
166168
topNOrLimitInfo ++
169+
offsetInfo ++
167170
pushedDownOperators.sample.map(v => "PushedSample" ->
168171
s"SAMPLE (${(v.upperBound - v.lowerBound) * 100}) ${v.withReplacement} SEED(${v.seed})"
169172
)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ object DataSourceStrategy
346346
l.output.toStructType,
347347
Set.empty,
348348
Set.empty,
349-
PushedDownOperators(None, None, None, Seq.empty, Seq.empty),
349+
PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty),
350350
toCatalystRDD(l, baseRelation.buildScan()),
351351
baseRelation,
352352
None) :: Nil
@@ -420,7 +420,7 @@ object DataSourceStrategy
420420
requestedColumns.toStructType,
421421
pushedFilters.toSet,
422422
handledFilters,
423-
PushedDownOperators(None, None, None, Seq.empty, Seq.empty),
423+
PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty),
424424
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
425425
relation.relation,
426426
relation.catalogTable.map(_.identifier))
@@ -443,7 +443,7 @@ object DataSourceStrategy
443443
requestedColumns.toStructType,
444444
pushedFilters.toSet,
445445
handledFilters,
446-
PushedDownOperators(None, None, None, Seq.empty, Seq.empty),
446+
PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty),
447447
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
448448
relation.relation,
449449
relation.catalogTable.map(_.identifier))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ class JDBCOptions(
196196
// This only applies to Data Source V2 JDBC
197197
val pushDownLimit = parameters.getOrElse(JDBC_PUSHDOWN_LIMIT, "false").toBoolean
198198

199+
// An option to allow/disallow pushing down OFFSET into V2 JDBC data source
200+
// This only applies to Data Source V2 JDBC
201+
val pushDownOffset = parameters.getOrElse(JDBC_PUSHDOWN_OFFSET, "false").toBoolean
202+
199203
// An option to allow/disallow pushing down TABLESAMPLE into JDBC data source
200204
// This only applies to Data Source V2 JDBC
201205
val pushDownTableSample = parameters.getOrElse(JDBC_PUSHDOWN_TABLESAMPLE, "false").toBoolean
@@ -283,6 +287,7 @@ object JDBCOptions {
283287
val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
284288
val JDBC_PUSHDOWN_AGGREGATE = newOption("pushDownAggregate")
285289
val JDBC_PUSHDOWN_LIMIT = newOption("pushDownLimit")
290+
val JDBC_PUSHDOWN_OFFSET = newOption("pushDownOffset")
286291
val JDBC_PUSHDOWN_TABLESAMPLE = newOption("pushDownTableSample")
287292
val JDBC_KEYTAB = newOption("keytab")
288293
val JDBC_PRINCIPAL = newOption("principal")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ object JDBCRDD extends Logging {
124124
groupByColumns: Option[Array[String]] = None,
125125
sample: Option[TableSampleInfo] = None,
126126
limit: Int = 0,
127-
sortOrders: Array[String] = Array.empty[String]): RDD[InternalRow] = {
127+
sortOrders: Array[String] = Array.empty[String],
128+
offset: Int = 0): RDD[InternalRow] = {
128129
val url = options.url
129130
val dialect = JdbcDialects.get(url)
130131
val quotedColumns = if (groupByColumns.isEmpty) {
@@ -145,7 +146,8 @@ object JDBCRDD extends Logging {
145146
groupByColumns,
146147
sample,
147148
limit,
148-
sortOrders)
149+
sortOrders,
150+
offset)
149151
}
150152
// scalastyle:on argcount
151153
}
@@ -167,7 +169,8 @@ private[jdbc] class JDBCRDD(
167169
groupByColumns: Option[Array[String]],
168170
sample: Option[TableSampleInfo],
169171
limit: Int,
170-
sortOrders: Array[String])
172+
sortOrders: Array[String],
173+
offset: Int)
171174
extends RDD[InternalRow](sc, Nil) {
172175

173176
/**
@@ -305,10 +308,11 @@ private[jdbc] class JDBCRDD(
305308
}
306309

307310
val myLimitClause: String = dialect.getLimitClause(limit)
311+
val myOffsetClause: String = dialect.getOffsetClause(offset)
308312

309313
val sqlText = options.prepareQuery +
310314
s"SELECT $columnList FROM ${options.tableOrQuery} $myTableSampleClause" +
311-
s" $myWhereClause $getGroupByClause $getOrderByClause $myLimitClause"
315+
s" $myWhereClause $getGroupByClause $getOrderByClause $myLimitClause $myOffsetClause"
312316
stmt = conn.prepareStatement(sqlText,
313317
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
314318
stmt.setFetchSize(options.fetchSize)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ private[sql] case class JDBCRelation(
304304
groupByColumns: Option[Array[String]],
305305
tableSample: Option[TableSampleInfo],
306306
limit: Int,
307-
sortOrders: Array[String]): RDD[Row] = {
307+
sortOrders: Array[String],
308+
offset: Int): RDD[Row] = {
308309
// Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
309310
JDBCRDD.scanTable(
310311
sparkSession.sparkContext,
@@ -317,7 +318,8 @@ private[sql] case class JDBCRelation(
317318
groupByColumns,
318319
tableSample,
319320
limit,
320-
sortOrders).asInstanceOf[RDD[Row]]
321+
sortOrders,
322+
offset).asInstanceOf[RDD[Row]]
321323
}
322324

323325
override def insert(data: DataFrame, overwrite: Boolean): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeS
2323
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
2424
import org.apache.spark.sql.connector.expressions.SortOrder
2525
import org.apache.spark.sql.connector.expressions.filter.Predicate
26-
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownRequiredColumns, SupportsPushDownTableSample, SupportsPushDownTopN, SupportsPushDownV2Filters}
26+
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset, SupportsPushDownRequiredColumns, SupportsPushDownTableSample, SupportsPushDownTopN, SupportsPushDownV2Filters}
2727
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
2828
import org.apache.spark.sql.internal.SQLConf
2929
import org.apache.spark.sql.sources
@@ -130,6 +130,19 @@ object PushDownUtils extends PredicateHelper {
130130
}
131131
}
132132

133+
/**
134+
* Pushes down OFFSET to the data source Scan.
135+
*
136+
* @return the Boolean value represents whether to push down.
137+
*/
138+
def pushOffset(scanBuilder: ScanBuilder, offset: Int): Boolean = {
139+
scanBuilder match {
140+
case s: SupportsPushDownOffset =>
141+
s.pushOffset(offset)
142+
case _ => false
143+
}
144+
}
145+
133146
/**
134147
* Pushes down top N to the data source Scan.
135148
*

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushedDownOperators.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ case class PushedDownOperators(
2828
aggregation: Option[Aggregation],
2929
sample: Option[TableSampleInfo],
3030
limit: Option[Int],
31+
offset: Option[Int],
3132
sortValues: Seq[SortOrder],
3233
pushedPredicates: Seq[Predicate]) {
3334
assert((limit.isEmpty && sortValues.isEmpty) || limit.isDefined)

0 commit comments

Comments
 (0)