Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local Cluster Unable to Handle Smaller-than-Memory Parquet File #8383

Open
seydar opened this issue Dec 1, 2023 · 4 comments
Open

Local Cluster Unable to Handle Smaller-than-Memory Parquet File #8383

seydar opened this issue Dec 1, 2023 · 4 comments
Labels
discussion Discussing a topic with no specific actions yet

Comments

@seydar
Copy link

seydar commented Dec 1, 2023

Describe the issue:

I am trying to read in a parquet file (288.6 MB as parquet, 1017 MB in memory) while using a dask LocalCluster. It is split into 10,000 partitions, each 104 KB each.

Using a m5.large AWS EC2 instance (8 GB RAM, 512 GB disk) and a local dask cluster, dask crashes when trying to load a reasonable parquet file (288.6 MB as parquet, 1017 MB when in memory according to memory_usage()). There is nothing else running on this instance.

Minimal Complete Verifiable Example:

import dask.dataframe as dd
from   dask.distributed import Client

if __name__ == "__main__":
    client = Client()

    manifest_files = ['s3://ari-public-test-data/test1'] # this is publicly accessible

    try:
        print("loading data")
        df    = dd.read_parquet(manifest_files)

        print("partitioning")
        divisions   = list(range(0, 10001))
        df          = df.set_index('agent', divisions=divisions)

        print("persisting")
        dp = df.persist()

        print("memory usage")
        print(dp.memory_usage().compute())

        print("count()")
        print(dp.count().compute())

        print()
        print("done!!!")

    finally:
        print("closing client")

Error:

[ubuntu@ip-10-0-15-130: ~] time python3 local_cluster.py
loading data
partitioning
persisting
memory usage
2023-12-01 19:42:11,723 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:34075 (pid=3313) exceeded 95% memory budget. Restarting...
2023-12-01 19:42:11,836 - distributed.nanny - WARNING - Restarting worker
2023-12-01 19:42:21,763 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:46241 (pid=3310) exceeded 95% memory budget. Restarting...
2023-12-01 19:42:21,866 - distributed.nanny - WARNING - Restarting worker
2023-12-01 19:42:29,624 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:42711 (pid=3342) exceeded 95% memory budget. Restarting...
2023-12-01 19:42:29,725 - distributed.nanny - WARNING - Restarting worker
2023-12-01 19:42:37,263 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:35713 (pid=3358) exceeded 95% memory budget. Restarting...
2023-12-01 19:42:37,837 - distributed.nanny - WARNING - Restarting worker
closing client
Traceback (most recent call last):
  File "/home/ubuntu/local_cluster.py", line 21, in <module>
    print(dp.memory_usage().compute())
  File "/home/ubuntu/.local/lib/python3.10/site-packages/dask/base.py", line 342, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/dask/base.py", line 628, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/distributed/client.py", line 2244, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: Attempted to run task ('assign-46e32864dd546867a697dca0d415c584', 0) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:35713. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

real	0m52.494s
user	0m35.338s
sys	0m14.703s

Anything else we need to know?:

This is running on a single AWS EC2 instance (m5.large), which has 8 GB RAM, 512 GB disk space, and 2 vCPUs.

The S3 file is publicly accessible, so anyone can test against it.

Environment:

  • Dask version: 2023.11.0
  • Pyarrow version: 14.0.1
  • Python version: 3.10.12
  • Operating System: Ubuntu 22.04.3 LTS
  • Install method (conda, pip, source): pip
  • Instance type: m5.large
  • Cloud: AWS
@seydar seydar changed the title Local Cluster Unable to Handle Reasonable Parquet File Local Cluster Unable to Handle Smaller-than-Memory Parquet File Dec 1, 2023
@fjetter
Copy link
Member

fjetter commented Dec 13, 2023

There are a couple of problems here that are introduced by the very small instance type.

While the file itself appears to be small, deserializing it requires quite a bit of memory overhead. This is due to the various encodings and compression within the parquet dataset. You should always account easily for an overhead of two to three when deserializing a parquet file. This is also the domain of pyarrow and something we cannot truly help with. To work around this, you can start your cluster with Client(n_workers=1) which will start a LocalCluster with one worker and two threads instead of two workers with one thread each. The benefit here is that the single worker is allowed to use all the memory instead of having to share. Instead, you could also set an artificial memory_limit as an argument or even disable this entirely with memory_limit=None.

The other "problem" is that you are introducing an artificial data fragmentation by splitting up the data into many small chunks. This also incurs memory overhead during the set_index operation. Testing this myself, it looks like you are better off with setting shuffle='tasks' in the set_index method (cc @hendrikmakait P2P for a single partition is a bit nonsense. We should special case this)

Last but not least, by operating on such a small instance you very easily hit the spilling thresholds (defined by default at 60% / 70% see

target: 0.60 # fraction of managed memory where we start spilling to disk
spill: 0.70 # fraction of process memory where we start spilling to disk
). Spilling will also require us to copy data (to serialize and write to disk) which in your case could actually just kill the worker.
The configuration here is set with defaults for a common standard deployment in mind where the task sizes are typically much smaller than the memory size. In your case, the loading of the initial task, requires so much memory that these default values no longer make sense. If you insist on running on such a small VM I strongly recommend adjusting those values.

@fjetter fjetter added discussion Discussing a topic with no specific actions yet and removed needs triage labels Dec 13, 2023
@seydar
Copy link
Author

seydar commented Jan 9, 2024

@fjetter Thank you for the detailed feedback!

The memory overhead makes sense, but it's still shocking given that the file is 288.6 MB on a fresh system with 8 GB of RAM (6.8 GB free).

Adding n_workers=1 results in a HUGE change — the program now actually runs for more than 54 seconds, but now we see runaway processes eating the unmanaged memory (which is strange, because I would expect the process to be managed by Dask):

[ubuntu@ip-10-0-15-130: ~] time python3 local_cluster.py
loading data
partitioning
persisting
memory usage
2024-01-09 14:27:09,302 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 5.32 GiB -- Worker memory limit: 7.46 GiB
2024-01-09 14:27:09,958 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 6.02 GiB -- Worker memory limit: 7.46 GiB
2024-01-09 14:27:19,936 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 6.82 GiB -- Worker memory limit: 7.46 GiB
<snip many instances of the unmanaged memory warning and the high memory usage warning>

(Full log here: error.log)

It ran for an 40 minutes before I killed it; it restarted the worker 4 times and then killed the whole process, as is tradition.

The process with the high unmanaged memory is python's spawned process, so I believe it's coming from distributed.


It feels like there's something weird going on with spilling the data while setting the index, because when I DON'T set the index, it works flawlessly.

[ubuntu@ip-10-0-15-130: ~] time python3 local_cluster.py
loading data
persisting
memory usage
Index          128
agent    266666664
time     266666664
x        266666664
y        266666664
dtype: int64
count()
time     33333333
x        33333333
y        33333333
agent    33333333
dtype: int64

done!!!
closing client

real	0m23.358s
user	0m7.708s
sys	0m3.730s

I guess I thought that setting the index would be important for future operations that could parallelize across the index, but that might be too costly. Should I split each agent (my index) into different parquet files?


And when I set the index and include shuffle='tasks', it works... sorta. It somehow creates an index for every individual row as opposed to the divisions that I tried to set (sounds like my code is off) (it also takes significantly longer):

[ubuntu@ip-10-0-15-130: ~] time python3 local_cluster.py
loading data
partitioning
persisting
memory usage
Index    266666664
time     266666664
x        266666664
y        266666664
dtype: int64
count()
time    33333333
x       33333333
y       33333333
dtype: int64

done!!!
closing client

real	3m13.257s
user	3m31.867s
sys	0m19.796s

Code: local_cluster.py (it's pretty much unchanged from the above code)


I guess my outstanding questions are:

  1. What is going on with the spilling and the runaway unmanaged memory when setting an index?
  2. How do I set my indices by agent?

@fjetter
Copy link
Member

fjetter commented Jan 10, 2024

What is going on with the spilling and the runaway unmanaged memory when setting an index?

unmanaged memory is often caused by the function you are running, spilling, network comms, etc. It is still the usage of the dask process but it is not some stored data but some dynamically allocated memory. Documentation about this is available here.

I guess I thought that setting the index would be important for future operations that could parallelize across the index, but that might be too costly. Should I split each agent (my index) into different parquet files?

I cannot answer this without having more context about your use cases. If you are looking for guidance about how to structure your data, I recommend opening a thread in https://dask.discourse.group/ with more details about your problem.

Whether setting an index is worth it depends on what you want to do.

How do I set my indices by agent?

Setting an index on agents is straight forward and I would recommend doing ddf.persist().set_index('agent').persist()

This will load your file and set the index on the single file without any distributed compute logic. I don't sufficiently understand why you'd want to split your file in the first place. This makes everything much more complex. If you insist on this split, you can call repartition on that indexed dataframe afterwards. This should avoid shuffling.

Generally speaking, if you notice that the 8GB instance is having memory issues I strongly recommend going to a larger instance type instead of attaching a lot of disk to it. Most of the time it is much cheaper to use more memory and have computations finish faster than be slow while using disk. Disk should ideally only be used to buffer temporary spikes and avoid out of memory and not be used to manage your data size.
Using more instances might also be an option for you if this happens and you have a lot of partitions. There are various cloud deployment solutions (see here) that do this for you. Some are open source like Dask Cloud Provider while others are fully managed services like Coiled (It has a free tier; Disclaimer: I'm working for Coiled)

@seydar
Copy link
Author

seydar commented Feb 23, 2024

Thank you for the detailed response!

Sorry for being vague — the project I'm working on has 4.9B rows in a DB (AWS Redshift) that track where robots drive, and while you can theoretically do all of the data analysis you need in SQL, I'm trying to set up a platform to do it in Python with Dask to make it easier to apply machine learning techniques. I'm expecting that the data will grow in the future, so I am trying to figure out the issue of dealing with datasets that are significantly larger than the available memory.

The columns are "agent", "x", "y", "time". I'm looking to partition based on "agent" because all of our processing is split by agent.

My plan is to solve it with one small machine, then solve it with several small machines, and then move on to more real-world sizes — that's why I've been so reticent to simply use a larger machine.

Looking at my file (s3://ari-public-test-data/test1), it's got 32 row groups of the standard size (1048576 rows).

I'll open up a discussion on the group, thank you.


That said, I think there's something weird going on with the runaway memory. I'm not running any extra processes, and I can see that the process that's consuming all the memory belongs to the parent dask process. Is there something specific I can show to prove that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet
Projects
None yet
Development

No branches or pull requests

2 participants