Skip to content

Commit

Permalink
[SPARK-33278][SQL] Improve the performance for FIRST_VALUE
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
#29800 provides a performance improvement for `NTH_VALUE`.
`FIRST_VALUE` also could use the `UnboundedOffsetWindowFunctionFrame` and `UnboundedPrecedingOffsetWindowFunctionFrame`.

### Why are the changes needed?
Improve the performance for `FIRST_VALUE`.

### Does this PR introduce _any_ user-facing change?
 'No'.

### How was this patch tested?
Jenkins test.

Closes #30178 from beliefer/SPARK-33278.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and cloud-fan committed Nov 12, 2020
1 parent a3d2954 commit 2f07c56
Show file tree
Hide file tree
Showing 4 changed files with 339 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
// Operator combine
CollapseRepartition,
CollapseProject,
OptimizeWindowFunctions,
CollapseWindow,
CombineFilters,
CombineLimits,
Expand Down Expand Up @@ -806,6 +807,18 @@ object CollapseRepartition extends Rule[LogicalPlan] {
}
}

/**
* Replaces first(col) to nth_value(col, 1) for better performance.
*/
object OptimizeWindowFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), spec)
if spec.orderSpec.nonEmpty &&
spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame].frameType == RowFrame =>
we.copy(windowFunction = NthValue(first.child, Literal(1), first.ignoreNulls))
}
}

/**
* Collapse Adjacent Window Expression.
* - If the partition specs and order specs are the same and the window expression are
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.First
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor

class OptimizeWindowFunctionsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("OptimizeWindowFunctions", FixedPoint(10),
OptimizeWindowFunctions) :: Nil
}

val testRelation = LocalRelation('a.double, 'b.double, 'c.string)
val a = testRelation.output(0)
val b = testRelation.output(1)
val c = testRelation.output(2)

test("replace first(col) by nth_value(col, 1)") {
val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
WindowSpecDefinition(b :: Nil, c.asc :: Nil,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))))
val correctAnswer = testRelation.select(
WindowExpression(
NthValue(a, Literal(1), false),
WindowSpecDefinition(b :: Nil, c.asc :: Nil,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))))

val optimized = Optimize.execute(inputPlan)
assert(optimized == correctAnswer)
}

test("can't replace first(col) by nth_value(col, 1) if the window frame type is range") {
val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
WindowSpecDefinition(b :: Nil, c.asc :: Nil,
SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow))))

val optimized = Optimize.execute(inputPlan)
assert(optimized == inputPlan)
}

test("can't replace first(col) by nth_value(col, 1) if the window frame isn't ordered") {
val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
WindowSpecDefinition(b :: Nil, Nil,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))))

val optimized = Optimize.execute(inputPlan)
assert(optimized == inputPlan)
}
}
66 changes: 35 additions & 31 deletions sql/core/src/test/resources/sql-tests/inputs/window.sql
Original file line number Diff line number Diff line change
Expand Up @@ -146,104 +146,108 @@ SELECT val, cate,
count(val) FILTER (WHERE val > 1) OVER(PARTITION BY cate)
FROM testData ORDER BY cate, val;

-- nth_value() over ()
-- nth_value()/first_value() over ()
SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (ORDER BY salary DESC) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary
RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING)
ORDER BY salary;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
ORDER BY salary DESC;

SELECT
employee_name,
salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) second_highest_salary
first_value(employee_name) OVER w highest_salary,
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING)
ORDER BY salary DESC;

SELECT
employee_name,
department,
salary,
NTH_VALUE(employee_name, 2) OVER (
PARTITION BY department
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) second_highest_salary
FIRST_VALUE(employee_name) OVER w highest_salary,
NTH_VALUE(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
WINDOW w AS (
PARTITION BY department
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
)
ORDER BY department;
Loading

0 comments on commit 2f07c56

Please sign in to comment.