-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add fail_hard decorator for worker methods #6210
Conversation
We add a decorator that we can apply to a worker method that closes the worker if an exception is encountered in that method. This is still a work in progress, and is just up here as a proof of concept.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any other methods that we think aren't covered here, but are common causes of issues
This should probably be on everything that gets called via loop.add_callback
. In worker.py
, I see that that's:
self.batched_stream.send
self.handle_scheduler
self.heartbeat
batched_send_connect
gather_dep
self.close
😬maybe_transition_long_running
client.run
—we can skip this one?
I'd also like to see this in core.py
, but that's a bigger story:
set_thread_ident
start_pcs
listener.stop
- async handlers for
handle_stream
xref Ensure exceptions in handlers are handled equally for sync and async #4734 - async
every_cycle
functions
It's interesting that transitions
isn't on that list, because it just gets called indirectly, likely via handle_scheduler
, gather_dep
, etc. If we just catch the concurrency entrypoints, we wouldn't actually need to wrap these internal functions like transitions
or validate_state
.
In fact, I wonder if doing that could make things even worse. For example, a transitions
error stemming from a compute-task
op will propagate up to handle_scheduler
. handle_scheduler
will already log and reraise the error... but then try to reconnect to the scheduler (or close!).
If we add a @fail_hard
to transitions
, this code path will still exist. But while handle_scheduler
is trying to reconnect or await self.close
, a separate self.close
callback will already be running in the background. So we're trying to reconnect and close at the same time. Which one is even proper behavior? (A while ago we discovered this reconnection logic is also broken #5480, so it's extra unclear what should happen.) But brittle as it is, reconnection usually works right now. If there's also a self.close
running in the background, I bet it won't. And what if the attempt to reconnect causes the self.close
to also throw an error? Now the worker is in an even more broken state, and it's still not closed.
I assume this isn't a special case to handle_scheduler
, but that many other places could have similar race conditions.
I do want to see more structure to our concurrency. I do think errors happening at all these places you've annotated should cause the worker to close. But I'm worried that, with how tangled things are, adding more unstructured concurrency, well-meaning and reasonable as it seems, could sometimes make things even worse.
distributed/worker.py
Outdated
except Exception as e: | ||
logger.exception(e) | ||
# TODO: send event to scheduler | ||
self.loop.add_callback(self.close, nanny=False, executor_wait=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if self.close
encounters an error? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we probably wouldn't close? I'm not sure that we can guarnatee against this case. Do you have some recommendation?
async def wrapper(self, *args, **kwargs): | ||
try: | ||
return await method(self, *args, **kwargs) | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except Exception as e: | |
except BaseException as e: |
This would fix #5958 if you also wrapped execute
(you should wrap execute
either way I think).
The fact that BaseExceptions in callbacks aren't propagated by Tornado is pretty crazy. If we're going to add manual support for propagating exceptions like this, I don't see why we'd let BaseExceptions be ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with catching BaseException for tasks, i.e. in apply_function
, et al. to fix #5958
However, I would be worried to close workers upon a asyncio.CancelledError
. While I don't think we're using cancellation in many places right now, this would be a very confusing behavior if that ever changes.
@gjoseph92 to be clear, are you suggesting that we shouldn't make this change? Or are you just bringing up the general concern that we're not certain about this change, but still think that something in this direction probably makes sense to do? |
I think we might be mixing up a few things. I agree that dealing with all Putting this stuff into core.py / Server is also an interesting approach but I think I would prefer a more structured approach there that thinks about how we deal with asyncio overall. the same is true for PeriodicCallbacks. That all related obviously to #6201 I consider this specific change as a "very crude exception handler for state validation exceptions". For this I would consider it useful to cover
|
I don't think we should make the change currently in this PR. I think the sorts of race conditions I pointed out have the potential to make things even worse.
I think we can come up with something that works though. The real problem we're trying to solve is that Tornado drops exceptions from callbacks. Let's focus on that problem. A few ideas:
Or we could go in a different direction, and set the worker event loop's exception handler. We keep saying "Tornado drops exceptions", but actually it's not Tornado's fault. Tornado is just using asyncio, and asyncio drops exceptions by default. But we could customize that behavior, and have it shut down the worker. This the most correct and reliable thing to do, since it'll automatically apply to everything. But it also wouldn't be possible to do with |
I think I agree with @fjetter here (or at least my interpretation of what @fjetter is saying) which is that I support avoiding add_callback more generally, but that I think that we can make incremental progress here. I think that it comes to this statement:
Yes, I agree that this is true. However, I also think that it is unlikely. In general I'm also trying to get something in by next release so that we can have a stop-gap measure for current state machine issues (and an easy way to provide future stop-gap solutions) while we spend time investigating. Given that, I'm also open to other approaches. Let me go through some of the options above with the "let's get something in in the next day or two" timeline in mind.
|
To option 1, which was "just use this on |
OK, for now I've removed fail_hard on anything non-asynchronous. I don't actually necessarily think that this is the right approach long-term, but I'm fine starting here, and I think that it's ok to start with something agreeable and leave discussion for later. |
Removing WIP title |
I've been playing with this with |
Hrm, nevermind. It's not capturing something failing in ensure_communicating yet (which should be captured because it's in handle_stream/handle_scheduler. 👀 |
Yeah, there is still a bit of work to do here. It's fun to dive into this though. I'm hopeful that we can get both the chaos testing to be much more sensitive (with this and other recent scheduler-event signals) and hopefully also expose other things that have been going on at the same time. |
@@ -1199,6 +1226,7 @@ async def heartbeat(self): | |||
finally: | |||
self.heartbeat_active = False | |||
|
|||
@fail_hard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once the decorator is here, what's the point of having this block anymore?
distributed/distributed/worker.py
Lines 1201 to 1210 in 198522b
finally: | |
if self.reconnect and self.status in Status.ANY_RUNNING: | |
logger.info("Connection to scheduler broken. Reconnecting...") | |
self.loop.add_callback(self.heartbeat) | |
else: | |
logger.info( | |
"Connection to scheduler broken. Closing without reporting. Status: %s", | |
self.status, | |
) | |
await self.close(report=False) |
The decorator is going to close the worker as soon as the finally
block completes. So what's the point of trying to reconnect if we're going to close either way? These seem like opposing behaviors.
I think we want to either remove the try/except entirely from handle_scheduler
(because all we want to do on error is close the worker, and @fail_hard
will do that for us anyway), or not use @fail_hard
here, if we do in fact still want to reconnect in the face of errors.
First off, there's this: distributed/distributed/core.py Lines 636 to 638 in 198522b
Also, sometimes distributed/distributed/worker.py Line 3100 in 198522b
which then calls ensure_communicating distributed/distributed/worker.py Lines 3142 to 3145 in 198522b
Same with Additionally, distributed/distributed/worker.py Lines 3640 to 3646 in 198522b
I think the fact that _handle_instructions currently only processes SendMessageToScheduler and Execute means that ensure_communicating won't be called as part of a done_callback on an asyncio task. (If that were the case, the problem would be that though _handle_stimulus_from_task does check the task's result to capture any exception the task may have raised, if it does raise an unexpected exception, that exception will just be propagated to the ether of the event loop (because _handle_stimulus_from_task is just a callback).)
But I think all that code-tracing above makes it clear that it's really hard, and tedious, to find every entrypoint to these functions we care about and add a
I'm not too concerned about try:
return await method(self, *args, **kwargs)
except Exception:
...
self.loop.add_callback(self.close, nanny=False, executor_wait=False)
# ^ what if self.close fails in the callback? then we're letting the worker continue to run in a broken state.
# to even reach this place in code, things that we thought were "unlikely" have already happened.
# this is our ejector seat. nobody pulls the ejector seat unless the airplane's already seriously broken.
# we need to make sure ejection still happens, even if something goes wrong in the process.
raise # <-- why re-raise? we're shutting down. We are taking the very final action here of closing the worker if an error happens. What other error-handling logic could possibly need to run? Whatever error handling logic used to happen before this PR, it's irrelevant now, because the worker will be dead in the next few seconds.
I guess I'm confused by this concern, because regardless of how it's implemented, this large change is what this PR is doing. Hopefully my comment about whether |
Anyway, this is how I'd feel most comfortable implementing it: async def _force_close(self):
try:
await asyncio.wait_for(self.close(nanny=False, executor_wait=False), 30)
except (Exception, BaseException): # <-- include BaseException here or not??
# Worker is in a very broken state if closing fails. We need to shut down immediately,
# to ensure things don't get even worse and this worker potentially deadlocks the cluster.
logger.critical(
"Error trying close worker in response to broken internal state. "
"Forcibly exiting worker NOW",
exc_info=True
)
# use `os._exit` instead of `sys.exit` because of uncertainty
# around propagating `SystemExit` from asyncio callbacks
os._exit(1)
def fail_hard(method):
"""
Decorator to close the worker if this method encounters an exception.
"""
if iscoroutinefunction(method):
@functools.wraps(method)
async def wrapper(self, *args, **kwargs):
try:
return await method(self, *args, **kwargs)
except Exception as e:
logger.exception(e)
# TODO: send event to scheduler
await _force_close(self)
# Could re-raise here if we wanted, but what's the point? We're closing.
# Main argument would be if the caller expects a return value, it'll
# fail anyway if we return None.
else:
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
try:
return method(self, *args, **kwargs)
except Exception as e:
logger.exception(e)
# TODO: send event to scheduler
self.loop.add_callback(_force_close, self)
# ^ Normally, you'd never know if an `add_callback` is actually going to succeed.
# But using `_force_close`, we can be confident the worker will get closed no matter what.
# Could re-raise here if we wanted, but what's the point? We're closing.
return wrapper Then we could use I do realize one good reason for re-raising the exception though: if we don't, the caller might immediately error anyway because we returned None instead of a real value. I still don't like letting other error handlers do unknown things while we're trying to close, but |
Unit Test Results 16 files + 4 16 suites +4 7h 38m 22s ⏱️ + 2h 7m 27s For more details on these failures, see this check. Results for commit 50c31ef. ± Comparison against base commit b837003. ♻️ This comment has been updated with latest results. |
I would like to repeat what I said yesterday because I am surprised that so much of the conversation here was about I do not think this PR is a good approach to structured concurrency. It is true that we are missing any proper means of dealing with exceptions. For the record, I tried asyncio exception handlers a long time ago already and it doesn't work because when using tornado, there are no unhandled exceptions. Tornado wraps all callbacks basically with a log_error and doesn't reraise. Think pseudo code try:
some_code()
except StateCorruptionDetected:
handle_state_corruption() Even with structured concurrency or an asyncio exception handler we'd need to talk about "What happens to an unhandled state corruption error, assuming we can detect it". What I heard so far, the requirements for
This Regarding what this body should look like, I like the approach in #6210 (comment) using
I don't see a reason why it should be included but at the same time I don't see a lot of harm To the second question of where to handle this exception (task groups/whatever would be nice but we don't have them), I think it's the same problem as for all other exception handlers. Try to be as target with exception handling as possible. The places where I see the possibility for us to detect these state violation errors are
Once we have a better approach to exception handling overall we may pull this to an outer layer but for now these are the functions that matter. I think all concerns about race conditions are moot if we're calling sys._exit |
Also set the chaos_rechunk test to sys.exit, this to focus us on situations where another worker fails, rather than this worker.
OK, I've used the formulation recommended above by @gjoseph92 but modified to ...
Every 20 runs or so on my laptop I'm now getting the following error, which is great news!
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../mambaforge/envs/dask-distributed/lib/python3.10/site-packages/tornado/ioloop.py:530: in run_sync
return future_cell[0].result()
distributed/utils_test.py:1096: in coro
await end_cluster(s, workers)
distributed/utils_test.py:937: in end_cluster
check_invalid_worker_transitions(s)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
s = <Scheduler 'tcp://127.0.0.1:39109', workers: 0, cores: 0, tasks: 0>
def check_invalid_worker_transitions(s: Scheduler) -> None:
if not s.events.get("invalid-worker-transition"):
return
for timestamp, msg in s.events["invalid-worker-transition"]:
worker = msg.pop("worker")
print("Worker:", worker)
print(InvalidTransition(**msg))
> raise ValueError(
"Invalid worker transitions found", len(s.events["invalid-worker-transition"])
)
E ValueError: ('Invalid worker transitions found', 2) |
Hrm, so this is interesting @fail_hard
async def handle_scheduler(self, comm):
try:
await self.handle_stream(comm, every_cycle=[self.ensure_communicating])
except Exception as e:
logger.exception(e)
raise
finally:
if self.reconnect and self.status in Status.ANY_RUNNING:
logger.info("Connection to scheduler broken. Reconnecting...")
self.loop.add_callback(self.heartbeat)
else:
logger.info(
"Connection to scheduler broken. Closing without reporting. Status: %s",
self.status,
)
await self.close(report=False) Currently handle_scheduler tries to reconnect on an exception. This makes some sense. Comm errors are reasonable, and reconnection in that case is reasonable. However, it might be cleaner if we just bail and let the Nanny handle restart+reconnect on a failure. This would be a change in current behavior, but maybe ok. |
I should note that the error above isn't actually the error generated from these changes (the gather_dep errors are more rare) but some of the other changes, like using If we merge this, this test is likely to start lighting up on CI. I think that I'm ok with that very short term. This seems like good pain to feel. |
Yeah, that's what I was asking about in https://github.com/dask/distributed/pull/6210/files#r859199361.
I'm in favor of this, mostly because we've seen bugs with reconnection in the past: #5480 #5481 #5457. |
Yeah, I think that the question here is "should Short term my guess is that we just want to cancel on everything and restart the worker. Longer term, I wouldn't be surprised if we want workers to be a little bit more resilient than this, but I also wouldn't be surprised if it goes the other direction. |
@fjetter as far as state machine failures go, I think that a lot of what I was seeing before were failures when we were shutting down. These are genuine and we should care about them, but recently I've shifted the chaos test to just use As a result, if you're still looking at state machine issues I'd like to focus you on the failure in #6210 (comment) , which comes up consistently with this test, while the others don't. This one seems to happen to workers when other workers fail, which seems like something that we should care about much more. |
We've removed this functionality for now
OK, so there is a behavior change here around reconnection. If we're ok with that then I think that this is good to go. I'm not feeling a huge rush to get this in, but wouldn't mind getting it in this release. |
To share my thoughts on that calculus, this does identify and resolve a state machine transition failure (and presumably resolve others as well). Because of this, I'd very much like to get it in. I'm a little nervous about the reconnect change, but not that nervous. As I think more about this, I think I'm more in favor of merging before the release. @gjoseph92 if you have time to look this over again I would welcome a 👍 or 👎 |
I'm on board with removing application layer reconnects and think we should replace it with a more robust network layer. I think there is tons of code we can clean up if we remove this feature, e.g. distributed/distributed/scheduler.py Lines 3751 to 3784 in b837003
However, I am nervous about this too and I would like to not rush this. I would definitely prefer a dedicated PR. Is this absolutely required to get the fail hard in? |
I'll see if I can separate. I'll probably allow CommClosedErrors and OSErrors to pass through. |
All is well now. The behavior is still there, and it's a bit cleaner now. |
We add a decorator that we can apply to a worker method that closes the
worker if an exception is encountered in that method.