Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple workers per VM #173

Open
eric-czech opened this issue Nov 17, 2020 · 33 comments
Open

Support multiple workers per VM #173

eric-czech opened this issue Nov 17, 2020 · 33 comments
Labels
provider/gcp/vm Cluster provider for GCP Instances

Comments

@eric-czech
Copy link
Contributor

It may be more efficient to run many workers on larger VMs as opposed to larger numbers of worker processes on small VMs, presumably to avoid inter-VM communication. This was a suggestion that resulted from this thread on optimizing some dask array workflows: https://github.com/pystatgen/sgkit/issues/390.

@quasiben mentioned that threads per worker can be controlled with worker_options={"nthreads": 2 }, but there appears to be no way to run more than one worker on a single VM.

@quasiben
Copy link
Member

I think for this to work we need to change:

"--spec",
"''%s''" # in yaml double single quotes escape the single quote
% json.dumps(
{
"cls": self.worker_class,
"opts": {
**worker_options,
"name": self.name,
},
}
),
]
)

To handle a spec which looks like the following:

workers_options = {
    'worker-1': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 1}},
    'worker-2': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 2}},
}

@jacobtomlinson
Copy link
Member

When running on a local machine LocalCluster optimises this for you right? On my laptop with 12 cores it runs three workers with four threads each.

Perhaps we can reuse this logic?

@quasiben
Copy link
Member

You mean, have dask-cloudprovider inspect the system and assign worker/threads accordingly ?

@jacobtomlinson
Copy link
Member

Yeah. We do this for LocalCluster here.

@jacobtomlinson jacobtomlinson added the provider/gcp/vm Cluster provider for GCP Instances label Nov 20, 2020
@jacobtomlinson
Copy link
Member

dask/distributed#4377 allows us to detect and create worker/threads automatically.

dask-worker --nprocs=auto

Once a release is out we can update things here to use this new option.

@eric-valente
Copy link

any progress here?

@jacobtomlinson
Copy link
Member

@eric-valente with the latest release of dask and distributed you should be able to set worker_options={"nprocs": "auto"}. Although I haven't managed to come back here and test this yet.

If you could give it a go and report back it would be much appreciated!

@eric-valente
Copy link

Hi @jacobtomlinson Tried it with packages and no luck. The workers will turn on in EC2 and won't connect to the scheduler and then proceed to die. When I remove the nprocs it works fine but again only has 1 worker per VM with threads = # of cores

worker_options={'nprocs':'auto'}

dask 2021.4.1 pyhd8ed1ab_0 conda-forge
dask-cloudprovider 2021.3.1 pyhd8ed1ab_0 conda-forge
dask-core 2021.4.1 pyhd8ed1ab_0 conda-forge
distributed 2021.4.1 py38h578d9bd_0 conda-forge

@jacobtomlinson
Copy link
Member

Strange! Are you able to get any logs from the EC2 instances? They tend to get dumped in var/log/cloud-init-output.log.

@eric-valente
Copy link

eric-valente commented May 5, 2021

@jacobtomlinson

This error then worker dies:
image

Here are my settings:
image

Note, this works if i simply remove nprocs from the worker_options. Thanks for your help!

@jacobtomlinson
Copy link
Member

Thanks @eric-valente. You also need to set the worker_class kwarg to "dask.distributed.Nanny".

The traceback you got above looks like a bug, I'll raise that back in distributed.

@quasiben what do you think about making dask.distributed.Nanny the default instead of dask.distributed.Worker?

@eric-valente
Copy link

Thanks again for your help here @jacobtomlinson

Tried setting the worker class to Nanny but still same issue:

image

image

@eric-valente
Copy link

dask/distributed#4640 seems maybe related

@eric-valente
Copy link

eric-valente commented May 6, 2021

Seems dask.distributed.Nanny does not accept nprocs either
https://distributed.dask.org/en/latest/_modules/distributed/nanny.html#Nanny

Fails with above init error:
--spec '{"cls": "dask.distributed.Nanny", "opts": {"nprocs": 4, "name": "dask-4ffc56fd-worker-dd6509e3"}}'

Works:
--spec '{"cls": "dask.distributed.Nanny", "opts": {"ncores": 4, "name": "dask-4ffc56fd-worker-dd6509e3"}}'

@eric-valente
Copy link

Seems like the cloud-init to add a worker uses this style of starting a worker:
python -m distributed.cli.dask_spec tcp://x.x.x.x:8786 --spec {"cls": "distributed.nanny.Nanny", "opts": {"nprocs": 4, "name": "dask-4ffc56fd-worker-dd6509e3"}}

vs. this style:
dask-worker --nprocs=auto

@quasiben
Copy link
Member

quasiben commented May 6, 2021

+1 to making dask.distributed.Nanny the default.

@eric-valente
Copy link

eric-valente commented May 6, 2021

@jacobtomlinson Yeah it seems like EC2Cluster uses python -m distributed.cli.dask_spec and passing in worker_class

image

As suggested above, I think you might need to accept multiiple workers defined in worker_options and skip nprocs?

workers_options = {
    'worker-1': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 1}},
    'worker-2': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 2}},
}

I think I could use worker_module but it is not a valid parameter for EC2Cluster:
image
This would allow me to use dask-worker with --nprocs=x

@jacobtomlinson
Copy link
Member

I think I could use worker_module but it is not a valid parameter for EC2Cluster

It should be, have you tried it?

@shireenrao
Copy link

shireenrao commented Oct 19, 2021

Hello
Is there an update on this issue? I am trying to do something similar. Currently I am creating my cluster manually where I start the scheduler on one EC2 and then on another EC2 I create the workers
like this:

dask-worker  tcp://XX.X.X.XXX:8786 --nprocs n --nthreads 1

Where n is the number of CPU's on that EC2.

It would be great if this can be done by passing nprocs and nthreads as worker_options to EC2Cluster. If I can help in anyway please let me know.

Thank you

@jacobtomlinson
Copy link
Member

@shireenrao have you tried it?

I would expect the following to work.

cluster = EC2Cluster(..., worker_class="distributed.nanny.Nanny", worker_options={"nprocs": "n"})

@shireenrao
Copy link

@jacobtomlinson - I tried that and it fails. This is the stack trace I see

python -m distributed.cli.dask_spec tcp://XXX.X.X.XXX:8786 --spec '{"cls": "distributed.nanny.Nanny", "opts": {"nprocs":"4", "name": "dask-44bad6fe-worker-ecaffb24"}}'
no environment.yml
distributed.nanny - INFO -         Start Nanny at: 'tcp://XXX.17.0.X:41233'
distributed.nanny - ERROR - Failed to initialize Worker
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 844, in _run
    worker = Worker(**worker_kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/worker.py", line 733, in __init__
    super().__init__(
TypeError: __init__() got an unexpected keyword argument 'nprocs'
distributed.nanny - ERROR - Failed while trying to start worker process: __init__() got an unexpected keyword argument 'nprocs'
distributed.nanny - INFO - Closing Nanny at 'tcp://XXX.17.0.X:41233'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/conda/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_spec.py", line 43, in <module>
    main()
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_spec.py", line 39, in main
    asyncio.get_event_loop().run_until_complete(run())
  File "/opt/conda/lib/python3.8/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_spec.py", line 33, in run
    servers = await run_spec(_spec, *args)
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 659, in run_spec
    await asyncio.gather(*workers.values())
  File "/opt/conda/lib/python3.8/asyncio/tasks.py", line 690, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 692, in start
    msg = await self._wait_until_connected(uid)
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 811, in _wait_until_connected
    raise msg["exception"]
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 844, in _run
    worker = Worker(**worker_kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/worker.py", line 733, in __init__
    super().__init__(
TypeError: __init__() got an unexpected keyword argument 'nprocs'

@jacobtomlinson
Copy link
Member

Sorry my bad looks like you should be setting nthreads and ncores.

https://github.com/dask/distributed/blob/3afc6703fb480b9fe5c983c84d6ea115810dd1ff/distributed/nanny.py#L88-L89

@shireenrao
Copy link

@jacobtomlinson - I am trying this on a ec2 instance with 36 vCPU's. Setting ncores and nthreads only starts 1 Nanny process with 36 threads. Here are the logs from the worker

python -m distributed.cli.dask_spec tcp://XX.XXX.XX.XX:8786 --spec '{"cls": "distributed.nanny.Nanny", "opts": {"ncores":36, "nthreads":1, "name": "dask-44bad6fe-worker-ecaffb24"}}'
/opt/conda/lib/python3.8/site-packages/distributed/nanny.py:172: UserWarning: the ncores= parameter has moved to nthreads=
  warnings.warn("the ncores= parameter has moved to nthreads=")
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.17.0.2:45419'
distributed.worker - INFO -       Start worker at:     tcp://172.17.0.2:42719
distributed.worker - INFO -          Listening to:     tcp://172.17.0.2:42719
distributed.worker - INFO -          dashboard at:           172.17.0.2:45887
distributed.worker - INFO - Waiting to connect to:    tcp://XX.XXX.XX.XX:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         36
distributed.worker - INFO -                Memory:                  68.59 GiB
distributed.worker - INFO -       Local Directory: /dask-worker-space/worker-2ywv7v5h
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:    tcp://XX.XXX.XX.XX:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

From a jupyter notebook session this is what one sees for the cluster

<Client: 'tcp://172.17.0.2:8786' processes=1 threads=36, memory=68.59 GiB>

Where as when I start the worker manually with

dask-worker tcp://XX.XXX.XX.XX:8786 --nprocs=36

Jupyter Notebooks shows the cluster to be

<Client: 'tcp://172.17.0.2:8786' processes=36 threads=36, memory=68.59 GiB>

All the CPU's are not being utilized.

@jacobtomlinson
Copy link
Member

Right I'm with you sorry. It seems that dask_spec doesn't support that today. Implementing that would have to be done upstream in distributed.

@ansar-sa
Copy link

ansar-sa commented Dec 2, 2021

Could we please request / create issue in distributed upstream to integrate this work.

For my use case it makes more sense to have a few very large machines than lots of small VMs.

@jacobtomlinson
Copy link
Member

The spec in distributes now accepts a list of specs. This does assume that nodes are regular as we would need to specify number of workers and cores/memory for each worker. But we should be able to implement this here now without needing any upstream changes.

@cspagemarine
Copy link

Has anybody been able to achieve this?

@emkademy
Copy link

emkademy commented Oct 5, 2022

Are there any news on this issue?

@kumarprabhu1988
Copy link

kumarprabhu1988 commented Nov 4, 2022

@jacobtomlinson I tried this on EC2Cluster like this:

ec2_cluster = EC2Cluster(
    env_vars=env_vars,
    extra_bootstrap=EC2_BOOTSTRAP_COMMANDS,
    filesystem_size=cluster_config["volume_size"],
    instance_tags=cluster_config["ec2_instance_tags"],
    n_workers=cluster_config["n_workers"],
    #worker_class="distributed.nanny.Nanny", 
    worker_options={i: {"cls": Nanny, "options": {"nthreads": 2}} for i in range(4)},
    scheduler_instance_type=cluster_config["scheduler_instance_type"],
    auto_shutdown=False,
    shutdown_on_close=False,
    security=False,  # https://github.com/dask/dask-cloudprovider/issues/249,
    volume_tags=cluster_config["ec2_instance_tags"],
    worker_instance_type=cluster_config["worker_instance_type"],
)

but this errors out with a json encode error in VMCluster

~/miniconda3/envs/pricing/lib/python3.7/site-packages/dask_cloudprovider/generic/vmcluster.py in __init__(self, scheduler, worker_module, worker_class, worker_options, *args, **kwargs)
    138                             "opts": {
    139                                 **worker_options,
--> 140                                 "name": self.name,
    141                             },
    142                         }

~/miniconda3/envs/pricing/lib/python3.7/json/__init__.py in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    229         cls is None and indent is None and separators is None and
    230         default is None and not sort_keys and not kw):
--> 231         return _default_encoder.encode(obj)
    232     if cls is None:
    233         cls = JSONEncoder

Any ideas? Will this require some changes in cloudprovider?

@jacobtomlinson
Copy link
Member

@kumarprabhu1988 I think the cls need to be a string, and it probably needs to be "distributed.nanny.Nanny".

@kumarprabhu1988
Copy link

kumarprabhu1988 commented Nov 4, 2022

@jacobtomlinson So I tried this, and no ec2 worker machine creation fails and there are no workers created, but it is a silent failure - no errors.

@jacobtomlinson
Copy link
Member

Could you set debug=True and grab the /var/log/cloud-init-output.log logs from the instances so we can see what went wrong?

@kumarprabhu1988
Copy link

Sure, I can do that. In the meanwhile I looked at the code and tried a to pass in worker configuration with some code changes and it worked. Here's the change I made. Let me know what you think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
provider/gcp/vm Cluster provider for GCP Instances
Projects
None yet
Development

No branches or pull requests

9 participants