Skip to content

[QST]: p2p shuffle on large datasets #7380

@wence-

Description

@wence-

I'm attempting to use to p2p shuffle implementation (using the branch proposed for merge in #7326) to shuffle an ~1TB dataset.
The data exists on disk as ~300 parquet files (that each expand to around [edit 2GiB] in size, with 23 columns) and I'm trying to shuffle into around 300 output partitions and writing to parquet. The key column is a string (although I can convert to int or datetime if that would help), the other columns are a mix of string, int, and float.

This is on a machine with 1TB RAM, and 40 cores. I run like so:

from pathlib import Path

import dask.dataframe as dd
from distributed import Client, LocalCluster

if __name__ == "__main__":
    cluster = LocalCluster(n_workers=40)
    client = Client(cluster)
    inputdir = Path(".../input")
    outputdir = Path(".../output-shuffled/")
    ddf = dd.read_parquet(inputdir, split_row_groups=False)

    ddf = ddf.shuffle('key', shuffle="p2p")

    ddf.to_parquet(outputdir / "store_sales")

This progresses quite well for a while, with peak memory usage hitting ~600GB, at some point though, some workers reach 95% their memory limits and are then killed by the nanny.

Am I configuring things wrong? Do I need to switch on anything else? Or should I not be expecting this to work right now?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions