Skip to content

Commit

Permalink
Improve the performance for first_value
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed Oct 29, 2020
1 parent a6d0741 commit 181186c
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,22 @@ import org.apache.spark.sql.types._
group = "agg_funcs",
since = "2.0.0")
case class First(child: Expression, ignoreNulls: Boolean)
extends DeclarativeAggregate with ExpectsInputTypes {
extends DeclarativeAggregate with OffsetWindowSpec with ExpectsInputTypes {

def this(child: Expression) = this(child, false)

def this(child: Expression, ignoreNullsExpr: Expression) = {
this(child, FirstLast.validateIgnoreNullExpr(ignoreNullsExpr, "first"))
}

override val input = child

override val offset = Literal.create(1, IntegerType)

override lazy val default = Literal.create(null, input.dataType)

override val isRelative = false

override def children: Seq[Expression] = child :: Nil

override def nullable: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ trait WindowExecBase extends UnaryExecNode {
case e @ WindowExpression(function, spec) =>
val frame = spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
function match {
case AggregateExpression(f: OffsetWindowSpec, _, _, _, _) if !f.ignoreNulls &&
frame.frameType == RowFrame && frame.lower == UnboundedPreceding =>
frame.upper match {
case UnboundedFollowing => collect("UNBOUNDED_OFFSET", f.fakeFrame, e, f)
case CurrentRow => collect("UNBOUNDED_PRECEDING_OFFSET", f.fakeFrame, e, f)
case _ => collect("AGGREGATE", frame, e, f)
}
case AggregateExpression(f, _, _, _, _) => collect("AGGREGATE", frame, e, f)
case f: FrameLessOffsetWindowFunction => collect("FRAME_LESS_OFFSET", frame, e, f)
case f: OffsetWindowSpec if !f.ignoreNulls &&
Expand Down
32 changes: 31 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/window.sql
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ SELECT val, cate,
count(val) FILTER (WHERE val > 1) OVER(PARTITION BY cate)
FROM testData ORDER BY cate, val;

-- nth_value() over ()
-- nth_value()/first_value() over ()
SELECT
employee_name,
salary,
first_value(employee_name) OVER (ORDER BY salary DESC) highest_salary,
nth_value(employee_name, 2) OVER (ORDER BY salary DESC) second_highest_salary
FROM
basic_pays
Expand All @@ -158,6 +159,9 @@ ORDER BY salary DESC;
SELECT
employee_name,
salary,
first_value(employee_name) OVER (
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) highest_salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary
Expand All @@ -168,6 +172,9 @@ ORDER BY salary DESC;
SELECT
employee_name,
salary,
first_value(employee_name) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) highest_salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary
Expand All @@ -178,6 +185,9 @@ ORDER BY salary DESC;
SELECT
employee_name,
salary,
first_value(employee_name) OVER (
ORDER BY salary
RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) highest_salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary
RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING) second_highest_salary
Expand All @@ -188,6 +198,9 @@ ORDER BY salary;
SELECT
employee_name,
salary,
first_value(employee_name) OVER (
ORDER BY salary DESC
ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) highest_salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) second_highest_salary
Expand All @@ -198,6 +211,9 @@ ORDER BY salary DESC;
SELECT
employee_name,
salary,
first_value(employee_name) OVER (
ORDER BY salary DESC
RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) highest_salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) second_highest_salary
Expand All @@ -208,6 +224,9 @@ ORDER BY salary DESC;
SELECT
employee_name,
salary,
first_value(employee_name) OVER (
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) highest_salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary
Expand All @@ -218,6 +237,9 @@ ORDER BY salary DESC;
SELECT
employee_name,
salary,
first_value(employee_name) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) highest_salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary
Expand All @@ -228,6 +250,9 @@ ORDER BY salary DESC;
SELECT
employee_name,
salary,
first_value(employee_name) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) highest_salary,
nth_value(employee_name, 2) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) second_highest_salary
Expand All @@ -239,6 +264,11 @@ SELECT
employee_name,
department,
salary,
FIRST_VALUE(employee_name) OVER (
PARTITION BY department
ORDER BY salary DESC
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) highest_salary,
NTH_VALUE(employee_name, 2) OVER (
PARTITION BY department
ORDER BY salary DESC
Expand Down
Loading

0 comments on commit 181186c

Please sign in to comment.