Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32934][SQL] Improve the performance for NTH_VALUE and reactor the OffsetWindowFunction #29800

Closed
wants to merge 63 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
4a6f903
Reuse completeNextStageWithFetchFailure
beliefer Jun 19, 2020
96456e2
Merge remote-tracking branch 'upstream/master'
beliefer Jul 1, 2020
4314005
Merge remote-tracking branch 'upstream/master'
beliefer Jul 3, 2020
d6af4a7
Merge remote-tracking branch 'upstream/master'
beliefer Jul 9, 2020
f69094f
Merge remote-tracking branch 'upstream/master'
beliefer Jul 16, 2020
b86a42d
Merge remote-tracking branch 'upstream/master'
beliefer Jul 25, 2020
2ac5159
Merge branch 'master' of github.com:beliefer/spark
beliefer Jul 25, 2020
9021d6c
Merge remote-tracking branch 'upstream/master'
beliefer Jul 28, 2020
74a2ef4
Merge branch 'master' of github.com:beliefer/spark
beliefer Jul 28, 2020
9828158
Merge remote-tracking branch 'upstream/master'
beliefer Jul 31, 2020
9cd1aaf
Merge remote-tracking branch 'upstream/master'
beliefer Aug 5, 2020
abfcbb9
Merge remote-tracking branch 'upstream/master'
beliefer Aug 26, 2020
07c6c81
Merge remote-tracking branch 'upstream/master'
beliefer Sep 1, 2020
c9a96c3
Support window function nth_value
beliefer Sep 1, 2020
580130b
Merge remote-tracking branch 'upstream/master'
beliefer Sep 2, 2020
bc0c308
Remove some code.
beliefer Sep 3, 2020
d95a7b7
Merge branch 'support-nth_value' of github.com:beliefer/spark
beliefer Sep 4, 2020
addcdbc
Update golden file.
beliefer Sep 4, 2020
3712808
Merge branch 'master' of github.com:beliefer/spark
beliefer Sep 11, 2020
6107413
Merge remote-tracking branch 'upstream/master'
beliefer Sep 11, 2020
2aee591
Fix conflict
beliefer Sep 11, 2020
4b799b4
Merge remote-tracking branch 'upstream/master'
beliefer Sep 14, 2020
a9c17a4
Try to optimize nth_value
beliefer Sep 15, 2020
8778412
Optimize code and add test cases.
beliefer Sep 16, 2020
f190dd8
Merge branch 'support-nth_value' into optimize-nth_value
beliefer Sep 16, 2020
db2b1d4
Optimize code.
beliefer Sep 16, 2020
7ff1815
Merge branch 'support-nth_value' into optimize-nth_value
beliefer Sep 16, 2020
5cb4686
Fix bug.
beliefer Sep 16, 2020
97f6376
Update golden file.
beliefer Sep 16, 2020
4002aaf
Update golden file.
beliefer Sep 17, 2020
501d564
Improve comments.
beliefer Sep 18, 2020
51fc8eb
Merge branch 'support-nth_value' into optimize-nth_value
beliefer Sep 18, 2020
9610c86
Merge branch 'master' into optimize-nth_value
beliefer Sep 18, 2020
ee0ecbf
Merge remote-tracking branch 'upstream/master'
beliefer Sep 18, 2020
1dd2d36
Optimize code.
beliefer Sep 21, 2020
d4e1cb2
Optimize code.
beliefer Sep 21, 2020
fb7778e
Optimize code
beliefer Sep 21, 2020
f1f80d3
Add comments.
beliefer Sep 21, 2020
596bc61
Merge remote-tracking branch 'upstream/master'
beliefer Sep 24, 2020
b5a48b8
Fix conflict.
beliefer Sep 24, 2020
ae71344
Fix bug.
beliefer Sep 24, 2020
875b92b
Supplement test cases.
beliefer Sep 25, 2020
0164e2f
Merge remote-tracking branch 'upstream/master'
beliefer Sep 27, 2020
90b79fc
Merge remote-tracking branch 'upstream/master'
beliefer Sep 29, 2020
ae73c47
Merge branch 'master' into optimize-nth_value
beliefer Sep 29, 2020
dab9e30
Improve comments.
beliefer Sep 29, 2020
37bbff4
Optimize code
beliefer Oct 12, 2020
fcd3f44
Optimize code
beliefer Oct 12, 2020
4d21236
Optimize code
beliefer Oct 12, 2020
0f2ab8c
Optimize code
beliefer Oct 12, 2020
2cef3a9
Merge remote-tracking branch 'upstream/master'
beliefer Oct 13, 2020
c26b64f
Merge remote-tracking branch 'upstream/master'
beliefer Oct 19, 2020
28dc1a2
Fix conflict.
beliefer Oct 19, 2020
cd31649
Optimize code.
beliefer Oct 22, 2020
d7b9862
Optimize code.
beliefer Oct 22, 2020
22adc21
Optimize code.
beliefer Oct 23, 2020
9e84f54
Optimize code.
beliefer Oct 23, 2020
cde6170
Optimize codde.
beliefer Oct 27, 2020
ca574d9
Optimize code
beliefer Oct 27, 2020
fd59f6e
Optimize code
beliefer Oct 27, 2020
72e7805
Optimize code
beliefer Oct 27, 2020
9c35ddd
Optimize code
beliefer Oct 27, 2020
1c0e82b
Optimize code
beliefer Oct 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2974,7 +2974,7 @@ class Analyzer(
*/
object ResolveWindowFrame extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case WindowExpression(wf: OffsetWindowFunction,
case WindowExpression(wf: FrameLessOffsetWindowFunction,
WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f =>
failAnalysis(s"Cannot specify window frame for ${wf.prettyName} function")
case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ trait CheckAnalysis extends PredicateHelper {
case w @ WindowExpression(AggregateExpression(_, _, true, _, _), _) =>
failAnalysis(s"Distinct window functions are not supported: $w")

case w @ WindowExpression(_: OffsetWindowFunction,
case w @ WindowExpression(_: FrameLessOffsetWindowFunction,
WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame))
if order.isEmpty || !frame.isOffset =>
failAnalysis("An offset window function can only be evaluated in an ordered " +
Expand All @@ -176,7 +176,8 @@ trait CheckAnalysis extends PredicateHelper {
// Only allow window functions with an aggregate expression or an offset window
// function or a Pandas window UDF.
e match {
case _: AggregateExpression | _: OffsetWindowFunction | _: AggregateWindowFunction =>
case _: AggregateExpression | _: FrameLessOffsetWindowFunction |
_: AggregateWindowFunction =>
w
case f: PythonUDF if PythonUDF.isWindowPandasUDF(f) =>
w
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,25 +327,14 @@ object WindowFunctionType {
}
}


/**
* An offset window function is a window function that returns the value of the input column offset
* by a number of rows within the partition. For instance: an OffsetWindowfunction for value x with
* offset -2, will get the value of x 2 rows back in the partition.
*/
abstract class OffsetWindowFunction
extends Expression with WindowFunction with Unevaluable with ImplicitCastInputTypes {
trait OffsetWindowSpec extends Expression {
/**
* Input expression to evaluate against a row which a number of rows below or above (depending on
* the value and sign of the offset) the current row.
* the value and sign of the offset) the starting row (current row if isRelative=true, or the
* first row of the window frame otherwise).
*/
val input: Expression

/**
* Default result value for the function when the `offset`th row does not exist.
*/
val default: Expression
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved

/**
* (Foldable) expression that contains the number of rows between the current row and the row
* where the input expression is evaluated. If `offset` is a positive integer, it means that
Expand All @@ -355,6 +344,36 @@ abstract class OffsetWindowFunction
*/
val offset: Expression

/**
* Default result value for the function when the `offset`th row does not exist.
*/
val default: Expression

/**
* An optional specification that indicates the offset window function should skip null values in
* the determination of which row to use.
*/
val ignoreNulls: Boolean

/**
* Whether the offset is starts with the current row. If `isRelative` is true, `offset` means
* the offset is start with the current row. otherwise, the offset is starts with the first
* row of the entire window frame.
*/
val isRelative: Boolean

lazy val fakeFrame = SpecifiedWindowFrame(RowFrame, offset, offset)
}

/**
* A frameless offset window function is a window function that cannot specify window frame and
* returns the value of the input column offset by a number of rows within the partition.
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
* For instance: a FrameLessOffsetWindowFunction for value x with offset -2, will get the value of
* x 2 rows back in the partition.
*/
abstract class FrameLessOffsetWindowFunction
extends WindowFunction with OffsetWindowSpec with Unevaluable with ImplicitCastInputTypes {

override def children: Seq[Expression] = Seq(input, offset, default)

/*
Expand All @@ -370,7 +389,11 @@ abstract class OffsetWindowFunction

override def nullable: Boolean = default == null || default.nullable || input.nullable

override lazy val frame: WindowFrame = SpecifiedWindowFrame(RowFrame, offset, offset)
override val ignoreNulls = false

override val isRelative = true

override lazy val frame: WindowFrame = fakeFrame

override def checkInputDataTypes(): TypeCheckResult = {
val check = super.checkInputDataTypes()
Expand Down Expand Up @@ -425,7 +448,7 @@ abstract class OffsetWindowFunction
group = "window_funcs")
// scalastyle:on line.size.limit line.contains.tab
case class Lead(input: Expression, offset: Expression, default: Expression)
extends OffsetWindowFunction {
extends FrameLessOffsetWindowFunction {

def this(input: Expression, offset: Expression) = this(input, offset, Literal(null))

Expand Down Expand Up @@ -467,7 +490,7 @@ case class Lead(input: Expression, offset: Expression, default: Expression)
group = "window_funcs")
// scalastyle:on line.size.limit line.contains.tab
case class Lag(input: Expression, inputOffset: Expression, default: Expression)
extends OffsetWindowFunction {
extends FrameLessOffsetWindowFunction {

def this(input: Expression, offset: Expression) = this(input, offset, Literal(null))

Expand Down Expand Up @@ -579,7 +602,6 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
}

// scalastyle:off line.size.limit line.contains.tab

@ExpressionDescription(
usage = """
_FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row
Expand Down Expand Up @@ -607,12 +629,16 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
since = "3.1.0",
group = "window_funcs")
// scalastyle:on line.size.limit line.contains.tab
case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean)
extends AggregateWindowFunction with ImplicitCastInputTypes {
case class NthValue(input: Expression, offset: Expression, ignoreNulls: Boolean)
extends AggregateWindowFunction with OffsetWindowSpec with ImplicitCastInputTypes {

def this(child: Expression, offset: Expression) = this(child, offset, false)

override def children: Seq[Expression] = input :: offsetExpr :: Nil
override lazy val default = Literal.create(null, input.dataType)

override val isRelative = false

override def children: Seq[Expression] = input :: offset :: Nil

override val frame: WindowFrame = UnspecifiedFrame
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -624,35 +650,35 @@ case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Bool
val check = super.checkInputDataTypes()
if (check.isFailure) {
check
} else if (!offsetExpr.foldable) {
TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.")
} else if (offset <= 0) {
} else if (!offset.foldable) {
TypeCheckFailure(s"Offset expression '$offset' must be a literal.")
} else if (offsetVal <= 0) {
TypeCheckFailure(
s"The 'offset' argument of nth_value must be greater than zero but it is $offset.")
s"The 'offset' argument of nth_value must be greater than zero but it is $offsetVal.")
} else {
TypeCheckSuccess
}
}

private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong
private lazy val offsetVal = offset.eval().asInstanceOf[Int].toLong
private lazy val result = AttributeReference("result", input.dataType)()
private lazy val count = AttributeReference("count", LongType)()
override lazy val aggBufferAttributes: Seq[AttributeReference] = result :: count :: Nil

override lazy val initialValues: Seq[Literal] = Seq(
/* result = */ Literal.create(null, input.dataType),
/* result = */ default,
/* count = */ Literal(1L)
)

override lazy val updateExpressions: Seq[Expression] = {
if (ignoreNulls) {
Seq(
/* result = */ If(count === offset && input.isNotNull, input, result),
/* result = */ If(count === offsetVal && input.isNotNull, input, result),
/* count = */ If(input.isNull, count, count + 1L)
)
} else {
Seq(
/* result = */ If(count === offset, input, result),
/* result = */ If(count === offsetVal, input, result),
/* count = */ count + 1L
)
}
Expand All @@ -662,7 +688,7 @@ case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Bool

override def prettyName: String = "nth_value"
override def sql: String =
s"$prettyName(${input.sql}, ${offsetExpr.sql})${if (ignoreNulls) " ignore nulls" else ""}"
s"$prettyName(${input.sql}, ${offset.sql})${if (ignoreNulls) " ignore nulls" else ""}"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType,
* 3. CURRENT ROW AND 1 FOLLOWING
* 4. 1 PRECEDING AND 1 FOLLOWING
* 5. 1 FOLLOWING AND 2 FOLLOWING
* - Offset frame: The frame consist of one row, which is an offset number of rows away from the
* current row. Only [[OffsetWindowFunction]]s can be processed in an offset frame.
* - Offset frame: The frame consist of one row, which is an offset number of rows. There are three
* implement of offset frame.
* 1. [[FrameLessOffsetWindowFunction]] returns the value of the input column offset by a number
* of rows according to the current row.
* 2. [[UnboundedOffsetWindowFunctionFrame]] and [[UnboundedPrecedingOffsetWindowFunctionFrame]]
* returns the value of the input column offset by a number of rows within the frame.
*
* Different frame boundaries can be used in Growing, Shrinking and Moving frames. A frame
* boundary can be either Row or Range based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,15 @@ trait WindowExecBase extends UnaryExecNode {
val frame = spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
function match {
case AggregateExpression(f, _, _, _, _) => collect("AGGREGATE", frame, e, f)
case f: FrameLessOffsetWindowFunction => collect("FRAME_LESS_OFFSET", frame, e, f)
case f: OffsetWindowSpec if !f.ignoreNulls &&
frame.frameType == RowFrame && frame.lower == UnboundedPreceding =>
frame.upper match {
case UnboundedFollowing => collect("UNBOUNDED_OFFSET", f.fakeFrame, e, f)
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
case CurrentRow => collect("UNBOUNDED_PRECEDING_OFFSET", f.fakeFrame, e, f)
case _ => collect("AGGREGATE", frame, e, f)
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
}
case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, f)
case f: OffsetWindowFunction => collect("OFFSET", frame, e, f)
case f: PythonUDF => collect("AGGREGATE", frame, e, f)
case f => sys.error(s"Unsupported window function: $f")
}
Expand Down Expand Up @@ -171,18 +178,42 @@ trait WindowExecBase extends UnaryExecNode {

// Create the factory to produce WindowFunctionFrame.
val factory = key match {
// Offset Frame
case ("OFFSET", _, IntegerLiteral(offset), _) =>
// Frameless offset Frame
case ("FRAME_LESS_OFFSET", _, IntegerLiteral(offset), _) =>
target: InternalRow =>
new OffsetWindowFunctionFrame(
new FrameLessOffsetWindowFunctionFrame(
target,
ordinal,
// OFFSET frame functions are guaranteed be OffsetWindowFunctions.
functions.map(_.asInstanceOf[OffsetWindowFunction]),
// OFFSET frame functions are guaranteed be OffsetWindowSpec.
functions.map(_.asInstanceOf[OffsetWindowSpec]),
child.output,
(expressions, schema) =>
MutableProjection.create(expressions, schema),
offset)
case ("UNBOUNDED_OFFSET", _, IntegerLiteral(offset), _) =>
target: InternalRow => {
new UnboundedOffsetWindowFunctionFrame(
target,
ordinal,
// OFFSET frame functions are guaranteed be OffsetWindowSpec.
functions.map(_.asInstanceOf[OffsetWindowSpec]),
child.output,
(expressions, schema) =>
MutableProjection.create(expressions, schema),
offset)
}
case ("UNBOUNDED_PRECEDING_OFFSET", _, IntegerLiteral(offset), _) =>
target: InternalRow => {
new UnboundedPrecedingOffsetWindowFunctionFrame(
target,
ordinal,
// OFFSET frame functions are guaranteed be OffsetWindowSpec.
functions.map(_.asInstanceOf[OffsetWindowSpec]),
child.output,
(expressions, schema) =>
MutableProjection.create(expressions, schema),
offset)
}

// Entire Partition Frame.
case ("AGGREGATE", _, UnboundedPreceding, UnboundedFollowing) =>
Expand Down
Loading