Skip to content

Commit

Permalink
[DataFrame] Adding insert, set_axis, set_index, reset_index and tests (
Browse files Browse the repository at this point in the history
  • Loading branch information
devin-petersohn authored and robertnishihara committed Feb 26, 2018
1 parent c2ad800 commit 1fa59f1
Show file tree
Hide file tree
Showing 2 changed files with 384 additions and 30 deletions.
271 changes: 265 additions & 6 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
from __future__ import print_function

import pandas as pd
from pandas.api.types import is_scalar
from pandas.util._validators import validate_bool_kwarg
from pandas.core.index import _ensure_index_from_sequences
from pandas._libs import lib
from pandas.core.dtypes.cast import maybe_upcast_putmask
from pandas.compat import lzip

import warnings
import numpy as np
import ray
import itertools
Expand Down Expand Up @@ -792,7 +800,52 @@ def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
raise NotImplementedError("Not Yet implemented.")

def insert(self, loc, column, value, allow_duplicates=False):
raise NotImplementedError("Not Yet implemented.")
"""Insert column into DataFrame at specified location.
Args:
loc (int): Insertion index. Must verify 0 <= loc <= len(columns).
column (hashable object): Label of the inserted column.
value (int, Series, or array-like): The values to insert.
allow_duplicates (bool): Whether to allow duplicate column names.
"""
try:
len(value)
except TypeError:
value = [value for _ in range(len(self.index))]

if len(value) != len(self.index):
raise ValueError(
"Column length provided does not match DataFrame length.")
if loc < 0 or loc > len(self.columns):
raise ValueError(
"Location provided must be higher than 0 and lower than the "
"number of columns.")
if not allow_duplicates and column in self.columns:
raise ValueError(
"Column {} already exists in DataFrame.".format(column))

cumulative = np.cumsum(self._lengths)
partitions = [value[cumulative[i-1]:cumulative[i]]
for i in range(len(cumulative))
if i != 0]

partitions.insert(0, value[:cumulative[0]])

# Because insert is always inplace, we have to create this temp fn.
def _insert(_df, _loc, _column, _part, _allow_duplicates):
_df.insert(_loc, _column, _part, _allow_duplicates)
return _df

self._df = \
[_deploy_func.remote(_insert,
self._df[i],
loc,
column,
partitions[i],
allow_duplicates)
for i in range(len(self._df))]

self.columns = self.columns.insert(loc, column)

def interpolate(self, method='linear', axis=0, limit=None, inplace=False,
limit_direction='forward', downcast=None, **kwargs):
Expand Down Expand Up @@ -1047,6 +1100,8 @@ def pop(self, item):
popped = to_pandas(self._map_partitions(
lambda df: df.pop(item)))
self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df
self.columns = [col for col in self.columns if col != item]

return popped

def pow(self, other, axis='columns', level=None, fill_value=None):
Expand Down Expand Up @@ -1111,7 +1166,103 @@ def resample(self, rule, how=None, axis=0, fill_method=None, closed=None,

def reset_index(self, level=None, drop=False, inplace=False, col_level=0,
col_fill=''):
raise NotImplementedError("Not Yet implemented.")
"""Reset this index to default and create column from current index.
Args:
level: Only remove the given levels from the index. Removes all
levels by default
drop: Do not try to insert index into dataframe columns. This
resets the index to the default integer index.
inplace: Modify the DataFrame in place (do not create a new object)
col_level : If the columns have multiple levels, determines which
level the labels are inserted into. By default it is inserted
into the first level.
col_fill: If the columns have multiple levels, determines how the
other levels are named. If None then the index name is
repeated.
Returns:
A new DataFrame if inplace is False, None otherwise.
"""
inplace = validate_bool_kwarg(inplace, 'inplace')
if inplace:
new_obj = self
else:
new_obj = self.copy()

def _maybe_casted_values(index, labels=None):
if isinstance(index, pd.PeriodIndex):
values = index.asobject.values
elif isinstance(index, pd.DatetimeIndex) and index.tz is not None:
values = index
else:
values = index.values
if values.dtype == np.object_:
values = lib.maybe_convert_objects(values)

# if we have the labels, extract the values with a mask
if labels is not None:
mask = labels == -1

# we can have situations where the whole mask is -1,
# meaning there is nothing found in labels, so make all nan's
if mask.all():
values = np.empty(len(mask))
values.fill(np.nan)
else:
values = values.take(labels)
if mask.any():
values, changed = maybe_upcast_putmask(
values, mask, np.nan)
return values

new_index = new_obj._default_index().index
if level is not None:
if not isinstance(level, (tuple, list)):
level = [level]
level = [self.index._get_level_number(lev) for lev in level]
if isinstance(self.index, pd.MultiIndex):
if len(level) < self.index.nlevels:
new_index = self.index.droplevel(level)

if not drop:
if isinstance(self.index, pd.MultiIndex):
names = [n if n is not None else ('level_%d' % i)
for (i, n) in enumerate(self.index.names)]
to_insert = lzip(self.index.levels, self.index.labels)
else:
default = 'index' if 'index' not in self else 'level_0'
names = ([default] if self.index.name is None
else [self.index.name])
to_insert = ((self.index, None),)

multi_col = isinstance(self.columns, pd.MultiIndex)
for i, (lev, lab) in reversed(list(enumerate(to_insert))):
if not (level is None or i in level):
continue
name = names[i]
if multi_col:
col_name = (list(name) if isinstance(name, tuple)
else [name])
if col_fill is None:
if len(col_name) not in (1, self.columns.nlevels):
raise ValueError("col_fill=None is incompatible "
"with incomplete column name "
"{}".format(name))
col_fill = col_name[0]

lev_num = self.columns._get_level_number(col_level)
name_lst = [col_fill] * lev_num + col_name
missing = self.columns.nlevels - len(name_lst)
name_lst += [col_fill] * missing
name = tuple(name_lst)
# to ndarray and maybe infer different dtype
level_values = _maybe_casted_values(lev, lab)
new_obj.insert(0, name, level_values)

new_obj.index = new_index
if not inplace:
return new_obj

def rfloordiv(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError("Not Yet implemented.")
Expand Down Expand Up @@ -1155,11 +1306,116 @@ def sem(self, axis=None, skipna=None, level=None, ddof=1,
raise NotImplementedError("Not Yet implemented.")

def set_axis(self, labels, axis=0, inplace=None):
raise NotImplementedError("Not Yet implemented.")
"""Assign desired index to given axis.
Args:
labels (pd.Index or list-like): The Index to assign.
axis (string or int): The axis to reassign.
inplace (bool): Whether to make these modifications inplace.
Returns:
If inplace is False, returns a new DataFrame, otherwise None.
"""
if is_scalar(labels):
warnings.warn(
'set_axis now takes "labels" as first argument, and '
'"axis" as named parameter. The old form, with "axis" as '
'first parameter and \"labels\" as second, is still supported '
'but will be deprecated in a future version of pandas.',
FutureWarning, stacklevel=2)
labels, axis = axis, labels

if inplace is None:
warnings.warn(
'set_axis currently defaults to operating inplace.\nThis '
'will change in a future version of pandas, use '
'inplace=True to avoid this warning.',
FutureWarning, stacklevel=2)
inplace = True
if inplace:
setattr(self, self._index._get_axis_name(axis), labels)
else:
obj = self.copy()
obj.set_axis(labels, axis=axis, inplace=True)
return obj

def set_index(self, keys, drop=True, append=False, inplace=False,
verify_integrity=False):
raise NotImplementedError("Not Yet implemented.")
"""Set the DataFrame index using one or more existing columns.
Args:
keys: column label or list of column labels / arrays.
drop (boolean): Delete columns to be used as the new index.
append (boolean): Whether to append columns to existing index.
inplace (boolean): Modify the DataFrame in place.
verify_integrity (boolean): Check the new index for duplicates.
Otherwise defer the check until necessary. Setting to False
will improve the performance of this method
Returns:
If inplace is set to false returns a new DataFrame, otherwise None.
"""
inplace = validate_bool_kwarg(inplace, 'inplace')
if not isinstance(keys, list):
keys = [keys]

if inplace:
frame = self
else:
frame = self.copy()

arrays = []
names = []
if append:
names = [x for x in self.index.names]
if isinstance(self.index, pd.MultiIndex):
for i in range(self.index.nlevels):
arrays.append(self.index._get_level_values(i))
else:
arrays.append(self.index)

to_remove = []
for col in keys:
if isinstance(col, pd.MultiIndex):
# append all but the last column so we don't have to modify
# the end of this loop
for n in range(col.nlevels - 1):
arrays.append(col._get_level_values(n))

level = col._get_level_values(col.nlevels - 1)
names.extend(col.names)
elif isinstance(col, pd.Series):
level = col._values
names.append(col.name)
elif isinstance(col, pd.Index):
level = col
names.append(col.name)
elif isinstance(col, (list, np.ndarray, pd.Index)):
level = col
names.append(None)
else:
level = frame[col]._values
names.append(col)
if drop:
to_remove.append(col)
arrays.append(level)

index = _ensure_index_from_sequences(arrays, names)

if verify_integrity and not index.is_unique:
duplicates = index.get_duplicates()
raise ValueError('Index has duplicate keys: %s' % duplicates)

for c in to_remove:
del frame[c]

# clear up memory usage
index._cleanup()

frame.index = index

if not inplace:
return frame

def set_value(self, index, col, value, takeable=False):
raise NotImplementedError("Not Yet implemented.")
Expand Down Expand Up @@ -1416,7 +1672,7 @@ def __iter__(self):
raise NotImplementedError("Not Yet implemented.")

def __contains__(self, key):
raise NotImplementedError("Not Yet implemented.")
return key in self.columns

def __nonzero__(self):
raise NotImplementedError("Not Yet implemented.")
Expand Down Expand Up @@ -1715,4 +1971,7 @@ def to_pandas(df):
Returns:
A new pandas DataFrame.
"""
return pd.concat(ray.get(df._df))
pd_df = pd.concat(ray.get(df._df))
pd_df.index = df.index
pd_df.columns = df.columns
return pd_df
Loading

0 comments on commit 1fa59f1

Please sign in to comment.