Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fail_hard decorator for worker methods #6210

Merged
merged 18 commits into from
Apr 29, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,37 @@
DEFAULT_STARTUP_INFORMATION: dict[str, Callable[[Worker], Any]] = {}


def fail_hard(method):
"""
Decorator to close the worker if this method encounters an exception
"""
if iscoroutinefunction(method):

@functools.wraps(method)
async def wrapper(self, *args, **kwargs):
try:
return await method(self, *args, **kwargs)
except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
except Exception as e:
except BaseException as e:

This would fix #5958 if you also wrapped execute (you should wrap execute either way I think).

The fact that BaseExceptions in callbacks aren't propagated by Tornado is pretty crazy. If we're going to add manual support for propagating exceptions like this, I don't see why we'd let BaseExceptions be ignored.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with catching BaseException for tasks, i.e. in apply_function, et al. to fix #5958

However, I would be worried to close workers upon a asyncio.CancelledError. While I don't think we're using cancellation in many places right now, this would be a very confusing behavior if that ever changes.

logger.exception(e)
# TODO: send event to scheduler
await self.close(nanny=False, executor_wait=False)
raise

else:

@functools.wraps(method)
def wrapper(self, *args, **kwargs):
try:
return method(self, *args, **kwargs)
except Exception as e:
logger.exception(e)
# TODO: send event to scheduler
self.loop.add_callback(self.close, nanny=False, executor_wait=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if self.close encounters an error? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we probably wouldn't close? I'm not sure that we can guarnatee against this case. Do you have some recommendation?

raise

return wrapper


class Worker(ServerNode):
"""Worker node in a Dask distributed cluster

Expand Down Expand Up @@ -2642,6 +2673,7 @@ def transition(
self._handle_instructions(instructions)
self.transitions(recs, stimulus_id=stimulus_id)

@fail_hard
def transitions(self, recommendations: Recs, *, stimulus_id: str) -> None:
"""Process transitions until none are left

Expand Down Expand Up @@ -3020,6 +3052,7 @@ def _update_metrics_received_data(
self.counters["transfer-count"].add(len(data))
self.incoming_count += 1

@fail_hard
@log_errors
async def gather_dep(
self,
Expand Down Expand Up @@ -4063,6 +4096,7 @@ def validate_task(self, ts):
f"Invalid TaskState encountered for {ts!r}.\nStory:\n{self.story(ts)}\n"
) from e

@fail_hard
def validate_state(self):
if self.status not in Status.ANY_RUNNING:
return
Expand Down