-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduler worker reconnect drops messages #6341
Scheduler worker reconnect drops messages #6341
Conversation
There are still many other things in `add_worker` that aren't tested and should be
key, | ||
"memory", | ||
stimulus_id, | ||
worker=address, | ||
nbytes=nbytes[key], | ||
typename=types[key], | ||
) | ||
recommendations, client_msgs, worker_msgs = t |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is @fjetter's key fix. Notice we were overwriting client_msgs, worker_msgs
on each iteration.
async def test_add_worker(s, a, b): | ||
w = Worker(s.address, nthreads=3) | ||
w.data["x-5"] = 6 | ||
w.data["y"] = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the test that probably should have caught the regression, but it didn't because Scheduler.add_worker
only cares about nbytes
. By just adding data, but not corresponding TaskStates, nbytes
was empty in the worker's reconnection message:
distributed/distributed/worker.py
Lines 1110 to 1117 in e23f0ea
nbytes={ | |
ts.key: ts.get_nbytes() | |
for ts in self.tasks.values() | |
# Only if the task is in memory this is a sensible | |
# result since otherwise it simply submits the | |
# default value | |
if ts.state == "memory" | |
}, |
The keys
field of this message is unused by the scheduler and should probably be removed. It should be redundant to nbytes
anyway.
|
||
assert w.ip in s.host_info | ||
assert s.host_info[w.ip]["addresses"] == {a.address, b.address, w.address} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous test wasn't really testing much besides this. Whether x-5
would come from normal compute on worker a
, or from memory on the new worker, was a race condition, and was untested anyway.
assert w.ip in s.host_info | ||
assert s.host_info[w.ip]["addresses"] == {a.address, w.address} | ||
|
||
assert await c.gather([anywhere, l1, l2]) == [1, 2, 3] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this updated test fails on main
(it times out waiting here, because the client message saying l1
and l2
are done is overwritten and never sent).
My updated test is failing locally as well. Need to look into this more next week. |
This should no longer be relevant after #6361 |
When a worker connects, it sends a list of data it already has. On the scheduler side, we were handling each item and transitioning it to
memory
. But on each iteration, we overwrite the list of messages to send to the client and worker. So if a worker reconnected with >1 key in memory, we'd drop critical instructions to send to clients and the worker.Progress towards #6228, #5480.
Pulled out from #6272/#6329
pre-commit run --all-files
cc @crusaderky @fjetter @mrocklin