Skip to content

[DataFrame] Implement to_csv #2014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/ray/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
test, qcut, match, Panel, date_range, Index, MultiIndex,
CategoricalIndex, Series, bdate_range, DatetimeIndex,
Timedelta, Timestamp, to_timedelta, set_eng_float_format,
set_option, NaT)
set_option, NaT, PeriodIndex, Categorical)
import threading

pd_version = pd.__version__
Expand Down Expand Up @@ -49,7 +49,7 @@ def get_npartitions():
"match", "to_datetime", "get_dummies", "Panel", "date_range", "Index",
"MultiIndex", "Series", "bdate_range", "DatetimeIndex", "to_timedelta",
"set_eng_float_format", "set_option", "CategoricalIndex", "Timedelta",
"Timestamp", "NaT"
"Timestamp", "NaT", "PeriodIndex", "Categorical"
]

try:
Expand Down
110 changes: 89 additions & 21 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
data (numpy ndarray (structured or homogeneous) or dict):
Dict can contain Series, arrays, constants, or list-like
objects.
index (pandas.Index or list): The row index for this dataframe.
index (pandas.Index, list, ObjectID): The row index for this
dataframe.
columns (pandas.Index): The column names for this dataframe, in
pandas Index object.
dtype: Data type to force. Only a single dtype is allowed.
Expand Down Expand Up @@ -1845,7 +1846,8 @@ def eval_helper(df):
columns = columns_copy.columns

if inplace:
self._update_inplace(row_partitions=new_rows, columns=columns)
self._update_inplace(row_partitions=new_rows, columns=columns,
index=self.index)
else:
return DataFrame(columns=columns, row_partitions=new_rows)

Expand Down Expand Up @@ -2330,18 +2332,31 @@ def insert(self, loc, column, value, allow_duplicates=False):
# Deploy insert function to specific column partition, and replace that
# column
def insert_col_part(df):
df.insert(index_within_partition, column, value, allow_duplicates)
if isinstance(value, pd.Series) and \
isinstance(value.dtype,
pd.core.dtypes.dtypes.DatetimeTZDtype):
# Need to set index to index of this dtype or inserted values
# become NaT
df.index = value
df.insert(index_within_partition, column,
value, allow_duplicates)
df.index = pd.RangeIndex(0, len(df))
else:
df.insert(index_within_partition, column,
value, allow_duplicates)
return df

new_obj = _deploy_func.remote(insert_col_part,
self._col_partitions[partition])

new_cols = [self._col_partitions[i]
if i != partition
else new_obj
for i in range(len(self._col_partitions))]
new_col_names = self.columns.insert(loc, column)

self._update_inplace(col_partitions=new_cols, columns=new_col_names)
self._update_inplace(col_partitions=new_cols, columns=new_col_names,
index=self.index)

def interpolate(self, method='linear', axis=0, limit=None, inplace=False,
limit_direction='forward', downcast=None, **kwargs):
Expand Down Expand Up @@ -3242,7 +3257,7 @@ def query_helper(df):
self._row_partitions)

if inplace:
self._update_inplace(row_partitions=new_rows)
self._update_inplace(row_partitions=new_rows, index=self.index)
else:
return DataFrame(row_partitions=new_rows,
col_metadata=self._col_metadata)
Expand Down Expand Up @@ -4201,23 +4216,72 @@ def to_clipboard(self, excel=None, sep=None, **kwargs):
port_frame = to_pandas(self)
port_frame.to_clipboard(excel, sep, **kwargs)

def to_csv(self, path_or_buf=None, sep=',', na_rep='', float_format=None,
def to_csv(self, path_or_buf=None, sep=",", na_rep="", float_format=None,
columns=None, header=True, index=True, index_label=None,
mode='w', encoding=None, compression=None, quoting=None,
quotechar='"', line_terminator='\n', chunksize=None,
mode="w", encoding=None, compression=None, quoting=None,
quotechar='"', line_terminator="\n", chunksize=None,
tupleize_cols=None, date_format=None, doublequote=True,
escapechar=None, decimal='.'):
escapechar=None, decimal="."):

kwargs = dict(
path_or_buf=path_or_buf, sep=sep, na_rep=na_rep,
float_format=float_format, columns=columns, header=header,
index=index, index_label=index_label, mode=mode,
encoding=encoding, compression=compression, quoting=quoting,
quotechar=quotechar, line_terminator=line_terminator,
chunksize=chunksize, tupleize_cols=tupleize_cols,
date_format=date_format, doublequote=doublequote,
escapechar=escapechar, decimal=decimal
)

if compression is not None:
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
return to_pandas(self).to_csv(**kwargs)

if tupleize_cols is not None:
warnings.warn("The 'tupleize_cols' parameter is deprecated and "
"will be removed in a future version",
FutureWarning, stacklevel=2)
else:
tupleize_cols = False

warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
remote_kwargs_id = ray.put(dict(kwargs, path_or_buf=None))
columns_id = ray.put(self.columns)

port_frame = to_pandas(self)
port_frame.to_csv(path_or_buf, sep, na_rep, float_format,
columns, header, index, index_label,
mode, encoding, compression, quoting,
quotechar, line_terminator, chunksize,
tupleize_cols, date_format, doublequote,
escapechar, decimal)
def get_csv_str(df, index, columns, header, kwargs):
df.index = index
df.columns = columns
kwargs["header"] = header
return df.to_csv(**kwargs)

idxs = [0] + np.cumsum(self._row_metadata._lengths).tolist()
idx_args = [self.index[idxs[i]:idxs[i+1]]
for i in range(len(self._row_partitions))]
csv_str_ids = _map_partitions(
get_csv_str, self._row_partitions, idx_args,
[columns_id] * len(self._row_partitions),
[header] + [False] * (len(self._row_partitions) - 1),
[remote_kwargs_id] * len(self._row_partitions))

if path_or_buf is None:
buf = io.StringIO()
elif isinstance(path_or_buf, str):
buf = open(path_or_buf, mode)
else:
buf = path_or_buf

for csv_str_id in csv_str_ids:
buf.write(ray.get(csv_str_id))
buf.flush()

result = None
if path_or_buf is None:
result = buf.getvalue()
buf.close()
elif isinstance(path_or_buf, str):
buf.close()
return result

def to_dense(self):
raise NotImplementedError(
Expand Down Expand Up @@ -4668,9 +4732,13 @@ def _getitem_array(self, key):
index=index)
else:
columns = self._col_metadata[key].index
indices_for_rows = \
[i for i, item in enumerate(self.columns)
if item in set(columns)]
column_indices = {item: i for i, item in enumerate(self.columns)}
indices_for_rows = [column_indices[column] for column in columns]

def get_columns_partition(df):
result = df.__getitem__(indices_for_rows),
result.columns = pd.RangeIndex(0, len(result.columns))
return result

new_parts = [_deploy_func.remote(
lambda df: df.__getitem__(indices_for_rows),
Expand Down
5 changes: 4 additions & 1 deletion python/ray/dataframe/index_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ def insert(self, key, loc=None, partition=None,
# Determine which partition to place it in, and where in that partition
if loc is not None:
cum_lens = np.cumsum(self._lengths)
partition = np.digitize(loc, cum_lens[:-1])
if len(cum_lens) > 1:
partition = np.digitize(loc, cum_lens[:-1], right=True)
else:
partition = 0
if partition >= len(cum_lens):
if loc > cum_lens[-1]:
raise IndexError("index {0} is out of bounds".format(loc))
Expand Down
79 changes: 64 additions & 15 deletions python/ray/dataframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from pyarrow.parquet import ParquetFile
import pandas as pd
from pandas.io.common import _infer_compression # don't depend on internal API


from .dataframe import ray, DataFrame
from . import get_npartitions
Expand Down Expand Up @@ -82,21 +84,24 @@ def _split_df(pd_df, chunksize):


# CSV
def _compute_offset(fn, npartitions):
def _compute_offset(fn, npartitions, ignore_first_line=False):
"""
Calculate the currect bytes offsets for a csv file.
Return a list of (start, end) tuple where the end == \n or EOF.
"""
total_bytes = os.path.getsize(fn)
chunksize = total_bytes // npartitions
bio = open(fn, 'rb')
if ignore_first_line:
start = len(bio.readline())
chunksize = (total_bytes - start) // npartitions
else:
start = 0
chunksize = total_bytes // npartitions
if chunksize == 0:
chunksize = 1

bio = open(fn, 'rb')

offsets = []
start = 0
while start <= total_bytes:
while start < total_bytes:
bio.seek(chunksize, 1) # Move forward {chunksize} bytes
extend_line = bio.readline() # Move after the next \n
total_offset = chunksize + len(extend_line)
Expand All @@ -121,15 +126,26 @@ def _infer_column(first_line, kwargs={}):


@ray.remote
def _read_csv_with_offset(fn, start, end, header=b'', kwargs={}):
def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''):
kwargs["quoting"] = int(kwargs["quoting"]) # See issue #2078

bio = open(fn, 'rb')
bio.seek(start)
to_read = header + bio.read(end - start)
bio.close()
return pd.read_csv(BytesIO(to_read), **kwargs)
pd_df = pd.read_csv(BytesIO(to_read), **kwargs)
index = pd_df.index
# Partitions must have RangeIndex
pd_df.index = pd.RangeIndex(0, len(pd_df))
return pd_df, index


@ray.remote
def get_index(*partition_indices):
return partition_indices[0].append(partition_indices[1:])


def read_csv(filepath,
def read_csv(filepath_or_buffer,
sep=',',
delimiter=None,
header='infer',
Expand Down Expand Up @@ -247,21 +263,54 @@ def read_csv(filepath,
memory_map=memory_map,
float_precision=float_precision)

offsets = _compute_offset(filepath, get_npartitions())
# Default to Pandas read_csv for non-serializable objects
if not isinstance(filepath_or_buffer, str) or \
_infer_compression(filepath_or_buffer, compression) is not None:

warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)

pd_obj = pd.read_csv(filepath_or_buffer, **kwargs)
if isinstance(pd_obj, pd.DataFrame):
return from_pandas(pd_obj, get_npartitions())

return pd_obj

filepath = filepath_or_buffer

# TODO: handle case where header is a list of lines
first_line = _get_firstline(filepath)
columns = _infer_column(first_line, kwargs=kwargs)
if header is None or (header == "infer" and names is not None):
first_line = b""
ignore_first_line = False
else:
ignore_first_line = True

offsets = _compute_offset(filepath, get_npartitions(),
ignore_first_line=ignore_first_line)

# Serialize objects to speed up later use in remote tasks
first_line_id = ray.put(first_line)
kwargs_id = ray.put(kwargs)

df_obj_ids = []
index_obj_ids = []
for start, end in offsets:
if start != 0:
df = _read_csv_with_offset.remote(
filepath, start, end, header=first_line, kwargs=kwargs)
df, index = _read_csv_with_offset._submit(
args=(filepath, start, end, kwargs_id, first_line_id),
num_return_vals=2)
else:
df = _read_csv_with_offset.remote(
filepath, start, end, kwargs=kwargs)
df, index = _read_csv_with_offset._submit(
args=(filepath, start, end, kwargs_id),
num_return_vals=2)
df_obj_ids.append(df)
return DataFrame(row_partitions=df_obj_ids, columns=columns)
index_obj_ids.append(index)

index = get_index.remote(*index_obj_ids) if index_col is not None else None

return DataFrame(row_partitions=df_obj_ids, columns=columns, index=index)


def read_json(path_or_buf=None,
Expand Down
4 changes: 2 additions & 2 deletions python/ray/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ def _map_partitions(func, partitions, *argslists):
for part in partitions]
else:
assert(all([len(args) == len(partitions) for args in argslists]))
return [_deploy_func.remote(func, part, *args)
for part, args in zip(partitions, *argslists)]
return [_deploy_func.remote(func, *args)
for args in zip(partitions, *argslists)]


@ray.remote
Expand Down