Skip to content

Commit

Permalink
apply suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Oct 11, 2023
1 parent 746fdbd commit fbfc276
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 45 deletions.
10 changes: 5 additions & 5 deletions modin/core/execution/ray/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,12 @@ def deconstruct_call_queue(call_queue):
was_iterable = True
if not isinstance(value, (list, tuple)):
was_iterable = False
value = [value]
value = (value,)
unfolded_queue.extend(value)
value_lengths.append({"len": len(value), "was_iterable": was_iterable})
kw_value_lengths.append(value_lengths)

return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue
return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *unfolded_queue


def reconstruct_call_queue(
Expand Down Expand Up @@ -385,9 +385,9 @@ def take_n_items(n):
kw_keys = take_n_items(kw_key_lengths[i])
kwargs = {}
value_lengths = kw_value_lengths[i]
for j, key in enumerate(kw_keys):
vals = take_n_items(value_lengths[j]["len"])
if value_lengths[j]["len"] == 1 and not value_lengths[j]["was_iterable"]:
for key, value_length in zip(kw_keys, value_lengths):
vals = take_n_items(value_length["len"])
if value_length["len"] == 1 and not value_length["was_iterable"]:
vals = vals[0]
kwargs[key] = vals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,43 +70,6 @@ def __init__(self, data, length=None, width=None, ip=None, call_queue=None):
)
)

@staticmethod
def _apply_call_queue(call_queue, data):
"""
Execute call queue over the given `data`.
Parameters
----------
call_queue : list[list[func, args, kwargs], ...]
data : ray.ObjectRef
Returns
-------
ray.ObjectRef of pandas.DataFrame
The resulting pandas DataFrame.
ray.ObjectRef of int
The number of rows of the resulting pandas DataFrame.
ray.ObjectRef of int
The number of columns of the resulting pandas DataFrame.
ray.ObjectRef of str
The node IP address of the worker process.
"""
(
num_funcs,
arg_lengths,
kw_key_lengths,
kw_value_lengths,
unfolded_queue,
) = deconstruct_call_queue(call_queue)
return _apply_list_of_funcs.remote(
data,
num_funcs,
arg_lengths,
kw_key_lengths,
kw_value_lengths,
*unfolded_queue,
)

def apply(self, func, *args, **kwargs):
"""
Apply a function to the object wrapped by this partition.
Expand Down Expand Up @@ -138,7 +101,9 @@ def apply(self, func, *args, **kwargs):
self._is_debug(log) and log.debug(
f"SUBMIT::_apply_list_of_funcs::{self._identity}"
)
result, length, width, ip = self._apply_call_queue(call_queue, data)
result, length, width, ip = _apply_list_of_funcs.remote(
data, *deconstruct_call_queue(call_queue)
)
else:
# We handle `len(call_queue) == 1` in a different way because
# this dramatically improves performance.
Expand Down Expand Up @@ -169,7 +134,7 @@ def drain_call_queue(self):
new_length,
new_width,
self._ip_cache,
) = self._apply_call_queue(call_queue, data)
) = _apply_list_of_funcs.remote(data, *deconstruct_call_queue(call_queue))
else:
# We handle `len(call_queue) == 1` in a different way because
# this dramatically improves performance.
Expand Down
2 changes: 1 addition & 1 deletion modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ def assert_materialized(obj):
arg_lengths,
kw_key_lengths,
kw_value_lengths,
queue,
*queue,
) = deconstruct_call_queue(call_queue)
queue = materialize_queue(*queue)
reconstructed_queue = reconstruct_call_queue(
Expand Down

0 comments on commit fbfc276

Please sign in to comment.