-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
I'm attempting to use to p2p shuffle implementation (using the branch proposed for merge in #7326) to shuffle an ~1TB dataset.
The data exists on disk as ~300 parquet files (that each expand to around [edit 2GiB] in size, with 23 columns) and I'm trying to shuffle into around 300 output partitions and writing to parquet. The key column is a string (although I can convert to int or datetime if that would help), the other columns are a mix of string, int, and float.
This is on a machine with 1TB RAM, and 40 cores. I run like so:
from pathlib import Path
import dask.dataframe as dd
from distributed import Client, LocalCluster
if __name__ == "__main__":
cluster = LocalCluster(n_workers=40)
client = Client(cluster)
inputdir = Path(".../input")
outputdir = Path(".../output-shuffled/")
ddf = dd.read_parquet(inputdir, split_row_groups=False)
ddf = ddf.shuffle('key', shuffle="p2p")
ddf.to_parquet(outputdir / "store_sales")This progresses quite well for a while, with peak memory usage hitting ~600GB, at some point though, some workers reach 95% their memory limits and are then killed by the nanny.
Am I configuring things wrong? Do I need to switch on anything else? Or should I not be expecting this to work right now?