Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST]: p2p shuffle on large datasets #7380

Open
wence- opened this issue Dec 8, 2022 · 21 comments
Open

[QST]: p2p shuffle on large datasets #7380

wence- opened this issue Dec 8, 2022 · 21 comments
Labels

Comments

@wence-
Copy link
Contributor

wence- commented Dec 8, 2022

I'm attempting to use to p2p shuffle implementation (using the branch proposed for merge in #7326) to shuffle an ~1TB dataset.
The data exists on disk as ~300 parquet files (that each expand to around [edit 2GiB] in size, with 23 columns) and I'm trying to shuffle into around 300 output partitions and writing to parquet. The key column is a string (although I can convert to int or datetime if that would help), the other columns are a mix of string, int, and float.

This is on a machine with 1TB RAM, and 40 cores. I run like so:

from pathlib import Path

import dask.dataframe as dd
from distributed import Client, LocalCluster

if __name__ == "__main__":
    cluster = LocalCluster(n_workers=40)
    client = Client(cluster)
    inputdir = Path(".../input")
    outputdir = Path(".../output-shuffled/")
    ddf = dd.read_parquet(inputdir, split_row_groups=False)

    ddf = ddf.shuffle('key', shuffle="p2p")

    ddf.to_parquet(outputdir / "store_sales")

This progresses quite well for a while, with peak memory usage hitting ~600GB, at some point though, some workers reach 95% their memory limits and are then killed by the nanny.

Am I configuring things wrong? Do I need to switch on anything else? Or should I not be expecting this to work right now?

@fjetter
Copy link
Member

fjetter commented Dec 8, 2022

there is a "hidden dashboard" <dashboard-address>/shuffle which exposes some instrumentation. I'd be interested to see what this shows. If you are actually seeing memory peaks that are caused by the shuffle, that means our buffers are not flushing fast enough and this should be visible there.

Otherwise, a peak at the very end could mean this is the to_parquet serialization which likely runs in parallel. In your setup, every worker should have 1 CPU and 25GB of RAM? I'm wondering how large an output partition actually is. Is it possible that the distribution of key is so severely skewed that there are partitions of significant size s.t. a worker may run OOM?

@fjetter
Copy link
Member

fjetter commented Dec 8, 2022

The key/partition distribution can be calculated with

ddf = ddf.shuffle('key', shuffle="p2p")
len_per_partition = ddf.map_partitions(len).compute()

which just counts the length of every partition. If you can run this without hitting an OOM, it's likely parquet that is causing you trouble

Lastly, in which "stage" are the workers dropping? Still during transfer (keys: shuffle-transfer-foo) or during unpack (keys: shuffle-p2p-bar)?

@fjetter fjetter added the shuffle label Dec 8, 2022
@wence-
Copy link
Contributor Author

wence- commented Dec 8, 2022

Is it possible that the distribution of key is so severely skewed that there are partitions of significant size s.t. a worker may run OOM?

There is one key that is much more prevalent than others, so yes, this could be a cause. Am seeing if the shuffle + map_partitions completes.

@wence-
Copy link
Contributor Author

wence- commented Dec 8, 2022

Actually, it looks like I don't get as far as the shuffle proper at all, a worker gets killed when the assign tasks are still processing. Is the problem here that the entire dataset is being persisted into memory?

If I run with split_row_groups=True then I do start the shuffle proper, so that may be the issue.

With split_row_groups=True and upping the default disconnect timeout from 30s, the shuffle aspect seems to be progressing without large memory footprint on any of the workers (peaking at around 3-3.5GB/worker).

@wence-
Copy link
Contributor Author

wence- commented Dec 8, 2022

Eventually that approach died in the map_partitions call (or somewhere just before that when the shuffle was finishing and trying to bring the files back into memory [this is a guess]). This might well be due to the one large partition (which probably doesn't fit in 25GB).

Log
2022-12-08 09:02:07,054 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2022-12-08 09:02:08,475 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2022-12-08 09:02:15,253 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2022-12-08 09:02:18,151 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2022-12-08 09:02:27,590 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2022-12-08 09:02:29,278 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2022-12-08 09:06:04,567 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:36829 (pid=64157) exceeded 95% memory budget. Restarting...
2022-12-08 09:06:13,492 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:36829'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:36829'
2022-12-08 09:06:13,541 - distributed.nanny - WARNING - Restarting worker
2022-12-08 09:06:13,693 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:13,694 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:36829'

2022-12-08 09:06:13,714 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:36829'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:36829'
2022-12-08 09:06:13,893 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:13,894 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:36829'

2022-12-08 09:06:13,895 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:36829'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:36829'
2022-12-08 09:06:13,957 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 370)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 370, None)
kwargs:    {}
Exception: "AssertionError('Shuffle worker restrictions misbehaving')"

Traceback (most recent call last):
  File "/.../doodles/shuffle-poc/shuffle-cpu-p2p.py", line 13, in <module>
    final_partition_sizes = ddf.map_partitions(len).compute()
  File "/.../lib/python3.9/site-packages/dask/base.py", line 315, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/.../lib/python3.9/site-packages/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/client.py", line 3122, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/.../lib/python3.9/site-packages/distributed/client.py", line 2291, in gather
    return self.sync(
  File "/.../lib/python3.9/site-packages/distributed/utils.py", line 339, in sync
    return sync(
  File "/.../lib/python3.9/site-packages/distributed/utils.py", line 406, in sync
    raise exc.with_traceback(tb)
  File "/.../lib/python3.9/site-packages/distributed/utils.py", line 379, in f
    result = yield future
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/.../lib/python3.9/site-packages/distributed/client.py", line 2154, in _gather
    raise exception.with_traceback(traceback)
  File "/.../lib/python3.9/site-packages/distributed/shuffle/_shuffle.py", line 52, in shuffle_unpack
    return _get_worker_extension().get_output_partition(id, output_partition)
  File "/.../lib/python3.9/site-packages/distributed/shuffle/_shuffle_extension.py", line 510, in get_output_partition
    assert shuffle_id in self.shuffles, "Shuffle worker restrictions misbehaving"
AssertionError: Shuffle worker restrictions misbehaving
2022-12-08 09:06:14,640 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:14,640 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:36829'

2022-12-08 09:06:15,224 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:36829'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:36829'
2022-12-08 09:06:15,227 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:15,227 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:36829'

2022-12-08 09:06:15,239 - distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:42867 -> tcp://127.0.0.1:34869
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/worker.py", line 1756, in get_data
    response = await comm.read(deserializers=serializers)
  File "/.../lib/python3.9/site-packages/distributed/comm/tcp.py", line 241, in read
    convert_stream_closed_error(self, e)
  File "/.../lib/python3.9/site-packages/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed)  local=tcp://127.0.0.1:42867 remote=tcp://127.0.0.1:44938>: Stream is closed
2022-12-08 09:06:15,374 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:15,433 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:15,434 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:15,436 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:15,645 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:15,646 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:15,647 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:15,846 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:15,846 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:15,848 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:16,047 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:16,047 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:16,048 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:16,247 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:16,248 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:16,249 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:16,442 - distributed.worker - ERROR - Exception during execution of task ('len-8eea6c270bcecac2047e8e7606d0fc80', 2524).
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/worker.py", line 2351, in _prepare_args_for_execution
    data[k] = self.data[k]
  File "/.../lib/python3.9/site-packages/zict/buffer.py", line 108, in __getitem__
    raise KeyError(key)
KeyError: "('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 2524)"

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/worker.py", line 2233, in execute
    args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs)
  File "/.../lib/python3.9/site-packages/distributed/worker.py", line 2355, in _prepare_args_for_execution
    data[k] = Actor(type(self.state.actors[k]), self.address, k, self)
KeyError: "('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 2524)"
2022-12-08 09:06:16,448 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:16,448 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:16,449 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:16,649 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:16,649 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:16,651 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:16,850 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:16,850 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:16,852 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:16,969 - distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:43725 -> tcp://127.0.0.1:32905
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/worker.py", line 1756, in get_data
    response = await comm.read(deserializers=serializers)
  File "/.../lib/python3.9/site-packages/distributed/comm/tcp.py", line 241, in read
    convert_stream_closed_error(self, e)
  File "/.../lib/python3.9/site-packages/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed)  local=tcp://127.0.0.1:43725 remote=tcp://127.0.0.1:48500>: Stream is closed
2022-12-08 09:06:17,050 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:17,050 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:17,052 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:17,251 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:17,252 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:17,253 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:17,452 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:17,453 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:17,454 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:17,652 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:17,653 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:17,654 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:17,853 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:17,854 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:17,855 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:18,054 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:18,054 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:18,055 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:18,255 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:18,255 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:18,256 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:18,456 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:18,456 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:18,457 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'
2022-12-08 09:06:18,466 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2022-12-08 09:06:18,466 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2022-12-08 09:06:18,467 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2022-12-08 09:06:18,468 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing
2022-12-08 09:06:18,468 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing
2022-12-08 09:06:18,468 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2022-12-08 09:06:18,469 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2022-12-08 09:06:18,469 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2022-12-08 09:06:18,469 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2022-12-08 09:06:18,469 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2022-12-08 09:06:18,469 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2022-12-08 09:06:18,470 - distributed.nanny - WARNING - Worker process still alive after 3.1999989318847657 seconds, killing
2022-12-08 09:06:18,470 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2022-12-08 09:06:18,470 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2022-12-08 09:06:18,470 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing
2022-12-08 09:06:18,472 - distributed.nanny - WARNING - Worker process still alive after 3.1999989318847657 seconds, killing
2022-12-08 09:06:18,657 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-08 09:06:18,657 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "/.../lib/python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "/.../lib/python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

2022-12-08 09:06:18,658 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:34781'
Traceback (most recent call last):
  File "/.../lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:34781'

@fjetter
Copy link
Member

fjetter commented Dec 9, 2022

The actual exception above is a bit hidden

2022-12-08 09:06:13,957 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 370)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 370, None)
kwargs:    {}
Exception: "AssertionError('Shuffle worker restrictions misbehaving')"

cc @hendrikmakait

@wence-
Copy link
Contributor Author

wence- commented Dec 9, 2022

My guess is that this is due to a single large partition which dask is trying to materialise on a single worker and blowing out the memory budget. Since I know this, I can manually split that partition and rather than calling DataFrame.shuffle directly call rearrange_by_column with my newly determined partitioning column. Will give that a go and see where we get to.

@fjetter
Copy link
Member

fjetter commented Dec 9, 2022

Maybe that's indeed due to a memory error but we would expect a different exception. The exception you got is something that should not be possible with #7326

Can you tell us which commit you were using?

@wence-
Copy link
Contributor Author

wence- commented Dec 9, 2022

Sorry, I screwed up and was running on c9f75b1, let me try again..., this time with 3c90f50

@wence-
Copy link
Contributor Author

wence- commented Dec 9, 2022

Here's an updated log, same kind of failure, but slightly different traceback.

Log
Python 3.9.15 | packaged by conda-forge | (main, Nov 22 2022, 08:45:29) 
Type 'copyright', 'credits' or 'license' for more information
IPython 8.7.0 -- An enhanced Interactive Python. Type '?' for help.
Dask version: 2022.12.0
Distributed version: 2022.12.0+93.g3c90f508
2022-12-09 02:17:02,817 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-4xif3l_a', purging
2022-12-09 02:17:05,748 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-gzct06gz', purging
2022-12-09 02:17:07,582 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-xhvnyh9q', purging
2022-12-09 02:17:10,862 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-7s7qbn13', purging
2022-12-09 02:17:13,966 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-16rzm02x', purging
2022-12-09 02:17:14,587 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-qyufaj8f', purging
2022-12-09 02:17:18,198 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-ohtovouo', purging
2022-12-09 02:17:21,173 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-ro8jh0we', purging
2022-12-09 02:17:24,167 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-4ks8rk4s', purging
2022-12-09 02:17:28,394 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-nbc_el50', purging
2022-12-09 02:17:32,127 - distributed.diskutils - INFO - Found stale lock file and directory '.../tmp/dask-worker-space/worker-y26kr8g2', purging
...//python3.9/contextlib.py:126: UserWarning: Creating scratch directories is taking a surprisingly long time. (30.98s) This is often due to running workers on a network file system. Consider specifying a local-directory to point workers to write scratch data to a local disk.
  next(self.gen)
2022-12-09 02:48:55,092 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2022-12-09 02:48:56,388 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2022-12-09 02:48:58,266 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2022-12-09 02:49:03,829 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2022-12-09 02:49:39,181 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:37605 (pid=52494) exceeded 95% memory budget. Restarting...
2022-12-09 02:49:53,112 - distributed.nanny - WARNING - Restarting worker
2022-12-09 02:49:53,345 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:53,349 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:53,350 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:53,386 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:53,551 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:53,551 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:53,554 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:53,754 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:53,755 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:53,759 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:53,955 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:53,956 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:53,961 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:54,161 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:54,161 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:54,215 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:54,364 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:54,365 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:54,410 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:54,574 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:54,575 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:54,613 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:54,777 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:54,778 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:54,787 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:54,978 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:54,978 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:54,986 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:55,196 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:55,197 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:55,221 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:55,397 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:55,397 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:55,404 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:55,598 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:55,599 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:55,606 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:55,799 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:55,799 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:55,812 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:55,999 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:56,000 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:56,005 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:56,201 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:56,201 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:56,228 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:56,402 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:56,402 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:56,421 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:56,607 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:56,608 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:56,745 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:56,809 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:56,809 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:56,840 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:57,013 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:57,014 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:57,037 - distributed.dashboard.components.scheduler - ERROR - 'tcp://127.0.0.1:37605'
Traceback (most recent call last):
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'
2022-12-09 02:49:57,160 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 190)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 190, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,162 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 124)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 124, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,173 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 197)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 197, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,175 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 170)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 170, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,194 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 213)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 213, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,199 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 139)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 139, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,213 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 216)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 216, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,213 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 131)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 131, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,241 - bokeh.util.tornado - ERROR - Error thrown from periodic callback:
2022-12-09 02:49:57,242 - bokeh.util.tornado - ERROR - Traceback (most recent call last):
  File "...//python3.9/site-packages/tornado/gen.py", line 526, in callback
    result_list.append(f.result())
  File "...//python3.9/site-packages/bokeh/server/session.py", line 95, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "...//python3.9/site-packages/bokeh/server/session.py", line 229, in with_document_locked
    return func(*args, **kwargs)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 450, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 408, in invoke_with_curdoc
    return f()
  File "...//python3.9/site-packages/bokeh/document/callbacks.py", line 449, in invoke
    return f(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 41, in <lambda>
    doc.add_periodic_callback(lambda: update(ref), interval)
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/__init__.py", line 49, in update
    comp.update()
  File "...//python3.9/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "...//python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3960, in update
    if self.scheduler.workers[worker].last_seen < now - 5:
KeyError: 'tcp://127.0.0.1:37605'

2022-12-09 02:49:57,246 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 183)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 183, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,246 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 168)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 168, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,274 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 126)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 126, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,278 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 196)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 196, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,300 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 115)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 115, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,305 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 173)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 173, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,327 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 195)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 195, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,336 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 187)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 187, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,541 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 165)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 165, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,547 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 113)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 113, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,782 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 143)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 143, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:57,782 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 218)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 218, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:58,282 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 1220)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 1220, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:58,429 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 863)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 863, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:58,954 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 54)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 54, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:59,308 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 392)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 392, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

2022-12-09 02:49:59,335 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 1215)
Function:  shuffle_unpack
args:      ('06e936888527f1380d9e3c9f5f1027b4', 1215, None)
kwargs:    {}
Exception: "RuntimeError('shuffle_unpack failed during shuffle 06e936888527f1380d9e3c9f5f1027b4')"

@fjetter
Copy link
Member

fjetter commented Dec 9, 2022

The dashboard errors are more less expected. WE haven't fixed the dashboard in case a worker is dying. A worker is definitely dying during the shuffle. We haven't implemented a restart logic for it, yet. See #7353

@fjetter
Copy link
Member

fjetter commented Dec 9, 2022

Is the problem here that the entire dataset is being persisted into memory

The entire dataset doesn't need to fit into memory. A single partition has to fit, of course (plus a little room in case pandas/arrow decides to copy stuff). If you are failing already at a task that is before the actual shuffle, you might have a partition that is too large.
I would recommend then to run the same example with fewer workers, s.t. the memory limit of an individual worker is bigger.

You can also run a sanity check to ensure there is nothing super weird going on, e.g.

ddf = dd.read_parquet(inputdir, split_row_groups=False)
len(ddf)

This loads the entire data once and returns the row count. If this fails already, your machine is too small, or rather your workers are too small

The data exists on disk as ~300 parquet files (that each expand to around 300MB in size

... what I'm trying to say. Maybe there is the one bad file that expands to 20-30GB. IDK

@fjetter
Copy link
Member

fjetter commented Dec 9, 2022

I assume this is not a public dataset, is it?

@wence-
Copy link
Contributor Author

wence- commented Dec 9, 2022

I assume this is not a public dataset, is it?

Unfortunately not, AFAIK.

@wence-
Copy link
Contributor Author

wence- commented Dec 9, 2022

Is the problem here that the entire dataset is being persisted into memory

The entire dataset doesn't need to fit into memory. A single partition has to fit, of course (plus a little room in case pandas/arrow decides to copy stuff). If you are failing already at a task that is before the actual shuffle, you might have a partition that is too large. I would recommend then to run the same example with fewer workers, s.t. the memory limit of an individual worker is bigger.

You can also run a sanity check to ensure there is nothing super weird going on, e.g.

ddf = dd.read_parquet(inputdir, split_row_groups=False)
len(ddf)

This loads the entire data once and returns the row count. If this fails already, your machine is too small ;)

The data exists on disk as ~300 parquet files (that each expand to around 300MB in size

... what I'm trying to say. Maybe there is the one bad file that expands to 20-30GB. IDK

I know the input files all expand to sane sizes (have tested this) [updated initial comment, it's always in the 2-4GiB range], so parquet-wise the inputs are sane. My reading of the errors, and perhaps you disagree, is that an (or maybe a few) output partitions are large (to the point that one output partition doesn't fit in a single worker's memory). I will try with fewer workers to see if upping the individual memory limit helps.

@fjetter
Copy link
Member

fjetter commented Dec 9, 2022

I know the input files all expand to sane sizes (have tested this) [updated initial comment, it's always in the 2-4GiB range], so parquet-wise the inputs are sane. My reading of the errors, and perhaps you disagree, is that an (or maybe a few) output partitions are large (to the point that one output partition doesn't fit in a single worker's memory). I will try with fewer workers to see if upping the individual memory limit helps.

Possibly. One important thing to note is that at the moment the entire dataset has to fit onto disk but not in memory. Maybe your disk size is too small? I am surprised you do not see a better exception. We might have been a bit too zealous with stripping "irrelevant" traces in #7326

@hendrikmakait
Copy link
Member

We might have been a bit too zealous with stripping "irrelevant" traces in #7326

It sure looks like it. The way that I read these logs, a worker gets restarted because it is running out of memory, which causes the scheduler extension to fail the shuffle. This in turn triggers a cascade of errors in the shuffle_unpack tasks once we propagate the shuffle failure to workers, these would be the errors that we see in the logs.

@wence-
Copy link
Contributor Author

wence- commented Dec 15, 2022

I tried overcommitting memory in the workers and eventually we nearly completed, but I think one worker (which needed to read 49GiB from disk) was removed because it didn't heartbeat for 5 mins:

Dask version: 2022.12.0
Distributed version: 2022.12.0+95.gbc317e20
2022-12-15 09:23:02,459 - distributed.scheduler - WARNING - Worker failed to heartbeat within 300 seconds. Closing: <WorkerState 'tcp://127.0.0.1:36461', name: 19, status: running, memory: 119, processing: 105>

Remind me how I up that limit, I think I should then manage to get this to complete. DASK_DISTRIBUTED__SCHEDULER__WORKER__TTL=3600s ?

@mrocklin
Copy link
Member

I assume this is not a public dataset, is it?

Unfortunately not, AFAIK.

I'm curious, would you be comfortable creating a mimic if this functionality existed?: dask/dask#9766

@wence-
Copy link
Contributor Author

wence- commented Dec 16, 2022

I'm curious, would you be comfortable creating a mimic if this functionality existed?: dask/dask#9766

In principle I have no objection to doing this, but would need to read the EULA on the data carefully to know exactly what that would need to cover.

In this particular instance, I have a hunch about what is causing things to fall over, and so I think I can by hand construct a synthetic dataset that will provoke the issue.

@wence-
Copy link
Contributor Author

wence- commented Dec 16, 2022

To follow up here, I was able to get the following script to run to completion:

This was on a machine with 40 physical cores and 1TB of RAM.

I needed to set:

export DASK_DISTRIBUTED__SCHEDULER__WORKER__TTL=3600s
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=3600s
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP=3600s

(Probably they didn't need to be that high, but belt-and-braces)

I also need to overcommit the memory limit for each worker to 100GiB.

The reason for this, and the previous failures, is that this dataset has a very skewed distribution for the shuffle key. In particular, there is a single key value that corresponds to around 5% of the total rows (this leads to one worker peaking at 80GiB memory usage when performing the len calculation, where all others sit comfortably around 4GiB).

The dataset has 2879987999 total rows, and the largest output partition has 132603535 rows.

In this particular instance, I know that downstream I don't need to do a merge of the dataset on this key (it's just a pre-sorting step), and so with the prior of the skewed key distribution I could write code to manually construct a better partitioning key. I wonder to what extent that might be automated. One could imagine extending the interface to allow the user to provide a prior on the key distribution that allows the shuffling mechanism to make sensible decisions.

In any case, having figured out the issues, I can, if it is interesting, construct a synthetic datasets that would allow you to test things too (I think one can also replicate the problem at a smaller scale by just doing the same thing but having tighter worker limits).

from pathlib import Path

import dask
import dask.dataframe as dd
import distributed
from distributed import Client, LocalCluster

if __name__ == "__main__":

    print("Dask version:", dask.__version__)
    print("Distributed version:", distributed.__version__)
    cluster = LocalCluster(n_workers=20, memory_limit="100GiB")
    client = Client(cluster)
    inputdir = Path(".../input/")
    outputdir = Path(".../shuffled/")
    ddf = dd.read_parquet(inputdir, split_row_groups=True)
    ddf = ddf.shuffle('shuffle_key', shuffle="p2p")
    final_partition_sizes = ddf.map_partitions(len).compute()
    print(f"Num out partitions = {len(final_partition_sizes)}")
    print(final_partition_sizes.max(), final_partition_sizes.min())
    print(final_partition_sizes)
Complete log (not fully error/warning-free)
Dask version: 2022.12.0
Distributed version: 2022.12.0+95.gbc317e20
.../lib/python3.9/contextlib.py:126: UserWarning: Creating scratch directories is taking a surprisingly long time. (6.11s) This is often due to running workers on a network file system. Consider specifying a local-directory to point workers to write scratch data to a local disk.
  next(self.gen)
'NoneType' object has no attribute 'add_next_tick_callback'
Traceback (most recent call last):
  File ".../lib/python3.9/site-packages/distributed/utils.py", line 742, in wrapper
    return await func(*args, **kwargs)
  File ".../lib/python3.9/site-packages/distributed/dashboard/components/shared.py", line 315, in cb
    self.doc().add_next_tick_callback(lambda: self.update(prof, metadata))
AttributeError: 'NoneType' object has no attribute 'add_next_tick_callback'
2022-12-16 03:54:12,792 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f7afefda070>>, <Task finished name='Task-4397943' coro=<ProfileTimePlot.trigger_update.<locals>.cb() done, defined at .../lib/python3.9/site-packages/distributed/utils.py:740> exception=AttributeError("'NoneType' object has no attribute 'add_next_tick_callback'")>)
Traceback (most recent call last):
  File ".../lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File ".../lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File ".../lib/python3.9/site-packages/distributed/utils.py", line 742, in wrapper
    return await func(*args, **kwargs)
  File ".../lib/python3.9/site-packages/distributed/dashboard/components/shared.py", line 315, in cb
    self.doc().add_next_tick_callback(lambda: self.update(prof, metadata))
AttributeError: 'NoneType' object has no attribute 'add_next_tick_callback'
'NoneType' object has no attribute 'add_next_tick_callback'
Traceback (most recent call last):
  File ".../lib/python3.9/site-packages/distributed/utils.py", line 742, in wrapper
    return await func(*args, **kwargs)
  File ".../lib/python3.9/site-packages/distributed/dashboard/components/shared.py", line 315, in cb
    self.doc().add_next_tick_callback(lambda: self.update(prof, metadata))
AttributeError: 'NoneType' object has no attribute 'add_next_tick_callback'
2022-12-16 03:54:21,762 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f7afefda070>>, <Task finished name='Task-4412619' coro=<ProfileTimePlot.trigger_update.<locals>.cb() done, defined at .../lib/python3.9/site-packages/distributed/utils.py:740> exception=AttributeError("'NoneType' object has no attribute 'add_next_tick_callback'")>)
Traceback (most recent call last):
  File ".../lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File ".../lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File ".../lib/python3.9/site-packages/distributed/utils.py", line 742, in wrapper
    return await func(*args, **kwargs)
  File ".../lib/python3.9/site-packages/distributed/dashboard/components/shared.py", line 315, in cb
    self.doc().add_next_tick_callback(lambda: self.update(prof, metadata))
AttributeError: 'NoneType' object has no attribute 'add_next_tick_callback'
Num out partitions = 4367
132603535 0
0        869862
1       1986682
2        868497
3             0
4             0
         ...   
4362          0
4363          0
4364          0
4365     867148
4366          0
Length: 4367, dtype: int64
2022-12-16 03:55:45,969 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File ".../lib/python3.9/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File ".../lib/python3.9/site-packages/distributed/worker.py", line 1213, in heartbeat
    response = await retry_operation(
  File ".../lib/python3.9/site-packages/distributed/utils_comm.py", line 400, in retry_operation
    return await retry(
  File ".../lib/python3.9/site-packages/distributed/utils_comm.py", line 385, in retry
    return await coro()
  File ".../lib/python3.9/site-packages/distributed/core.py", line 1210, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File ".../lib/python3.9/site-packages/distributed/core.py", line 975, in send_recv
    response = await comm.read(deserializers=deserializers)
  File ".../lib/python3.9/site-packages/distributed/comm/tcp.py", line 241, in read
    convert_stream_closed_error(self, e)
  File ".../lib/python3.9/site-packages/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:47992 remote=tcp://127.0.0.1:35821>: Stream is closed
2022-12-16 03:55:45,972 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File ".../lib/python3.9/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File ".../lib/python3.9/site-packages/distributed/worker.py", line 1213, in heartbeat
    response = await retry_operation(
  File ".../lib/python3.9/site-packages/distributed/utils_comm.py", line 400, in retry_operation
    return await retry(
  File ".../lib/python3.9/site-packages/distributed/utils_comm.py", line 385, in retry
    return await coro()
  File ".../lib/python3.9/site-packages/distributed/core.py", line 1210, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File ".../lib/python3.9/site-packages/distributed/core.py", line 975, in send_recv
    response = await comm.read(deserializers=deserializers)
  File ".../lib/python3.9/site-packages/distributed/comm/tcp.py", line 241, in read
    convert_stream_closed_error(self, e)
  File ".../lib/python3.9/site-packages/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:47998 remote=tcp://127.0.0.1:35821>: Stream is closed

@quasiben quasiben moved this to In Progress in cuDF/Dask/Numba/UCX Jan 6, 2023
@quasiben quasiben moved this to Todo in cuDF/Dask/Numba/UCX Jan 6, 2023
@shwina shwina moved this from In Progress to TODO in cuDF/Dask/Numba/UCX Jan 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants