Skip to content

Commit 223d510

Browse files
committed
use optimized plan
1 parent ecefc2a commit 223d510

File tree

2 files changed

+12
-12
lines changed

2 files changed

+12
-12
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ import org.apache.spark.network.util.{ByteUnit, JavaUtils}
2525
import org.apache.spark.rdd.RDD
2626
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
2727
import org.apache.spark.sql.catalyst.InternalRow
28+
import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
2829
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
29-
import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, SparkPlan, SQLExecution, TakeOrderedAndProjectExec}
30+
import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
3031
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
3132
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
3233
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -294,18 +295,17 @@ object SparkDatasetHelper extends Logging {
294295
SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
295296
}
296297

297-
private[kyuubi] def planLimit(plan: SparkPlan): Option[Int] = plan match {
298-
case tp: TakeOrderedAndProjectExec => Option(tp.limit)
299-
case c: CollectLimitExec => Option(c.limit)
300-
case ap: AdaptiveSparkPlanExec => planLimit(ap.inputPlan)
301-
case _ => None
302-
}
298+
private[kyuubi] def optimizedPlanLimit(queryExecution: QueryExecution): Option[Long] =
299+
queryExecution.optimizedPlan match {
300+
case globalLimit: GlobalLimit => globalLimit.maxRows
301+
case _ => None
302+
}
303303

304304
def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = {
305305
if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
306306
return false
307307
}
308-
val finalLimit = planLimit(result.queryExecution.sparkPlan) match {
308+
val finalLimit = optimizedPlanLimit(result.queryExecution) match {
309309
case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows)
310310
case Some(limit) => limit
311311
case None => resultMaxRows

externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ class SparkDatasetHelperSuite extends WithSparkSQLEngine {
3333
" SELECT * FROM VALUES(1),(2),(3),(4) AS t(id)")
3434

3535
val topKStatement = s"SELECT * FROM(SELECT * FROM tv ORDER BY id LIMIT ${topKThreshold - 1})"
36-
assert(SparkDatasetHelper.planLimit(
37-
spark.sql(topKStatement).queryExecution.sparkPlan) === Option(topKThreshold - 1))
36+
assert(SparkDatasetHelper.optimizedPlanLimit(
37+
spark.sql(topKStatement).queryExecution) === Option(topKThreshold - 1))
3838

3939
val collectLimitStatement =
4040
s"SELECT * FROM (SELECT * FROM tv ORDER BY id LIMIT $topKThreshold)"
41-
assert(SparkDatasetHelper.planLimit(
42-
spark.sql(collectLimitStatement).queryExecution.sparkPlan) === Option(topKThreshold))
41+
assert(SparkDatasetHelper.optimizedPlanLimit(
42+
spark.sql(collectLimitStatement).queryExecution) === Option(topKThreshold))
4343
}
4444
}
4545
}

0 commit comments

Comments
 (0)