From fbfc276f2ec1d321c2c60be1b4e08c3bc37a7f32 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 11 Oct 2023 14:57:20 +0000 Subject: [PATCH] apply suggestions Signed-off-by: Dmitry Chigarev --- modin/core/execution/ray/common/utils.py | 10 ++--- .../pandas_on_ray/partitioning/partition.py | 43 ++----------------- .../storage_formats/pandas/test_internals.py | 2 +- 3 files changed, 10 insertions(+), 45 deletions(-) diff --git a/modin/core/execution/ray/common/utils.py b/modin/core/execution/ray/common/utils.py index cace16c9f28..bca48f97473 100644 --- a/modin/core/execution/ray/common/utils.py +++ b/modin/core/execution/ray/common/utils.py @@ -335,12 +335,12 @@ def deconstruct_call_queue(call_queue): was_iterable = True if not isinstance(value, (list, tuple)): was_iterable = False - value = [value] + value = (value,) unfolded_queue.extend(value) value_lengths.append({"len": len(value), "was_iterable": was_iterable}) kw_value_lengths.append(value_lengths) - return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue + return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *unfolded_queue def reconstruct_call_queue( @@ -385,9 +385,9 @@ def take_n_items(n): kw_keys = take_n_items(kw_key_lengths[i]) kwargs = {} value_lengths = kw_value_lengths[i] - for j, key in enumerate(kw_keys): - vals = take_n_items(value_lengths[j]["len"]) - if value_lengths[j]["len"] == 1 and not value_lengths[j]["was_iterable"]: + for key, value_length in zip(kw_keys, value_lengths): + vals = take_n_items(value_length["len"]) + if value_length["len"] == 1 and not value_length["was_iterable"]: vals = vals[0] kwargs[key] = vals diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index c7404bd4dc9..339f6bf64fc 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -70,43 +70,6 @@ def __init__(self, data, length=None, width=None, ip=None, call_queue=None): ) ) - @staticmethod - def _apply_call_queue(call_queue, data): - """ - Execute call queue over the given `data`. - - Parameters - ---------- - call_queue : list[list[func, args, kwargs], ...] - data : ray.ObjectRef - - Returns - ------- - ray.ObjectRef of pandas.DataFrame - The resulting pandas DataFrame. - ray.ObjectRef of int - The number of rows of the resulting pandas DataFrame. - ray.ObjectRef of int - The number of columns of the resulting pandas DataFrame. - ray.ObjectRef of str - The node IP address of the worker process. - """ - ( - num_funcs, - arg_lengths, - kw_key_lengths, - kw_value_lengths, - unfolded_queue, - ) = deconstruct_call_queue(call_queue) - return _apply_list_of_funcs.remote( - data, - num_funcs, - arg_lengths, - kw_key_lengths, - kw_value_lengths, - *unfolded_queue, - ) - def apply(self, func, *args, **kwargs): """ Apply a function to the object wrapped by this partition. @@ -138,7 +101,9 @@ def apply(self, func, *args, **kwargs): self._is_debug(log) and log.debug( f"SUBMIT::_apply_list_of_funcs::{self._identity}" ) - result, length, width, ip = self._apply_call_queue(call_queue, data) + result, length, width, ip = _apply_list_of_funcs.remote( + data, *deconstruct_call_queue(call_queue) + ) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -169,7 +134,7 @@ def drain_call_queue(self): new_length, new_width, self._ip_cache, - ) = self._apply_call_queue(call_queue, data) + ) = _apply_list_of_funcs.remote(data, *deconstruct_call_queue(call_queue)) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index d7b1242ba75..71310f3ee5d 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1502,7 +1502,7 @@ def assert_materialized(obj): arg_lengths, kw_key_lengths, kw_value_lengths, - queue, + *queue, ) = deconstruct_call_queue(call_queue) queue = materialize_queue(*queue) reconstructed_queue = reconstruct_call_queue(