Description
Describe the issue:
We are using distributed to run a process on HTCondor. Our cluster is somewhat unreliable, and hence our jobs will occasionally die for reasons we can't determine. We therefore have created a set-up that will resubmit jobs which fail. However sometimes we get the following error in our process:
Traceback (most recent call last):
File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/pepper/htcondor.py", line 265, in _dask_map
result = task.result(timeout=1)
File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/distributed/client.py", line 322, in result
return self.client.sync(self._result, callback_timeout=timeout)
File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/distributed/utils.py", line 436, in sync
return result
UnboundLocalError: local variable 'result' referenced before assignment
This gets repeated many times, and stops the overall manager process. I think the root cause may be an issue with our cluster (since it is only intermittent), but could you please look into this UnboundLocalError
, which I think is spurious?
Example:
Unfortunately due to the intermittent nature of the issue I haven't managed to create a minimal reproducible example. This is the event loop which we use to gather the results.
tasks = client.map(function, *iterables, pure=False, key=key)
tasks_to_itemidx = dict(zip(tasks, range(len(tasks))))
tasks = dask.distributed.as_completed(tasks)
task_failures = {}
for task in tasks:
try:
# This call should return immediately but sometimes Dask gets
# stuck here. Unknown why. Specify timeout to circumvent.
result = task.result(timeout=1)
except asyncio.exceptions.TimeoutError:
# Retry but silence the error
tasks.add(self._dask_resubmit_failed_task(
function, task, tasks_to_itemidx, iterables, key))
except Exception as e:
logger.exception(e)
failures = task_failures.get(task, 0)
if self.retries is not None and failures >= self.retries:
raise
logger.info(
f"Task failed {failures} times and will be retried")
new_task = self._dask_resubmit_failed_task(
function, task, tasks_to_itemidx, iterables, key)
task_failures[new_task] = failures + 1
else:
if result is None:
logger.error("Task returned 'None' (usually due to dask "
"killing this worker).")
failures = task_failures.get(task, 0)
if self.retries is not None and failures >= self.retries:
raise RuntimeError(
"Number of retries was exceed by a task returning "
"'None'. This is usually due to dask killing a "
"worker for exceeding memory usage.")
logger.info(
f"Task failed {failures} times and will be retried")
new_task = self._dask_resubmit_failed_task(
function, task, tasks_to_itemidx, iterables, key)
task.cancel()
task_failures[new_task] = failures + 1
else:
yield result
del tasks_to_itemidx[task]
if task in task_failures:
del task_failures[task]
Environment:
- Dask version: 2024.2.0
- Python version: 3.9.12
- Operating System: CENTOS7
- Install method (conda, pip, source): pip