Skip to content

Ctrl-C during compute/map/submit/etc. doesn't stop sending data to scheduler #6572

Open
@gjoseph92

Description

@gjoseph92

Interrupting a compute call (or many others) does not stop trying to send the futures to the scheduler.

If I accidentally make a huge bad graph and call compute on it

df = pd.read_csv(
    "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/taxis.csv"
)
@delayed
def calculate_metric(df: pd.DataFrame, i: int):
    return (df.tip + i).mean()
results = [calculate_metric(df, i) for i in range(1000)]  # `df` is embedded in every task

dask.compute(results)

Nothing happens for a long time. I see that I'm uploading lots of data to the cluster:
Screen Shot 2022-06-13 at 7 16 53 PM

I realize what I did was bad practice.

I click the stop button in my jupyter notebook. I see a KeyboardInterrupt message which makes me think it's been stopped.

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
File <timed exec>:1, in <module>

File ~/Library/Caches/pypoetry/virtualenvs/workshop-3-dask-anti-patterns-vZD_dh8A-py3.9/lib/python3.9/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    597     keys.append(x.__dask_keys__())
    598     postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
    601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/Library/Caches/pypoetry/virtualenvs/workshop-3-dask-anti-patterns-vZD_dh8A-py3.9/lib/python3.9/site-packages/distributed/client.py:3000, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2998         should_rejoin = False
   2999 try:
-> 3000     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3001 finally:
   3002     for f in futures.values():

File ~/Library/Caches/pypoetry/virtualenvs/workshop-3-dask-anti-patterns-vZD_dh8A-py3.9/lib/python3.9/site-packages/distributed/client.py:2174, in Client.gather(self, futures, errors, direct, asynchronous)
   2172 else:
   2173     local_worker = None
-> 2174 return self.sync(
   2175     self._gather,
   2176     futures,
   2177     errors=errors,
   2178     direct=direct,
...
--> 316         gotit = waiter.acquire(True, timeout)
    317     else:
    318         gotit = waiter.acquire(False)

I update my code to not embed the 3mb dataframe in every task, but just reference it as a key. It should only need to be uploaded once:

delayed_df_value = delayed(df)
results = [calculate_metric(delayed_df_value, i) for i in range(1000)]

dask.compute(results)

Yet this still takes forever. I look back at activity monitor. I see python3 is still sending data!

It's still uploading my old giant graph. And that's hogging all the network bandwidth so my new computation can't even get submitted.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions