Skip to content

Commit 1149727

Browse files
committed
Remove offset only
1 parent 4f22a7a commit 1149727

File tree

8 files changed

+33
-125
lines changed

8 files changed

+33
-125
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -349,13 +349,9 @@ trait CheckAnalysis extends PredicateHelper {
349349

350350
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
351351
&& o.children.exists(_.isInstanceOf[Offset]) =>
352-
if (!SQLConf.get.forceUsingOffsetWithoutLimit) {
353-
failAnalysis(
354-
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
355-
| clause found in: ${o.nodeName}. If you know exactly that OFFSET clause
356-
| does not cause excessive overhead and still want to use it, set
357-
| ${SQLConf.FORCE_USING_OFFSET_WITHOUT_LIMIT.key} to true.""".stripMargin)
358-
}
352+
failAnalysis(
353+
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
354+
| clause found in: ${o.nodeName}.""".stripMargin)
359355

360356
case _: Union | _: SetOperation if operator.children.length > 1 =>
361357
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
@@ -787,13 +783,9 @@ trait CheckAnalysis extends PredicateHelper {
787783
plan match {
788784
case Offset(offsetExpr, _) =>
789785
checkLimitLikeClause("offset", offsetExpr)
790-
if (!SQLConf.get.forceUsingOffsetWithoutLimit) {
791-
failAnalysis(
792-
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
793-
| clause is found to be the outermost node. If you know exactly that OFFSET
794-
| clause does not cause excessive overhead and still want to use it, set
795-
| ${SQLConf.FORCE_USING_OFFSET_WITHOUT_LIMIT.key} to true.""".stripMargin)
796-
}
786+
failAnalysis(
787+
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
788+
| clause is found to be the outermost node.""".stripMargin)
797789
case _ =>
798790
}
799791
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,10 +1405,6 @@ object RewriteOffsets extends Rule[LogicalPlan] {
14051405
GlobalLimit(le, Greatest(Seq(noe, oe)), grandChild)
14061406
case LocalLimit(le, oe, Offset(noe, grandChild)) =>
14071407
Offset(noe, LocalLimit(le, Greatest(Seq(noe, oe)), grandChild))
1408-
case Offset(oe, Offset(noe, grandChild)) =>
1409-
Offset(Greatest(Seq(noe, oe)), grandChild)
1410-
case Offset(oe @ Literal(v, IntegerType), child) =>
1411-
Limit(Limit.INVALID_LIMIT, oe, child)
14121408
}
14131409
}
14141410

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,6 @@ case class Pivot(
797797
* So we introduced LocalLimit and GlobalLimit in the logical plan node for limit pushdown.
798798
*/
799799
object Limit {
800-
val INVALID_LIMIT = Literal(-1)
801800
def apply(
802801
limitExpr: Expression,
803802
offsetExpr: Expression = Literal(0),

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2158,12 +2158,6 @@ object SQLConf {
21582158
.checkValue(_ > 0, "The value of spark.sql.addPartitionInBatch.size must be positive")
21592159
.createWithDefault(100)
21602160

2161-
val FORCE_USING_OFFSET_WITHOUT_LIMIT = buildConf("spark.sql.forceUsingOffsetWithoutLimit")
2162-
.doc("When this option is set to true, although OFFSET may have large overhead, " +
2163-
"still use it. Otherwise, an analysis exception is thrown.")
2164-
.booleanConf
2165-
.createWithDefault(false)
2166-
21672161
/**
21682162
* Holds information about keys that have been deprecated.
21692163
*
@@ -2736,8 +2730,6 @@ class SQLConf extends Serializable with Logging {
27362730

27372731
def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
27382732

2739-
def forceUsingOffsetWithoutLimit: Boolean = getConf(SQLConf.FORCE_USING_OFFSET_WITHOUT_LIMIT)
2740-
27412733
/** ********************** SQLConf functionality methods ************ */
27422734

27432735
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
8282
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
8383
case ReturnAnswer(rootPlan) => rootPlan match {
8484
case Limit(IntegerLiteral(limit), IntegerLiteral(offset), Sort(order, true, child))
85-
if limit != Limit.INVALID_LIMIT.value && limit < conf.topKSortFallbackThreshold =>
85+
if limit < conf.topKSortFallbackThreshold =>
8686
TakeOrderedAndProjectExec(limit, offset, order, child.output, planLater(child)) :: Nil
8787
case Limit(
8888
IntegerLiteral(limit),
8989
IntegerLiteral(offset),
9090
Project(projectList, Sort(order, true, child)))
91-
if limit != Limit.INVALID_LIMIT.value && limit < conf.topKSortFallbackThreshold =>
91+
if limit < conf.topKSortFallbackThreshold =>
9292
TakeOrderedAndProjectExec(limit, offset, order, projectList, planLater(child)) :: Nil
9393
case Limit(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
9494
CollectLimitExec(limit, offset, planLater(child)) :: Nil
@@ -97,13 +97,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
9797
case other => planLater(other) :: Nil
9898
}
9999
case Limit(IntegerLiteral(limit), IntegerLiteral(offset), Sort(order, true, child))
100-
if limit != Limit.INVALID_LIMIT.value && limit < conf.topKSortFallbackThreshold =>
100+
if limit < conf.topKSortFallbackThreshold =>
101101
TakeOrderedAndProjectExec(limit, offset, order, child.output, planLater(child)) :: Nil
102102
case Limit(
103103
IntegerLiteral(limit),
104104
IntegerLiteral(offset),
105105
Project(projectList, Sort(order, true, child)))
106-
if limit != Limit.INVALID_LIMIT.value && limit < conf.topKSortFallbackThreshold =>
106+
if limit < conf.topKSortFallbackThreshold =>
107107
TakeOrderedAndProjectExec(limit, offset, order, projectList, planLater(child)) :: Nil
108108
case _ => Nil
109109
}

sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala

Lines changed: 19 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,7 @@ case class CollectLimitExec(limit: Int, offset: Int, child: SparkPlan) extends L
4646
override def output: Seq[Attribute] = child.output
4747
override def outputPartitioning: Partitioning = SinglePartition
4848
override def executeCollect(): Array[InternalRow] = {
49-
if (limit == Limit.INVALID_LIMIT.value) {
50-
child.executeCollect().drop(offset)
51-
} else {
52-
child.executeTake(limit + offset).drop(offset)
53-
}
49+
child.executeTake(limit + offset).drop(offset)
5450
}
5551
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
5652
private lazy val writeMetrics =
@@ -59,11 +55,7 @@ case class CollectLimitExec(limit: Int, offset: Int, child: SparkPlan) extends L
5955
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
6056
override lazy val metrics = readMetrics ++ writeMetrics
6157
protected override def doExecute(): RDD[InternalRow] = {
62-
val locallyLimited = if (limit == Limit.INVALID_LIMIT.value) {
63-
child.execute()
64-
} else {
65-
child.execute().mapPartitionsInternal(_.take(limit + offset))
66-
}
58+
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit + offset))
6759
val shuffled = new ShuffledRowRDD(
6860
ShuffleExchangeExec.prepareShuffleDependency(
6961
locallyLimited,
@@ -72,11 +64,7 @@ case class CollectLimitExec(limit: Int, offset: Int, child: SparkPlan) extends L
7264
serializer,
7365
writeMetrics),
7466
readMetrics)
75-
if (limit == Limit.INVALID_LIMIT.value) {
76-
shuffled.mapPartitionsInternal(_.drop(offset))
77-
} else {
78-
shuffled.mapPartitionsInternal(_.drop(offset).take(limit))
79-
}
67+
shuffled.mapPartitionsInternal(_.drop(offset).take(limit))
8068
}
8169
}
8270

@@ -150,11 +138,7 @@ case class LocalLimitExec(limit: Int, child: SparkPlan, offset: Int = 0) extends
150138
override def outputPartitioning: Partitioning = child.outputPartitioning
151139

152140
override def doExecute(): RDD[InternalRow] = {
153-
if (limit == Limit.INVALID_LIMIT.value) {
154-
child.execute()
155-
} else {
156-
child.execute().mapPartitions { iter => iter.take(limit + offset)}
157-
}
141+
child.execute().mapPartitions { iter => iter.take(limit + offset)}
158142
}
159143

160144
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
@@ -163,16 +147,12 @@ case class LocalLimitExec(limit: Int, child: SparkPlan, offset: Int = 0) extends
163147
// operators in one query.
164148
ctx.addMutableState(CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false)
165149
ctx.addMutableState(CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false)
166-
if (limit == Limit.INVALID_LIMIT.value) {
167-
s"${consume(ctx, input)}"
168-
} else {
169-
s"""
170-
| if ($countTerm < ${limit + offset}) {
171-
| $countTerm += 1;
172-
| ${consume(ctx, input)}
173-
| }
174-
""".stripMargin
175-
}
150+
s"""
151+
| if ($countTerm < ${limit + offset}) {
152+
| $countTerm += 1;
153+
| ${consume(ctx, input)}
154+
| }
155+
""".stripMargin
176156
}
177157
}
178158

@@ -189,11 +169,7 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan, offset: Int = 0) extend
189169
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
190170

191171
override def doExecute(): RDD[InternalRow] = {
192-
val rdd = if (limit == Limit.INVALID_LIMIT.value) {
193-
child.execute()
194-
} else {
195-
child.execute().mapPartitions { iter => iter.take(limit + offset)}
196-
}
172+
val rdd = child.execute().mapPartitions { iter => iter.take(limit + offset)}
197173
val skips = rdd.take(offset)
198174
rdd.filter(!skips.contains(_))
199175
}
@@ -204,24 +180,14 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan, offset: Int = 0) extend
204180
// operators in one query.
205181
ctx.addMutableState(CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false)
206182
ctx.addMutableState(CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false)
207-
if (limit == Limit.INVALID_LIMIT.value) {
208-
s"""
209-
| if ($skipTerm < $offset) {
210-
| $skipTerm += 1;
211-
| } else {
212-
| ${consume(ctx, input)}
213-
| }
214-
""".stripMargin
215-
} else {
216-
s"""
217-
| if ($skipTerm < $offset) {
218-
| $skipTerm += 1;
219-
| } else if ($countTerm < $limit) {
220-
| $countTerm += 1;
221-
| ${consume(ctx, input)}
222-
| }
223-
""".stripMargin
224-
}
183+
s"""
184+
| if ($skipTerm < $offset) {
185+
| $skipTerm += 1;
186+
| } else if ($countTerm < $limit) {
187+
| $countTerm += 1;
188+
| ${consume(ctx, input)}
189+
| }
190+
""".stripMargin
225191
}
226192
}
227193

sql/core/src/test/resources/sql-tests/inputs/postgreSQL/limit.sql

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@ SELECT '' AS zero, unique1, unique2, stringu1
2121
SELECT '' AS eleven, unique1, unique2, stringu1
2222
FROM onek WHERE unique1 < 50
2323
ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
24-
SET spark.sql.forceUsingOffsetWithoutLimit = true;
25-
SELECT '' AS ten, unique1, unique2, stringu1
26-
FROM onek
27-
ORDER BY unique1 OFFSET 990;
28-
SET spark.sql.forceUsingOffsetWithoutLimit = false;
24+
-- SELECT '' AS ten, unique1, unique2, stringu1
25+
-- FROM onek
26+
-- ORDER BY unique1 OFFSET 990;
2927
-- SELECT '' AS five, unique1, unique2, stringu1
3028
-- FROM onek
3129
-- ORDER BY unique1 OFFSET 990 LIMIT 5;

sql/core/src/test/resources/sql-tests/results/postgreSQL/limit.sql.out

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-- Automatically generated by SQLQueryTestSuite
2-
-- Number of queries: 15
2+
-- Number of queries: 12
33

44

55
-- !query
@@ -80,41 +80,6 @@ struct<eleven:string,unique1:int,unique2:int,stringu1:string>
8080
0 998 AAAAAA
8181

8282

83-
-- !query
84-
SET spark.sql.forceUsingOffsetWithoutLimit = true
85-
-- !query schema
86-
struct<key:string,value:string>
87-
-- !query output
88-
spark.sql.forceUsingOffsetWithoutLimit true
89-
90-
91-
-- !query
92-
SELECT '' AS ten, unique1, unique2, stringu1
93-
FROM onek
94-
ORDER BY unique1 OFFSET 990
95-
-- !query schema
96-
struct<ten:string,unique1:int,unique2:int,stringu1:string>
97-
-- !query output
98-
990 369 CMAAAA
99-
991 426 DMAAAA
100-
992 363 EMAAAA
101-
993 661 FMAAAA
102-
994 695 GMAAAA
103-
995 144 HMAAAA
104-
996 258 IMAAAA
105-
997 21 JMAAAA
106-
998 549 KMAAAA
107-
999 152 LMAAAA
108-
109-
110-
-- !query
111-
SET spark.sql.forceUsingOffsetWithoutLimit = false
112-
-- !query schema
113-
struct<key:string,value:string>
114-
-- !query output
115-
spark.sql.forceUsingOffsetWithoutLimit false
116-
117-
11883
-- !query
11984
SELECT '' AS five, unique1, unique2, stringu1
12085
FROM onek

0 commit comments

Comments
 (0)