Skip to content

feat: add update and align methods to dataframe #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,3 +504,75 @@ def _kurt_from_moments_and_count(
kurt_id, na_cond_id, ops.partial_arg3(ops.where_op, None)
)
return block, kurt_id


def align(
left_block: blocks.Block,
right_block: blocks.Block,
join: str = "outer",
axis: typing.Union[str, int, None] = None,
) -> typing.Tuple[blocks.Block, blocks.Block]:
axis_n = core.utils.get_axis_number(axis) if axis is not None else None
# Must align columns first as other way will likely create extra joins
if (axis_n is None) or axis_n == 1:
left_block, right_block = align_columns(left_block, right_block, join=join)
if (axis_n is None) or axis_n == 0:
left_block, right_block = align_rows(left_block, right_block, join=join)
return left_block, right_block


def align_rows(
left_block: blocks.Block,
right_block: blocks.Block,
join: str = "outer",
):
joined_index, (get_column_left, get_column_right) = left_block.index.join(
right_block.index, how=join
)
left_columns = [get_column_left(col) for col in left_block.value_columns]
right_columns = [get_column_right(col) for col in right_block.value_columns]

left_block = joined_index._block.select_columns(left_columns)
right_block = joined_index._block.select_columns(right_columns)
return left_block, right_block


def align_columns(
left_block: blocks.Block,
right_block: blocks.Block,
join: str = "outer",
):
columns, lcol_indexer, rcol_indexer = left_block.column_labels.join(
right_block.column_labels, how=join, return_indexers=True
)
column_indices = zip(
lcol_indexer if (lcol_indexer is not None) else range(len(columns)),
rcol_indexer if (rcol_indexer is not None) else range(len(columns)),
)
left_column_ids = []
right_column_ids = []

original_left_block = left_block
original_right_block = right_block

for left_index, right_index in column_indices:
if left_index >= 0:
left_col_id = original_left_block.value_columns[left_index]
else:
dtype = right_block.dtypes[right_index]
left_block, left_col_id = left_block.create_constant(
None, dtype=dtype, label=original_right_block.column_labels[right_index]
)
left_column_ids.append(left_col_id)

if right_index >= 0:
right_col_id = original_right_block.value_columns[right_index]
else:
dtype = original_left_block.dtypes[left_index]
right_block, right_col_id = right_block.create_constant(
None, dtype=dtype, label=left_block.column_labels[left_index]
)
right_column_ids.append(right_col_id)
left_final = left_block.select_columns(left_column_ids)
right_final = right_block.select_columns(right_column_ids)
return left_final, right_final
108 changes: 66 additions & 42 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,55 @@ def rpow(

__rpow__ = rpow

def align(
self,
other: typing.Union[DataFrame, bigframes.series.Series],
join: str = "outer",
axis: typing.Union[str, int, None] = None,
) -> typing.Tuple[
typing.Union[DataFrame, bigframes.series.Series],
typing.Union[DataFrame, bigframes.series.Series],
]:
axis_n = utils.get_axis_number(axis) if axis else None
if axis_n == 1 and isinstance(other, bigframes.series.Series):
raise NotImplementedError(
f"align with series and axis=1 not supported. {constants.FEEDBACK_LINK}"
)
left_block, right_block = block_ops.align(
self._block, other._block, join=join, axis=axis
)
return DataFrame(left_block), other.__class__(right_block)

def update(self, other, join: str = "left", overwrite=True, filter_func=None):
other = other if isinstance(other, DataFrame) else DataFrame(other)
if join != "left":
raise ValueError("Only 'left' join supported for update")

if filter_func is not None: # Will always take other if possible

def update_func(
left: bigframes.series.Series, right: bigframes.series.Series
) -> bigframes.series.Series:
return left.mask(right.notna() & filter_func(left), right)

elif overwrite:

def update_func(
left: bigframes.series.Series, right: bigframes.series.Series
) -> bigframes.series.Series:
return left.mask(right.notna(), right)

else:

def update_func(
left: bigframes.series.Series, right: bigframes.series.Series
) -> bigframes.series.Series:
return left.mask(left.isna(), right)

result = self.combine(other, update_func, how=join)

self._set_block(result._block)

def combine(
self,
other: DataFrame,
Expand All @@ -753,56 +802,31 @@ def combine(
],
fill_value=None,
overwrite: bool = True,
*,
how: str = "outer",
) -> DataFrame:
# Join rows
joined_index, (get_column_left, get_column_right) = self._block.index.join(
other._block.index, how="outer"
)
columns, lcol_indexer, rcol_indexer = self.columns.join(
other.columns, how="outer", return_indexers=True
)
l_aligned, r_aligned = block_ops.align(self._block, other._block, join=how)

column_indices = zip(
lcol_indexer if (lcol_indexer is not None) else range(len(columns)),
rcol_indexer if (lcol_indexer is not None) else range(len(columns)),
other_missing_labels = self._block.column_labels.difference(
other._block.column_labels
)

block = joined_index._block
l_frame = DataFrame(l_aligned)
r_frame = DataFrame(r_aligned)
results = []
for left_index, right_index in column_indices:
if left_index >= 0 and right_index >= 0: # -1 indices indicate missing
left_col_id = get_column_left(self._block.value_columns[left_index])
right_col_id = get_column_right(other._block.value_columns[right_index])
left_series = bigframes.series.Series(block.select_column(left_col_id))
right_series = bigframes.series.Series(
block.select_column(right_col_id)
)
for (label, lseries), (_, rseries) in zip(l_frame.items(), r_frame.items()):
if not ((label in other_missing_labels) and not overwrite):
if fill_value is not None:
left_series = left_series.fillna(fill_value)
right_series = right_series.fillna(fill_value)
results.append(func(left_series, right_series))
elif left_index >= 0:
# Does not exist in other
if overwrite:
dtype = self.dtypes[left_index]
block, null_col_id = block.create_constant(None, dtype=dtype)
result = bigframes.series.Series(block.select_column(null_col_id))
results.append(result)
result = func(
lseries.fillna(fill_value), rseries.fillna(fill_value)
)
else:
left_col_id = get_column_left(self._block.value_columns[left_index])
result = bigframes.series.Series(block.select_column(left_col_id))
if fill_value is not None:
result = result.fillna(fill_value)
results.append(result)
elif right_index >= 0:
right_col_id = get_column_right(other._block.value_columns[right_index])
result = bigframes.series.Series(block.select_column(right_col_id))
if fill_value is not None:
result = result.fillna(fill_value)
results.append(result)
result = func(lseries, rseries)
else:
# Should not be possible
raise ValueError("No right or left index.")
result = (
lseries.fillna(fill_value) if fill_value is not None else lseries
)
results.append(result)

if all([isinstance(val, bigframes.series.Series) for val in results]):
import bigframes.core.reshape as rs
Expand Down
76 changes: 71 additions & 5 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,77 @@ def test_combine(
pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False)


@pytest.mark.parametrize(
("overwrite", "filter_func"),
[
(True, None),
(False, None),
(True, lambda x: x.isna() | (x % 2 == 0)),
],
ids=[
"default",
"overwritefalse",
"customfilter",
],
)
def test_df_update(overwrite, filter_func):
if pd.__version__.startswith("1."):
pytest.skip("dtype handled differently in pandas 1.x.")
index1 = pandas.Index([1, 2, 3, 4], dtype="Int64")
index2 = pandas.Index([1, 2, 4, 5], dtype="Int64")
pd_df1 = pandas.DataFrame(
{"a": [1, None, 3, 4], "b": [5, 6, None, 8]}, dtype="Int64", index=index1
)
pd_df2 = pandas.DataFrame(
{"a": [None, 20, 30, 40], "c": [90, None, 110, 120]},
dtype="Int64",
index=index2,
)

bf_df1 = dataframe.DataFrame(pd_df1)
bf_df2 = dataframe.DataFrame(pd_df2)

bf_df1.update(bf_df2, overwrite=overwrite, filter_func=filter_func)
pd_df1.update(pd_df2, overwrite=overwrite, filter_func=filter_func)

pd.testing.assert_frame_equal(bf_df1.to_pandas(), pd_df1)


@pytest.mark.parametrize(
("join", "axis"),
[
("outer", None),
("outer", 0),
("outer", 1),
("left", 0),
("right", 1),
("inner", None),
("inner", 1),
],
)
def test_df_align(join, axis):
index1 = pandas.Index([1, 2, 3, 4], dtype="Int64")
index2 = pandas.Index([1, 2, 4, 5], dtype="Int64")
pd_df1 = pandas.DataFrame(
{"a": [1, None, 3, 4], "b": [5, 6, None, 8]}, dtype="Int64", index=index1
)
pd_df2 = pandas.DataFrame(
{"a": [None, 20, 30, 40], "c": [90, None, 110, 120]},
dtype="Int64",
index=index2,
)

bf_df1 = dataframe.DataFrame(pd_df1)
bf_df2 = dataframe.DataFrame(pd_df2)

bf_result1, bf_result2 = bf_df1.align(bf_df2, join=join, axis=axis)
pd_result1, pd_result2 = pd_df1.align(pd_df2, join=join, axis=axis)

# Don't check dtype as pandas does unnecessary float conversion
pd.testing.assert_frame_equal(bf_result1.to_pandas(), pd_result1, check_dtype=False)
pd.testing.assert_frame_equal(bf_result2.to_pandas(), pd_result2, check_dtype=False)


def test_combine_first(
scalars_df_index,
scalars_df_2_index,
Expand All @@ -1232,11 +1303,6 @@ def test_combine_first(
pd_df_b.columns = ["b", "a", "d"]
pd_result = pd_df_a.combine_first(pd_df_b)

print("pandas")
print(pd_result.to_string())
print("bigframes")
print(bf_result.to_string())

# Some dtype inconsistency for all-NULL columns
pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False)

Expand Down
62 changes: 62 additions & 0 deletions third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,35 @@ def drop(
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def align(
self,
other,
join="outer",
axis=None,
) -> tuple:
"""
Align two objects on their axes with the specified join method.

Join method is specified for each axis Index.

Args:
other (DataFrame or Series):
join ({{'outer', 'inner', 'left', 'right'}}, default 'outer'):
Type of alignment to be performed.
left: use only keys from left frame, preserve key order.
right: use only keys from right frame, preserve key order.
outer: use union of keys from both frames, sort keys lexicographically.
inner: use intersection of keys from both frames,
preserve the order of the left keys.

axis (allowed axis of the other object, default None):
Align on index (0), columns (1), or both (None).

Returns:
tuple of (DataFrame, type of other): Aligned objects.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def rename(
self,
*,
Expand Down Expand Up @@ -1265,6 +1294,39 @@ def combine_first(self, other) -> DataFrame:
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def update(
self, other, join: str = "left", overwrite: bool = True, filter_func=None
) -> DataFrame:
"""
Modify in place using non-NA values from another DataFrame.

Aligns on indices. There is no return value.

Args:
other (DataFrame, or object coercible into a DataFrame):
Should have at least one matching index/column label
with the original DataFrame. If a Series is passed,
its name attribute must be set, and that will be
used as the column name to align with the original DataFrame.
join ({'left'}, default 'left'):
Only left join is implemented, keeping the index and columns of the
original object.
overwrite (bool, default True):
How to handle non-NA values for overlapping keys:
True: overwrite original DataFrame's values
with values from `other`.
False: only update values that are NA in
the original DataFrame.

filter_func (callable(1d-array) -> bool 1d-array, optional):
Can choose to replace values other than NA. Return True for values
that should be updated.

Returns:
None: This method directly changes calling object.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

# ----------------------------------------------------------------------
# Data reshaping

Expand Down