Open
Description
Describe the issue:
I am trying to read in a parquet file (288.6 MB as parquet, 1017 MB in memory) while using a dask LocalCluster. It is split into 10,000 partitions, each 104 KB each.
Using a m5.large
AWS EC2 instance (8 GB RAM, 512 GB disk) and a local dask cluster, dask crashes when trying to load a reasonable parquet file (288.6 MB as parquet, 1017 MB when in memory according to memory_usage()
). There is nothing else running on this instance.
Minimal Complete Verifiable Example:
import dask.dataframe as dd
from dask.distributed import Client
if __name__ == "__main__":
client = Client()
manifest_files = ['s3://ari-public-test-data/test1'] # this is publicly accessible
try:
print("loading data")
df = dd.read_parquet(manifest_files)
print("partitioning")
divisions = list(range(0, 10001))
df = df.set_index('agent', divisions=divisions)
print("persisting")
dp = df.persist()
print("memory usage")
print(dp.memory_usage().compute())
print("count()")
print(dp.count().compute())
print()
print("done!!!")
finally:
print("closing client")
Error:
[ubuntu@ip-10-0-15-130: ~] time python3 local_cluster.py
loading data
partitioning
persisting
memory usage
2023-12-01 19:42:11,723 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:34075 (pid=3313) exceeded 95% memory budget. Restarting...
2023-12-01 19:42:11,836 - distributed.nanny - WARNING - Restarting worker
2023-12-01 19:42:21,763 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:46241 (pid=3310) exceeded 95% memory budget. Restarting...
2023-12-01 19:42:21,866 - distributed.nanny - WARNING - Restarting worker
2023-12-01 19:42:29,624 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:42711 (pid=3342) exceeded 95% memory budget. Restarting...
2023-12-01 19:42:29,725 - distributed.nanny - WARNING - Restarting worker
2023-12-01 19:42:37,263 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:35713 (pid=3358) exceeded 95% memory budget. Restarting...
2023-12-01 19:42:37,837 - distributed.nanny - WARNING - Restarting worker
closing client
Traceback (most recent call last):
File "/home/ubuntu/local_cluster.py", line 21, in <module>
print(dp.memory_usage().compute())
File "/home/ubuntu/.local/lib/python3.10/site-packages/dask/base.py", line 342, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/home/ubuntu/.local/lib/python3.10/site-packages/dask/base.py", line 628, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/ubuntu/.local/lib/python3.10/site-packages/distributed/client.py", line 2244, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: Attempted to run task ('assign-46e32864dd546867a697dca0d415c584', 0) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:35713. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
real 0m52.494s
user 0m35.338s
sys 0m14.703s
Anything else we need to know?:
This is running on a single AWS EC2 instance (m5.large
), which has 8 GB RAM, 512 GB disk space, and 2 vCPUs.
The S3 file is publicly accessible, so anyone can test against it.
Environment:
- Dask version: 2023.11.0
- Pyarrow version: 14.0.1
- Python version: 3.10.12
- Operating System: Ubuntu 22.04.3 LTS
- Install method (conda, pip, source): pip
- Instance type:
m5.large
- Cloud: AWS