Description
Interrupting a compute
call (or many others) does not stop trying to send the futures to the scheduler.
If I accidentally make a huge bad graph and call compute
on it
df = pd.read_csv(
"https://raw.githubusercontent.com/mwaskom/seaborn-data/master/taxis.csv"
)
@delayed
def calculate_metric(df: pd.DataFrame, i: int):
return (df.tip + i).mean()
results = [calculate_metric(df, i) for i in range(1000)] # `df` is embedded in every task
dask.compute(results)
Nothing happens for a long time. I see that I'm uploading lots of data to the cluster:
I realize what I did was bad practice.
I click the stop button in my jupyter notebook. I see a KeyboardInterrupt message which makes me think it's been stopped.
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
File <timed exec>:1, in <module>
File ~/Library/Caches/pypoetry/virtualenvs/workshop-3-dask-anti-patterns-vZD_dh8A-py3.9/lib/python3.9/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
597 keys.append(x.__dask_keys__())
598 postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/Library/Caches/pypoetry/virtualenvs/workshop-3-dask-anti-patterns-vZD_dh8A-py3.9/lib/python3.9/site-packages/distributed/client.py:3000, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2998 should_rejoin = False
2999 try:
-> 3000 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3001 finally:
3002 for f in futures.values():
File ~/Library/Caches/pypoetry/virtualenvs/workshop-3-dask-anti-patterns-vZD_dh8A-py3.9/lib/python3.9/site-packages/distributed/client.py:2174, in Client.gather(self, futures, errors, direct, asynchronous)
2172 else:
2173 local_worker = None
-> 2174 return self.sync(
2175 self._gather,
2176 futures,
2177 errors=errors,
2178 direct=direct,
...
--> 316 gotit = waiter.acquire(True, timeout)
317 else:
318 gotit = waiter.acquire(False)
I update my code to not embed the 3mb dataframe in every task, but just reference it as a key. It should only need to be uploaded once:
delayed_df_value = delayed(df)
results = [calculate_metric(delayed_df_value, i) for i in range(1000)]
dask.compute(results)
Yet this still takes forever. I look back at activity monitor. I see python3
is still sending data!
It's still uploading my old giant graph. And that's hogging all the network bandwidth so my new computation can't even get submitted.