You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
importcoiledimportdask.dataframeasddfromdask.distributedimportwaitcluster=coiled.Cluster(
n_workers=30,
worker_cpu=4,
region="us-east-2", # start workers close to data to minimize costsarm=True,
)
client=cluster.get_client()
# this takes 1m21sdf=dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/")
df=df.set_index("request_datetime", shuffle="tasks").persist()
_=wait(df)
# this takes 2m12sdf=dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/")
df=df.set_index("request_datetime", shuffle="p2p").persist()
_=wait(df)
GIL contention is very high during the p2p shuffle (also during tasks) and cpu usage is at 100+%, implying, maybe, that the creation/deletion of lots of Python objects is slowing us down considerably.
I'm wondering how to address this best. In your example, the string columns are of type string[python], so converting back to that type feels like the right thing to do for me even if it's overly expensive. Maybe converting string[python] to string[arrow] during a P2P shuffle if dataframe.convert-string is set and raising a warning of we encounter a string[python] column and it's not set would be the right way?
Brief update about this in case somebody stumbled over it.
with current main we're mostly at the same performance as tasks. However, when disabling disk (dask.config.set({"distributed.p2p.disk": True})) entirely we're at 10s so disk appears to slow us down much more than one would naively assume (this is trying to be addressed in #8323)
GIL contention is very high during the p2p shuffle (also during tasks) and cpu usage is at 100+%, implying, maybe, that the creation/deletion of lots of Python objects is slowing us down considerably.
cc @hendrikmakait @jrbourbeau
The text was updated successfully, but these errors were encountered: