Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 97 additions & 40 deletions modin/data_management/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,12 @@ def _join_index_objects(self, axis, other_index, how, sort=True):
return self.index.join(other_index, how=how, sort=sort)

def concat(self, axis, other, **kwargs):
ignore_index = kwargs.get("ignore_index", default=False)
if axis == 0:
if isinstance(other, list):
return self._append_list_of_managers(other, kwargs["ignore_index"])
return self._append_list_of_managers(other, ignore_index)
else:
return self._append_data_manager(other, kwargs["ignore_index"])
return self._append_data_manager(other, ignore_index)
else:
if isinstance(other, list):
return self._join_list_of_managers(other, **kwargs)
Expand Down Expand Up @@ -180,9 +181,17 @@ def _append_list_of_managers(self, others, ignore_index):
return cls(new_data, new_index, joined_columns)

def _join_data_manager(self, other, **kwargs):
assert isinstance(other, type(self)), \
"This method is for data manager objects only"
cls = type(self)

joined_index = self._join_index_objects(1, other.index, kwargs["how"], sort=kwargs["sort"])
# Uses join's default value (though should not revert to default)
how = kwargs.get("how", default="left")
sort = kwargs.get("sort", default=False)
lsuffix = kwargs.get("lsuffix", default="")
rsuffix = kwargs.get("rsuffix", default="")

joined_index = self._join_index_objects(1, other.index, how, sort=sort)

to_join = other.reindex(0, joined_index).data
new_self = self.reindex(0, joined_index).data
Expand All @@ -193,14 +202,30 @@ def _join_data_manager(self, other, **kwargs):
# suffixes.
self_proxy = pandas.DataFrame(columns=self.columns)
other_proxy = pandas.DataFrame(columns=other.columns)
new_columns = self_proxy.join(other_proxy, lsuffix=kwargs["lsuffix"], rsuffix=kwargs["rsuffix"]).columns
new_columns = self_proxy.join(other_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns

return cls(new_data, joined_index, new_columns)

def _join_list_of_managers(self, others, **kwargs):
assert isinstance(others, list), \
"This method is for lists of DataManager objects only"
assert all(isinstance(other, type(self)) for other in others), \
"Different Manager objects are being used. This is not allowed"
cls = type(self)

# Uses join's default value (though should not revert to default)
how = kwargs.get("how", default="left")
sort = kwargs.get("sort", default=False)
lsuffix = kwargs.get("lsuffix", default="")
rsuffix = kwargs.get("rsuffix", default="")

assert isinstance(others, list), \
"This method is for lists of DataManager objects only"
assert all(isinstance(other, type(self)) for other in others), \
"Different Manager objects are being used. This is not allowed"
cls = type(self)

joined_index = self._join_index_objects(1, [other.index for other in others], kwargs["how"], sort=kwargs["sort"])
joined_index = self._join_index_objects(1, [other.index for other in others], how, sort=sort)

to_join = [other.reindex(0, joined_index).data for other in others]
new_self = self.reindex(0, joined_index).data
Expand All @@ -211,7 +236,7 @@ def _join_list_of_managers(self, others, **kwargs):
# suffixes.
self_proxy = pandas.DataFrame(columns=self.columns)
others_proxy = [pandas.DataFrame(columns=other.columns) for other in others]
new_columns = self_proxy.join(others_proxy, lsuffix=kwargs["lsuffix"], rsuffix=kwargs["rsuffix"]).columns
new_columns = self_proxy.join(others_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns

return cls(new_data, joined_index, new_columns)
# END Append/Concat/Join (Not Merge)
Expand All @@ -231,7 +256,7 @@ def inter_manager_operations(self, other, how_to_join, func):
new_columns = self._join_index_objects(0, other.columns, how_to_join, sort=False)

reindexed_other = other.reindex(0, joined_index).data
reindexed_self = other.reindex(0, joined_index).data
reindexed_self = self.reindex(0, joined_index).data

# THere is an interesting serialization anomaly that happens if we do
# not use the columns in `inter_data_op_builder` from here (e.g. if we
Expand Down Expand Up @@ -297,7 +322,7 @@ def reindex_builer(df, axis, old_labels, new_labels, **kwargs):

def reset_index(self, **kwargs):
cls = type(self)
drop = kwargs["drop"]
drop = kwargs.get("drop", default=False)
new_index = pandas.RangeIndex(len(self.index))

if not drop:
Expand Down Expand Up @@ -356,31 +381,41 @@ def full_reduce(self, axis, map_func, reduce_func=None):
return result

def count(self, **kwargs):
axis = kwargs.get("axis", default=0)
map_func = self._prepare_method(pandas.DataFrame.count, **kwargs)
reduce_func = self._prepare_method(pandas.DataFrame.sum, **kwargs)
return self.full_reduce(kwargs["axis"], map_func, reduce_func)
return self.full_reduce(axis, map_func, reduce_func)

def max(self, **kwargs):
# Pandas default is 0 (though not mentioned in docs)
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.max, **kwargs)
return self.full_reduce(kwargs["axis"], func)
return self.full_reduce(axis, func)

def mean(self, **kwargs):
axis = kwargs["axis"]
# Pandas default is 0 (though not mentioned in docs)
axis = kwargs.get("axis", default=0)
length = len(self.index) if not axis else len(self.columns)

return self.sum(**kwargs) / length

def min(self, **kwargs):
# Pandas default is 0 (though not mentioned in docs)
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.min, **kwargs)
return self.full_reduce(kwargs["axis"], func)
return self.full_reduce(axis, func)

def prod(self, **kwargs):
# Pandas default is 0 (though not mentioned in docs)
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.prod, **kwargs)
return self.full_reduce(kwargs["axis"], func)
return self.full_reduce(axis, func)

def sum(self, **kwargs):
# Pandas default is 0 (though not mentioned in docs)
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.sum, **kwargs)
return self.full_reduce(kwargs["axis"], func)
return self.full_reduce(axis, func)
# END Full Reduce operations

# Map partitions operations
Expand Down Expand Up @@ -448,12 +483,14 @@ def _post_process_idx_ops(self, axis, intermediate_result):
return result

def all(self, **kwargs):
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.all, **kwargs)
return self.full_axis_reduce(func, kwargs["axis"])
return self.full_axis_reduce(func, axis)

def any(self, **kwargs):
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.any, **kwargs)
return self.full_axis_reduce(func, kwargs["axis"])
return self.full_axis_reduce(func, axis)

def idxmax(self, **kwargs):

Expand All @@ -463,11 +500,12 @@ def idxmax_builder(df, **kwargs):
df.index = pandas.RangeIndex(len(df.index))
return df.idxmax(**kwargs)

axis = kwargs.get("axis", default=0)
func = self._prepare_method(idxmax_builder, **kwargs)
max_result = self.full_axis_reduce(func, kwargs["axis"])
max_result = self.full_axis_reduce(func, axis)
# Because our internal partitions don't track the external index, we
# have to do a conversion.
return self._post_process_idx_ops(kwargs["axis"], max_result)
return self._post_process_idx_ops(axis, max_result)

def idxmin(self, **kwargs):

Expand All @@ -477,11 +515,12 @@ def idxmin_builder(df, **kwargs):
df.index = pandas.RangeIndex(len(df.index))
return df.idxmin(**kwargs)

axis = kwargs.get("axis", default=0)
func = self._prepare_method(idxmin_builder, **kwargs)
min_result = self.full_axis_reduce(func, kwargs["axis"])
min_result = self.full_axis_reduce(func, axis)
# Because our internal partitions don't track the external index, we
# have to do a conversion.
return self._post_process_idx_ops(kwargs["axis"], min_result)
return self._post_process_idx_ops(axis, min_result)

def first_valid_index(self):

Expand Down Expand Up @@ -512,30 +551,43 @@ def last_valid_index_builder(df):
return self.index[first_result.max()]

def median(self, **kwargs):
# Pandas default is 0 (though not mentioned in docs)
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.median, **kwargs)
return self.full_axis_reduce(func, kwargs["axis"])
return self.full_axis_reduce(func, axis)

def nunique(self, **kwargs):
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.nunique, **kwargs)
return self.full_axis_reduce(func, kwargs["axis"])
return self.full_axis_reduce(func, axis)

def skew(self, **kwargs):
# Pandas default is 0 (though not mentioned in docs)
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.skew, **kwargs)
return self.full_axis_reduce(func, kwargs["axis"])
return self.full_axis_reduce(func, axis)

def std(self, **kwargs):
# Pandas default is 0 (though not mentioned in docs)
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.std, **kwargs)
return self.full_axis_reduce(func, kwargs["axis"])
return self.full_axis_reduce(func, axis)

def var(self, **kwargs):
# Pandas default is 0 (though not mentioned in docs)
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.var, **kwargs)
return self.full_axis_reduce(func, kwargs["axis"])
return self.full_axis_reduce(func, axis)

def quantile_for_single_value(self, **kwargs):
axis = kwargs.get("axis", default=0)
q = kwargs.get("q", default=0.5)
assert type(q) is float

func = self._prepare_method(pandas.DataFrame.quantile, **kwargs)

result = self.full_axis_reduce(func, kwargs["axis"])
result.name = kwargs["q"]
result = self.full_axis_reduce(func, axis)
result.name = q
return result
# END Column/Row partitions reduce operations

Expand Down Expand Up @@ -566,19 +618,23 @@ def query_builder(df):

def quantile_for_list_of_values(self, **kwargs):
cls = type(self)
q = kwargs["q"]
axis = kwargs.get("axis", default=0)
q = kwargs.get("q", default=0.5)
assert isinstance(q, (pandas.Series, np.ndarray, pandas.Index, list))

func = self._prepare_method(pandas.DataFrame.quantile, **kwargs)

q_index = pandas.Float64Index(q)

new_data = self.map_across_full_axis(kwargs["axis"], func)
new_columns = self.columns if not kwargs["axis"] else self.index
new_data = self.map_across_full_axis(axis, func)
new_columns = self.columns if not axis else self.index
return cls(new_data, q_index, new_columns)

def _cumulative_builder(self, func, **kwargs):
cls = type(self)
axis = kwargs.get("axis", default=0)
func = self._prepare_method(func, **kwargs)
new_data = self.map_across_full_axis(kwargs["axis"], func)
new_data = self.map_across_full_axis(axis, func)
return cls(new_data, self.index, self.columns)

def cumsum(self, **kwargs):
Expand All @@ -594,8 +650,10 @@ def cumprod(self, **kwargs):
return self._cumulative_builder(pandas.DataFrame.cumprod, **kwargs)

def dropna(self, **kwargs):
axis = kwargs["axis"]
subset = kwargs["subset"]
axis = kwargs.get("axis", default=0)
subset = kwargs.get("subset", default=None)
thresh = kwargs.get("thresh", default=None)
how = kwargs.get("how", default="any")
# We need to subset the axis that we care about with `subset`. This
# will be used to determine the number of values that are NA.
if subset is not None:
Expand All @@ -611,13 +669,12 @@ def dropna(self, **kwargs):
# We are building this dictionary first to determine which columns
# and rows to drop. This way we do not drop some columns before we
# know which rows need to be dropped.
if kwargs["thresh"] is not None:
if thresh is not None:
# Count the number of NA values and specify which are higher than
# thresh.
thresh = kwargs["thresh"]
drop_values = {ax ^ 1: compute_na.isna().sum(axis=ax ^ 1) > thresh for ax in axis}
else:
drop_values = {ax ^ 1: getattr(compute_na.isna(), kwargs["how"])(axis=ax ^ 1) for ax in axis}
drop_values = {ax ^ 1: getattr(compute_na.isna(), how)(axis=ax ^ 1) for ax in axis}

if 0 not in drop_values:
drop_values[0] = None
Expand All @@ -636,7 +693,7 @@ def dropna(self, **kwargs):
def mode(self, **kwargs):
cls = type(self)

axis = kwargs["axis"]
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.mode, **kwargs)
new_data = self.map_across_full_axis(axis, func)

Expand All @@ -657,8 +714,8 @@ def mode(self, **kwargs):
def fillna(self, **kwargs):
cls = type(self)

axis = kwargs["axis"]
value = kwargs["value"]
axis = kwargs.get("axis", default=0)
value = kwargs.get("value", default=None)

if isinstance(value, dict):
return
Expand All @@ -681,7 +738,7 @@ def describe(self, **kwargs):
def rank(self, **kwargs):
cls = type(self)

axis = kwargs["axis"]
axis = kwargs.get("axis", default=0)
func = self._prepare_method(pandas.DataFrame.rank, **kwargs)
new_data = self.map_across_full_axis(axis, func)
if axis:
Expand Down
11 changes: 7 additions & 4 deletions modin/data_management/partitioning/partition_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,9 @@ def get_indices(self, axis=0, old_blocks=None):
A Pandas Index object.
"""
if axis == 0:
func = self.preprocess_func(lambda df: df.index)
# We grab the first column of blocks and extract the indices
new_indices = [idx.apply(lambda df: df.index).get() for idx in self.partitions.T[0]]
new_indices = [idx.apply(func).get() for idx in self.partitions.T[0]]
# This is important because sometimes we have resized the data. The new
# sizes will not be valid if we are trying to compute the index on a
# new object that has a different length.
Expand All @@ -369,7 +370,8 @@ def get_indices(self, axis=0, old_blocks=None):
else:
cumulative_block_lengths = np.array(self.block_lengths).cumsum()
else:
new_indices = [idx.apply(lambda df: df.columns).get() for idx in self.partitions[0]]
func = self.preprocess_func(lambda df: df.columns)
new_indices = [idx.apply(func).get() for idx in self.partitions[0]]

if old_blocks is not None:
cumulative_block_lengths = np.array(old_blocks.block_widths).cumsum()
Expand Down Expand Up @@ -545,6 +547,7 @@ def apply_func_to_select_indices_along_full_axis(self, axis, func, indices, keep
indices = [indices]

partitions_dict = self._get_dict_of_block_index(axis, indices)
preprocessed_func = self.preprocess_func(func)

# Since we might be keeping the remaining blocks that are not modified,
# we have to also keep the block_partitions object in the correct
Expand All @@ -558,10 +561,10 @@ def apply_func_to_select_indices_along_full_axis(self, axis, func, indices, keep

if not keep_remaining:
# See notes in `apply_func_to_select_indices`
result = np.array([partitions_for_apply[i].apply(func, internal_indices=partitions_dict[i]) for i in partitions_dict])
result = np.array([partitions_for_apply[i].apply(preprocessed_func, internal_indices=partitions_dict[i]) for i in partitions_dict])
else:
# See notes in `apply_func_to_select_indices`
result = np.array([partitions_for_remaining[i] if i not in partitions_dict else partitions_for_apply[i].apply(func, internal_indices=partitions_dict[i]) for i in range(len(partitions_for_remaining))])
result = np.array([partitions_for_remaining[i] if i not in partitions_dict else partitions_for_apply[i].apply(preprocessed_func, internal_indices=partitions_dict[i]) for i in range(len(partitions_for_remaining))])

return cls(result.T) if not axis else cls(result)

Expand Down
6 changes: 5 additions & 1 deletion modin/data_management/partitioning/remote_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import division
from __future__ import print_function

import pandas
import ray


Expand Down Expand Up @@ -167,7 +168,10 @@ def to_pandas(self):
Returns:
A Pandas DataFrame.
"""
return self.get()
dataframe = self.get()
assert type(dataframe) is pandas.DataFrame()

return dataframe

@classmethod
def put(cls, obj):
Expand Down