Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modin on Ray cluster with object spilling: process was killed when loading parquet file #6639

Open
2 of 3 tasks
zaichang opened this issue Oct 9, 2023 · 22 comments
Open
2 of 3 tasks
Labels
bug 🦗 Something isn't working External Pull requests and issues from people who do not regularly contribute to modin Memory 💾 Issues related to memory P2 Minor bugs or low-priority feature requests Ray ⚡ Issues related to the Ray engine

Comments

@zaichang
Copy link

zaichang commented Oct 9, 2023

Modin version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest released version of Modin.

  • I have confirmed this bug exists on the main branch of Modin. (In order to do this you can follow this guide.)

Reproducible Example

AWS_ACCESS_KEY_ID=""
AWS_SECRET_ACCESS_KEY=""
AWS_SESSION_TOKEN=""

import modin.pandas as pd
pd.show_versions()

df = pd.read_parquet('s3://my-bucket/10GB.parquet',
        storage_options={
            "key": AWS_ACCESS_KEY_ID,
            "secret": AWS_SECRET_ACCESS_KEY,
            "token": AWS_SESSION_TOKEN,
        },
    )

Issue Description

After enabling object spilling, we ran a ray cluster of 4 nodes each with various instance RAM as detailed below, and attempted to read a 10GB parquet file. Only to find that workers get killed and often the kernel also dies, without ever completing the read_parquet operation.

As a side note, given the out-of-the-box claim of "By default, Modin leverages out-of-core methods to handle datasets that don’t fit in memory for both Ray and Dask engines." https://modin.readthedocs.io/en/latest/getting_started/why_modin/out_of_core.html

It was strange then to see a note on how to enable object spilling in multi-node clusters: https://docs.ray.io/en/latest/ray-core/objects/object-spilling.html#cluster-mode without noting that it is actually disabled by default.

Our test uses Ray as an engine and tried a few configurations, and only the 64GB cluster succeeded in loading a 10GB file

Instances    Instance RAM    Total Cluster RAM     Result
---------------------------------------------------------
4            8GB             32GB                  Killed
4            16GB            64GB                  Killed
4            32GB            128GB                 Killed
4            64GB            256GB                 Worked

Expected Behavior

With object spilling enabled, I expect the cluster to complete reading the large parquet to work even if the dataset does not fit in an instance's memory, and surprised to find that even a 32GB per instance did not work.

Error Logs

Replace this line with the error backtrace (if applicable).

Installed Versions

INSTALLED VERSIONS

commit : 4c01f64
python : 3.9.18.final.0
python-bits : 64
OS : Linux
OS-release : 5.15.0-1031-aws
Version : #35-Ubuntu SMP Fri Feb 10 02:07:18 UTC 2023
machine : x86_64
processor : x86_64
byteorder : little
LC_ALL : None
LANG : C.UTF-8
LOCALE : en_US.UTF-8

Modin dependencies

modin : 0.24.1
ray : 2.7.0
dask : None
distributed : None
hdk : None

pandas dependencies

pandas : 2.1.1
numpy : 1.26.0
pytz : 2023.3.post1
dateutil : 2.8.2
setuptools : 68.2.2
pip : 23.2.1
Cython : None
pytest : None
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : None
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 3.1.2
IPython : 8.16.1
pandas_datareader : None
bs4 : 4.12.2
bottleneck : None
dataframe-api-compat: None
fastparquet : 2023.4.0
fsspec : 2023.4.0
gcsfs : None
matplotlib : 3.7.1
numba : None
numexpr : None
odfpy : None
openpyxl : None
pandas_gbq : None
pyarrow : 12.0.0
pyreadstat : None
pyxlsb : None
s3fs : 0.4.2
scipy : 1.10.1
sqlalchemy : None
tables : None
tabulate : 0.9.0
xarray : None
xlrd : None
zstandard : 0.21.0
tzdata : 2023.3
qtpy : None
pyqt5 : None

@zaichang zaichang added bug 🦗 Something isn't working Triage 🩹 Issues that need triage labels Oct 9, 2023
@noloerino noloerino added Memory 💾 Issues related to memory Ray ⚡ Issues related to the Ray engine P2 Minor bugs or low-priority feature requests and removed Triage 🩹 Issues that need triage labels Oct 10, 2023
@noloerino
Copy link
Collaborator

Would any @modin-project/modin-core members have some insight here?

@anmyachev anmyachev added the External Pull requests and issues from people who do not regularly contribute to modin label Oct 12, 2023
@anmyachev
Copy link
Collaborator

@zaichang thanks for contribution.

Did you see any warnings like defaulting to pandas implementation?

@zaichang
Copy link
Author

zaichang commented Oct 16, 2023

@anmyachev I checked my logs and yes there were these lines:

UserWarning: Defaulting to pandas implementation.
Please refer to https://modin.readthedocs.io/en/stable/supported_apis/defaulting_to_pandas.html for explanation.
Reason: Parquet options that are not currently supported

It's confusing though because the only option I passed in is storage_options whose support is implied by https://modin.readthedocs.io/en/stable/supported_apis/io_supported.html

read_parquet | P | Parameters besides filters and storage_options passed via **kwargs are not supported. use_nullable_dtypes == True is not supported.

Besides, if S3 storage_options are not supported I would expect that it would fail before reading any data rather than an out of memory error.

@anmyachev
Copy link
Collaborator

@zaichang it's strange, because storage_options are supported. Maybe these are old logs?

@anmyachev
Copy link
Collaborator

It was strange then to see a note on how to enable object spilling in multi-node clusters: https://docs.ray.io/en/latest/ray-core/objects/object-spilling.html#cluster-mode without noting that it is actually disabled by default.

@zaichang you are right. We will correct the documentation.

@anmyachev
Copy link
Collaborator

With object spilling enabled, I expect the cluster to complete reading the large parquet to work even if the dataset does not fit in an instance's memory, and surprised to find that even a 32GB per instance did not work.

@zaichang "The Ray object store allocates 30% of host memory to the shared memory (/dev/shm, unless you specify --object-store-memory).". When Modin starts a Ray local cluster itself, it uses 60% of the total memory. I advise you to also increase the size of the shared memory via --object-store-memory. This should help in the case of 32 GB for one host.

@zaichang
Copy link
Author

@anmyachev Thank you very much for getting back to me. I'll try adding --object-store-memory and report back.

The logs that mentioned defaulting to pandas implementation were definitely from that test run, but I'll double check with the above test. On a more general note if I have to use a 32GB instance to read a 10GB parquet file, wouldn't that mean we actually can't work with datasets that don't fit in memory for a Ray cluster?

Is the situation any different with a Dask cluster?

@anmyachev
Copy link
Collaborator

On a more general note if I have to use a 32GB instance to read a 10GB parquet file, wouldn't that mean we actually can't work with datasets that don't fit in memory for a Ray cluster?

@zaichang Most likely, Ray's spilling will not help when the size of the dataframe is close to the total size of RAM and there are operations in the workload that completely materialize the dataframe in one process. This mechanism works much better when there are many small objects whose total memory consumption exceeds the available memory, or when there are no operations in the workload that are performed entirely in the pandas.

Is the situation any different with a Dask cluster?

I'm not sure, but it seems that the situation there cannot be better than in Ray, because this object spilling occurs when there is an attempt to write data to a storage where there is not enough memory (in the storage). Here the problem is insufficient available memory (RAM) at the time of object materialization in the process.

@seydar
Copy link
Contributor

seydar commented Nov 28, 2023

@zaichang I'm running into a similar issue with Ray, Modin, Dask, and Dask Distributed. I've found that these problems only occur with parquet files (a single 52 GB CSV file poses no problem), so I'm wondering if there's something weird with pyarrow. I'm looking into getting modin to work with fastparquet, but so far it doesn't quite work out-of-the-box.

I'd love to chat more about potential solutions and potential sources of the problem — my email is in my Github profile.

@chriscalip
Copy link

2023-12-29 Also experiencing out-of-core not working for csv,parquet on 32 GB ram using modin[dask],modin[ray],modin[unidist]..mpi

@anmyachev
Copy link
Collaborator

@anmyachev I checked my logs and yes there were these lines:

UserWarning: Defaulting to pandas implementation.
Please refer to https://modin.readthedocs.io/en/stable/supported_apis/defaulting_to_pandas.html for explanation.
Reason: Parquet options that are not currently supported

@zaichang One thing that still bothers me is that when reading parquet it says that parquet options are not supported. Are you still seeing this message? Do you specify in the reproducer all the parameters you used for read_parquet?

@seydar It is very interesting that such a large csv file works, but the parquet file does not. If your setup is different from the one in this issue, could you create a separate one? We would investigate this issue.

@chriscalip Could you also create a separate issue with all the details that you are able to provide for us?

@chriscalip
Copy link

Just to note I am not clear:

  1. modin-ray does not work with large csv file bigger than RAM.
  2. modin-ray does not work with large parquet bigger than RAM.
  3. modin-ray does not work with large SQL result bigger than RAM.

@zaichang
Copy link
Author

zaichang commented Feb 6, 2024

@anmyachev Thanks for checking in. I recently re-checked my test and indeed I am not seeing the read_parquet unsupported options message any more. Internally we had unrelated code that cause this behaviour because of overriding the factory via a different fsspec backend.

I've corrected the above in my tests but the main issue as described in this ticket is still reproducible using a 16GB machine trying to read either a 10GB parquet file or CSV. @chriscalip you need to give modin+ray enough headroom when ingesting data, and I think a rough heuristic is that you most likely need more than double the RAM of your dataset size.

Dependencies used (on 2024-01-30)

	modin               : 0.24.1
	ray                 : 2.7.0
	s3fs                : 2023.12.2

Log

>>> df = pd.read_parquet('s3://my-bucket/data/10GB.parquet',
...         storage_options={
...             "key": AWS_ACCESS_KEY_ID,
...             "secret": AWS_SECRET_ACCESS_KEY,
...             "token": AWS_SESSION_TOKEN,
...         },
...     )
2024-01-30 00:05:21,586 WARNING worker.py:2058 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 2f6bf2d0fb91197755b067b75679c283e922ad4402000000 Worker ID: b929a90b47a71f4aaaebe8379913371c1098eda456ba3ac21d19e3ec Node ID: ea88f3a85a2fb13476918391f91c321b29357bbf63f4125b595ad774 Worker IP address: ec2-3-80-103-25.compute-1.amazonaws.com Worker port: 10022 Worker PID: 22633 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
.
.
.
(raylet, ip=10.0.3.103) [2024-01-30 00:12:41,840 E 12751 12751] (raylet) node_manager.cc:3007: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: ea88f3a85a2fb13476918391f91c321b29357bbf63f4125b595ad774, IP: ec2-3-80-103-25.compute-1.amazonaws.com) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip ec2-3-80-103-25.compute-1.amazonaws.com`
(raylet, ip=10.0.3.103) 
(raylet, ip=10.0.3.103) Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/conda/lib/python3.9/site-packages/modin/utils.py", line 484, in wrapped
    return func(*params.args, **params.kwargs)
  File "/opt/conda/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 129, in run_and_log
    return obj(*args, **kwargs)
  File "/opt/conda/lib/python3.9/site-packages/modin/pandas/io.py", line 299, in read_parquet
    query_compiler=FactoryDispatcher.read_parquet(
  File "/opt/conda/lib/python3.9/site-packages/modin/core/execution/dispatching/factories/dispatcher.py", line 195, in read_parquet
    return cls.get_factory()._read_parquet(**kwargs)
  File "/opt/conda/lib/python3.9/site-packages/modin/core/execution/dispatching/factories/factories.py", line 212, in _read_parquet
    return cls.io_cls.read_parquet(**kwargs)
  File "/opt/conda/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 129, in run_and_log
    return obj(*args, **kwargs)
  File "/opt/conda/lib/python3.9/site-packages/modin/core/io/file_dispatcher.py", line 158, in read
    query_compiler = cls._read(*args, **kwargs)
  File "/opt/conda/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 129, in run_and_log
    return obj(*args, **kwargs)
  File "/opt/conda/lib/python3.9/site-packages/modin/core/io/column_stores/parquet_dispatcher.py", line 791, in _read
    return cls.build_query_compiler(
  File "/opt/conda/lib/python3.9/site-packages/modin/logging/logger_decorator.py", line 129, in run_and_log
    return obj(*args, **kwargs)
  File "/opt/conda/lib/python3.9/site-packages/modin/core/io/column_stores/parquet_dispatcher.py", line 672, in build_query_compiler
    row_lengths = [part.length() for part in remote_parts.T[0]]
  File "/opt/conda/lib/python3.9/site-packages/modin/core/io/column_stores/parquet_dispatcher.py", line 672, in <listcomp>
    row_lengths = [part.length() for part in remote_parts.T[0]]
  File "/opt/conda/lib/python3.9/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py", line 277, in length
    self._length_cache = RayWrapper.materialize(self._length_cache)
  File "/opt/conda/lib/python3.9/site-packages/modin/core/execution/ray/common/engine_wrapper.py", line 92, in materialize
    return ray.get(obj_id)
  File "/opt/conda/lib/python3.9/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.9/site-packages/ray/_private/worker.py", line 2549, in get
    raise value
ray.exceptions.OutOfMemoryError: Task was killed due to the node running low on memory.

@seydar
Copy link
Contributor

seydar commented Feb 6, 2024

@zaichang It looks like you and I are working on similar problems, I'd love to chat more if you're interested on trading notes — my email is ari [at] aribrown [period] com.

@anmyachev
Copy link
Collaborator

@anmyachev Thanks for checking in. I recently re-checked my test and indeed I am not seeing the read_parquet unsupported options message any more. Internally we had unrelated code that cause this behaviour because of overriding the factory via a different fsspec backend.

I've corrected the above in my tests but the main issue as described in this ticket is still reproducible using a 16GB machine trying to read either a 10GB parquet file or CSV. @chriscalip you need to give modin+ray enough headroom when ingesting data, and I think a rough heuristic is that you most likely need more than double the RAM of your dataset size.

Dependencies used (on 2024-01-30)

	modin               : 0.24.1
	ray                 : 2.7.0
	s3fs                : 2023.12.2

@zaichang thanks for the information!

Sometimes objects in memory take up much more space than on disk. I'm wondering if this is the case. Could you check this using pure pandas and the https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.memory_usage.html#pandas.DataFrame.memory_usage method?

BTW, do you use Ray dashboard? If so, could you look at the load distribution between the nodes? Interesting information is whether all nodes are involved and how the load is distributed right before the moment of the memory error.

@Liquidmasl
Copy link

Liquidmasl commented Jul 3, 2024

Hey guys, i am on the hunt for a solution for our OOM problems and stumbled over this thread. |
I am more confused as before

@chriscalip you need to give modin+ray enough headroom when ingesting data, and I think a rough heuristic is that you most likely need more than double the RAM of your dataset size.

how does this not defeat the purpose?

From the docs:

Not only does Modin let you work with datasets that are too large to fit in memory, we can perform various operations on them without worrying about memory constraints.

Modin solves this problem by spilling over to disk, in other words, it uses your disk as an overflow for memory so that you can work with datasets that are too large to fit in memory.

We work with pointclouds with hundreds of millions of rows. So a file might be 300gb, with your heuristic of needing twice the memory, we can load everything into memory, and still have 300gb left. If we have everything in memory we could just use pandas, right?
What am I missing here?
Can I ingest the data in 100 parts and concat them together to get around the ingest issue? (like they kinda do in the docs?)

@chriscalip
Copy link

@Liquidmasl I concur.

@YarShev
Copy link
Collaborator

YarShev commented Jul 4, 2024

While Modin on Ray is able to handle OOM datasets, that gets tricky to handle when loading data from a file. An underlying execution engine (Ray in this case) is not aware of the data that hasn't been put into the storage yet so it doesn't know what data to spill and when. After the data is kept in the storage, the engine can spill a portion of data to disk if there is no enough room for processing. You should take into account the size of data when loading it from a file. If there are no enough resources in your system to load data, you should consider data load by portions or move to a machine that has enough resources to handle your data size.

Can I ingest the data in 100 parts and concat them together to get around the ingest issue? (like they kinda do in the docs?)

This approach should work.

@seydar
Copy link
Contributor

seydar commented Jul 8, 2024

Ingesting the data in 100 parts is now tough to do, because you'd have to use pyarrow to read in the file and then split it, and pyarrow suffers from the safe bug IME.

Maybe you could manually read in the row groups and then split it? (Not sure which library would support this)

@chriscalip
Copy link

I suggest while this bug is active the claim "Out-of-memory data with Modin", https://modin.readthedocs.io/en/stable/getting_started/why_modin/out_of_core.html , should be removed from docs. Modin is currently unable to process beyond host computer available memory.

@Liquidmasl
Copy link

Liquidmasl commented Aug 5, 2024

This approach should work.

So I tried that. I read in a ply file 100 mil rows at a time, storing them in their own modin dataframes, which i put into a list.
This step works fine, actually a lot better then i thought.

But when I try to concatenate them using pd.concat(dfs, ignore_index=True) My RAM gradually fills up, then the page file fills up, until everything freezes. and i mean literaly everything.
After 2 hours I force shutdown my pc because the faces from the people in the meeting i was in on my screen where still there from 2 hours ago, mouse still didnt move and i could not get to the task manager.

I dont know whats going wrong, its quite difficult to figure out aswell because everything stops working.

Anything I am doing wrong? I kinda really need to get this to work D:

EDIT:
I just tried again with less rows per segment it it went though in absolutely no time at all.
I am currently investigating if it just worked or something burned and crashed without me noticing

EditEdit:
I did have a typo, but after fixing this and reducing the number of rows per part, it did load and concaternate successfully.
A raylet died when saving to parquet though. but only the first 3 times i tried. when i finally got the ray dashboard to run it .. just saved successfully.

EditEditEdit:

Summery and return to the original issue of the post:

So there is some weird stuff happining here:
How to make the saving work:

  1. Load dataset in chunks
  2. make said data to a bunch of modin dataframes, stored in a list
  3. concatenate dataframes to one big dataframe

This works, remarkably fast even!
Now saving as parquet it gets weird here. I have a hard time figuring out when what happend. It seams the behaviour it not very consistent at all.

import modin.pandas as pd

large_df.to_parquet(path)

leads to a dead raylet. Unsure why, possibly memory issue

import ray
import modin.pandas as pd

large_df.to_parquet(path)

Leads to.. sometimes

  • bluescreen
  • filling up ram and page until pc is frozen for good
  • filling up tam and page file, then releasing ram and page but not actually finishing, just stuck in stasis (pc works fine though)
import ray
ray.init()
import modin.pandas as pd

large_df.to_parquet(path)

Leads to two initialized ray instances, but saving works (takes for ever though)! (RAM is generally chill all the time)

But now that I managed to save the parquet file...

Back to .parquet loading

I have saved the same data twice,

  • with pandas native to_parquet() - resulting in one single .parquet file (used different pc with 500gb ram)
  • with modin.pandas to_parquet() - resulting in a .parquet folder whith a bunch of .parquet files

Reading the single file with modins read_parquet() finishes in about a second (obviously not loading anything in memory)
Reading the .parquet folder though explodes the RAM until I get an OOM error.

This is obviously an issue.
Am I missing something?
Why would it cost so much ram to load a bunch of .parquet files? Especially because it costs basically none when reading just 1 file.
Do I have to manually iterate over the parquet files and concat the result again?

I will transfer this to its own issue as soon as I am sure my examples are correct, currently still testing (everything takes for ever)

@Liquidmasl
Copy link

Liquidmasl commented Aug 6, 2024

I suggest while this bug is active the claim "Out-of-memory data with Modin", https://modin.readthedocs.io/en/stable/getting_started/why_modin/out_of_core.html , should be removed from docs. Modin is currently unable to process beyond host computer available memory.

Ingesting the data in 100 parts is now tough to do, because you'd have to use pyarrow to read in the file and then split it, and pyarrow suffers from the safe bug IME.

Maybe you could manually read in the row groups and then split it? (Not sure which library would support this)

I did manage to load a dataset now.
After many hours of confusion.

To make any of this work
Do not initialize ray through modin, do it manually and set _memory and object_store_memory
See: #7361

Loading the initial dataset in batches, making those batches to modin dfs, and concatenating them worked

After saving that to a bunch of parquets, I did not manage to load them anymore with:

modins read_parquet() does not work, as it just fills up RAM and pagefile until computer dies or it crashes.
rays read_parquet() was also not successful for me, computer did not die, but raylet died when object store was full (did not spill)

what DID work was to just... iterate over the .parquet files in the .parquet folder, load them one by one in own dataframes, and then again concat them together at the end. Similar to how I initially loaded the dataset.
Does that suck? Well kinda. but at least i can save and load my dataset now.
It also still needs a bunch of RAM.
I think this could be done better if we could set the number of partitions in to_parquet() like we can in rays implementation with number_of_rows_per_file. Because larger .parquet part files leads to larger RAM needs on loading them.

I just found

That a partitioning column can be used on to_parquet() and then rays read_parquet() can load it again.
I guess because the seperate .parquet files are small enough, ray doesnt choke. Not only does it work without crashing, its incredibly fast aswell!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug 🦗 Something isn't working External Pull requests and issues from people who do not regularly contribute to modin Memory 💾 Issues related to memory P2 Minor bugs or low-priority feature requests Ray ⚡ Issues related to the Ray engine
Projects
None yet
Development

No branches or pull requests

7 participants