Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client hangs in case of unsatisfiable resources #8256

Open
sjdv1982 opened this issue Oct 10, 2023 · 8 comments
Open

Client hangs in case of unsatisfiable resources #8256

sjdv1982 opened this issue Oct 10, 2023 · 8 comments
Labels
documentation Improve or add to documentation good first issue Clearly described and easy to accomplish. Good for beginners to the project.

Comments

@sjdv1982
Copy link

Describe the issue:

When client.submit asks for resources that do not exist in that quantity (or that do not exist at all), client.result() hangs forever. Expected behaviour: a Python exception.

Minimal Complete Verifiable Example:

import dask
from dask.distributed import LocalCluster, Client

if __name__ == "__main__":
    with dask.config.set({"distributed.worker.resources.ncores": 3}):  # if omitted, problem still occurs
        cluster = LocalCluster(n_workers=1)  # if n_workers > 1, problem still occurs (fortunately!)
    
    client = Client(cluster)
    
    task = client.submit(lambda: 1)
    print(task.result())    # works
    
    task = client.submit(lambda: 2, resources={"ncores": 1})
    print(task.result())    # works
    
    task = client.submit(lambda: 3, resources={"ncores": 4})
    print(task.result())    # hangs forever

Anything else we need to know?:

I am rather a beginner to Dask. I wouldn't mind to write a bugfix for this.

Environment:

  • Dask version: 2023.9.2
  • Python version: 3.10.13
  • Operating System: Ubuntu
  • Install method (conda, pip, source): conda-forge
@hendrikmakait
Copy link
Member

Hi, @sjdv1982! Thanks for reporting your issue. The behavior you describe is, in fact, the expected behavior. The reason is that Dask clusters are dynamic, and a new worker satisfying the resource requirements might join the cluster after the tasks have been submitted.

One typical use case for this is when you are running in a Cloud environment where it might take a moment to acquire GPU instances. See also #7170.

@hendrikmakait
Copy link
Member

I've checked the documentation at https://distributed.dask.org/en/stable/resources.html and it looks like the expected behavior for this particular issue is not explained though. Would you be interested in submitting a PR that describes the behavior for (currently) unsatisfiable resource constraints?

@hendrikmakait hendrikmakait added documentation Improve or add to documentation good first issue Clearly described and easy to accomplish. Good for beginners to the project. and removed needs triage labels Oct 10, 2023
@sjdv1982
Copy link
Author

Hi @hendrikmakait,
Thank you for your response. I understand that the current behaviour is expected in a Cloud environment. I am willing to add a short description of this in the resources documentation.

However, the SpecCluster documentation says:

Cluster that requires a full specification of workers

The SpecCluster class expects a full specification of the Scheduler and
Workers to use.  It removes any handling of user inputs (like threads vs
processes, number of cores, and so on) and any handling of cluster resource
managers (like pods, jobs, and so on).  Instead, it expects this
information to be passed in scheduler and worker specifications.

Thus, IMO, the current behaviour is correct for a Cloud cluster but inappropriate for SpecCluster and its subclasses, where the worker resources are known statically.

@fjetter
Copy link
Member

fjetter commented Oct 11, 2023

The scheduling does not distinguish how dask was deployed and we don't have the information available at runtime whether or not new workers with additional resources can be added or not.

I'm open to suggestions on how to improve this but I wouldn't want to couple cluster implementation specific knowledge to the scheduler.

There are two things I could see happening

  • resource verification (e.g. min/max; Not entirely sure what the API would look like)
  • timeout if a task has been in state no-worker for a certain amount of time

@sjdv1982
Copy link
Author

sjdv1982 commented Oct 12, 2023

Hmm... in principle, I would suggest resource verification by the client. One could use the existing API client.scheduler.get_metadata(keys=["cluster-manager-info"]) . Currently, clusters write only their name and type in there, but SpecClustercould store its "resources" entry in the worker specification (SpecCluster.new_spec for future workers).

However, to my surprise, it seems that resources are currently not stored in the SpecCluster worker specification. Therefore, when one does

with dask.config.set({"distributed.worker.resources.ncores": 3}):
    cluster = LocalCluster(n_workers=1)
cluster.scale_up(2)

the second worker does not have any resources attached to it!

Is this correct and expected behaviour??

If yes, I would suggest to modify the docstring of SpecCluster, to indicate that worker resources are fully dynamic and not part of the worker specification; and to explain this also where @hendrikmakait suggested, i.e. that Dask clusters are always dynamic regarding resources, even for "static" cluster classes derived from SpecCluster (including all HPC Dask clusters from the jobqueue project).

I would prefer that SpecCluster should instead read the resources config at __init__ and copy it into its worker spec (and in the scheduler cluster-manager-info metadata) for all future workers. That would satisfy my "principle of least surprise". But I am rather a beginner in Dask and am not familiar with expected behaviour.

Thank you for your time.

@sjdv1982
Copy link
Author

sjdv1982 commented Oct 12, 2023

In the same vein, I was very surprised to see the following code hang. It seems to me the obvious way to set up a cluster for a 2-GPU machine. Am I doing something wrong?

import dask
from dask.distributed import LocalCluster, Client

if __name__ == "__main__":
    with dask.config.set({"distributed.worker.resources.GPU": 2}):
        cluster = LocalCluster(n_workers=0, threads_per_worker=1)
        cluster.adapt(minimum_jobs=0, maximum_jobs=2)  # minimum_jobs=1 makes no difference

    client = Client(cluster)
    task = client.submit(lambda: 1, resources={"GPU": 1})
    print(task.result())    # hangs

@fjetter
Copy link
Member

fjetter commented Nov 7, 2023

Is this correct and expected behaviour??

Probably not correct but kind of expected. Since you are using the dask config contextmanager, the setting is reverted once the cluster is up but as you already found out, the cluster is not caching this.

I think this should be fixed. Are you interested in contributing a patch for this?

@sjdv1982
Copy link
Author

sjdv1982 commented Nov 8, 2023

Sure, I will be happy to contribute a patch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improve or add to documentation good first issue Clearly described and easy to accomplish. Good for beginners to the project.
Projects
None yet
Development

No branches or pull requests

3 participants