Skip to content

[SPARK-44289][SPARK-43874][SPARK-43869][SPARK-43607][PS] Support indexer_between_time for pandas 2.0.0 & enabling more tests. #42533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions python/pyspark/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -4165,6 +4165,7 @@ def value_counts(

Examples
--------
>>> import numpy as np
>>> df = ps.DataFrame({'A': [1, 2, 2, 3, 3, 3],
... 'B': [1, 1, 2, 3, 3, np.nan]},
... columns=['A', 'B'])
Expand All @@ -4183,7 +4184,7 @@ def value_counts(
2 1.0 1
2.0 1
3 3.0 2
Name: B, dtype: int64
Name: count, dtype: int64

Don't include counts of NaN when dropna is False.

Expand All @@ -4195,7 +4196,7 @@ def value_counts(
2.0 1
3 3.0 2
NaN 1
Name: B, dtype: int64
Name: count, dtype: int64
"""
warnings.warn(
"The resulting Series will have a fixed name of 'count' from 4.0.0.",
Expand Down Expand Up @@ -4232,7 +4233,7 @@ def value_counts(
psser._internal.data_fields[0].copy(name=name)
for psser, name in zip(groupkeys, groupkey_names)
],
column_labels=[self._agg_columns[0]._column_label],
column_labels=[("count",)],
data_spark_columns=[scol_for(sdf, agg_column)],
)
return first_series(DataFrame(internal))
Expand Down
20 changes: 14 additions & 6 deletions python/pyspark/pandas/indexes/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,24 +730,32 @@ def indexer_between_time(

Examples
--------
>>> psidx = ps.date_range("2000-01-01", periods=3, freq="T") # doctest: +SKIP
>>> psidx # doctest: +SKIP
>>> psidx = ps.date_range("2000-01-01", periods=3, freq="T")
>>> psidx
DatetimeIndex(['2000-01-01 00:00:00', '2000-01-01 00:01:00',
'2000-01-01 00:02:00'],
dtype='datetime64[ns]', freq=None)

>>> psidx.indexer_between_time("00:01", "00:02").sort_values() # doctest: +SKIP
>>> psidx.indexer_between_time("00:01", "00:02").sort_values()
Index([1, 2], dtype='int64')

>>> psidx.indexer_between_time("00:01", "00:02", include_end=False) # doctest: +SKIP
>>> psidx.indexer_between_time("00:01", "00:02", include_end=False)
Index([1], dtype='int64')

>>> psidx.indexer_between_time("00:01", "00:02", include_start=False) # doctest: +SKIP
>>> psidx.indexer_between_time("00:01", "00:02", include_start=False)
Index([2], dtype='int64')
"""

def pandas_between_time(pdf) -> ps.DataFrame[int]: # type: ignore[no-untyped-def]
return pdf.between_time(start_time, end_time, include_start, include_end)
if include_start and include_end:
inclusive = "both"
elif not include_start and not include_end:
inclusive = "neither"
elif include_start and not include_end:
inclusive = "left"
elif not include_start and include_end:
inclusive = "right"
return pdf.between_time(start_time, end_time, inclusive=inclusive)

psdf = self.to_frame()[[]]
id_column_name = verify_temp_column_name(psdf, "__id_column__")
Expand Down
51 changes: 9 additions & 42 deletions python/pyspark/pandas/tests/computation/test_cov.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@


class FrameCovMixin:
@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43809): Enable DataFrameSlowTests.test_cov for pandas 2.0.0.",
)
def test_cov(self):
# SPARK-36396: Implement DataFrame.cov

Expand Down Expand Up @@ -66,12 +62,8 @@ def test_cov(self):
self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))

# extension dtype
if LooseVersion(pd.__version__) >= LooseVersion("1.2"):
numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "Float32", "Float64", "float"]
boolean_dtypes = ["boolean", "bool"]
else:
numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "float"]
boolean_dtypes = ["boolean", "bool"]
numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "Float32", "Float64", "float"]
boolean_dtypes = ["boolean", "bool"]

sers = [pd.Series([1, 2, 3, None], dtype=dtype) for dtype in numeric_dtypes]
sers += [pd.Series([True, False, True, None], dtype=dtype) for dtype in boolean_dtypes]
Expand All @@ -81,44 +73,19 @@ def test_cov(self):
pdf.columns = [dtype for dtype in numeric_dtypes + boolean_dtypes] + ["decimal"]
psdf = ps.from_pandas(pdf)

if LooseVersion(pd.__version__) >= LooseVersion("1.2"):
self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
self.assert_eq(pdf.cov(min_periods=3), psdf.cov(min_periods=3), almost=True)
self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4))
else:
test_types = [
"Int8",
"Int16",
"Int32",
"Int64",
"float",
"boolean",
"bool",
]
expected = pd.DataFrame(
data=[
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.3333333, 0.3333333],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.3333333, 0.3333333],
],
index=test_types,
columns=test_types,
)
self.assert_eq(expected, psdf.cov(), almost=True)
self.assert_eq(pdf.cov(numeric_only=True), psdf.cov(), almost=True)

# string column
pdf = pd.DataFrame(
[(1, 2, "a", 1), (0, 3, "b", 1), (2, 0, "c", 9), (1, 1, "d", 1)],
columns=["a", "b", "c", "d"],
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True)
self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
self.assert_eq(pdf.cov(numeric_only=True), psdf.cov(), almost=True)
self.assert_eq(
pdf.cov(numeric_only=True, min_periods=4), psdf.cov(min_periods=4), almost=True
)
self.assert_eq(pdf.cov(numeric_only=True, min_periods=5), psdf.cov(min_periods=5))

# nan
np.random.seed(42)
Expand All @@ -132,7 +99,7 @@ def test_cov(self):
# return empty DataFrame
pdf = pd.DataFrame([("1", "2"), ("0", "3"), ("2", "0"), ("1", "1")], columns=["a", "b"])
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.cov(), psdf.cov())
self.assert_eq(pdf.cov(numeric_only=True), psdf.cov())


class FrameCovTests(FrameCovMixin, ComparisonTestBase, SQLTestUtils):
Expand Down
20 changes: 8 additions & 12 deletions python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,25 @@ def test_add(self):
for psser in self.pssers:
self.assertRaises(TypeError, lambda: self.psser + psser)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43571): Enable DateOpsTests.test_sub for pandas 2.0.0.",
)
def test_sub(self):
self.assertRaises(TypeError, lambda: self.psser - "x")
self.assertRaises(TypeError, lambda: self.psser - 1)
self.assert_eq(
(self.pser - self.some_date).dt.days,
(self.pser - self.some_date).apply(lambda x: x.days),
self.psser - self.some_date,
)
pdf, psdf = self.pdf, self.psdf
for col in self.df_cols:
if col == "date":
self.assert_eq((pdf["date"] - pdf[col]).dt.days, psdf["date"] - psdf[col])
self.assert_eq(
(pdf["date"] - pdf[col]).apply(lambda x: x.days), psdf["date"] - psdf[col]
)
else:
self.assertRaises(TypeError, lambda: psdf["date"] - psdf[col])
pdf, psdf = self.date_pdf, self.date_psdf
self.assert_eq((pdf["this"] - pdf["that"]).dt.days, psdf["this"] - psdf["that"])
self.assert_eq(
(pdf["this"] - pdf["that"]).apply(lambda x: x.days), psdf["this"] - psdf["that"]
)

def test_mul(self):
self.assertRaises(TypeError, lambda: self.psser * "x")
Expand Down Expand Up @@ -128,15 +128,11 @@ def test_radd(self):
self.assertRaises(TypeError, lambda: 1 + self.psser)
self.assertRaises(TypeError, lambda: self.some_date + self.psser)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43570): Enable DateOpsTests.test_rsub for pandas 2.0.0.",
)
def test_rsub(self):
self.assertRaises(TypeError, lambda: "x" - self.psser)
self.assertRaises(TypeError, lambda: 1 - self.psser)
self.assert_eq(
(self.some_date - self.pser).dt.days,
(self.some_date - self.pser).apply(lambda x: x.days),
self.some_date - self.psser,
)

Expand Down
12 changes: 2 additions & 10 deletions python/pyspark/pandas/tests/groupby/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ def pdf(self):
def psdf(self):
return ps.from_pandas(self.pdf)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-44289): Enable GroupbyAggregateTests.test_aggregate for pandas 2.0.0.",
)
def test_aggregate(self):
pdf = pd.DataFrame(
{"A": [1, 1, 2, 2], "B": [1, 2, 3, 4], "C": [0.362, 0.227, 1.267, -0.562]}
Expand Down Expand Up @@ -173,12 +169,8 @@ def sort(df):
stats_psdf = psdf.groupby(10).agg({20: ["min", "max"], 30: "sum"})
stats_pdf = pdf.groupby(10).agg({20: ["min", "max"], 30: "sum"})
self.assert_eq(
stats_psdf.sort_values(by=[(20, "min"), (20, "max"), (30, "sum")]).reset_index(
drop=True
),
stats_pdf.sort_values(by=[(20, "min"), (20, "max"), (30, "sum")]).reset_index(
drop=True
),
stats_psdf.reset_index(drop=True),
stats_pdf.reset_index(drop=True),
)

def test_aggregate_func_str_list(self):
Expand Down
11 changes: 3 additions & 8 deletions python/pyspark/pandas/tests/groupby/test_apply_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ def pdf(self):
def psdf(self):
return ps.from_pandas(self.pdf)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43708): Enable GroupByTests.test_apply " "for pandas 2.0.0.",
)
def test_apply(self):
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
Expand Down Expand Up @@ -87,14 +83,17 @@ def test_apply(self):
self.assert_eq(
psdf.groupby(psdf.b // 5).apply(lambda x: x + x.min()).sort_index(),
pdf.groupby(pdf.b // 5).apply(lambda x: x + x.min()).sort_index(),
almost=True,
)
self.assert_eq(
psdf.groupby(psdf.b // 5)["a"].apply(lambda x: x + x.min()).sort_index(),
pdf.groupby(pdf.b // 5)["a"].apply(lambda x: x + x.min()).sort_index(),
almost=True,
)
self.assert_eq(
psdf.groupby(psdf.b // 5)[["a"]].apply(lambda x: x + x.min()).sort_index(),
pdf.groupby(pdf.b // 5)[["a"]].apply(lambda x: x + x.min()).sort_index(),
almost=True,
)
self.assert_eq(
psdf.groupby(psdf.b // 5)[["a"]].apply(len).sort_index(),
Expand Down Expand Up @@ -139,10 +138,6 @@ def test_apply(self):
pdf.groupby([("x", "a"), ("x", "b")]).apply(len).sort_index(),
)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43706): Enable GroupByTests.test_apply_without_shortcut " "for pandas 2.0.0.",
)
def test_apply_without_shortcut(self):
with option_context("compute.shortcut_limit", 0):
self.test_apply()
Expand Down
7 changes: 3 additions & 4 deletions python/pyspark/pandas/tests/groupby/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,6 @@ def test_unique(self):
for act, exp in zip(actual, expect):
self.assertTrue(sorted(act) == sorted(exp))

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43444): Enable GroupBySlowTests.test_value_counts for pandas 2.0.0.",
)
def test_value_counts(self):
pdf = pd.DataFrame(
{"A": [np.nan, 2, 2, 3, 3, 3], "B": [1, 1, 2, 3, 3, np.nan]}, columns=["A", "B"]
Expand All @@ -785,6 +781,7 @@ def test_value_counts(self):
self.assert_eq(
psdf.groupby("A")["B"].value_counts(dropna=False).sort_index(),
pdf.groupby("A")["B"].value_counts(dropna=False).sort_index(),
almost=True,
)
self.assert_eq(
psdf.groupby("A", dropna=False)["B"].value_counts(dropna=False).sort_index(),
Expand All @@ -804,6 +801,7 @@ def test_value_counts(self):
pdf.groupby("A")["B"]
.value_counts(sort=True, ascending=False, dropna=False)
.sort_index(),
almost=True,
)
self.assert_eq(
psdf.groupby("A")["B"]
Expand All @@ -812,6 +810,7 @@ def test_value_counts(self):
pdf.groupby("A")["B"]
.value_counts(sort=True, ascending=True, dropna=False)
.sort_index(),
almost=True,
)
self.assert_eq(
psdf.B.rename().groupby(psdf.A).value_counts().sort_index(),
Expand Down
24 changes: 0 additions & 24 deletions python/pyspark/pandas/tests/indexes/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1577,10 +1577,6 @@ def test_asof(self):
psmidx = ps.MultiIndex.from_tuples([("a", "a"), ("a", "b"), ("a", "c")])
self.assertRaises(NotImplementedError, lambda: psmidx.asof(("a", "b")))

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43608): Enable IndexesTests.test_union for pandas 2.0.0.",
)
def test_union(self):
# Index
pidx1 = pd.Index([1, 2, 3, 4])
Expand All @@ -1593,13 +1589,6 @@ def test_union(self):
self.assert_eq(psidx1.union(psidx2), pidx1.union(pidx2))
self.assert_eq(psidx2.union(psidx1), pidx2.union(pidx1))
self.assert_eq(psidx1.union(psidx3), pidx1.union(pidx3))
# Deprecated case, but adding to track if pandas stop supporting union
# as a set operation. It should work fine until stop supporting anyway.
# No longer supported from pandas 2.0.0.
if LooseVersion(pd.__version__) >= LooseVersion("2.0.0"):
self.assert_eq(psidx1 | psidx2, ps.Index([3, 4], dtype="int64"))
else:
self.assert_eq(pidx1 | pidx2, psidx1 | psidx2)

self.assert_eq(psidx1.union([3, 4, 5, 6]), pidx1.union([3, 4, 5, 6]), almost=True)
self.assert_eq(psidx2.union([1, 2, 3, 4]), pidx2.union([1, 2, 3, 4]), almost=True)
Expand Down Expand Up @@ -1904,10 +1893,6 @@ def test_hasnans(self):
psmidx = ps.Index([("a", 1), ("b", 2)])
self.assertRaises(NotImplementedError, lambda: psmidx.hasnans())

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43607): Enable IndexesTests.test_intersection for pandas 2.0.0.",
)
def test_intersection(self):
pidx = pd.Index([1, 2, 3, 4], name="Koalas")
psidx = ps.from_pandas(pidx)
Expand All @@ -1919,15 +1904,6 @@ def test_intersection(self):
self.assert_eq(
(pidx + 1).intersection(pidx_other), (psidx + 1).intersection(psidx_other).sort_values()
)
# Deprecated case, but adding to track if pandas stop supporting intersection
# as a set operation. It should work fine until stop supporting anyway.
# No longer supported from pandas 2.0.0.
if LooseVersion(pd.__version__) >= LooseVersion("2.0.0"):
self.assert_eq(
(psidx & psidx_other).sort_values(), ps.Index([3, 1, 7, 1], dtype="int64")
)
else:
self.assert_eq(pidx & pidx_other, (psidx & psidx_other).sort_values())

pidx_other_different_name = pd.Index([3, 4, 5, 6], name="Databricks")
psidx_other_different_name = ps.from_pandas(pidx_other_different_name)
Expand Down
5 changes: 0 additions & 5 deletions python/pyspark/pandas/tests/indexes/test_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ def test_strftime(self):
psidx.strftime(date_format="%B %d, %Y"), pidx.strftime(date_format="%B %d, %Y")
)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43644): Enable DatetimeIndexTests.test_indexer_between_time "
"for pandas 2.0.0.",
)
def test_indexer_between_time(self):
for psidx, pidx in self.idx_pairs:
self.assert_eq(
Expand Down
9 changes: 4 additions & 5 deletions python/pyspark/pandas/tests/indexes/test_reindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ def df_pair(self):
psdf = ps.from_pandas(pdf)
return pdf, psdf

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43811): Enable DataFrameTests.test_reindex for pandas 2.0.0.",
)
def test_reindex(self):
index = pd.Index(["A", "B", "C", "D", "E"])
columns = pd.Index(["numbers"])
Expand All @@ -64,9 +60,12 @@ def test_reindex(self):
psdf.reindex(["A", "B", "C"], columns=["numbers", "2", "3"]).sort_index(),
)

# We manually test this due to the bug in pandas.
expected_result = ps.DataFrame([1.0, 2.0, 3.0], index=ps.Index(["A", "B", "C"]))
expected_result.columns = pd.Index(["numbers"], name="cols")
self.assert_eq(
pdf.reindex(["A", "B", "C"], index=["numbers", "2", "3"]).sort_index(),
psdf.reindex(["A", "B", "C"], index=["numbers", "2", "3"]).sort_index(),
expected_result,
)

self.assert_eq(
Expand Down
Loading