Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove worker reconnect #6361

Merged
merged 21 commits into from
May 20, 2022
Merged

Remove worker reconnect #6361

merged 21 commits into from
May 20, 2022

Conversation

gjoseph92
Copy link
Collaborator

When a worker disconnects from the scheduler, close it immediately instead of trying to reconnect.

Also prohibit workers from joining if they have data in memory, as an alternative to #6341.

Just seeing what CI does for now, to figure out which tests to change/remove.

Closes #6350

  • Tests added / passed
  • Passes pre-commit run --all-files

@github-actions
Copy link
Contributor

github-actions bot commented May 18, 2022

Unit Test Results

       15 files  ±  0         15 suites  ±0   7h 7m 11s ⏱️ - 5m 36s
  2 795 tests  -   6    2 714 ✔️  -   7    79 💤 ±0  2 +1 
20 725 runs   - 45  19 803 ✔️  - 45  920 💤  - 1  2 +1 

For more details on these failures, see this check.

Results for commit 305a23f. ± Comparison against base commit ff94776.

♻️ This comment has been updated with latest results.

@gjoseph92
Copy link
Collaborator Author

Everything is green except one flaky test_stress_scatter_death #6305, and a single distributed/tests/test_client.py::test_restart_timeout_is_logged on Windows is leaking a thread:

E               AssertionError: (<Thread(AsyncProcess Dask Worker process (from Nanny) watch process join, started daemon 5744)>, ['  File "C:\\Minico...rocessing\\popen_spawn_win32.py", line 108, in wait
E                 \tres = _winapi.WaitForSingleObject(int(self._handle), msecs)
E                 '])
E               assert False

That feels flaky to me, but I guess it's possible this is related?

distributed/cli/dask_worker.py Show resolved Hide resolved
@@ -5058,43 +5026,21 @@ async def gather(self, keys, serializers=None):
)
result = {"status": "error", "keys": missing_keys}
with log_errors():
# Remove suspicious workers from the scheduler but allow them to
# reconnect.
# Remove suspicious workers from the scheduler and shut them down.
await asyncio.gather(
*(
self.remove_worker(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is maybe heavy-handed. retire_workers would probably be less disruptive, but slower and a little more complex. I don't love losing all the other keys on a worker just because it's missing one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind be heavy handed, especially if things are in a wonky state.

distributed/tests/test_client.py Show resolved Hide resolved
Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete review, but I'm stuck in meetings for the next bit

distributed/scheduler.py Show resolved Hide resolved
@@ -5058,43 +5026,21 @@ async def gather(self, keys, serializers=None):
)
result = {"status": "error", "keys": missing_keys}
with log_errors():
# Remove suspicious workers from the scheduler but allow them to
# reconnect.
# Remove suspicious workers from the scheduler and shut them down.
await asyncio.gather(
*(
self.remove_worker(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind be heavy handed, especially if things are in a wonky state.

distributed/tests/test_client.py Show resolved Hide resolved
distributed/tests/test_scheduler.py Show resolved Hide resolved
@mrocklin
Copy link
Member

In general I'm ok with what's here

Copy link
Collaborator Author

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we discussed offline was whether the Nanny should restart the worker in cases when the connection to the scheduler is broken unexpectedly. The answer is probably yes, but as this PR stands, Worker.close will always tell the Nanny it's closing gracefully, preventing restart:

if nanny and self.nanny:
with self.rpc(self.nanny) as r:
await r.close_gracefully()

As a follow-up PR, we should add an option to Worker.close to control this behavior.

distributed/cli/dask_worker.py Show resolved Hide resolved
distributed/scheduler.py Show resolved Hide resolved
distributed/scheduler.py Show resolved Hide resolved
distributed/tests/test_client.py Show resolved Hide resolved
distributed/tests/test_scheduler.py Show resolved Hide resolved
@mrocklin
Copy link
Member

One thing we discussed offline was whether the Nanny should restart the worker in cases when the connection to the scheduler is broken unexpectedly. The answer is probably yes, but as this PR stands, Worker.close will always tell the Nanny it's closing gracefully, preventing restart:

As a follow-up PR, we should add an option to Worker.close to control this behavior

I may not be understanding the statement above, but we already have the nanny= option to Worker.close, right? That's the code snippet that you've highlighted. I think the question is if there is a specific call of Worker.close when we want to pass nanny=False. Agreed?

@gjoseph92
Copy link
Collaborator Author

we already have the nanny= option to Worker.close

Great point, I just missed that. Yes, we can set nanny=False in these cases here. I'll do that.

@gjoseph92
Copy link
Collaborator Author

@mrocklin actually, I think maybe we should do this in a follow-up PR. It's pretty straightforward, but involves a little bit of thinking and a few more tests (the nanny is going to try to report the worker closure to the scheduler at the same time the worker's connection is breaking from it shutting down).

Easy enough, but I'd rather focus on the core change here and not increase the scope.

gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request May 19, 2022
`reconnect=True` (previous default) is now the only option. This is not a necessary change to make. It just simplifies things not not have it. See discussion in dask#6361 (comment).
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request May 19, 2022
Unnecessary after dask#6361 is merged
was leaking file descriptors, and needed an event loop
@gjoseph92
Copy link
Collaborator Author

@gjoseph92 gjoseph92 marked this pull request as ready for review May 20, 2022 02:02
@gjoseph92
Copy link
Collaborator Author

@fjetter @mrocklin I believe this is ready for final review. Follow-ups identified in #6384.

@mrocklin mrocklin merged commit f669f06 into dask:main May 20, 2022
@mrocklin
Copy link
Member

Thanks @gjoseph92 . This is in.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Remove worker reconnection logic
2 participants