Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing input data for a task should fail worker, not the task #6142

Open
gjoseph92 opened this issue Apr 15, 2022 · 0 comments
Open

Missing input data for a task should fail worker, not the task #6142

gjoseph92 opened this issue Apr 15, 2022 · 0 comments
Labels
stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@gjoseph92
Copy link
Collaborator

If an input to a task is not actually in self.data, this currently manifests as the task failing (as though it's a user error). This in fact indicates a serious worker state issue, and should probably cause the entire worker to shut down.

See #6125 (comment) for a traceback of how an invalid transition led to a task executing where its inputs weren't all in memory yet. The df.compute() in client code failed as though this was a normal error, with the message KeyError: "('split-shuffle-1-b4961b03aa9e8bec7c581d2dc337f717', 10, (3, 9))".

The problem is this overly-generous try/except in Worker.execute, which treats any problem in the try block as an issue with the task, including _prepare_args_for_execution, which looks up the input keys:

if self.validate:
assert not ts.waiting_for_data
assert ts.state == "executing"
assert ts.run_spec is not None
function, args, kwargs = await self._maybe_deserialize_task( # type: ignore
ts, stimulus_id=stimulus_id
)
args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs)
try:
executor = ts.annotations["executor"] # type: ignore
except (TypeError, KeyError):
executor = "default"
try:
e = self.executors[executor]
except KeyError:
raise ValueError(
f"Invalid executor {executor!r}; "
f"expected one of: {sorted(self.executors)}"
)
self.active_keys.add(key)
try:
ts.start_time = time()
if iscoroutinefunction(function):
result = await apply_function_async(
function,
args2,
kwargs2,
self.scheduler_delay,
)
elif "ThreadPoolExecutor" in str(type(e)):
result = await self.loop.run_in_executor(
e,
apply_function,
function,
args2,
kwargs2,
self.execution_state,
key,
self.active_threads,
self.active_threads_lock,
self.scheduler_delay,
)
else:
result = await self.loop.run_in_executor(
e,
apply_function_simple,
function,
args2,
kwargs2,
self.scheduler_delay,
)
finally:
self.active_keys.discard(key)
self.threads[key] = result["thread"]
if result["op"] == "task-finished":
if self.digests is not None:
self.digests["task-duration"].add(result["stop"] - result["start"])
return ExecuteSuccessEvent(
key=key,
value=result["result"],
start=result["start"],
stop=result["stop"],
nbytes=result["nbytes"],
type=result["type"],
stimulus_id=stimulus_id,
)
if isinstance(result["actual-exception"], Reschedule):
return RescheduleEvent(key=ts.key, stimulus_id=stimulus_id)
logger.warning(
"Compute Failed\n"
"Key: %s\n"
"Function: %s\n"
"args: %s\n"
"kwargs: %s\n"
"Exception: %r\n",
key,
str(funcname(function))[:1000],
convert_args_to_str(args2, max_len=1000),
convert_kwargs_to_str(kwargs2, max_len=1000),
result["exception_text"],
)
return ExecuteFailureEvent(
key=key,
start=result["start"],
stop=result["stop"],
exception=result["exception"],
traceback=result["traceback"],
exception_text=result["exception_text"],
traceback_text=result["traceback_text"],
stimulus_id=stimulus_id,
)
except Exception as exc:
logger.error("Exception during execution of task %s.", key, exc_info=True)
msg = error_message(exc)
return ExecuteFailureEvent(
key=key,
start=None,
stop=None,
exception=msg["exception"],
traceback=msg["traceback"],
exception_text=msg["exception_text"],
traceback_text=msg["traceback_text"],
stimulus_id=stimulus_id,
)

Most problems that could happen there would be user error, but not all of them. We should be more selective?

@fjetter fjetter added the stability Issue or feature related to cluster stability (e.g. deadlock) label May 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
Development

No branches or pull requests

2 participants