Skip to content

Commit

Permalink
[DataFrame] Implements df.update (ray-project#1997)
Browse files Browse the repository at this point in the history
* Working on fixing update

* Fixing update implementation

* Adding test

* Addressing comments
  • Loading branch information
SaladRaider authored and devin-petersohn committed May 7, 2018
1 parent 12da021 commit 1f82a46
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 21 deletions.
65 changes: 47 additions & 18 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4415,9 +4415,34 @@ def unstack(self, level=-1, fill_value=None):

def update(self, other, join='left', overwrite=True, filter_func=None,
raise_conflict=False):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Modify DataFrame in place using non-NA values from other.
Args:
other: DataFrame, or object coercible into a DataFrame
join: {'left'}, default 'left'
overwrite: If True then overwrite values for common keys in frame
filter_func: Can choose to replace values other than NA.
raise_conflict: If True, will raise an error if the DataFrame and
other both contain data in the same place.
Returns:
None
"""
if raise_conflict:
raise NotImplementedError(
"raise_conflict parameter not yet supported. "
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

if not isinstance(other, DataFrame):
other = DataFrame(other)

def update_helper(x, y):
x.update(y, join, overwrite, filter_func, False)
return x

self._inter_df_op_helper(update_helper, other, join, 0, None,
inplace=True)

def var(self, axis=None, skipna=None, level=None, ddof=1,
numeric_only=None, **kwargs):
Expand Down Expand Up @@ -4971,36 +4996,40 @@ def _operator_helper(self, func, other, axis, level, *args):
if isinstance(other, DataFrame):
return self._inter_df_op_helper(
lambda x, y: func(x, y, axis, level, *args),
other, axis, level)
other, "outer", axis, level)
else:
return self._single_df_op_helper(
lambda df: func(df, other, axis, level, *args),
other, axis, level)

def _inter_df_op_helper(self, func, other, axis, level):
def _inter_df_op_helper(self, func, other, how, axis, level,
inplace=False):
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")
axis = pd.DataFrame()._get_axis_number(axis)

# Adding two DataFrames causes an outer join.
if isinstance(other, DataFrame):
new_column_index = self.columns.join(other.columns, how="outer")
new_index = self.index.join(other.index, how="outer")
copartitions = self._copartition(other, new_index)

new_blocks = \
np.array([_co_op_helper._submit(
args=tuple([func, self.columns, other.columns,
len(part[0]), None] +
np.concatenate(part).tolist()),
num_return_vals=len(part[0]))
for part in copartitions])
new_column_index = self.columns.join(other.columns, how=how)
new_index = self.index.join(other.index, how=how)
copartitions = self._copartition(other, new_index)

new_blocks = \
np.array([_co_op_helper._submit(
args=tuple([func, self.columns, other.columns,
len(part[0]), None] +
np.concatenate(part).tolist()),
num_return_vals=len(part[0]))
for part in copartitions])

if not inplace:
# TODO join the Index Metadata objects together for performance.
return DataFrame(block_partitions=new_blocks,
columns=new_column_index,
index=new_index)
else:
self._update_inplace(block_partitions=new_blocks,
columns=new_column_index,
index=new_index)

def _single_df_op_helper(self, func, other, axis, level):
if level is not None:
Expand Down
17 changes: 14 additions & 3 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3030,10 +3030,21 @@ def test_unstack():


def test_update():
ray_df = create_test_dataframe()
df = rdf.DataFrame([[1.5, np.nan, 3.],
[1.5, np.nan, 3.],
[1.5, np.nan, 3],
[1.5, np.nan, 3]])

with pytest.raises(NotImplementedError):
ray_df.update(None)
other = rdf.DataFrame([[3.6, 2., np.nan],
[np.nan, np.nan, 7]], index=[1, 3])

df.update(other)

expected = rdf.DataFrame([[1.5, np.nan, 3],
[3.6, 2, 3],
[1.5, np.nan, 3],
[1.5, np.nan, 7.]])
assert ray_df_equals(df, expected)


@pytest.fixture
Expand Down

0 comments on commit 1f82a46

Please sign in to comment.