Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: df.astype alters df to let df.resample(...).aggregate(...) fail with KeyError #5572

Open
3 tasks done
ahallermed opened this issue Jan 23, 2023 · 8 comments
Open
3 tasks done
Labels
bug 🦗 Something isn't working External Pull requests and issues from people who do not regularly contribute to modin P1 Important tasks that we should complete soon

Comments

@ahallermed
Copy link
Contributor

Modin version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest released version of Modin.

  • I have confirmed this bug exists on the main branch of Modin. (In order to do this you can follow this guide.)

Reproducible Example

import modin.pandas as pd
import numpy as np
import pandas
import ray

ray.init(runtime_env={'env_vars': {'__MODIN_AUTOIMPORT_PANDAS__': '1'}})

pandas_module = pd
df = pandas_module.DataFrame({
    "a": [1] * 5,
    # without second value column 'b', the error does not occur
    "b": [1] * 5,
    "time":
    np.array([
        "2001-01-02 04:56:00",
        "2001-01-02 04:57:00",
        "2001-01-02 04:58:00",
        "2001-01-02 04:59:00",
        "2001-01-02 05:00:00",
    ],
             dtype="datetime64[ns]"),
})
df
# %%
# Without changing the dtype, no error occurs
df["a"] = df["a"].astype("float64")

# %%
df = df.set_index("time")
resampler = df.resample(rule="5min")
# %%
aggregation = {"b": "last"}
resampler.aggregate(aggregation)

Issue Description

TL;DR;
Using astype to change a columns dtype let's aggregation fail with KeyError.


Changing the dtype of a df's column (which can be very important for huge dfs; in this example to float64, but could be any) aggregation over time will raise a KeyError, where the column 'b' is not found (over which the aggregation is done).
This is, of course, a minimalistic example, but it shows the issue very well.


WORKAROUND

Transforming the df to pandas and back to modin.pandas solves the issue but the forth and back transformed df equals to the not transformed df, which makes it hard for me to find any differences.

Expected Behavior

The aggregation step should not raise any KeyError like vanilla pandas does.

Error Logs

2023-01-23 10:28:47,570	ERROR worker.py:92 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): �[36mray::deploy_ray_func()�[39m (pid=295406, ip=192.168.178.41)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py", line 560, in deploy_ray_func
    result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 181, in deploy_axis_func
    result = func(dataframe, *f_args, **f_kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1577, in _tree_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 882, in map_func
    val = op(resampled_val, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/resample.py", line 352, in aggregate
    result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 169, in agg
    return self.agg_dict_like()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 478, in agg_dict_like
    arg = self.normalize_dictlike_arg("agg", selected_obj, arg)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 601, in normalize_dictlike_arg
    raise KeyError(f"Column(s) {cols_sorted} do not exist")
KeyError: "Column(s) ['b'] do not exist"
2023-01-23 10:28:47,573	ERROR worker.py:92 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): �[36mray::_apply_func()�[39m (pid=295406, ip=192.168.178.41)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: �[36mray::deploy_ray_func()�[39m (pid=295406, ip=192.168.178.41)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py", line 560, in deploy_ray_func
    result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 181, in deploy_axis_func
    result = func(dataframe, *f_args, **f_kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1577, in _tree_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 882, in map_func
    val = op(resampled_val, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/resample.py", line 352, in aggregate
    result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 169, in agg
    return self.agg_dict_like()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 478, in agg_dict_like
    arg = self.normalize_dictlike_arg("agg", selected_obj, arg)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 601, in normalize_dictlike_arg
    raise KeyError(f"Column(s) {cols_sorted} do not exist")
KeyError: "Column(s) ['b'] do not exist"
2023-01-23 10:28:47,574	ERROR worker.py:92 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): �[36mray::deploy_ray_func()�[39m (pid=295406, ip=192.168.178.41)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py", line 560, in deploy_ray_func
    result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 181, in deploy_axis_func
    result = func(dataframe, *f_args, **f_kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1577, in _tree_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 882, in map_func
    val = op(resampled_val, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/resample.py", line 352, in aggregate
    result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 169, in agg
    return self.agg_dict_like()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 478, in agg_dict_like
    arg = self.normalize_dictlike_arg("agg", selected_obj, arg)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 601, in normalize_dictlike_arg
    raise KeyError(f"Column(s) {cols_sorted} do not exist")
KeyError: "Column(s) ['b'] do not exist"
2023-01-23 10:28:47,576	ERROR worker.py:92 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): �[36mray::deploy_ray_func()�[39m (pid=295406, ip=192.168.178.41)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py", line 560, in deploy_ray_func
    result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 181, in deploy_axis_func
    result = func(dataframe, *f_args, **f_kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1577, in _tree_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 882, in map_func
    val = op(resampled_val, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/resample.py", line 352, in aggregate
    result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 169, in agg
    return self.agg_dict_like()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 478, in agg_dict_like
    arg = self.normalize_dictlike_arg("agg", selected_obj, arg)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 601, in normalize_dictlike_arg
    raise KeyError(f"Column(s) {cols_sorted} do not exist")
KeyError: "Column(s) ['b'] do not exist"
2023-01-23 10:28:47,577	ERROR worker.py:92 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): �[36mray::_apply_func()�[39m (pid=295406, ip=192.168.178.41)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: �[36mray::deploy_ray_func()�[39m (pid=295406, ip=192.168.178.41)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py", line 560, in deploy_ray_func
    result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 181, in deploy_axis_func
    result = func(dataframe, *f_args, **f_kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1577, in _tree_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 882, in map_func
    val = op(resampled_val, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/resample.py", line 352, in aggregate
    result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 169, in agg
    return self.agg_dict_like()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 478, in agg_dict_like
    arg = self.normalize_dictlike_arg("agg", selected_obj, arg)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 601, in normalize_dictlike_arg
    raise KeyError(f"Column(s) {cols_sorted} do not exist")
KeyError: "Column(s) ['b'] do not exist"
2023-01-23 10:28:47,579	ERROR worker.py:92 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): �[36mray::_apply_func()�[39m (pid=295406, ip=192.168.178.41)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: �[36mray::deploy_ray_func()�[39m (pid=295406, ip=192.168.178.41)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py", line 560, in deploy_ray_func
    result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 181, in deploy_axis_func
    result = func(dataframe, *f_args, **f_kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1577, in _tree_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 882, in map_func
    val = op(resampled_val, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/resample.py", line 352, in aggregate
    result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 169, in agg
    return self.agg_dict_like()
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 478, in agg_dict_like
    arg = self.normalize_dictlike_arg("agg", selected_obj, arg)
  File "/home/andreas/miniconda3/envs/elise/lib/python3.8/site-packages/pandas/core/apply.py", line 601, in normalize_dictlike_arg
    raise KeyError(f"Column(s) {cols_sorted} do not exist")
KeyError: "Column(s) ['b'] do not exist"

Installed Versions

On masters latest commit

INSTALLED VERSIONS ------------------ commit : 3c99791 python : 3.8.10.final.0 python-bits : 64 OS : Linux OS-release : 5.15.0-10057-tuxedo Version : #63~20.04.1tux3 SMP Thu Jan 12 13:09:01 UTC 2023 machine : x86_64 processor : x86_64 byteorder : little LC_ALL : None LANG : en_US.UTF-8 LOCALE : en_US.UTF-8

Modin dependencies

modin : 0.18.0+50.g3c997914
ray : 2.1.0
dask : 2023.1.0
distributed : 2023.1.0
hdk : None

pandas dependencies

pandas : 1.5.3
numpy : 1.23.4
pytz : 2022.6
dateutil : 2.8.2
setuptools : 65.5.0
pip : 21.1.3
Cython : None
pytest : 7.2.0
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : None
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 3.1.2
IPython : 8.6.0
pandas_datareader: None
bs4 : 4.11.1
bottleneck : None
brotli : None
fastparquet : None
fsspec : 2022.11.0
gcsfs : None
matplotlib : 3.5.3
numba : None
numexpr : None
odfpy : None
openpyxl : None
pandas_gbq : None
pyarrow : 10.0.1
pyreadstat : None
pyxlsb : None
s3fs : None
scipy : 1.9.3
snappy : None
sqlalchemy : 1.3.24
tables : None
tabulate : 0.9.0
xarray : None
xlrd : None
xlwt : None
zstandard : None
tzdata : None


With modin 1.7.1

INSTALLED VERSIONS ------------------ commit : 7f801ad python : 3.8.10.final.0 python-bits : 64 OS : Linux OS-release : 5.15.0-10057-tuxedo Version : #63~20.04.1tux3 SMP Thu Jan 12 13:09:01 UTC 2023 machine : x86_64 processor : x86_64 byteorder : little LC_ALL : None LANG : en_US.UTF-8 LOCALE : en_US.UTF-8

Modin dependencies

modin : 0.17.1
ray : 1.12.1
dask : None
distributed : None
hdk : None

pandas dependencies

pandas : 1.5.2
numpy : 1.23.4
pytz : 2022.7
dateutil : 2.8.2
setuptools : 65.6.3
pip : 22.3.1
Cython : None
pytest : 7.2.0
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : 4.9.2
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 3.1.2
IPython : 8.6.0
pandas_datareader: None
bs4 : 4.11.1
bottleneck : None
brotli : None
fastparquet : None
fsspec : 2022.11.0
gcsfs : None
matplotlib : 3.5.3
numba : None
numexpr : None
odfpy : None
openpyxl : None
pandas_gbq : None
pyarrow : None
pyreadstat : None
pyxlsb : None
s3fs : None
scipy : 1.9.3
snappy : None
sqlalchemy : 1.3.24
tables : None
tabulate : 0.9.0
xarray : None
xlrd : None
xlwt : None
zstandard : None
tzdata : None

@ahallermed ahallermed added bug 🦗 Something isn't working Triage 🩹 Issues that need triage labels Jan 23, 2023
@vnlitvinov vnlitvinov added P1 Important tasks that we should complete soon External Pull requests and issues from people who do not regularly contribute to modin and removed Triage 🩹 Issues that need triage labels Feb 6, 2023
@vnlitvinov
Copy link
Collaborator

This minimal example even shows this in PandasOnPython mode, but works fine on pure pandas.

@Garra1980
Copy link
Collaborator

Hmm, I don't see that error on modin master...

@ahallermed
Copy link
Contributor Author

@Garra1980 I still get the error even with the lastest commit: 21ab814e2f9fd9e4874a036e2fa8e53208638614 from yesterday.

Do you have different dependencies installed?

@Garra1980
Copy link
Collaborator

I do see the issue for 0.18 but test works when I switch to env with
modin 0.18.0+70.ga4504a0e pypi_0 pypi
pyarrow 11.0.0 pypi_0 pypi
python 3.9.16 h2782a2a_0_cpython conda-forge
ray 2.2.0 pypi_0 pypi
pandas 1.5.3 pypi_0 pypi
numpy 1.24.2 py39h7360e5f_0 conda-forge

@ahallermed
Copy link
Contributor Author

@Garra1980 Thank you for your packages.
I created a fresh environment with these packages and found out that running the code in vscode interactive window, the same error still occurred but running it in terminal, nothing happened.

Therefore, please adjust the last line of the example code to:

print(resampler.aggregate(aggregation))

I can still reproduce the error with this adjustment. Please have a look if that is still the case for you.

I think, it may has something to do with lazy evaluation???

@Garra1980
Copy link
Collaborator

thanks, now I see it. yes, lazy execution does the trick here and I tried with printing at the very beginning of erorr checking, thought print(resampler) would be enough but apparently not

@pyrito
Copy link
Collaborator

pyrito commented Feb 15, 2023

I dug into this a bit and I think this might be a partitioning issue. When do the astype we end up creating two partitions. You can verify this by running:

In [42]: df._query_compiler._modin_frame._partitions
Out[42]: 
array([[<modin.core.execution.ray.implementations.pandas_on_ray.partitioning.partition.PandasOnRayDataframePartition object at 0x1c29b1fa0>,
        <modin.core.execution.ray.implementations.pandas_on_ray.partitioning.partition.PandasOnRayDataframePartition object at 0x1c29b1e80>]],
      dtype=object)

In [43]: df._query_compiler._modin_frame._partitions[0][0].get()
Out[43]: 
                       a
time                    
2001-01-02 04:56:00  1.0
2001-01-02 04:57:00  1.0
2001-01-02 04:58:00  1.0
2001-01-02 04:59:00  1.0
2001-01-02 05:00:00  1.0

In [44]: df._query_compiler._modin_frame._partitions[0][1].get()
Out[44]: 
                     b
time                  
2001-01-02 04:56:00  1
2001-01-02 04:57:00  1
2001-01-02 04:58:00  1
2001-01-02 04:59:00  1
2001-01-02 05:00:00  1

The resample aggregate seems to be applying the function to the partitions and is obviously not able to find it in the first partition hence the KeyError. Without the astype we still have one partition and the resample is able to finish properly. I'm not sure what the right solution is for this problem.

cc: @modin-project/modin-core

@mvashishtha
Copy link
Collaborator

@pyrito thanks for posting that explanation. So resample aggregate parallelizes over column-wise partitions, but that doesn't work for an aggregation that is column-dependent. I think there's a fairly simple fix of 1) adding a special case here for aggregate to filter out columns that are not in the partition. 2) validating the column keys at the API layer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug 🦗 Something isn't working External Pull requests and issues from people who do not regularly contribute to modin P1 Important tasks that we should complete soon
Projects
None yet
Development

No branches or pull requests

5 participants