forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' of https://github.com/ray-project/ray
* 'master' of https://github.com/ray-project/ray: [rllib] Fix broken link in docs (ray-project#1967) [DataFrame] Sample implement (ray-project#1954) [DataFrame] Implement Inter-DataFrame operations (ray-project#1937) remove UniqueIDHasher (ray-project#1957) [rllib] Add DDPG documentation, rename DDPG2 <=> DDPG (ray-project#1946) updates (ray-project#1958) Pin Cython in autoscaler development example. (ray-project#1951) Incorporate C++ Buffer management and Seal global threadpool fix from arrow (ray-project#1950) [XRay] Add consistency check for protocol between node_manager and local_scheduler_client (ray-project#1944) Remove smart_open install. (ray-project#1943) [DataFrame] Fully implement append, concat and join (ray-project#1932) [DataFrame] Fix for __getitem__ string indexing (ray-project#1939) [DataFrame] Implementing write methods (ray-project#1918) [rllib] arr[end] was excluded when end is not None (ray-project#1931) [DataFrame] Implementing API correct groupby with aggregation methods (ray-project#1914)
- Loading branch information
Showing
84 changed files
with
3,577 additions
and
2,210 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,90 +1,133 @@ | ||
import pandas as pd | ||
from __future__ import absolute_import | ||
from __future__ import division | ||
from __future__ import print_function | ||
|
||
import pandas | ||
import numpy as np | ||
from .dataframe import DataFrame as rdf | ||
from .utils import ( | ||
from_pandas, | ||
_deploy_func) | ||
from functools import reduce | ||
from .dataframe import DataFrame | ||
from .utils import _reindex_helper | ||
|
||
|
||
def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False, | ||
keys=None, levels=None, names=None, verify_integrity=False, | ||
copy=True): | ||
|
||
def _concat(frame1, frame2): | ||
# Check type on objects | ||
# Case 1: Both are Pandas DF | ||
if isinstance(frame1, pd.DataFrame) and \ | ||
isinstance(frame2, pd.DataFrame): | ||
|
||
return pd.concat((frame1, frame2), axis, join, join_axes, | ||
if keys is not None: | ||
objs = [objs[k] for k in keys] | ||
else: | ||
objs = list(objs) | ||
|
||
if len(objs) == 0: | ||
raise ValueError("No objects to concatenate") | ||
|
||
objs = [obj for obj in objs if obj is not None] | ||
|
||
if len(objs) == 0: | ||
raise ValueError("All objects passed were None") | ||
|
||
try: | ||
type_check = next(obj for obj in objs | ||
if not isinstance(obj, (pandas.Series, | ||
pandas.DataFrame, | ||
DataFrame))) | ||
except StopIteration: | ||
type_check = None | ||
if type_check is not None: | ||
raise ValueError("cannot concatenate object of type \"{0}\"; only " | ||
"pandas.Series, pandas.DataFrame, " | ||
"and ray.dataframe.DataFrame objs are " | ||
"valid", type(type_check)) | ||
|
||
all_series = all([isinstance(obj, pandas.Series) | ||
for obj in objs]) | ||
if all_series: | ||
return pandas.concat(objs, axis, join, join_axes, | ||
ignore_index, keys, levels, names, | ||
verify_integrity, copy) | ||
|
||
if not (isinstance(frame1, rdf) and | ||
isinstance(frame2, rdf)) and join == 'inner': | ||
raise NotImplementedError( | ||
"Obj as dicts not implemented. To contribute to " | ||
"Pandas on Ray, please visit github.com/ray-project/ray." | ||
) | ||
|
||
# Case 2: Both are different types | ||
if isinstance(frame1, pd.DataFrame): | ||
frame1 = from_pandas(frame1, len(frame1) / 2**16 + 1) | ||
if isinstance(frame2, pd.DataFrame): | ||
frame2 = from_pandas(frame2, len(frame2) / 2**16 + 1) | ||
|
||
# Case 3: Both are Ray DF | ||
if isinstance(frame1, rdf) and \ | ||
isinstance(frame2, rdf): | ||
|
||
new_columns = frame1.columns.join(frame2.columns, how=join) | ||
|
||
def _reindex_helper(pdf, old_columns, join): | ||
pdf.columns = old_columns | ||
if join == 'outer': | ||
pdf = pdf.reindex(columns=new_columns) | ||
else: | ||
pdf = pdf[new_columns] | ||
pdf.columns = pd.RangeIndex(len(new_columns)) | ||
|
||
return pdf | ||
|
||
f1_columns, f2_columns = frame1.columns, frame2.columns | ||
new_f1 = [_deploy_func.remote(lambda p: _reindex_helper(p, | ||
f1_columns, join), part) for | ||
part in frame1._row_partitions] | ||
new_f2 = [_deploy_func.remote(lambda p: _reindex_helper(p, | ||
f2_columns, join), part) for | ||
part in frame2._row_partitions] | ||
|
||
return rdf(row_partitions=new_f1 + new_f2, columns=new_columns, | ||
index=frame1.index.append(frame2.index)) | ||
|
||
# (TODO) Group all the pandas dataframes | ||
|
||
if isinstance(objs, dict): | ||
raise NotImplementedError( | ||
"Obj as dicts not implemented. To contribute to " | ||
"Pandas on Ray, please visit github.com/ray-project/ray." | ||
) | ||
"Pandas on Ray, please visit github.com/ray-project/ray.") | ||
|
||
axis = pd.DataFrame()._get_axis_number(axis) | ||
if axis == 1: | ||
raise NotImplementedError( | ||
"Concat not implemented for axis=1. To contribute to " | ||
"Pandas on Ray, please visit github.com/ray-project/ray." | ||
) | ||
|
||
all_pd = np.all([isinstance(obj, pd.DataFrame) for obj in objs]) | ||
if all_pd: | ||
result = pd.concat(objs, axis, join, join_axes, | ||
ignore_index, keys, levels, names, | ||
verify_integrity, copy) | ||
else: | ||
result = reduce(_concat, objs) | ||
axis = pandas.DataFrame()._get_axis_number(axis) | ||
|
||
if isinstance(result, pd.DataFrame): | ||
return from_pandas(result, len(result) / 2**16 + 1) | ||
if join not in ['inner', 'outer']: | ||
raise ValueError("Only can inner (intersect) or outer (union) join the" | ||
" other axis") | ||
|
||
return result | ||
# We need this in a list because we use it later. | ||
all_index, all_columns = list(zip(*[(obj.index, obj.columns) | ||
for obj in objs])) | ||
|
||
def series_to_df(series, columns): | ||
df = pandas.DataFrame(series) | ||
df.columns = columns | ||
return DataFrame(df) | ||
|
||
# Pandas puts all of the Series in a single column named 0. This is | ||
# true regardless of the existence of another column named 0 in the | ||
# concat. | ||
if axis == 0: | ||
objs = [series_to_df(obj, [0]) | ||
if isinstance(obj, pandas.Series) else obj for obj in objs] | ||
else: | ||
# Pandas starts the count at 0 so this will increment the names as | ||
# long as there's a new nameless Series being added. | ||
def name_incrementer(i): | ||
val = i[0] | ||
i[0] += 1 | ||
return val | ||
|
||
i = [0] | ||
objs = [series_to_df(obj, obj.name if obj.name is not None | ||
else name_incrementer(i)) | ||
if isinstance(obj, pandas.Series) else obj for obj in objs] | ||
|
||
# Using concat on the columns and index is fast because they're empty, | ||
# and it forces the error checking. It also puts the columns in the | ||
# correct order for us. | ||
final_index = \ | ||
pandas.concat([pandas.DataFrame(index=idx) for idx in all_index], | ||
axis=axis, join=join, join_axes=join_axes, | ||
ignore_index=ignore_index, keys=keys, levels=levels, | ||
names=names, verify_integrity=verify_integrity, | ||
copy=False).index | ||
final_columns = \ | ||
pandas.concat([pandas.DataFrame(columns=col) | ||
for col in all_columns], | ||
axis=axis, join=join, join_axes=join_axes, | ||
ignore_index=ignore_index, keys=keys, levels=levels, | ||
names=names, verify_integrity=verify_integrity, | ||
copy=False).columns | ||
|
||
# Put all of the DataFrames into Ray format | ||
# TODO just partition the DataFrames instead of building a new Ray DF. | ||
objs = [DataFrame(obj) if isinstance(obj, (pandas.DataFrame, | ||
pandas.Series)) else obj | ||
for obj in objs] | ||
|
||
# Here we reuse all_columns/index so we don't have to materialize objects | ||
# from remote memory built in the previous line. In the future, we won't be | ||
# building new DataFrames, rather just partitioning the DataFrames. | ||
if axis == 0: | ||
new_blocks = np.array([_reindex_helper._submit( | ||
args=tuple([all_columns[i], final_columns, axis, | ||
len(objs[0]._block_partitions)] + part.tolist()), | ||
num_return_vals=len(objs[0]._block_partitions)) | ||
for i in range(len(objs)) | ||
for part in objs[i]._block_partitions]) | ||
else: | ||
# Transposing the columns is necessary because the remote task treats | ||
# everything like rows and returns in row-major format. Luckily, this | ||
# operation is cheap in numpy. | ||
new_blocks = np.array([_reindex_helper._submit( | ||
args=tuple([all_index[i], final_index, axis, | ||
len(objs[0]._block_partitions.T)] + part.tolist()), | ||
num_return_vals=len(objs[0]._block_partitions.T)) | ||
for i in range(len(objs)) | ||
for part in objs[i]._block_partitions.T]).T | ||
|
||
return DataFrame(block_partitions=new_blocks, | ||
columns=final_columns, | ||
index=final_index) |
Oops, something went wrong.