Skip to content

[SPARK-40386][PS][SQL] Implement ddof in DataFrame.cov #37829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -8738,8 +8738,7 @@ def update(self, other: "DataFrame", join: str = "left", overwrite: bool = True)
internal = self._internal.with_new_sdf(sdf, data_fields=data_fields)
self._update_internal_frame(internal, check_same_anchor=False)

# TODO: ddof should be implemented.
def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
def cov(self, min_periods: Optional[int] = None, ddof: int = 1) -> "DataFrame":
"""
Compute pairwise covariance of columns, excluding NA/null values.

Expand All @@ -8755,8 +8754,7 @@ def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
below this threshold will be returned as ``NaN``.

This method is generally used for the analysis of time series data to
understand the relationship between different measures
across time.
understand the relationship between different measures across time.

.. versionadded:: 3.3.0

Expand All @@ -8765,6 +8763,11 @@ def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
min_periods : int, optional
Minimum number of observations required per pair of columns
to have a valid result.
ddof : int, default 1
Delta degrees of freedom. The divisor used in calculations
is ``N - ddof``, where ``N`` represents the number of elements.

.. versionadded:: 3.4.0

Returns
-------
Expand Down Expand Up @@ -8794,6 +8797,20 @@ def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
c 0.059277 -0.008543 1.010670 -0.001486 -0.000271
d -0.008943 -0.024738 -0.001486 0.921297 -0.013692
e 0.014144 0.009826 -0.000271 -0.013692 0.977795
>>> df.cov(ddof=2)
a b c d e
a 0.999439 -0.020181 0.059336 -0.008952 0.014159
b -0.020181 1.060413 -0.008551 -0.024762 0.009836
c 0.059336 -0.008551 1.011683 -0.001487 -0.000271
d -0.008952 -0.024762 -0.001487 0.922220 -0.013705
e 0.014159 0.009836 -0.000271 -0.013705 0.978775
>>> df.cov(ddof=-1)
a b c d e
a 0.996444 -0.020121 0.059158 -0.008926 0.014116
b -0.020121 1.057235 -0.008526 -0.024688 0.009807
c 0.059158 -0.008526 1.008650 -0.001483 -0.000270
d -0.008926 -0.024688 -0.001483 0.919456 -0.013664
e 0.014116 0.009807 -0.000270 -0.013664 0.975842

**Minimum number of periods**

Expand All @@ -8813,6 +8830,8 @@ def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
b NaN 1.248003 0.191417
c -0.150812 0.191417 0.895202
"""
if not isinstance(ddof, int):
raise TypeError("ddof must be integer")
min_periods = 1 if min_periods is None else min_periods

# Only compute covariance for Boolean and Numeric except Decimal
Expand Down Expand Up @@ -8891,8 +8910,8 @@ def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
step += r
for c in range(r, num_cols):
cov_scols.append(
F.covar_samp(
F.col(data_cols[r]).cast("double"), F.col(data_cols[c]).cast("double")
SF.covar(
F.col(data_cols[r]).cast("double"), F.col(data_cols[c]).cast("double"), ddof
)
if count_not_null[r * num_cols + c - step] >= min_periods
else F.lit(None)
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/pandas/spark/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def mode(col: Column, dropna: bool) -> Column:
return Column(sc._jvm.PythonSQLUtils.pandasMode(col._jc, dropna))


def covar(col1: Column, col2: Column, ddof: int) -> Column:
sc = SparkContext._active_spark_context
return Column(sc._jvm.PythonSQLUtils.pandasCovar(col1._jc, col2._jc, ddof))


def repeat(col: Column, n: Union[int, Column]) -> Column:
"""
Repeats a string column n times, and returns it as a new string column.
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/pandas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6958,6 +6958,16 @@ def test_cov(self):
self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True)
self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))

# ddof
with self.assertRaisesRegex(TypeError, "ddof must be integer"):
psdf.cov(ddof="ddof")
for ddof in [-1, 0, 2]:
self.assert_eq(pdf.cov(ddof=ddof), psdf.cov(ddof=ddof), almost=True)
self.assert_eq(
pdf.cov(min_periods=4, ddof=ddof), psdf.cov(min_periods=4, ddof=ddof), almost=True
)
self.assert_eq(pdf.cov(min_periods=5, ddof=ddof), psdf.cov(min_periods=5, ddof=ddof))

# bool
pdf = pd.DataFrame(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,25 @@ case class CovSample(
override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): CovSample = copy(left = newLeft, right = newRight)
}

/**
* Covariance in Pandas' fashion. This expression is dedicated only for Pandas API on Spark.
* Refer to numpy.cov.
*/
case class PandasCovar(
override val left: Expression,
override val right: Expression,
ddof: Int)
extends Covariance(left, right, true) {

override val evaluateExpression: Expression = {
If(n === 0.0, Literal.create(null, DoubleType),
If(n === ddof, divideByZeroEvalResult, ck / (n - ddof)))
}
override def prettyName: String = "pandas_covar"

override protected def withNewChildrenInternal(
newLeft: Expression,
newRight: Expression): PandasCovar =
copy(left = newLeft, right = newRight)
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ private[sql] object PythonSQLUtils extends Logging {
def pandasMode(e: Column, ignoreNA: Boolean): Column = {
Column(PandasMode(e.expr, ignoreNA).toAggregateExpression(false))
}

def pandasCovar(col1: Column, col2: Column, ddof: Int): Column = {
Column(PandasCovar(col1.expr, col2.expr, ddof).toAggregateExpression(false))
}
}

/**
Expand Down