-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduler worker reconnect drops messages #6341
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -370,25 +370,38 @@ async def test_clear_events_client_removal(c, s, a, b): | |||||||||||||||||
assert time() < start + 2 | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
@gen_cluster() | ||||||||||||||||||
async def test_add_worker(s, a, b): | ||||||||||||||||||
w = Worker(s.address, nthreads=3) | ||||||||||||||||||
w.data["x-5"] = 6 | ||||||||||||||||||
w.data["y"] = 1 | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the test that probably should have caught the regression, but it didn't because distributed/distributed/worker.py Lines 1110 to 1117 in e23f0ea
The |
||||||||||||||||||
@gen_cluster(nthreads=[("", 1)], client=True) | ||||||||||||||||||
async def test_add_worker(c, s, a): | ||||||||||||||||||
lock = Lock() | ||||||||||||||||||
|
||||||||||||||||||
dsk = {("x-%d" % i): (inc, i) for i in range(10)} | ||||||||||||||||||
s.update_graph( | ||||||||||||||||||
tasks=valmap(dumps_task, dsk), | ||||||||||||||||||
keys=list(dsk), | ||||||||||||||||||
client="client", | ||||||||||||||||||
dependencies={k: set() for k in dsk}, | ||||||||||||||||||
) | ||||||||||||||||||
s.validate_state() | ||||||||||||||||||
await w | ||||||||||||||||||
s.validate_state() | ||||||||||||||||||
async with lock: | ||||||||||||||||||
anywhere = c.submit(inc, 0, key="l-0") | ||||||||||||||||||
l1 = c.submit(lock.acquire, key="l-1") | ||||||||||||||||||
l2 = c.submit(lock.acquire, key="l-2") | ||||||||||||||||||
|
||||||||||||||||||
while not (sum(t.state == "processing" for t in s.tasks.values()) == 3): | ||||||||||||||||||
await asyncio.sleep(0.01) | ||||||||||||||||||
|
||||||||||||||||||
# Simulate a worker joining with necessary and unnecessary data. | ||||||||||||||||||
w = Worker(s.address, nthreads=1) | ||||||||||||||||||
w.update_data({"l-1": 2, "l-2": 3, "x": -1, "y": -2}) | ||||||||||||||||||
# `update_data` queues messages to send; we want to purely test `add_worker` logic | ||||||||||||||||||
w.batched_stream.buffer.clear() | ||||||||||||||||||
|
||||||||||||||||||
s.validate_state() | ||||||||||||||||||
await w | ||||||||||||||||||
s.validate_state() | ||||||||||||||||||
|
||||||||||||||||||
while not len(s.workers) == 2: | ||||||||||||||||||
await asyncio.sleep(0.01) | ||||||||||||||||||
|
||||||||||||||||||
assert w.ip in s.host_info | ||||||||||||||||||
assert s.host_info[w.ip]["addresses"] == {a.address, w.address} | ||||||||||||||||||
|
||||||||||||||||||
assert await c.gather([anywhere, l1, l2]) == [1, 2, 3] | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this updated test fails on |
||||||||||||||||||
assert "x" not in w.data | ||||||||||||||||||
assert "y" not in w.data | ||||||||||||||||||
|
||||||||||||||||||
assert w.ip in s.host_info | ||||||||||||||||||
assert s.host_info[w.ip]["addresses"] == {a.address, b.address, w.address} | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The previous test wasn't really testing much besides this. Whether |
||||||||||||||||||
await w.close() | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is @fjetter's key fix. Notice we were overwriting
client_msgs, worker_msgs
on each iteration.