Skip to content

UnboundLocalError when collecting results #8514

Open
@Dominic-Stafford

Description

@Dominic-Stafford

Describe the issue:

We are using distributed to run a process on HTCondor. Our cluster is somewhat unreliable, and hence our jobs will occasionally die for reasons we can't determine. We therefore have created a set-up that will resubmit jobs which fail. However sometimes we get the following error in our process:

Traceback (most recent call last):
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/pepper/htcondor.py", line 265, in _dask_map
    result = task.result(timeout=1)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/distributed/client.py", line 322, in result
    return self.client.sync(self._result, callback_timeout=timeout)
  File "/nfs/dust/cms/user/stafford/coffea/ttX_Xtott/pepper/ttX_Xtott_penv/lib/python3.9/site-packages/distributed/utils.py", line 436, in sync
    return result
UnboundLocalError: local variable 'result' referenced before assignment

This gets repeated many times, and stops the overall manager process. I think the root cause may be an issue with our cluster (since it is only intermittent), but could you please look into this UnboundLocalError, which I think is spurious?

Example:

Unfortunately due to the intermittent nature of the issue I haven't managed to create a minimal reproducible example. This is the event loop which we use to gather the results.

        tasks = client.map(function, *iterables, pure=False, key=key)
        tasks_to_itemidx = dict(zip(tasks, range(len(tasks))))
        tasks = dask.distributed.as_completed(tasks)
        task_failures = {}
        for task in tasks:
            try:
                # This call should return immediately but sometimes Dask gets
                # stuck here. Unknown why. Specify timeout to circumvent.
                result = task.result(timeout=1)
            except asyncio.exceptions.TimeoutError:
                # Retry but silence the error
                tasks.add(self._dask_resubmit_failed_task(
                    function, task, tasks_to_itemidx, iterables, key))
            except Exception as e:
                logger.exception(e)
                failures = task_failures.get(task, 0)
                if self.retries is not None and failures >= self.retries:
                    raise
                logger.info(
                    f"Task failed {failures} times and will be retried")

                new_task = self._dask_resubmit_failed_task(
                    function, task, tasks_to_itemidx, iterables, key)
                task_failures[new_task] = failures + 1
            else:
                if result is None:
                    logger.error("Task returned 'None' (usually due to dask "
                                 "killing this worker).")
                    failures = task_failures.get(task, 0)
                    if self.retries is not None and failures >= self.retries:
                        raise RuntimeError(
                            "Number of retries was exceed by a task returning "
                            "'None'. This is usually due to dask killing a "
                            "worker for exceeding memory usage.")
                    logger.info(
                        f"Task failed {failures} times and will be retried")

                    new_task = self._dask_resubmit_failed_task(
                        function, task, tasks_to_itemidx, iterables, key)
                    task.cancel()
                    task_failures[new_task] = failures + 1
                else:
                    yield result
            del tasks_to_itemidx[task]
            if task in task_failures:
                del task_failures[task]

Environment:

  • Dask version: 2024.2.0
  • Python version: 3.9.12
  • Operating System: CENTOS7
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions