Skip to content

feat: df.join lsuffix and rsuffix support #1857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 126 additions & 34 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3485,70 +3485,138 @@ def join(
*,
on: Optional[str] = None,
how: str = "left",
lsuffix: str = "",
rsuffix: str = "",
) -> DataFrame:
if isinstance(other, bigframes.series.Series):
other = other.to_frame()

left, right = self, other

if not left.columns.intersection(right.columns).empty:
raise NotImplementedError(
f"Deduping column names is not implemented. {constants.FEEDBACK_LINK}"
)
col_intersection = left.columns.intersection(right.columns)

if not col_intersection.empty:
if lsuffix == rsuffix == "":
raise ValueError(
f"columns overlap but no suffix specified: {col_intersection}"
)

if how == "cross":
if on is not None:
raise ValueError("'on' is not supported for cross join.")
result_block = left._block.merge(
right._block,
left_join_ids=[],
right_join_ids=[],
suffixes=("", ""),
suffixes=(lsuffix, rsuffix),
how="cross",
sort=True,
)
return DataFrame(result_block)

# Join left columns with right index
if on is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This if block is getting pretty long. Might be time for a helper function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added _join_on_key function.

if left._has_index and (on in left.index.names):
if on in left.columns:
raise ValueError(
f"'{on}' is both an index level and a column label, which is ambiguous."
)
else:
raise NotImplementedError(
f"Joining on index level '{on}' is not yet supported. {constants.FEEDBACK_LINK}"
)
if (left.columns == on).sum() > 1:
raise ValueError(f"The column label '{on}' is not unique.")

if other._block.index.nlevels != 1:
raise ValueError(
"Join on columns must match the index level of the other DataFrame. Join on column with multi-index haven't been supported."
)
# Switch left index with on column
left_columns = left.columns
left_idx_original_names = left.index.names if left._has_index else ()
left_idx_names_in_cols = [
f"bigframes_left_idx_name_{i}"
for i in range(len(left_idx_original_names))
]
if left._has_index:
left.index.names = left_idx_names_in_cols
left = left.reset_index(drop=False)
left = left.set_index(on)

# Join on index and switch back
combined_df = left._perform_join_by_index(right, how=how)
combined_df.index.name = on
combined_df = combined_df.reset_index(drop=False)
combined_df = combined_df.set_index(left_idx_names_in_cols)

# To be consistent with Pandas
if combined_df._has_index:
combined_df.index.names = (
left_idx_original_names
if how in ("inner", "left")
else ([None] * len(combined_df.index.names))
)

# Reorder columns
combined_df = combined_df[list(left_columns) + list(right.columns)]
return combined_df
return self._join_on_key(
other,
on=on,
how=how,
lsuffix=lsuffix,
rsuffix=rsuffix,
should_duplicate_on_key=(on in col_intersection),
)

# Join left index with right index
if left._block.index.nlevels != right._block.index.nlevels:
raise ValueError("Index to join on must have the same number of levels.")

return left._perform_join_by_index(right, how=how)
return left._perform_join_by_index(right, how=how)._add_join_suffix(
left.columns, right.columns, lsuffix=lsuffix, rsuffix=rsuffix
)

def _join_on_key(
self,
other: DataFrame,
on: str,
how: str,
lsuffix: str,
rsuffix: str,
should_duplicate_on_key: bool,
) -> DataFrame:
left, right = self, other
# Replace all columns names with unique names for reordering.
left_col_original_names = left.columns
on_col_name = "bigframes_left_col_on"
dup_on_col_name = "bigframes_left_col_on_dup"
left_col_temp_names = [
f"bigframes_left_col_name_{i}" if col_name != on else on_col_name
for i, col_name in enumerate(left_col_original_names)
]
left.columns = pandas.Index(left_col_temp_names)
# if on column is also in right df, we need to duplicate the column
# and set it to be the first column
if should_duplicate_on_key:
left[dup_on_col_name] = left[on_col_name]
on_col_name = dup_on_col_name
left_col_temp_names = [on_col_name] + left_col_temp_names
left = left[left_col_temp_names]

# Switch left index with on column
left_idx_original_names = left.index.names if left._has_index else ()
left_idx_names_in_cols = [
f"bigframes_left_idx_name_{i}" for i in range(len(left_idx_original_names))
]
if left._has_index:
left.index.names = left_idx_names_in_cols
left = left.reset_index(drop=False)
left = left.set_index(on_col_name)

right_col_original_names = right.columns
right_col_temp_names = [
f"bigframes_right_col_name_{i}"
for i in range(len(right_col_original_names))
]
right.columns = pandas.Index(right_col_temp_names)

# Join on index and switch back
combined_df = left._perform_join_by_index(right, how=how)
combined_df.index.name = on_col_name
combined_df = combined_df.reset_index(drop=False)
combined_df = combined_df.set_index(left_idx_names_in_cols)

# To be consistent with Pandas
if combined_df._has_index:
combined_df.index.names = (
left_idx_original_names
if how in ("inner", "left")
else ([None] * len(combined_df.index.names))
)

# Reorder columns
combined_df = combined_df[left_col_temp_names + right_col_temp_names]
return combined_df._add_join_suffix(
left_col_original_names,
right_col_original_names,
lsuffix=lsuffix,
rsuffix=rsuffix,
extra_col=on if on_col_name == dup_on_col_name else None,
)

def _perform_join_by_index(
self,
Expand All @@ -3562,6 +3630,30 @@ def _perform_join_by_index(
)
return DataFrame(block)

def _add_join_suffix(
self,
left_columns,
right_columns,
lsuffix: str = "",
rsuffix: str = "",
extra_col: typing.Optional[str] = None,
):
col_intersection = left_columns.intersection(right_columns)
final_col_names = [] if extra_col is None else [extra_col]
for col_name in left_columns:
if col_name in col_intersection:
final_col_names.append(f"{col_name}{lsuffix}")
else:
final_col_names.append(col_name)

for col_name in right_columns:
if col_name in col_intersection:
final_col_names.append(f"{col_name}{rsuffix}")
else:
final_col_names.append(col_name)
self.columns = pandas.Index(final_col_names)
return self

@validations.requires_ordering()
def rolling(
self,
Expand Down
99 changes: 93 additions & 6 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2899,12 +2899,99 @@ def test_join_different_table(
assert_pandas_df_equal(bf_result, pd_result, ignore_order=True)


def test_join_duplicate_columns_raises_not_implemented(scalars_dfs):
scalars_df, _ = scalars_dfs
df_a = scalars_df[["string_col", "float64_col"]]
df_b = scalars_df[["float64_col"]]
with pytest.raises(NotImplementedError):
df_a.join(df_b, how="outer").to_pandas()
@all_joins
def test_join_different_table_with_duplicate_column_name(
scalars_df_index, scalars_pandas_df_index, how
):
bf_df_a = scalars_df_index[["string_col", "int64_col", "int64_too"]].rename(
columns={"int64_too": "int64_col"}
)
bf_df_b = scalars_df_index.dropna()[
["string_col", "int64_col", "int64_too"]
].rename(columns={"int64_too": "int64_col"})
bf_result = bf_df_a.join(bf_df_b, how=how, lsuffix="_l", rsuffix="_r").to_pandas()
pd_df_a = scalars_pandas_df_index[["string_col", "int64_col", "int64_too"]].rename(
columns={"int64_too": "int64_col"}
)
pd_df_b = scalars_pandas_df_index.dropna()[
["string_col", "int64_col", "int64_too"]
].rename(columns={"int64_too": "int64_col"})
pd_result = pd_df_a.join(pd_df_b, how=how, lsuffix="_l", rsuffix="_r")

pd.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False)


@all_joins
def test_join_param_on_with_duplicate_column_name_not_on_col(
scalars_df_index, scalars_pandas_df_index, how
):
# This test is for duplicate column names, but the 'on' column is not duplicated.
if how == "cross":
return
bf_df_a = scalars_df_index[
["string_col", "datetime_col", "timestamp_col", "int64_too"]
].rename(columns={"timestamp_col": "datetime_col"})
bf_df_b = scalars_df_index.dropna()[
["string_col", "datetime_col", "timestamp_col"]
].rename(columns={"timestamp_col": "datetime_col"})
bf_result = bf_df_a.join(
bf_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r"
).to_pandas()
pd_df_a = scalars_pandas_df_index[
["string_col", "datetime_col", "timestamp_col", "int64_too"]
].rename(columns={"timestamp_col": "datetime_col"})
pd_df_b = scalars_pandas_df_index.dropna()[
["string_col", "datetime_col", "timestamp_col"]
].rename(columns={"timestamp_col": "datetime_col"})
pd_result = pd_df_a.join(
pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r"
)
pd.testing.assert_frame_equal(
bf_result.sort_index(),
pd_result.sort_index(),
check_like=True,
check_index_type=False,
check_names=False,
)
pd.testing.assert_index_equal(bf_result.columns, pd_result.columns)


@pytest.mark.skipif(
pandas.__version__.startswith("1."), reason="bad left join in pandas 1.x"
)
@all_joins
def test_join_param_on_with_duplicate_column_name_on_col(
scalars_df_index, scalars_pandas_df_index, how
):
# This test is for duplicate column names, and the 'on' column is duplicated.
if how == "cross":
return
bf_df_a = scalars_df_index[
["string_col", "datetime_col", "timestamp_col", "int64_too"]
].rename(columns={"timestamp_col": "datetime_col"})
bf_df_b = scalars_df_index.dropna()[
["string_col", "datetime_col", "timestamp_col", "int64_too"]
].rename(columns={"timestamp_col": "datetime_col"})
bf_result = bf_df_a.join(
bf_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r"
).to_pandas()
pd_df_a = scalars_pandas_df_index[
["string_col", "datetime_col", "timestamp_col", "int64_too"]
].rename(columns={"timestamp_col": "datetime_col"})
pd_df_b = scalars_pandas_df_index.dropna()[
["string_col", "datetime_col", "timestamp_col", "int64_too"]
].rename(columns={"timestamp_col": "datetime_col"})
pd_result = pd_df_a.join(
pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r"
)
pd.testing.assert_frame_equal(
bf_result.sort_index(),
pd_result.sort_index(),
check_like=True,
check_index_type=False,
check_names=False,
)
pd.testing.assert_index_equal(bf_result.columns, pd_result.columns)


@all_joins
Expand Down
36 changes: 32 additions & 4 deletions tests/unit/test_dataframe_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -2444,12 +2444,40 @@ def test_join_different_table(
assert_pandas_df_equal(bf_result, pd_result, ignore_order=True)


def test_join_duplicate_columns_raises_not_implemented(scalars_dfs):
@all_joins
def test_join_raise_when_param_on_duplicate_with_column(scalars_df_index, how):
if how == "cross":
return
bf_df_a = scalars_df_index[["string_col", "int64_col"]].rename(
columns={"int64_col": "string_col"}
)
bf_df_b = scalars_df_index.dropna()["string_col"]
with pytest.raises(
ValueError, match="The column label 'string_col' is not unique."
):
bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r")


def test_join_duplicate_columns_raises_value_error(scalars_dfs):
scalars_df, _ = scalars_dfs
df_a = scalars_df[["string_col", "float64_col"]]
df_b = scalars_df[["float64_col"]]
with pytest.raises(NotImplementedError):
df_a.join(df_b, how="outer").to_pandas()
with pytest.raises(ValueError, match="columns overlap but no suffix specified"):
df_a.join(df_b, how="outer")


@all_joins
def test_join_param_on_duplicate_with_index_raises_value_error(scalars_df_index, how):
if how == "cross":
return
Comment on lines +2471 to +2472
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be worth added a test that ValueError is not raise for this condition with a cross join.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cross join actually raise another error, match added.

bf_df_a = scalars_df_index[["string_col"]]
bf_df_a.index.name = "string_col"
bf_df_b = scalars_df_index.dropna()["string_col"]
with pytest.raises(
ValueError,
match="'string_col' is both an index level and a column label, which is ambiguous.",
):
bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r")


@all_joins
Expand All @@ -2461,7 +2489,7 @@ def test_join_param_on(scalars_dfs, how):
bf_df_b = bf_df[["float64_col"]]

if how == "cross":
with pytest.raises(ValueError):
with pytest.raises(ValueError, match="'on' is not supported for cross join."):
bf_df_a.join(bf_df_b, on="rowindex_2", how=how)
else:
bf_result = bf_df_a.join(bf_df_b, on="rowindex_2", how=how).to_pandas()
Expand Down
Loading