Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC]: Interface and Abstraction for Distributed Inference Environment #3587

Closed
youkaichao opened this issue Mar 23, 2024 · 18 comments
Closed

Comments

@youkaichao
Copy link
Member

youkaichao commented Mar 23, 2024

This RFC describes a proposal for interfaces and abstractions for distributed inference environments. I plan to solicit discussions for a week (until March 31st) before I begin to actually refactor the code.

Motivation

The current distributed inference environment in vllm is quite tangled, and we often see deadlocks and hangs (see #3455 , #2770 , #3559 , to name a few). The problem becomes prominent when we try to upgrade to pytorch 2.2.0 (see #3442 , #3442 ), because pytorch 2.2.0 upgrades from nccl==2.18.1 to 2.19.3 (see https://pypi.org/pypi/torch/2.1.2/json and https://pypi.org/pypi/torch/2.2.0/json to compare the dependency), and nccl==2.19.3 breaks vllm due to increased memory cost during cudagraph capture (from 10MB per graph to 100MB per graph, adds up to several GBs because we have dozens of cudagraph).

TL,DR; distributed inference in current codebase is a headache. If it works, hooray; if not, don't be surprised.

Proposal

Abstraction

I think we should have three levels of abstraction:

  1. Launcher, responsible for launching processes (potentially across multi-node). Currently it is ray, but we can also have another choices like Python's native multiprocessing in single-node cases. See [Core] Multiprocessing executor for single-node multi-GPU deployment #3466 for example.
  2. Coordinator, responsible for coordinating shared resources (e.g. filesystem usage) and broadcasting some messages. Currently we don't have this, and there are lots of hacks for ad-hoc implementation, e.g. use filelock to lock on filesystems ( [Bugfix] use SoftLockFile instead of LockFile #3578 ), use TCP to initialize communication in cupy ( Use CuPy for CUDA graphs #2811 ), use MPI to initialize communication in AMD's cupy version ( [ROCm] enable cupy in order to enable cudagraph mode for AMD GPUs #3123 ).
  3. Communicator, responsible for cross-device communication of large tensor data (e.g. perform allreduce). Currently we support nccl, and AMD also has its own communication library. Note that this is vendor-specific, and vendors usually have their own way of cross-device communication.

The most messy one, and the missing one, is the Coordinator abstraction level. More on this later.

Interface

Between each consecutive abstractions, lies the interface.

Interface between Launcher and Coordinator

After Launcher launches processes, it needs to at least tell the processes the following information:

  • launch_id, used to distinguish current launch with possibly concurrent launch (e.g. when 4 users want to set up 4 inference engines in the same node, each with 2 GPUs). Note: the launch_id can be used as a "random seed" to draw values for master_port, instead of keeping only one default master_port value and having to kill all processes after the last run crashes. A reference implementation would be hashing the launch_id to a port number, and increasing the port number to find the first free port. This is a strategy taken by Jupyter Notebook/Lab Server .
  • world_size, number of processes participating in the current launch (may span over multiple nodes)
  • local_world_size, number of processes participating in the current launch in the current node (not necessarily the same across nodes)
  • rank, range from 0 (inclusive) to world_size (exclusive) , unique in the launch for each process
  • local_rank, range from 0 (inclusive) to local_world_size (exclusive), unique in each node, can use this to assign devices in a node!
  • master_addr, the IP address of the master node, should be reachable from all nodes
  • master_port, a free port in the master node, reserved for possible coordination
  • other custom information can be added, but the above are required.

How does Launcher pass these information to each process? Basically we have two choices:

  1. through environment variables, the simplest way, but will disable the usage of thread-level distributed inference because environment variables are shared within threads in one process. (However, thread-level distributed inference seems rare. Do we need to consider this?)
  2. through serialization and deserialization (e.g. passing bytes in a shared object store), the most general way, at the cost of complexity and runtime efficiency to design and execute the serialization/deserialization

Interface between Coordinator and Communicator

Device communicators (e.g. nccl) often need to initialize the communication by sharing some unique token (see nccl documentation). In addition, processes sometimes need to coordinate the resource in a node or across the cluster.

In sight of the above consideration, Coordinator should at least have the following interfaces:

  1. is_master(): tell if the current process is a master process, i.e. convenient wrapper for boilerplate code rank == 0
  2. is_local_master(): tell if the current process is a local master process, i.e. convenient wrapper for boilerplate code local_rank == 0
  3. broadcast(bytes, src): broadcast some message (in the form of bytes) from rank src to all the processes. The semantic is standard, no need for more explanation.
  4. barrier(): block until all processes reaches here. Also standard communication primitive.

Note: very often than not, we want to execute something in just one process per node (e.g. creating directories, downloading files to the node). Inspired by this thread, we can write code like this:

if is_local_master():
    do_something() # download file, create directory, etc.
barrier()

Furthermore, there are more complicated requirements like "only one process in each node does something, but this something is different across nodes", essentially the requirement of local_barrier(), a function that block until all processes in the current node reaches here. It is debatable if we want this (currently I don't see any requirements like this in vllm.)

Communicator interface

The following functionality of communicator is suggested (mostly taken from the nccl design):

  1. the master process get unique token to identify the communication group
  2. the master process broadcast unique token to all ranks
  3. each process initializes communication by the unique token and their rank, world_size
  4. an in-place allreduce function: allreduce(char* input, size_t count, size_t dtype, size_t op). More functionality would be better (e.g. out-of-place allreduce, broadcast/reduce/scatter etc.), but inplace allreduce is all we need currently.

The intended usage would be something like this:

# inside each process
coor = Coordinator(); # initialize Coordinator, done by vllm
comm = Communicator(coor) # hardware vendor can use `coor` to initialize their communicator
data = torch.tensor((1024, 1024)).to(device=f"xpu:{coor.local_rank}")
comm.allreduce(data) # hardware vendor can access the raw data via pytorch's [`Tensor.data_ptr`](https://pytorch.org/docs/stable/generated/torch.Tensor.data_ptr.html) mechanism.
# please implement Communicator.__del__ to destroy communicator, so that programs can exit gracefully 

A reference implementation of Coordinator

A reference implementation of Coordinator can be torch.distributed, with the gloo backend designed to communicate CPU tensors.

Other considerations include MPI and custom-implemented TCP store. However, since we live in torch framework, torch.distributed is a natural choice without any new dependency.

Note: torch.distributed can also be used as a fully functional communicator for GPU devices. However, torch.distributed.all_reduce is way more complicated than just an allreduce operation. It might initialize autograd engine, might keep track of gradients, might dispatch to different device kernels. Even if we are in torch.inference_mode, its c10 engine might perform some additional operations that fails functionalities like cudagraph. Therefore, I prefer to call vendor-provided communication libraries directly to bypass the problem. After all, we just want an allreduce operation on dense tensors, without any hustle and bustle.

Benefits

After we have the above abstraction and interface, we can have the following benefits:

  • We are always in a distributed environment, just with different sizes of wold_size. Distributed concerns will always be considered, so that we can easily scale to multi-node environments (if any LLM needs this).
  • Hardware vendors can plug in their communication libraries very easily. All they need to provide are: integration into pytorch torch.Tensor (only forward computation ops are enough), a c library (an .so file would be enough) for calling communication ops with raw data (i.e. char* in c). And if they want to move quickly, just one allreduce op would be enough for inference. No need to wait for the whole functionality completed within pytorch.

Things not to be considered

We don't aim for a fully-fledged distributed execution environment. And since inference tasks are almost stateless, we don't need to consider elasticness and fault-tolerance. As opposed to training, we don't need to save checkpoints, we don't need to resume from previous failure ...

@rkooo567
Copy link
Collaborator

Thanks for the RFC! I have several questions.

  1. Should we consider the requirements from more advanced communication pattern, e.g., https://arxiv.org/abs/2401.09670 (prefill disaggregation)? The major requirement would be that this will require to establish communicator between two groups of workers, meaning there could be more than 1 communicator group (for all reduce among tp workers, and send/recv among different group of workers).
  2. When we initialize the engine, we initialize states for all workers by calling APIs such as calling init_model, init_communicator to all workers. Is this going to be a part of coordinator APIs? or launcher?
  3. It is a little confusing to me the coordinator is passed into communicator because coordinator has "broadcast" API which is typically implemented by the communicator API. Is broadcast implementation agonistic to communicator in general?

@youkaichao
Copy link
Member Author

Should we consider the requirements from more advanced communication pattern

It is possible, and it won't change the interface in this RFC. The burden would go to DistributedWorker (described below), on how it creates communicators using coordinators.

Is init_model, init_communicator going to be a part of coordinator APIs? or launcher?

I think it should go to the API of a distributed worker, an abstraction coming with launcher. Each vLLM engine instance will have one launcher, who launches a set of DistributedWorker, and then the launcher calls init_model on each DistributedWorker.

A default implementation of DistributedWorker can be:

class DistributedWorker:
    def __init__(self, args):
        self.coor = Coordinator(); # initialize Coordinator, done by vllm
        self.comm = Communicator(self.coor) # hardware vendor can use `coor` to initialize their communicator

    def init_model(self, args):
        pass

    def run_model(self, args):
        pass

It is a little confusing to me the coordinator is passed into communicator because coordinator has "broadcast" API which is typically implemented by the communicator API.

The emphasis of this RFC is to disentangle control-plane communication (Coordinator) and data-plane communication (Communicator). Both of them can have a broadcast operation, but Coordinator.broadcast (required) is designed to broadcast tiny control messages (e.g. 128 bytes), while Communicator.broadcast (optional) is designed to broadcast large chunks of data (e.g. 100MB tensors).

Communicators typically need control-plane communication to set up the state before large chunk of data communication, e.g. in the design of nccl, before we can perform allreduce on tensor, we need to broadcast a unique id using CPUs so that communicators know they are in a group.

@WoosukKwon
Copy link
Collaborator

This is awesome!

@rkooo567
Copy link
Collaborator

The emphasis of this RFC is to disentangle control-plane communication (Coordinator) and data-plane communication (Communicator). Both of them can have a broadcast operation, but Coordinator.broadcast (required) is designed to broadcast tiny control messages (e.g. 128 bytes), while Communicator.broadcast (optional) is designed to broadcast large chunks of data (e.g. 100MB tensors).

In this case, do we decouple communicator implementation from coordinator broadcast implementation? For example, let's say we have a new communicator implementation called Gccl, GcclCommunicator(). In this case, Coordinator's broadcast implementation still uses the default one, or Gccl under the hood?

I am worried it is a leak abstraction if coordinator should use communicator's broadcast implementation under the hood. In that case, it may make more sense the coordinator accepts the communicator as an input?

class DistributedWorker:
    def __init__(self, args):
        self.comm = Communicator() # hardware vendor can use `coor` to initialize their communicator
        self.coor = Coordinator(self.comm); # initialize Coordinator, done by vllm

class Coordinator:
    def broadcast(self):
        self.comm.broadcast(...)

@youkaichao
Copy link
Member Author

do we decouple communicator implementation from coordinator broadcast implementation?

Yes, they are decoupled. The reference implementation of coordinator uses pytorch's gloo backend, which is a CPU backend and is always available. If we have a new communicator GcclCommunicator, Coordinator's broadcast implementation still uses the default one, not the GcclCommunicator one.

The dependency chain is: coordinator has its own broadcast mechanism, either via MPI or TCPStore or anything else, to broadcast tiny messages. Communicator depends on coordinator's broadcast operation to finish initialization, and then Communicator can do all_reduce on large tensor data.

@rkooo567
Copy link
Collaborator

Gotcha, that makes sense! Thanks for the clarification!

@WoosukKwon
Copy link
Collaborator

QQ: How does the custom all reduce backend fit into this abstraction?

@youkaichao
Copy link
Member Author

QQ: How does the custom all reduce backend fit into this abstraction?

I'm not familiar with custom allreduce kernel. At the first glance, I feel it can be another implementation of Communicator, beside the nccl one.

@youkaichao
Copy link
Member Author

youkaichao commented Apr 9, 2024

Progress tracker:

  • [Core] separate distributed_init from worker #3904 adds a CPU coordinator. By default, vllm has two process groups, one is the default world with device-specific backend, the other is cpu world with gloo backend.
  • launch_id is renamed to instance_id, there is a get_vllm_instance_id function in vllm/utils.py . Currently it is only used in ray gpu executor. In the future, I plan to make all executors respect it, by moving verbose logging under some folder named with instance_id.

@youkaichao
Copy link
Member Author

Summary of all the distributed inference cases we need to support: a cartesian product of single/multi vllm engine, single/multi node.

Precondition: each vllm engine will require at least one GPUs. GPUs cannot be shared across vllm engines.

Here are all the possible combinations:

Single vllm Engine, Single Node

Users can use CUDA_VISIBLE_DEVICES to control the set of GPUs used for the vLLM instance.
e.g.
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m vllm.entrypoints.openai.api_server args

Single vllm Engine, Multiple Nodes

Currently only support ray-managed cluster. Only node need to execute this:

python -m vllm.entrypoints.openai.api_server args

Then ray will use all nodes in the cluster.

In the future we may need to support torchrun/mpi style launcher, as described in #3902 (comment) .

# multi node, on node 0
CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nnodes 2 --nproc-per-node=n --rdzv_backend=c10d --rdzv_endpoint=${node_0_ip:port} python -m vllm.entrypoints.openai.api_server $args
# multi node, on node 1
CUDA_VISIBLE_DEVICES=1,2,3,4 torchrun --nnodes 2 --nproc-per-node=n --rdzv_backend=c10d --rdzv_endpoint=${node_0_ip:port} python -m vllm.entrypoints.openai.api_server $args

Multiple vllm Engines, Single Node

Users can use CUDA_VISIBLE_DEVICES to control the set of GPUs used for each vLLM instance.
e.g.

first engine:

CUDA_VISIBLE_DEVICES=0,1,2,3 python -m vllm.entrypoints.openai.api_server args

second engine:

CUDA_VISIBLE_DEVICES=4,5,6,7 python -m vllm.entrypoints.openai.api_server args

Multiple vllm Engines, Multiple Nodes

In this case, each vllm engine should have its own nodes. No overlap is allowed. We don't need to do anything in this case.

Conclusion

Ideally, we should respect CUDA_VISIBLE_DEVICES set by users, in all cases.

@Jeffwan
Copy link
Contributor

Jeffwan commented Apr 19, 2024

Communicator, responsible for cross-device communication of large tensor data (e.g. perform allreduce). Currently we support nccl, and AMD also has its own communication library. Note that this is vendor-specific, and vendors usually have their own way of cross-device communication.
The most messy one, and the missing one, is the Coordinator abstraction level. More on this later.

I think some companies are building their own XXCCL solution to

  1. optimize the cross-node performance, discover network topology and optimize communication etc based on their own data center settings.
  2. abstract the heterogeneous devices and provide a hardware abstraction layer (HAL) (term is from https://arxiv.org/abs/2202.07848 paper).

While this is separate from the vllm engine itself, I am thinking maybe it's worth to extend to this area or use existing library to simplify the work if there's any.

@Jeffwan
Copy link
Contributor

Jeffwan commented Apr 19, 2024

through environment variables, the simplest way, but will disable the usage of thread-level distributed inference because environment variables are shared within threads in one process. (However, thread-level distributed inference seems rare. Do we need to consider this?)

I think the only use case here is the single node multiple GPU case. In this case, could different threads just set device in this way torch.cuda.set_device(device_id)? They share the envs but but in non-distributed env, rank information should not be injected from external. Instead, launcher just need to manage the workers and they should have 1:1 mapping with CUDA_VISIBLE_DEVICES

@Jeffwan
Copy link
Contributor

Jeffwan commented Apr 20, 2024

launch_id, used to distinguish current launch with possibly concurrent launch (e.g. when 4 users want to set up 4 inference engines in the same node, each with 2 GPUs). Note: the launch_id can be used as a "random seed" to draw values for master_port, instead of keeping only one default master_port value and having to kill all processes after the last run crashes. A reference implementation would be hashing the launch_id to a port number, and increasing the port number to find the first free port. This is a strategy taken by Jupyter Notebook/Lab Server .

I didn't get this part.

  1. In order to distinguish different engines (4 engines with 2 gpus each one one node), the new instance should always picks up the random ports. Is master_port enough?

  2. If we do like some id, why not hashing to random port number to launch_id but do a reverse way?

@youkaichao
Copy link
Member Author

launch_id can even go beyond distributed training, e.g. when we plan to put verbose logging in some folder, as partly done in #4079 .

@Jeffwan
Copy link
Contributor

Jeffwan commented Apr 20, 2024

might perform some additional operations that fails functionalities like cudagraph.

I am thinking whether the motivation is strong enough. Seems two pain points are 1. torch.distributed.all_reduce is not that clean, it doesn't work with cudagraph 2. heterogeneous devices are hard to maintain.

  1. for the 1st one, I feel the refactor cost is a little bit higher for one feature with side effect (taking more memories).
  2. for the 2nd one, should we expect different vendor support their communicative backend at torch.distrbuted level?

from the architecture level, split communication collective usage into control plane and data plane totally makes sense. I am just thinking whether it's possible to reuse one progress group which is cleaner but address issues you listed.

@youkaichao
Copy link
Member Author

First of all, thank you for your interest and feedback!

whether it's possible to reuse one progress group which is cleaner

Process group is just a way to organize processes, it does not create new processes. For tensor parallel of size k, we can have k processes, and they can form multiple process groups. That said, creating process group is quite cheap. I don't get it why you think we should stick to one group.

should we expect different vendor support their communicative backend at torch.distrbuted level

That could be the case, but the progress can be really slow. e.g. we have to wait for at least several months for pytorch release. Meanwhile, writing wrappers in vllm is much faster to implement, e.g. we can do it in days.

@Jeffwan
Copy link
Contributor

Jeffwan commented Apr 20, 2024

@youkaichao

Process group is just a way to organize processes, it does not create new processes. For tensor parallel of size k, we can have k processes, and they can form multiple process groups. That said, creating process group is quite cheap. I don't get it why you think we should stick to one group.

Yeah, the communication thread it is cheap. I was thinking about the debuggability and ports etc. (we have limited ports can be exposed in our env). But anyway, consider just two groups needed here, I think my concern was unnecessary. I am buying into the idea.

@youkaichao
Copy link
Member Author

Finished in #5293

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants