Skip to content

Commit

Permalink
[KYUUBI #5377][FOLLOWUP] Get limit from more spark plan and regard re…
Browse files Browse the repository at this point in the history
…sult max rows

# 🔍 Description
Followup #5591
Support to get existing limit from more plan and regard the result max rows.
## Issue References 🔗

This pull request fixes #

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #5963 from turboFei/incremental_save.

Closes #5377

223d510 [Fei Wang] use optimized plan
ecefc2a [Fei Wang] use spark plan
57091e5 [Fei Wang] minor
2096144 [Fei Wang] for logical plan
0f734ee [Fei Wang] ut
fdc1155 [Fei Wang] save
f8e405a [Fei Wang] math.min

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
  • Loading branch information
turboFei committed Feb 4, 2024
1 parent 91840d4 commit 576379c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand Down Expand Up @@ -295,16 +295,23 @@ object SparkDatasetHelper extends Logging {
SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
}

private[kyuubi] def optimizedPlanLimit(queryExecution: QueryExecution): Option[Long] =
queryExecution.optimizedPlan match {
case globalLimit: GlobalLimit => globalLimit.maxRows
case _ => None
}

def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = {
if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
return false
}
lazy val limit = result.queryExecution.executedPlan match {
case collectLimit: CollectLimitExec => collectLimit.limit
case _ => resultMaxRows
val finalLimit = optimizedPlanLimit(result.queryExecution) match {
case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows)
case Some(limit) => limit
case None => resultMaxRows
}
lazy val stats = if (limit > 0) {
limit * EstimationUtils.getSizePerRow(
lazy val stats = if (finalLimit > 0) {
finalLimit * EstimationUtils.getSizePerRow(
result.queryExecution.executedPlan.output)
} else {
result.queryExecution.optimizedPlan.stats.sizeInBytes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kyuubi

import org.apache.spark.sql.internal.SQLConf

import org.apache.kyuubi.engine.spark.WithSparkSQLEngine

class SparkDatasetHelperSuite extends WithSparkSQLEngine {
override def withKyuubiConf: Map[String, String] = Map.empty

test("get limit from spark plan") {
Seq(true, false).foreach { aqe =>
val topKThreshold = 3
spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, aqe)
spark.sessionState.conf.setConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD, topKThreshold)
spark.sql("CREATE OR REPLACE TEMPORARY VIEW tv AS" +
" SELECT * FROM VALUES(1),(2),(3),(4) AS t(id)")

val topKStatement = s"SELECT * FROM(SELECT * FROM tv ORDER BY id LIMIT ${topKThreshold - 1})"
assert(SparkDatasetHelper.optimizedPlanLimit(
spark.sql(topKStatement).queryExecution) === Option(topKThreshold - 1))

val collectLimitStatement =
s"SELECT * FROM (SELECT * FROM tv ORDER BY id LIMIT $topKThreshold)"
assert(SparkDatasetHelper.optimizedPlanLimit(
spark.sql(collectLimitStatement).queryExecution) === Option(topKThreshold))
}
}
}

0 comments on commit 576379c

Please sign in to comment.