Skip to content

Commit 7cfc85c

Browse files
FEAT-#2342: Add axis partitions API (#2515)
Signed-off-by: Igoshev, Yaroslav <yaroslav.igoshev@intel.com> Co-authored-by: Devin Petersohn <devin.petersohn@gmail.com>
1 parent 477c5f6 commit 7cfc85c

File tree

8 files changed

+235
-33
lines changed

8 files changed

+235
-33
lines changed

modin/api/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to Modin Development Team under one or more contributor license agreements.
2+
# See the NOTICE file distributed with this work for additional information regarding
3+
# copyright ownership. The Modin Development Team licenses this file to you under the
4+
# Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
# compliance with the License. You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software distributed under
10+
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific language
12+
# governing permissions and limitations under the License.
13+
14+
from .partition_api import unwrap_partitions
15+
16+
__all__ = ["unwrap_partitions"]

modin/api/partition_api.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Licensed to Modin Development Team under one or more contributor license agreements.
2+
# See the NOTICE file distributed with this work for additional information regarding
3+
# copyright ownership. The Modin Development Team licenses this file to you under the
4+
# Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
# compliance with the License. You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software distributed under
10+
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific language
12+
# governing permissions and limitations under the License.
13+
14+
15+
def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
16+
"""
17+
Unwrap partitions of the `api_layer_object`.
18+
19+
Parameters
20+
----------
21+
api_layer_object : DataFrame or Series
22+
The API layer object.
23+
axis : None, 0 or 1. Default is None
24+
The axis to unwrap partitions for (0 - row partitions, 1 - column partitions).
25+
If axis is None, all the partitions of the API layer object are unwrapped.
26+
bind_ip : boolean. Default is False
27+
Whether to bind node ip address to each partition or not.
28+
29+
Returns
30+
-------
31+
list
32+
A list of Ray.ObjectRef/Dask.Future to partitions of the `api_layer_object`
33+
if Ray/Dask is used as an engine.
34+
35+
Notes
36+
-----
37+
In case bind_ip=True, a list containing tuples of Ray.ObjectRef/Dask.Future to node ip addresses
38+
and partitions of the `api_layer_object`, respectively, is returned if Ray/Dask is used as an engine.
39+
"""
40+
if not hasattr(api_layer_object, "_query_compiler"):
41+
raise ValueError(
42+
f"Only API Layer objects may be passed in here, got {type(api_layer_object)} instead."
43+
)
44+
45+
if axis is None:
46+
47+
def _unwrap_partitions(oid):
48+
if bind_ip:
49+
return [
50+
(partition.ip, getattr(partition, oid))
51+
for row in api_layer_object._query_compiler._modin_frame._partitions
52+
for partition in row
53+
]
54+
else:
55+
return [
56+
getattr(partition, oid)
57+
for row in api_layer_object._query_compiler._modin_frame._partitions
58+
for partition in row
59+
]
60+
61+
actual_engine = type(
62+
api_layer_object._query_compiler._modin_frame._partitions[0][0]
63+
).__name__
64+
if actual_engine in ("PandasOnRayFramePartition",):
65+
return _unwrap_partitions("oid")
66+
elif actual_engine in ("PandasOnDaskFramePartition",):
67+
return _unwrap_partitions("future")
68+
raise ValueError(
69+
f"Do not know how to unwrap '{actual_engine}' underlying partitions"
70+
)
71+
else:
72+
partitions = (
73+
api_layer_object._query_compiler._modin_frame._frame_mgr_cls.axis_partition(
74+
api_layer_object._query_compiler._modin_frame._partitions, axis ^ 1
75+
)
76+
)
77+
return [
78+
part.coalesce(bind_ip=bind_ip).unwrap(squeeze=True, bind_ip=bind_ip)
79+
for part in partitions
80+
]

modin/engines/base/frame/axis_partition.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,56 @@ def shuffle(self, func, lengths, **kwargs):
9595
def _wrap_partitions(self, partitions):
9696
return [self.partition_type(obj) for obj in partitions]
9797

98+
def coalesce(self, bind_ip=False):
99+
"""
100+
Coalesce the axis partitions into a single partition.
101+
102+
Parameters
103+
----------
104+
bind_ip : boolean, default False
105+
Whether to bind node ip address to a single partition or not.
106+
107+
Returns
108+
-------
109+
BaseFrameAxisPartition
110+
An axis partition containing only a single coalesced partition.
111+
"""
112+
coalesced = self.apply(lambda x: x, num_splits=1, maintain_partitioning=False)
113+
return type(self)(coalesced, bind_ip=bind_ip)
114+
115+
def unwrap(self, squeeze=False, bind_ip=False):
116+
"""
117+
Unwrap partitions from axis partition.
118+
119+
Parameters
120+
----------
121+
squeeze : boolean, default False
122+
The flag used to unwrap only one partition.
123+
bind_ip : boolean, default False
124+
Whether to bind node ip address to each partition or not.
125+
126+
Returns
127+
-------
128+
list
129+
List of partitions from axis partition.
130+
131+
Notes
132+
-----
133+
In case bind_ip=True, list containing tuples of Ray.ObjectRef/Dask.Future
134+
to node ip addresses and unwrapped partitions, respectively, is returned
135+
if Ray/Dask is used as an engine.
136+
"""
137+
if squeeze and len(self.list_of_blocks) == 1:
138+
if bind_ip:
139+
return self.list_of_ips[0], self.list_of_blocks[0]
140+
else:
141+
return self.list_of_blocks[0]
142+
else:
143+
if bind_ip:
144+
return list(zip(self.list_of_ips, self.list_of_blocks))
145+
else:
146+
return self.list_of_blocks
147+
98148

99149
class PandasFrameAxisPartition(BaseFrameAxisPartition):
100150
"""An abstract class is created to simplify and consolidate the code for AxisPartitions that run pandas.

modin/engines/dask/pandas_on_dask/frame/axis_partition.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616

1717
from distributed.client import get_client
1818
from distributed import Future
19+
from distributed.utils import get_ip
20+
import pandas
1921

2022

2123
class PandasOnDaskFrameAxisPartition(PandasFrameAxisPartition):
22-
def __init__(self, list_of_blocks):
24+
def __init__(self, list_of_blocks, bind_ip=False):
2325
# Unwrap from BaseFramePartition object for ease of use
2426
for obj in list_of_blocks:
2527
obj.drain_call_queue()
2628
self.list_of_blocks = [obj.future for obj in list_of_blocks]
29+
if bind_ip:
30+
self.list_of_ips = [obj.ip for obj in list_of_blocks]
2731

2832
partition_type = PandasOnDaskFramePartition
2933
instance_type = Future
@@ -34,6 +38,7 @@ def deploy_axis_func(
3438
):
3539
client = get_client()
3640
axis_result = client.submit(
41+
deploy_dask_func,
3742
PandasFrameAxisPartition.deploy_axis_func,
3843
axis,
3944
func,
@@ -51,7 +56,7 @@ def deploy_axis_func(
5156
# get futures for each.
5257
return [
5358
client.submit(lambda l: l[i], axis_result, pure=False)
54-
for i in range(result_num_splits)
59+
for i in range(result_num_splits * 4)
5560
]
5661

5762
@classmethod
@@ -60,6 +65,7 @@ def deploy_func_between_two_axis_partitions(
6065
):
6166
client = get_client()
6267
axis_result = client.submit(
68+
deploy_dask_func,
6369
PandasFrameAxisPartition.deploy_func_between_two_axis_partitions,
6470
axis,
6571
func,
@@ -74,7 +80,13 @@ def deploy_func_between_two_axis_partitions(
7480
# get futures for each.
7581
return [
7682
client.submit(lambda l: l[i], axis_result, pure=False)
77-
for i in range(num_splits)
83+
for i in range(num_splits * 4)
84+
]
85+
86+
def _wrap_partitions(self, partitions):
87+
return [
88+
self.partition_type(future, length, width, ip)
89+
for (future, length, width, ip) in zip(*[iter(partitions)] * 4)
7890
]
7991

8092

@@ -94,3 +106,26 @@ class PandasOnDaskFrameRowPartition(PandasOnDaskFrameAxisPartition):
94106
"""
95107

96108
axis = 1
109+
110+
111+
def deploy_dask_func(func, *args):
112+
"""
113+
Run a function on a remote partition.
114+
115+
Parameters
116+
----------
117+
func : callable
118+
The function to run.
119+
120+
Returns
121+
-------
122+
The result of the function `func`.
123+
"""
124+
result = func(*args)
125+
ip = get_ip()
126+
if isinstance(result, pandas.DataFrame):
127+
return result, len(result), len(result.columns), ip
128+
elif all(isinstance(r, pandas.DataFrame) for r in result):
129+
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
130+
else:
131+
return [i for r in result for i in [r, None, None, ip]]

modin/engines/dask/pandas_on_dask/frame/partition.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from modin.data_management.utils import length_fn_pandas, width_fn_pandas
1818

1919
from distributed.client import get_client
20+
from distributed.utils import get_ip
2021
import cloudpickle as pkl
2122

2223

@@ -25,7 +26,7 @@ def apply_list_of_funcs(funcs, df):
2526
if isinstance(func, bytes):
2627
func = pkl.loads(func)
2728
df = func(df, **kwargs)
28-
return df
29+
return df, get_ip()
2930

3031

3132
class PandasOnDaskFramePartition(BaseFramePartition):
@@ -40,13 +41,14 @@ class PandasOnDaskFramePartition(BaseFramePartition):
4041
subclasses. There is no logic for updating inplace.
4142
"""
4243

43-
def __init__(self, future, length=None, width=None, call_queue=None):
44+
def __init__(self, future, length=None, width=None, ip=None, call_queue=None):
4445
self.future = future
4546
if call_queue is None:
4647
call_queue = []
4748
self.call_queue = call_queue
4849
self._length_cache = length
4950
self._width_cache = width
51+
self.ip = ip
5052

5153
def get(self):
5254
"""Flushes the call_queue and returns the data.
@@ -81,7 +83,8 @@ def apply(self, func, **kwargs):
8183
future = get_client().submit(
8284
apply_list_of_funcs, call_queue, self.future, pure=False
8385
)
84-
return PandasOnDaskFramePartition(future)
86+
futures = [get_client().submit(lambda l: l[i], future) for i in range(2)]
87+
return PandasOnDaskFramePartition(futures[0], ip=futures[1])
8588

8689
def add_to_apply_calls(self, func, **kwargs):
8790
return PandasOnDaskFramePartition(
@@ -91,7 +94,9 @@ def add_to_apply_calls(self, func, **kwargs):
9194
def drain_call_queue(self):
9295
if len(self.call_queue) == 0:
9396
return
94-
self.future = self.apply(lambda x: x).future
97+
new_partition = self.apply(lambda x: x)
98+
self.future = new_partition.future
99+
self.ip = new_partition.ip
95100
self.call_queue = []
96101

97102
def mask(self, row_indices, col_indices):

modin/engines/ray/pandas_on_ray/frame/axis_partition.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717
from .partition import PandasOnRayFramePartition
1818

1919
import ray
20+
from ray.services import get_node_ip_address
2021

2122

2223
class PandasOnRayFrameAxisPartition(PandasFrameAxisPartition):
23-
def __init__(self, list_of_blocks):
24+
def __init__(self, list_of_blocks, bind_ip=False):
2425
# Unwrap from BaseFramePartition object for ease of use
2526
for obj in list_of_blocks:
2627
obj.drain_call_queue()
2728
self.list_of_blocks = [obj.oid for obj in list_of_blocks]
29+
if bind_ip:
30+
self.list_of_ips = [obj.ip for obj in list_of_blocks]
2831

2932
partition_type = PandasOnRayFramePartition
3033
instance_type = ray.ObjectID
@@ -44,7 +47,7 @@ def deploy_axis_func(
4447
maintain_partitioning,
4548
)
4649
+ tuple(partitions),
47-
num_returns=num_splits * 3 if lengths is None else len(lengths) * 3,
50+
num_returns=num_splits * 4 if lengths is None else len(lengths) * 4,
4851
)
4952

5053
@classmethod
@@ -62,13 +65,13 @@ def deploy_func_between_two_axis_partitions(
6265
kwargs,
6366
)
6467
+ tuple(partitions),
65-
num_returns=num_splits * 3,
68+
num_returns=num_splits * 4,
6669
)
6770

6871
def _wrap_partitions(self, partitions):
6972
return [
70-
self.partition_type(partitions[i], partitions[i + 1], partitions[i + 2])
71-
for i in range(0, len(partitions), 3)
73+
self.partition_type(object_id, length, width, ip)
74+
for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4)
7275
]
7376

7477

@@ -92,20 +95,27 @@ class PandasOnRayFrameRowPartition(PandasOnRayFrameAxisPartition):
9295

9396
@ray.remote
9497
def deploy_ray_func(func, *args): # pragma: no cover
95-
"""Run a function on a remote partition.
96-
97-
Note: Ray functions are not detected by codecov (thus pragma: no cover)
98+
"""
99+
Run a function on a remote partition.
98100
99-
Args:
100-
func: The function to run.
101+
Parameters
102+
----------
103+
func : callable
104+
The function to run.
101105
102-
Returns:
106+
Returns
107+
-------
103108
The result of the function `func`.
109+
110+
Notes
111+
-----
112+
Ray functions are not detected by codecov (thus pragma: no cover)
104113
"""
105114
result = func(*args)
115+
ip = get_node_ip_address()
106116
if isinstance(result, pandas.DataFrame):
107-
return result, len(result), len(result.columns)
117+
return result, len(result), len(result.columns), ip
108118
elif all(isinstance(r, pandas.DataFrame) for r in result):
109-
return [i for r in result for i in [r, len(r), len(r.columns)]]
119+
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
110120
else:
111-
return [i for r in result for i in [r, None, None]]
121+
return [i for r in result for i in [r, None, None, ip]]

0 commit comments

Comments
 (0)