Skip to content

Commit

Permalink
[SPARK-25842][SQL] Deprecate rangeBetween APIs introduced in SPARK-21608
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
See the detailed information at https://issues.apache.org/jira/browse/SPARK-25841 on why these APIs should be deprecated and redesigned.

This patch also reverts apache@8acb51f which applies to 2.4.

## How was this patch tested?
Only deprecation and doc changes.

Closes apache#22841 from rxin/SPARK-25842.

Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
rxin authored and cloud-fan committed Oct 26, 2018
1 parent 86d469a commit 89d748b
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 175 deletions.
30 changes: 0 additions & 30 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,36 +858,6 @@ def ntile(n):
return Column(sc._jvm.functions.ntile(int(n)))


@since(2.4)
def unboundedPreceding():
"""
Window function: returns the special frame boundary that represents the first row
in the window partition.
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.unboundedPreceding())


@since(2.4)
def unboundedFollowing():
"""
Window function: returns the special frame boundary that represents the last row
in the window partition.
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.unboundedFollowing())


@since(2.4)
def currentRow():
"""
Window function: returns the special frame boundary that represents the current row
in the window partition.
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.currentRow())


# ---------------------- Date/Timestamp functions ------------------------------

@since(1.5)
Expand Down
70 changes: 18 additions & 52 deletions python/pyspark/sql/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
#

import sys
if sys.version >= '3':
long = int

from pyspark import since, SparkContext
from pyspark.sql.column import Column, _to_seq, _to_java_column
from pyspark.sql.column import _to_seq, _to_java_column

__all__ = ["Window", "WindowSpec"]

Expand Down Expand Up @@ -126,45 +124,20 @@ def rangeBetween(start, end):
and "5" means the five off after the current row.
We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``,
``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow``
to specify special boundary values, rather than using integral values directly.
and ``Window.currentRow`` to specify special boundary values, rather than using integral
values directly.
:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``,
a column returned by ``pyspark.sql.functions.unboundedPreceding``, or
The frame is unbounded if this is ``Window.unboundedPreceding``, or
any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``,
a column returned by ``pyspark.sql.functions.unboundedFollowing``, or
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to min(sys.maxsize, 9223372036854775807).
>>> from pyspark.sql import functions as F, SparkSession, Window
>>> spark = SparkSession.builder.getOrCreate()
>>> df = spark.createDataFrame(
... [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])
>>> window = Window.orderBy("id").partitionBy("category").rangeBetween(
... F.currentRow(), F.lit(1))
>>> df.withColumn("sum", F.sum("id").over(window)).show()
+---+--------+---+
| id|category|sum|
+---+--------+---+
| 1| b| 3|
| 2| b| 5|
| 3| b| 3|
| 1| a| 4|
| 1| a| 4|
| 2| a| 2|
+---+--------+---+
"""
if isinstance(start, (int, long)) and isinstance(end, (int, long)):
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
elif isinstance(start, Column) and isinstance(end, Column):
start = start._jc
end = end._jc
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
return WindowSpec(jspec)
Expand Down Expand Up @@ -239,34 +212,27 @@ def rangeBetween(self, start, end):
and "5" means the five off after the current row.
We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``,
``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow``
to specify special boundary values, rather than using integral values directly.
and ``Window.currentRow`` to specify special boundary values, rather than using integral
values directly.
:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``,
a column returned by ``pyspark.sql.functions.unboundedPreceding``, or
The frame is unbounded if this is ``Window.unboundedPreceding``, or
any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``,
a column returned by ``pyspark.sql.functions.unboundedFollowing``, or
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
if isinstance(start, (int, long)) and isinstance(end, (int, long)):
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
elif isinstance(start, Column) and isinstance(end, Column):
start = start._jc
end = end._jc
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
return WindowSpec(self._jspec.rangeBetween(start, end))


def _test():
import doctest
SparkContext('local[4]', 'PythonTest')
(failure_count, test_count) = doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE)
(failure_count, test_count) = doctest.testmod()
if failure_count:
sys.exit(-1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,52 +215,10 @@ object Window {
}

/**
* Creates a [[WindowSpec]] with the frame boundaries defined,
* from `start` (inclusive) to `end` (inclusive).
*
* Both `start` and `end` are relative to the current row. For example, "lit(0)" means
* "current row", while "lit(-1)" means one off before the current row, and "lit(5)" means the
* five off after the current row.
*
* Users should use `unboundedPreceding()`, `unboundedFollowing()`, and `currentRow()` from
* [[org.apache.spark.sql.functions]] to specify special boundary values, literals are not
* transformed to [[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s.
*
* A range-based boundary is based on the actual value of the ORDER BY
* expression(s). An offset is used to alter the value of the ORDER BY expression, for
* instance if the current order by expression has a value of 10 and the lower bound offset
* is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
* number of constraints on the ORDER BY expressions: there can be only one expression and this
* expression must have a numerical/date/timestamp data type. An exception can be made when the
* offset is unbounded, because no value modification is needed, in this case multiple and
* non-numerical/date/timestamp data type ORDER BY expression are allowed.
*
* {{{
* import org.apache.spark.sql.expressions.Window
* val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
* .toDF("id", "category")
* val byCategoryOrderedById =
* Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), lit(1))
* df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
*
* +---+--------+---+
* | id|category|sum|
* +---+--------+---+
* | 1| b| 3|
* | 2| b| 5|
* | 3| b| 3|
* | 1| a| 4|
* | 1| a| 4|
* | 2| a| 2|
* +---+--------+---+
* }}}
*
* @param start boundary start, inclusive. The frame is unbounded if the expression is
* [[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]].
* @param end boundary end, inclusive. The frame is unbounded if the expression is
* [[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]].
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
* @since 2.3.0
*/
@deprecated("Use the version with Long parameter types", "2.4.0")
def rangeBetween(start: Column, end: Column): WindowSpec = {
spec.rangeBetween(start, end)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,51 +210,10 @@ class WindowSpec private[sql](
}

/**
* Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
*
* Both `start` and `end` are relative to the current row. For example, "lit(0)" means
* "current row", while "lit(-1)" means one off before the current row, and "lit(5)" means the
* five off after the current row.
*
* Users should use `unboundedPreceding()`, `unboundedFollowing()`, and `currentRow()` from
* [[org.apache.spark.sql.functions]] to specify special boundary values, literals are not
* transformed to [[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s.
*
* A range-based boundary is based on the actual value of the ORDER BY
* expression(s). An offset is used to alter the value of the ORDER BY expression, for
* instance if the current order by expression has a value of 10 and the lower bound offset
* is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
* number of constraints on the ORDER BY expressions: there can be only one expression and this
* expression must have a numerical/date/timestamp data type. An exception can be made when the
* offset is unbounded, because no value modification is needed, in this case multiple and
* non-numerical/date/timestamp data type ORDER BY expression are allowed.
*
* {{{
* import org.apache.spark.sql.expressions.Window
* val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
* .toDF("id", "category")
* val byCategoryOrderedById =
* Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), lit(1))
* df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
*
* +---+--------+---+
* | id|category|sum|
* +---+--------+---+
* | 1| b| 3|
* | 2| b| 5|
* | 3| b| 3|
* | 1| a| 4|
* | 1| a| 4|
* | 2| a| 2|
* +---+--------+---+
* }}}
*
* @param start boundary start, inclusive. The frame is unbounded if the expression is
* [[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]].
* @param end boundary end, inclusive. The frame is unbounded if the expression is
* [[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]].
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
* @since 2.3.0
*/
@deprecated("Use the version with Long parameter types", "2.4.0")
def rangeBetween(start: Column, end: Column): WindowSpec = {
new WindowSpec(
partitionSpec,
Expand Down
12 changes: 6 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -830,30 +830,30 @@ object functions {
// Window functions
//////////////////////////////////////////////////////////////////////////////////////////////
/**
* Window function: returns the special frame boundary that represents the first row in the
* window partition.
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
*
* @group window_funcs
* @since 2.3.0
*/
@deprecated("Use Window.unboundedPreceding", "2.4.0")
def unboundedPreceding(): Column = Column(UnboundedPreceding)

/**
* Window function: returns the special frame boundary that represents the last row in the
* window partition.
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
*
* @group window_funcs
* @since 2.3.0
*/
@deprecated("Use Window.unboundedFollowing", "2.4.0")
def unboundedFollowing(): Column = Column(UnboundedFollowing)

/**
* Window function: returns the special frame boundary that represents the current row in the
* window partition.
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
*
* @group window_funcs
* @since 2.3.0
*/
@deprecated("Use Window.currentRow", "2.4.0")
def currentRow(): Column = Column(CurrentRow)

/**
Expand Down

0 comments on commit 89d748b

Please sign in to comment.