-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Local Cluster Unable to Handle Smaller-than-Memory Parquet File #8383
Comments
There are a couple of problems here that are introduced by the very small instance type. While the file itself appears to be small, deserializing it requires quite a bit of memory overhead. This is due to the various encodings and compression within the parquet dataset. You should always account easily for an overhead of two to three when deserializing a parquet file. This is also the domain of The other "problem" is that you are introducing an artificial data fragmentation by splitting up the data into many small chunks. This also incurs memory overhead during the Last but not least, by operating on such a small instance you very easily hit the spilling thresholds (defined by default at 60% / 70% see distributed/distributed/distributed.yaml Lines 155 to 156 in 05ba316
The configuration here is set with defaults for a common standard deployment in mind where the task sizes are typically much smaller than the memory size. In your case, the loading of the initial task, requires so much memory that these default values no longer make sense. If you insist on running on such a small VM I strongly recommend adjusting those values. |
@fjetter Thank you for the detailed feedback! The memory overhead makes sense, but it's still shocking given that the file is 288.6 MB on a fresh system with 8 GB of RAM (6.8 GB free). Adding
(Full log here: error.log) It ran for an 40 minutes before I killed it; it restarted the worker 4 times and then killed the whole process, as is tradition. The process with the high unmanaged memory is python's It feels like there's something weird going on with spilling the data while setting the index, because when I DON'T set the index, it works flawlessly.
I guess I thought that setting the index would be important for future operations that could parallelize across the index, but that might be too costly. Should I split each agent (my index) into different parquet files? And when I set the index and include
Code: local_cluster.py (it's pretty much unchanged from the above code) I guess my outstanding questions are:
|
unmanaged memory is often caused by the function you are running, spilling, network comms, etc. It is still the usage of the dask process but it is not some stored data but some dynamically allocated memory. Documentation about this is available here.
I cannot answer this without having more context about your use cases. If you are looking for guidance about how to structure your data, I recommend opening a thread in https://dask.discourse.group/ with more details about your problem. Whether setting an index is worth it depends on what you want to do.
Setting an index on agents is straight forward and I would recommend doing This will load your file and set the index on the single file without any distributed compute logic. I don't sufficiently understand why you'd want to split your file in the first place. This makes everything much more complex. If you insist on this split, you can call Generally speaking, if you notice that the 8GB instance is having memory issues I strongly recommend going to a larger instance type instead of attaching a lot of disk to it. Most of the time it is much cheaper to use more memory and have computations finish faster than be slow while using disk. Disk should ideally only be used to buffer temporary spikes and avoid out of memory and not be used to manage your data size. |
Thank you for the detailed response! Sorry for being vague — the project I'm working on has 4.9B rows in a DB (AWS Redshift) that track where robots drive, and while you can theoretically do all of the data analysis you need in SQL, I'm trying to set up a platform to do it in Python with Dask to make it easier to apply machine learning techniques. I'm expecting that the data will grow in the future, so I am trying to figure out the issue of dealing with datasets that are significantly larger than the available memory. The columns are "agent", "x", "y", "time". I'm looking to partition based on "agent" because all of our processing is split by agent. My plan is to solve it with one small machine, then solve it with several small machines, and then move on to more real-world sizes — that's why I've been so reticent to simply use a larger machine. Looking at my file (s3://ari-public-test-data/test1), it's got 32 row groups of the standard size (1048576 rows). I'll open up a discussion on the group, thank you. That said, I think there's something weird going on with the runaway memory. I'm not running any extra processes, and I can see that the process that's consuming all the memory belongs to the parent dask process. Is there something specific I can show to prove that? |
Describe the issue:
I am trying to read in a parquet file (288.6 MB as parquet, 1017 MB in memory) while using a dask LocalCluster. It is split into 10,000 partitions, each 104 KB each.
Using a
m5.large
AWS EC2 instance (8 GB RAM, 512 GB disk) and a local dask cluster, dask crashes when trying to load a reasonable parquet file (288.6 MB as parquet, 1017 MB when in memory according tomemory_usage()
). There is nothing else running on this instance.Minimal Complete Verifiable Example:
Error:
Anything else we need to know?:
This is running on a single AWS EC2 instance (
m5.large
), which has 8 GB RAM, 512 GB disk space, and 2 vCPUs.The S3 file is publicly accessible, so anyone can test against it.
Environment:
m5.large
The text was updated successfully, but these errors were encountered: