-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add fail_hard decorator for worker methods #6210
Changes from 1 commit
6cbc05e
6eeac6b
97511ed
73471b2
b2fa15b
a991d84
b33b411
43b2962
a0721aa
b59ccd0
a08b5dd
e1c7447
a0b6f10
ebe8886
a1399a1
0505df8
50c31ef
96f7bd2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -157,6 +157,37 @@ | |
DEFAULT_STARTUP_INFORMATION: dict[str, Callable[[Worker], Any]] = {} | ||
|
||
|
||
def fail_hard(method): | ||
""" | ||
Decorator to close the worker if this method encounters an exception | ||
""" | ||
if iscoroutinefunction(method): | ||
|
||
@functools.wraps(method) | ||
async def wrapper(self, *args, **kwargs): | ||
try: | ||
return await method(self, *args, **kwargs) | ||
except Exception as e: | ||
logger.exception(e) | ||
# TODO: send event to scheduler | ||
await self.close(nanny=False, executor_wait=False) | ||
raise | ||
|
||
else: | ||
|
||
@functools.wraps(method) | ||
def wrapper(self, *args, **kwargs): | ||
try: | ||
return method(self, *args, **kwargs) | ||
except Exception as e: | ||
logger.exception(e) | ||
# TODO: send event to scheduler | ||
self.loop.add_callback(self.close, nanny=False, executor_wait=False) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then we probably wouldn't close? I'm not sure that we can guarnatee against this case. Do you have some recommendation? |
||
raise | ||
|
||
return wrapper | ||
|
||
|
||
class Worker(ServerNode): | ||
"""Worker node in a Dask distributed cluster | ||
|
||
|
@@ -2642,6 +2673,7 @@ def transition( | |
self._handle_instructions(instructions) | ||
self.transitions(recs, stimulus_id=stimulus_id) | ||
|
||
@fail_hard | ||
def transitions(self, recommendations: Recs, *, stimulus_id: str) -> None: | ||
"""Process transitions until none are left | ||
|
||
|
@@ -3020,6 +3052,7 @@ def _update_metrics_received_data( | |
self.counters["transfer-count"].add(len(data)) | ||
self.incoming_count += 1 | ||
|
||
@fail_hard | ||
@log_errors | ||
async def gather_dep( | ||
self, | ||
|
@@ -4063,6 +4096,7 @@ def validate_task(self, ts): | |
f"Invalid TaskState encountered for {ts!r}.\nStory:\n{self.story(ts)}\n" | ||
) from e | ||
|
||
@fail_hard | ||
def validate_state(self): | ||
if self.status not in Status.ANY_RUNNING: | ||
return | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would fix #5958 if you also wrapped
execute
(you should wrapexecute
either way I think).The fact that BaseExceptions in callbacks aren't propagated by Tornado is pretty crazy. If we're going to add manual support for propagating exceptions like this, I don't see why we'd let BaseExceptions be ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with catching BaseException for tasks, i.e. in
apply_function
, et al. to fix #5958However, I would be worried to close workers upon a
asyncio.CancelledError
. While I don't think we're using cancellation in many places right now, this would be a very confusing behavior if that ever changes.