Skip to content

Commit b07ec3c

Browse files
Rewriting the partitioning
Adding some partitioning updates Updating remote Continuing backend rewrite progress Adding idxmax/min Adding head/tail/repr for data_manager Fixing transpose Updating remote with bugfixes for transpose/repr Fixing a number of operations Fix quantile Adding more functionality, __getitem__ Fixing more tests Fixing drop, passing tests Add insert to new structure Cleaning up unneeded imports Updating remote Fix minor bug Updating remote Add sort_index Minor refactor of code. Cleaning some up Continuing logic migration Add some docs Adding more docs Add more method level documentation. Adding documentation and cleaning up Retructuring partitioning files for simplicity Adding more docs, cleaning up docs Fix performance bug, more cleanup
1 parent ae9f397 commit b07ec3c

File tree

13 files changed

+2264
-1558
lines changed

13 files changed

+2264
-1558
lines changed

modin/data_management/__init__.py

Whitespace-only changes.

modin/data_management/data_manager.py

Lines changed: 863 additions & 0 deletions
Large diffs are not rendered by default.

modin/data_management/partitioning/__init__.py

Whitespace-only changes.
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
from __future__ import absolute_import
2+
from __future__ import division
3+
from __future__ import print_function
4+
5+
import pandas
6+
import ray
7+
8+
from .remote_partition import RayRemotePartition
9+
10+
11+
class AxisPartition(object):
12+
"""This abstract class represents the Parent class for any
13+
`ColumnPartition` or `RowPartition` class. This class is intended to
14+
simplify the way that operations are performed
15+
16+
Note 0: The procedures that use this class and its methods assume that
17+
they have some global knowledge about the entire axis. This may
18+
require the implementation to use concatenation or append on the
19+
list of block partitions in this object.
20+
21+
Note 1: The `BlockPartitions` object that controls these objects
22+
(through the API exposed here) has an invariant that requires that
23+
this object is never returned from a function. It assumes that
24+
there will always be `RemotePartition` object stored and structures
25+
itself accordingly.
26+
27+
The only abstract method needed to implement is the `apply` method.
28+
"""
29+
def apply(self, func, num_splits=None, other_axis_partition=None, **kwargs):
30+
"""Applies a function to a full axis.
31+
32+
Note: The procedures that invoke this method assume full axis
33+
knowledge. Implement this method accordingly.
34+
35+
Important: You must return a list of `RemotePartition` objects from
36+
this method. See Note 1 for this class above for more information.
37+
38+
Args:
39+
func: The function to apply. This will be preprocessed according to
40+
the corresponding `RemotePartitions` object.
41+
num_splits: The number of objects to return, the number of splits
42+
for the resulting object. It is up to this method to choose the
43+
splitting at this time.
44+
other_axis_partition: Another `AxisPartition` object to be applied
45+
to func. This is for operations that are between datasets.
46+
47+
Returns:
48+
A list of `RemotePartition` objects.
49+
"""
50+
raise NotImplementedError("Must be implemented in children classes")
51+
52+
53+
class RayAxisPartition(AxisPartition):
54+
55+
def __init__(self, list_of_blocks):
56+
# Unwrap from RemotePartition object for ease of use
57+
self.list_of_blocks = [obj.oid for obj in list_of_blocks]
58+
59+
def apply(self, func, num_splits=None, other_axis_partition=None, **kwargs):
60+
"""Applies func to the object in the plasma store.
61+
62+
See notes in Parent class about this method.
63+
64+
Args:
65+
func: The function to apply.
66+
num_splits: The number of times to split the result object.
67+
other_axis_partition: Another `RayAxisPartition` object to apply to
68+
func with this one.
69+
70+
Returns:
71+
A list of `RayRemotePartition` objects.
72+
"""
73+
if num_splits is None:
74+
num_splits = len(self.list_of_blocks)
75+
76+
if other_axis_partition is not None:
77+
return [RayRemotePartition(obj) for obj in deploy_ray_func_between_two_axis_partitions._submit(args=(self.axis, func, num_splits, len(self.list_of_blocks), kwargs) + tuple(self.list_of_blocks + other_axis_partition.list_of_blocks), num_return_vals=num_splits)]
78+
79+
return [RayRemotePartition(obj) for obj in deploy_ray_axis_func._submit(args=(self.axis, func, num_splits, kwargs, *self.list_of_blocks), num_return_vals=num_splits)]
80+
81+
82+
class RayColumnPartition(RayAxisPartition):
83+
"""The column partition implementation for Ray. All of the implementation
84+
for this class is in the parent class, and this class defines the axis
85+
to perform the computation over.
86+
"""
87+
axis = 0
88+
89+
90+
class RayRowPartition(RayAxisPartition):
91+
"""The row partition implementation for Ray. All of the implementation
92+
for this class is in the parent class, and this class defines the axis
93+
to perform the computation over.
94+
"""
95+
axis = 1
96+
97+
98+
def split_result_of_axis_func_pandas(axis, num_splits, result):
99+
"""Split the Pandas result evenly based on the provided number of splits.
100+
101+
Args:
102+
axis: The axis to split across.
103+
num_splits: The number of even splits to create.
104+
result: The result of the computation. This should be a Pandas
105+
DataFrame.
106+
107+
Returns:
108+
A list of Pandas DataFrames.
109+
"""
110+
# We do this to restore block partitioning
111+
if axis == 0 or type(result) is pandas.Series:
112+
# We do this to avoid zeros and having an extremely large last partition
113+
chunksize = len(result) // num_splits if len(result) % num_splits == 0 else len(result) // num_splits + 1
114+
return [result.iloc[chunksize * i: chunksize * (i + 1)] for i in range(num_splits)]
115+
else:
116+
# See note above about avoiding zeros.
117+
chunksize = len(result.columns) // num_splits if len(result.columns) % num_splits == 0 else len(result.columns) // num_splits + 1
118+
return [result.iloc[:, chunksize * i: chunksize * (i + 1)] for i in range(num_splits)]
119+
120+
121+
@ray.remote
122+
def deploy_ray_axis_func(axis, func, num_splits, kwargs, *partitions):
123+
"""Deploy a function along a full axis in Ray.
124+
125+
Args:
126+
axis: The axis to perform the function along.
127+
func: The function to perform.
128+
num_splits: The number of splits to return
129+
(see `split_result_of_axis_func_pandas`)
130+
kwargs: A dictionary of keyword arguments.
131+
partitions: All partitions that make up the full axis (row or column)
132+
133+
Returns:
134+
A list of Pandas DataFrames.
135+
"""
136+
dataframe = pandas.concat(partitions, axis=axis, copy=False)
137+
result = func(dataframe, **kwargs)
138+
return split_result_of_axis_func_pandas(axis, num_splits, result)
139+
140+
141+
@ray.remote
142+
def deploy_ray_func_between_two_axis_partitions(axis, func, num_splits, len_of_left, kwargs, *partitions):
143+
"""Deploy a function along a full axis between two data sets in Ray.
144+
145+
Args:
146+
axis: The axis to perform the function along.
147+
func: The function to perform.
148+
num_splits: The number of splits to return
149+
(see `split_result_of_axis_func_pandas`).
150+
len_of_left: The number of values in `partitions` that belong to the
151+
left data set.
152+
kwargs: A dictionary of keyword arguments.
153+
partitions: All partitions that make up the full axis (row or column)
154+
for both data sets.
155+
156+
Returns:
157+
A list of Pandas DataFrames.
158+
"""
159+
lt_frame = pandas.concat(list(partitions[:len_of_left]), axis=axis, copy=False)
160+
rt_frame = pandas.concat(list(partitions[len_of_left:]), axis=axis, copy=False)
161+
162+
result = func(lt_frame, rt_frame, **kwargs)
163+
return split_result_of_axis_func_pandas(axis, num_splits, result)

0 commit comments

Comments
 (0)