-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Heterogeneous Computing Design #5201
Comments
Thank you for writing this up @madsbk . It's a good read. I apologize that it took me a couple of days to get to this. Regarding dynamic annotations I agree that this seems like a good idea. I've done similar things actually in custom code. I suggest a slight variation to your approach. Rather than include this information as part of the message for {"op": "task-finished", ..., "annotations": ..., "restrictions": ...} We might send a few different messages {"op": "annotate-task", "key": ..., "annotations": ...}
{"op": "task-finished", ...} The batched comms will guarantee ordering, and these will almost certainly be sent in the same batch, so I think that this will have the same performance characteristics, but reduce complexity / separate things out a little. As a nice side effect we can set annotations and restrictions and other things outside of the context of a specific task finishing. Thoughts?
I'll admit to being less personally excited by the idea of changing the type of stored data on the fly, but I can see that it might be helpful. I think that the separated design above might make this easier to implement. |
Good point, #5207 now introduces a dedicated |
I would like to discuss how we can make it easier to utilize a cluster of mixed architectures -- focusing on mixing GPU and CPU tasks/workers.
Background
Currently, a common setup is to have one Worker per GPU and nothing else. This is how a typical Dask-CUDA cluster looks like and is a simple setup that works reasonably well. It makes Distributed handle memory transfer between GPUs seamlessly and GPU tasks are not accidentally scheduled on machines without GPUs.
However, it also means that CPU-tasks are scheduled on GPU-workers and potential CPU-cores on the machines are unutilized. E.g., a DGX-1 has 8 GPUs and 80 CPU-cores thus most of the time the 80 CPU-cores are idling.
Distributed supports resource management, which makes it possible to restrict tasks to specific types of Workers. However, it requires manually annotating operations and is very hard to do for individual operations efficiently. It is easy enough to annotate high-level operations like Dask collectives but if you want to annotate each chunk individually it is hard. Additionally, it is not possible to annotate tasks based on task outputs dynamically.
Currently mixing different datatypes in Dask collectives are not possible. E.g., if you have a Dask dataframe each chunk can be a cuDF or a Pandas dataframe but not a mix of the two. This limits the use of heterogeneous computing since it forces the user to decide between either a pure GPU or a pure CPU dataframe computations.
Goals
I can think of three projects that could achieve the goals.
Dynamic Annotations and Restrictions
Make it possible for a Worker to update the annotation and restriction of a task based on task output. The user specifies a function that the worker calls after executing a task. The function returns a dict of annotations and restrictions (if not None), which the Worker send back to the Scheduler as part of the
"op": "task-finished"
message. The Scheduler then updates the task and its dependent tasks with the updated annotations and restrictions.In order to implement Detect GPU tasks as proposed by @mrocklin and restrict GPU-tasks to GPU-Workers, a function could look something like the following, which makes the task’s dependent use the Worker’s default GPU executor:
Mixed Typed Collectives
Make it possible to create Dask collectives with mixed typed underlying objects. This works in many cases already! E.g., calling
map_partitions()
with a function that returns a cuDF or a Pandas dataframe based on thepartition_info
argument works. The only operation that I have found to fail isconcat()
, which takes multiple chunks of input. We could implementconcat()
by first concat the cuDF dataframes, then the Pandas dataframes, and finally convert everything to cuDF dataframes before concatenating the result.I have experimented with this approach, and it works like a charm, but we properly need a more generic design that makes use of type dispatching and are extendable like all the other backend functions.
Spilling by Conversion
Make it possible for extensions like
Dask-CUDA's DeviceHostFile
to annotate tasks. This way whenDeviceHostFile
spills GPU memory by converting to Pandas, it can inform the Worker and the Scheduler that the task and its dependents should be handled as a CPU task. And the other way around when un-spilling.cc. @dask/gpu
The text was updated successfully, but these errors were encountered: