Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P shuffle is slow with string dtypes #7880

Open
Tracked by #8043
mrocklin opened this issue Jun 2, 2023 · 4 comments
Open
Tracked by #8043

P2P shuffle is slow with string dtypes #7880

mrocklin opened this issue Jun 2, 2023 · 4 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Jun 2, 2023

import coiled
import dask.dataframe as dd
from dask.distributed import wait

cluster = coiled.Cluster(
    n_workers=30,
    worker_cpu=4,
    region="us-east-2",  # start workers close to data to minimize costs
    arm=True,
)

client = cluster.get_client()
# this takes 1m21s
df = dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/")
df = df.set_index("request_datetime", shuffle="tasks").persist()
_ = wait(df)
# this takes 2m12s
df = dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/")
df = df.set_index("request_datetime", shuffle="p2p").persist()
_ = wait(df)

GIL contention is very high during the p2p shuffle (also during tasks) and cpu usage is at 100+%, implying, maybe, that the creation/deletion of lots of Python objects is slowing us down considerably.

cc @hendrikmakait @jrbourbeau

@hendrikmakait hendrikmakait self-assigned this Jun 5, 2023
@hendrikmakait
Copy link
Member

I'm wondering how to address this best. In your example, the string columns are of type string[python], so converting back to that type feels like the right thing to do for me even if it's overly expensive. Maybe converting string[python] to string[arrow] during a P2P shuffle if dataframe.convert-string is set and raising a warning of we encounter a string[python] column and it's not set would be the right way?

@mrocklin
Copy link
Member Author

mrocklin commented Jun 5, 2023

Sorry, I had the config option set. Dtypes coming in were string[pyarrow].

@mrocklin
Copy link
Member Author

mrocklin commented Jun 5, 2023

dask.config.set({"dataframe.convert-string": True})  # use PyArrow strings by default

@fjetter
Copy link
Member

fjetter commented Nov 16, 2023

Brief update about this in case somebody stumbled over it.

with current main we're mostly at the same performance as tasks. However, when disabling disk (dask.config.set({"distributed.p2p.disk": True})) entirely we're at 10s so disk appears to slow us down much more than one would naively assume (this is trying to be addressed in #8323)

Method Duration
tasks 1min 8s
p2p 1min 15s
p2p (w/out disk) 10s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants