Skip to content

Commit

Permalink
[DataFrame] Fix equals and make it more efficient (ray-project#2186)
Browse files Browse the repository at this point in the history
* Fixing equals

* Adding test fix

* Working on fix for equals and drop

* Fix equals and fix tests to use ray.dataframe.equals

* Addressing comments
  • Loading branch information
devin-petersohn authored and pschafhalter committed Jun 4, 2018
1 parent a5d888e commit b56c8ed
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 45 deletions.
85 changes: 43 additions & 42 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import warnings
import numpy as np
from numpy.testing import assert_equal
import ray
import itertools
import io
Expand Down Expand Up @@ -286,7 +287,7 @@ def back(df, n):
for _ in range(len(self.index))])
col_dots.index = self.index
col_dots.name = "..."
x = pd.concat([front, col_dots, back], axis=1)
x = pd.concat([front, col_dots, back], axis=1, copy=False)

# If less than 60 rows, x is already in the correct format.
if len(self._row_metadata) < 60:
Expand Down Expand Up @@ -473,7 +474,8 @@ def dtypes(self):
if isinstance(self._dtypes_cache, list) and \
isinstance(self._dtypes_cache[0],
ray.ObjectID):
self._dtypes_cache = pd.concat(ray.get(self._dtypes_cache))
self._dtypes_cache = pd.concat(ray.get(self._dtypes_cache),
copy=False)
self._dtypes_cache.index = self.columns

return self._dtypes_cache
Expand Down Expand Up @@ -1050,7 +1052,7 @@ def agg_helper(df, arg, index, columns, *args, **kwargs):
# be returned immediately
is_series = ray.get(is_series)
if all(is_series):
new_series = pd.concat(ray.get(new_parts))
new_series = pd.concat(ray.get(new_parts), copy=False)
new_series.index = self.columns if axis == 0 else self.index
return new_series
# This error is thrown when some of the partitions return Series and
Expand Down Expand Up @@ -1201,7 +1203,7 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
else [func[key]]),
self._col_partitions[part])
for (part, ind), key in part_ind_tuples]
return pd.concat(ray.get(result), axis=1)
return pd.concat(ray.get(result), axis=1, copy=False)
else:
result = [_deploy_func.remote(
lambda df: df.iloc[:, ind].apply(func[key]),
Expand Down Expand Up @@ -1599,17 +1601,12 @@ def drop(self, labels=None, axis=0, index=None, columns=None, level=None,
"""Return new object with labels in requested axis removed.
Args:
labels: Index or column labels to drop.
axis: Whether to drop labels from the index (0 / 'index') or
columns (1 / 'columns').
index, columns: Alternative to specifying axis (labels, axis=1 is
equivalent to columns=labels).
level: For MultiIndex
inplace: If True, do operation inplace and return None.
errors: If 'ignore', suppress error and existing labels are
dropped.
Returns:
Expand Down Expand Up @@ -1757,33 +1754,22 @@ def equals(self, other):
Returns:
Boolean: True if equal, otherwise False
"""
# TODO(kunalgosar): Implement Copartition and use to implement equals
def helper(df, index, other_series):
return df.iloc[index['index_within_partition']] \
.equals(other_series)

results = []
other_partition = None
other_df = None
# TODO: Make the appropriate coord df accessor methods for this fxn
for i, idx in other._row_metadata._coord_df.iterrows():
if idx['partition'] != other_partition:
other_df = ray.get(other._row_partitions[idx['partition']])
other_partition = idx['partition']
# TODO: group series here into full df partitions to reduce
# the number of remote calls to helper
other_series = other_df.iloc[idx['index_within_partition']]
curr_index = self._row_metadata._coord_df.loc[i]
curr_df = self._row_partitions[int(curr_index['partition'])]
results.append(_deploy_func.remote(helper,
curr_df,
curr_index,
other_series))

for r in results:
if not ray.get(r):
return False
return True

if not self.index.equals(other.index) or not \
self.columns.equals(other.columns):
return False

# We copartition because we don't know what the DataFrames look like
# before this. Empty partitions can give problems with
# _match_partitioning (See _match_partitioning)
new_zipped_parts = self._copartition(other, self.index)

equals_partitions = [_equals_helper.remote(left, right)
for left, right in new_zipped_parts]

# To avoid getting all we use next notation.
return next((False for eq in equals_partitions if not ray.get(eq)),
True)

def eval(self, expr, inplace=False, **kwargs):
"""Evaluate a Python expression as a string using various backends.
Expand Down Expand Up @@ -1848,7 +1834,7 @@ def eval_helper(df):
result_type = ray.get(_deploy_func.remote(lambda df: type(df),
new_rows[0]))
if result_type is pd.Series:
new_series = pd.concat(ray.get(new_rows), axis=0)
new_series = pd.concat(ray.get(new_rows), axis=0, copy=False)
new_series.index = self.index
return new_series

Expand Down Expand Up @@ -4104,7 +4090,8 @@ def sort_values(self, by, axis=0, ascending=True, inplace=False,
for row, idx in index_builder:
row.index = [str(idx)]

broadcast_values = pd.concat([row for row, idx in index_builder])
broadcast_values = pd.concat([row for row, idx in index_builder],
copy=False)

# We are converting the by to string here so that we don't have a
# collision with the RangeIndex on the inner frame. It is cheap and
Expand Down Expand Up @@ -5365,20 +5352,20 @@ def _merge_columns(left_columns, right_columns, *args):
def _where_helper(left, cond, other, left_columns, cond_columns,
other_columns, *args):

left = pd.concat(ray.get(left.tolist()), axis=1)
left = pd.concat(ray.get(left.tolist()), axis=1, copy=False)
# We have to reset the index and columns here because we are coming
# from blocks and the axes are set according to the blocks. We have
# already correctly copartitioned everything, so there's no
# correctness problems with doing this.
left.reset_index(inplace=True, drop=True)
left.columns = left_columns

cond = pd.concat(ray.get(cond.tolist()), axis=1)
cond = pd.concat(ray.get(cond.tolist()), axis=1, copy=False)
cond.reset_index(inplace=True, drop=True)
cond.columns = cond_columns

if isinstance(other, np.ndarray):
other = pd.concat(ray.get(other.tolist()), axis=1)
other = pd.concat(ray.get(other.tolist()), axis=1, copy=False)
other.reset_index(inplace=True, drop=True)
other.columns = other_columns

Expand All @@ -5388,7 +5375,7 @@ def _where_helper(left, cond, other, left_columns, cond_columns,
@ray.remote
def reindex_helper(old_index, new_index, axis, npartitions, method, fill_value,
limit, tolerance, *df):
df = pd.concat(df, axis=axis ^ 1)
df = pd.concat(df, axis=axis ^ 1, copy=False)
if axis == 1:
df.index = old_index
else:
Expand All @@ -5398,3 +5385,17 @@ def reindex_helper(old_index, new_index, axis, npartitions, method, fill_value,
method=method, fill_value=fill_value,
limit=limit, tolerance=tolerance)
return create_blocks_helper(df, npartitions, axis)


@ray.remote
def _equals_helper(left, right):
right = pd.concat(ray.get(right.tolist()), axis=1, copy=False)
left = pd.concat(ray.get(left.tolist()), axis=1, copy=False)
# Since we know that the index and columns match, we can just check the
# values. We can't use np.array_equal here because it doesn't recognize
# np.nan as equal to another np.nan
try:
assert_equal(left.values, right.values)
except AssertionError:
return False
return True
6 changes: 3 additions & 3 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ def ray_df_equals_pandas(ray_df, pandas_df):


@pytest.fixture
def ray_series_equals_pandas(ray_df, pandas_df):
return ray_df.equals(pandas_df)
def ray_series_equals_pandas(ray_series, pandas_series):
return ray_series.equals(pandas_series)


@pytest.fixture
def ray_df_equals(ray_df1, ray_df2):
return to_pandas(ray_df1).equals(to_pandas(ray_df2))
return ray_df1.equals(ray_df2)


@pytest.fixture
Expand Down
10 changes: 10 additions & 0 deletions python/ray/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,16 @@ def _co_op_helper(func, left_columns, right_columns, left_df_len, left_idx,
def _match_partitioning(column_partition, lengths, index):
"""Match the number of rows on each partition. Used in df.merge().
NOTE: This function can cause problems when there are empty column
partitions.
The way this function is intended to be used is as follows: Align the
right partitioning with the left. The left will remain unchanged. Then,
you are free to perform actions on a per-partition basis with the
partitioning.
The index objects must already be identical for this to work correctly.
Args:
column_partition: The column partition to change.
lengths: The lengths of each row partition to match to.
Expand Down

0 comments on commit b56c8ed

Please sign in to comment.