Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Sort values on dataframe with empty partitions raises ValueError: Replace has to be set to True when upsampling the population frac > 1. #5552

Closed
3 tasks done
RehanSD opened this issue Jan 18, 2023 · 2 comments · Fixed by #5553
Labels
bug 🦗 Something isn't working P0 Highest priority tasks requiring immediate fix

Comments

@RehanSD
Copy link
Collaborator

RehanSD commented Jan 18, 2023

Modin version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest released version of Modin.

  • I have confirmed this bug exists on the main branch of Modin. (In order to do this you can follow this guide.)

Reproducible Example

import modin.pandas as pd
# import pandas as pd
import pandas_market_calendars as mcal
import ray

ENTITIES = ['037833100', '594918104', '023135106', '30231G102', '478160104', '30303M102', '369604103', '00206R102',
            '46625H100', '742718109', '02079K305', '949746101', '931142103', '92343V104', '717081103', '084670702',
            '166764100', '191216100', '458140100', '58933Y105', '68389X105', '060505104', '20030N101', '437076102',
            '92826C839', '17275R102', '718172109', '713448108', '254687106', '459200101', '172967424', '91324P102',
            '031162100', '02209S103', '57636Q104', '88579Y101', '500754106', '806857108', '375558103', '00287Y109',
            '580135101', '126650100', '747525103', '110122108', '931427108', '532457108', '438516106', '913017109',
            '151020104', '097023105', '855244109', '907818108', '654106103', '911312106', '902973304', '741503403',
            '16119P108', '539830109', '882508104', '761713106', 'Y09827109', '38141G104', '609207105', '09062X103',
            '22160K105', '828806109', '194162103', '548661107', '026874784', '002824100', '617446448', '09247X101',
            '260543103', '025816109', '883556102', '887317303', '263534109', '65339F101', '293792107', '26441C204',
            '674599105', '235851102', '842587107', '00724F101', '26875P101', '37045V100', '79466L302', '872540109',
            '49456B101', '20825C104', '59156R108', '70450Y103', '149123101', '25746U109', '345370860', '03027X100',
            '369550108', '517834107', '61166W101', '693475105']


def main():
    nyse = mcal.get_calendar('NYSE')

    trade_date_range = nyse.valid_days(start_date='2016-12-20', end_date='2017-01-10')
    trade_date_range.name = "pricing_date"

    pricing_index = pd.MultiIndex.from_product([trade_date_range, ENTITIES], names=["pricing_date", "entity"])

    df = pd.DataFrame(index=pricing_index)

    # filtered_df = df.loc['2017-01-09'].index.get_level_values("entity")
    filtered_df = df._default_to_pandas(lambda x: x.loc['2017-01-09']).index.get_level_values("entity")
    dates_df = pd.DataFrame(df.index.get_level_values("pricing_date")).drop_duplicates().sort_values(by="pricing_date")
    # KeyError: "pricing_date" (not true in native Pandas) if we make the above call use _default_to_pandas
    # TypeError: '>=' not supported between instances of 'str' and 'int' if we leave the above as is
    print(dates_df.iloc[len(dates_df) // 2]["pricing_date"])


if __name__ == '__main__':
    ray.init()
    main()

Issue Description

As discovered in #3620, if we end up with a dataframe that has empty partitions, when we do the sort, we get an error, since we try and sample from an empty dataframe.

Expected Behavior

Should not error

Error Logs

---------------------------------------------------------------------------
RayTaskError(ValueError)                  Traceback (most recent call last)
Cell In[1], line 41
     39 if __name__ == '__main__':
     40     ray.init()
---> 41     main()

Cell In[1], line 33, in main()
     31 # filtered_df = df.loc['2017-01-09'].index.get_level_values("entity")
     32 filtered_df = df._default_to_pandas(lambda x: x.loc['2017-01-09']).index.get_level_values("entity")
---> 33 dates_df = pd.DataFrame(df.index.get_level_values("pricing_date")).drop_duplicates().sort_values(by="pricing_date")
     34 # KeyError: "pricing_date" (not true in native Pandas) if we make the above call use _default_to_pandas
     35 # TypeError: '>=' not supported between instances of 'str' and 'int' if we leave the above as is
     36 print(dates_df.iloc[len(dates_df) // 2]["pricing_date"])

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/pandas/base.py:2919, in BasePandasDataset.sort_values(self, by, axis, ascending, inplace, kind, na_position, ignore_index, key)
   2917 ascending = validate_ascending(ascending)
   2918 if axis == 0:
-> 2919     result = self._query_compiler.sort_rows_by_column_values(
   2920         by,
   2921         ascending=ascending,
   2922         kind=kind,
   2923         na_position=na_position,
   2924         ignore_index=ignore_index,
   2925         key=key,
   2926     )
   2927 else:
   2928     result = self._query_compiler.sort_columns_by_row_values(
   2929         by,
   2930         ascending=ascending,
   (...)
   2934         key=key,
   2935     )

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/core/storage_formats/pandas/query_compiler.py:3383, in PandasQueryCompiler.sort_rows_by_column_values(self, columns, ascending, **kwargs)
   3380 def sort_rows_by_column_values(self, columns, ascending=True, **kwargs):
   3381     # Our algebra sort is only implemented for Engines that support virtual partitioning.
   3382     if Engine.get() in ["Ray", "Dask", "Unidist"]:
-> 3383         new_modin_frame = self._modin_frame.sort_by(
   3384             0, columns, ascending=ascending, **kwargs
   3385         )
   3386         return self.__constructor__(new_modin_frame)
   3387     ignore_index = kwargs.get("ignore_index", False)

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:126, in lazy_metadata_decorator.<locals>.decorator.<locals>.run_f_on_minimally_updated_metadata(self, *args, **kwargs)
    124     elif apply_axis == "rows":
    125         obj._propagate_index_objs(axis=0)
--> 126 result = f(self, *args, **kwargs)
    127 if apply_axis is None and not transpose:
    128     result._deferred_index = self._deferred_index

File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:2073, in PandasDataframe.sort_by(self, axis, columns, ascending, **kwargs)
   2071         index = i
   2072         break
-> 2073 new_partitions = self._partition_mgr_cls.shuffle_partitions(
   2074     self._partitions,
   2075     index,
   2076     shuffling_functions,
   2077     sort_function,
   2078 )
   2079 new_axes = self.axes
   2080 new_lengths = [None, None]

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py:1553, in PandasDataframePartitionManager.shuffle_partitions(cls, partitions, index, shuffle_functions, final_shuffle_func)
   1551 samples = [partition.apply(sample_func) for partition in masked_partitions]
   1552 # Get each sample to pass in to the pivot function
-> 1553 samples = cls.get_objects_from_partitions(samples)
   1554 pivots = shuffle_functions.pivot_function(samples)
   1555 # Convert our list of block partitions to row partitions. We need to create full-axis
   1556 # row partitions since we need to send the whole partition to the split step as otherwise
   1557 # we wouldn't know how to split the block partitions that don't contain the shuffling key.

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py:117, in PandasOnRayDataframePartitionManager.get_objects_from_partitions(cls, partitions)
    113         partitions[idx] = part.force_materialization()
    114 assert all(
    115     [len(partition.list_of_blocks) == 1 for partition in partitions]
    116 ), "Implementation assumes that each partition contains a signle block."
--> 117 return RayWrapper.materialize(
    118     [partition.list_of_blocks[0] for partition in partitions]
    119 )

File ~/software_sources/modin/modin/core/execution/ray/common/engine_wrapper.py:92, in RayWrapper.materialize(cls, obj_id)
     77 @classmethod
     78 def materialize(cls, obj_id):
     79     """
     80     Get the value of object from the Plasma store.
     81
   (...)
     90         Whatever was identified by `obj_id`.
     91     """
---> 92     return ray.get(obj_id)

File ~/opt/anaconda3/envs/ponder-product-testing/lib/python3.10/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    103     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    104         return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)

File ~/opt/anaconda3/envs/ponder-product-testing/lib/python3.10/site-packages/ray/_private/worker.py:2289, in get(object_refs, timeout)
   2287     worker.core_worker.dump_object_store_memory_usage()
   2288 if isinstance(value, RayTaskError):
-> 2289     raise value.as_instanceof_cause()
   2290 else:
   2291     raise value

RayTaskError(ValueError): ray::_apply_func() (pid=83892, ip=127.0.0.1)
  File "/Users/maheshvashishtha/software_sources/modin/modin/core/dataframe/pandas/dataframe/utils.py", line 61, in sample_fn
    return pick_samples_for_quantiles(
  File "/Users/maheshvashishtha/software_sources/modin/modin/core/dataframe/pandas/dataframe/utils.py", line 153, in pick_samples_for_quantiles
    return df.sample(frac=probability).to_numpy()
  File "/Users/maheshvashishtha/opt/anaconda3/envs/ponder-product-testing/lib/python3.10/site-packages/pandas/core/generic.py", line 5765, in sample
    size = sample.process_sampling_size(n, frac, replace)
  File "/Users/maheshvashishtha/opt/anaconda3/envs/ponder-product-testing/lib/python3.10/site-packages/pandas/core/sample.py", line 103, in process_sampling_size
    raise ValueError(
ValueError: Replace has to be set to `True` when upsampling the population `frac` > 1.

During handling of the above exception, another exception occurred:

ray::_apply_func() (pid=83892, ip=127.0.0.1)
  File "/Users/maheshvashishtha/software_sources/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py", line 398, in _apply_func
    result = func(partition.copy(), *args, **kwargs)
  File "/Users/maheshvashishtha/software_sources/modin/modin/core/dataframe/pandas/dataframe/utils.py", line 61, in sample_fn
    return pick_samples_for_quantiles(
  File "/Users/maheshvashishtha/software_sources/modin/modin/core/dataframe/pandas/dataframe/utils.py", line 153, in pick_samples_for_quantiles
    return df.sample(frac=probability).to_numpy()
  File "/Users/maheshvashishtha/opt/anaconda3/envs/ponder-product-testing/lib/python3.10/site-packages/pandas/core/generic.py", line 5765, in sample
    size = sample.process_sampling_size(n, frac, replace)
  File "/Users/maheshvashishtha/opt/anaconda3/envs/ponder-product-testing/lib/python3.10/site-packages/pandas/core/sample.py", line 103, in process_sampling_size
    raise ValueError(
ValueError: Replace has to be set to `True` when upsampling the population `frac` > 1.

Installed Versions

Replace this line with the output of pd.show_versions()

@RehanSD RehanSD added bug 🦗 Something isn't working Triage 🩹 Issues that need triage labels Jan 18, 2023
@RehanSD
Copy link
Collaborator Author

RehanSD commented Jan 18, 2023

Did some digging, and it looks like this happens if we're over-partitioned and attempt a sort, since we end up trying to oversample the data.

@RehanSD
Copy link
Collaborator Author

RehanSD commented Jan 18, 2023

@anmyachev @YarShev

RehanSD added a commit to RehanSD/modin that referenced this issue Jan 18, 2023
Signed-off-by: Rehan Durrani <rehan@ponder.io>
@vnlitvinov vnlitvinov added P0 Highest priority tasks requiring immediate fix and removed Triage 🩹 Issues that need triage labels Jan 19, 2023
@mvashishtha mvashishtha changed the title BUG: Sort values on dataframe with empty partitions leads to errors. BUG: Sort values on dataframe with empty partitions raises ValueError: Replace has to be set to True when upsampling the population frac > 1. Mar 6, 2023
mvashishtha pushed a commit that referenced this issue Mar 9, 2023
Signed-off-by: Rehan Durrani <rehan@ponder.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug 🦗 Something isn't working P0 Highest priority tasks requiring immediate fix
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants