Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Core] fully composible launcher/task/coordinator/communicator design and implementation #3762

Closed
wants to merge 25 commits into from

Conversation

youkaichao
Copy link
Member

An implementation draft of #3587 .

Check the test code for example:

import pytest
import torch

from vllm.implementations.launcher.mp_launcher import MPLauncher
from vllm.implementations.coordinator import CoordinatorType
from vllm.implementations.communicator import CommunicatorType
from vllm.implementations.distributed_tasks.global_coordinator_task import GlobalCoordinatorDistributedTask
from vllm.implementations.distributed_tasks.group_coordinator_task import GroupCoordinatorDistributedTask

class AllReduceDistributedTask(GlobalCoordinatorDistributedTask):
    def post_init_distributed(self, **kwargs):
        tensor = torch.ones(16, 1024, 1024, dtype=torch.float32).cuda(self.coordinator.get_local_rank())
        self.communicator.all_reduce(tensor_in=tensor)
        result = tensor.mean().cpu().item()
        assert result == self.coordinator.get_local_world_size()    

@pytest.mark.skipif(torch.cuda.device_count() < 2,
                    reason="Need at least 2 GPUs to run the test.")
def test_pynccl():
    MPLauncher(n_tasks=2).launch(
        task_type=AllReduceDistributedTask,
        coordinator_type=CoordinatorType.TORCH_DISTRIBUTED,
        communicator_type=CommunicatorType.PYNCCL,
    )

And the code for defining a task type:

from vllm.implementations.coordinator import CoordinatorType, get_coordinator_class
from vllm.implementations.communicator import CommunicatorType, get_communicator_class
from vllm.interfaces.launcher import DistributedTask
from vllm.interfaces.communicator import Communicator
from vllm.interfaces.coordinator import Coordinator


class GlobalCoordinatorDistributedTask(DistributedTask):
    def run(self, *, coordinator_type: CoordinatorType, communicator_type: CommunicatorType, **kwargs):
        coordinator_cls = get_coordinator_class(coordinator_type)
        communicator_cls = get_communicator_class(communicator_type)
        self.coordinator : Coordinator = coordinator_cls()
        self.coordinator.initialize()
        self.communicator : Communicator = communicator_cls(self.coordinator)
        self.post_init_distributed(**kwargs)

    def post_init_distributed(self, **kwargs):
        """Subclasses can override this method to do whatever they want.
        They can use `self.coordinator` for global communication over the whole process group.
        They can use `self.communicator` for communication between devices.
        """
        return

We can see the full composibility. MPLauncher only accepts launcher-specific args, and task_type. It passes the rest arg to task, which initialize coordinator and communicator.

It is worth to note that GlobalCoordinatorDistributedTask knows nothing about specific coordinator or communicator. It just operates on interfaces provided by Communicator and Coordinator .

This is a draft implementation, and is open for discussion.

@youkaichao
Copy link
Member Author

Here is a description of the general design:

image

@youkaichao
Copy link
Member Author

Just for visibility: I also notice that there are several concurrent efforts to refactor the multi-gpu execution, e.g. #3466 and #3691 . I will take a look at all these, and try to find out if these efforts can be unified to reduce redundant work.

@njhill
Copy link
Member

njhill commented Mar 31, 2024

@youkaichao #3466 has actually been ready now for about 6 weeks (it is a rebase of prior PR #2898). I just opened another one #3763, which should be orthogonal/complimentary to #3466. And I think either/both of these should address the performance issues that #3691 attempts to circumvent.

Copy link
Collaborator

@cadedaniel cadedaniel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff! Some question:

  • Can the interfaces be kept as pure abstract classes? It is a little hard to follow the layers upon first reading -- I feel we should avoid super().method() for user implementations as much as possible, as then the code relevant to the layer is completely contained within the implementation and not spread between implementation and interface.
  • The MPLauncher starts processes for the task. Is it possible with this design to run the Engine in a different process? The concern I have is that the Engine will do a lot of work that can contend for CPU with the model execution. Things like receiving requests/async detokenization/async tokenization. Of course we can push all those things to different threads, but the dependency on Python for LLM ecosystem means we won't be able to escape the GIL unless we cordon it off to a different process.

from vllm.interfaces.coordinator import Coordinator


class Communicator(object):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this extend object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should avoid super().method() for user implementations as much as possible, as then the code relevant to the layer is completely contained within the implementation and not spread between implementation and interface.

Technically it is doable. But then some code will be replicated over many implementations. There is a trade-off here.

Is it possible with this design to run the Engine in a different process?

Yes we can. The launcher design is general, and we can have both CPU workers and GPU workers. Although we need to think about how they coordinate with each other.

why does this extend object?
see https://stackoverflow.com/questions/4015417/why-do-python-classes-inherit-object

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

btw python 2 is EOL'd a long time ago (in 2020 IIRC), we don't need to extend object anymore!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree that code will be replicated if not using super().method(). There is a third way--factor out common utilities that can be used in multiple implementations. Basically, composition vs. inheritance.

@zhuohan123
Copy link
Member

I am a bit confused about this PR. Where will these communicator be used? How do they interact with the existing code?

@youkaichao
Copy link
Member Author

Where will these communicator be used

They will be used for GPU-collective-communication, like allreduce . For example (in the test code):

class AllReduceDistributedTask(GlobalCoordinatorDistributedTask):

    def post_init_distributed(self, **kwargs):
        tensor = torch.ones(16, 1024, 1024, dtype=torch.float32).cuda(
            self.coordinator.get_local_rank())
        self.communicator.all_reduce(tensor_in=tensor)
        result = tensor.mean().cpu().item()
        assert result == self.coordinator.get_local_world_size()

I didn't push these code into existing code yet. This is just a draft for comment.

@davidthomas426
Copy link

davidthomas426 commented Apr 9, 2024

I think for several current uses, usage like communicator.all_reduce is too high-level, as we may want to have configurable parallelization strategy. An example is gather-logits-to-rank-0 and then single driver running sampling, vs. allgather-logits and then replicating sampling. Also could replicate scheduling and then there's no need for broadcasting a bunch of input tensors that we are currently doing.

But this can happen at higher abstraction layers, and in stages.

"Coordinator" and "Communicator" is a confusing split to me, and it doesn't help that the words sound too similar to each other. Maybe this is yak shaving, but could we consider different names that make the distinction clearer? Like "WorkerCoordinator" vs. "DeviceCommunicator", or something? Or even "CollectiveComms" or "CollectiveCommunicator" to make the "NCCL wrapper" idea more clear?

@youkaichao
Copy link
Member Author

could we consider different names that make the distinction clearer? Like "WorkerCoordinator" vs. "DeviceCommunicator"

That's a great idea! Indeed I met this exact problem when I try to convey this abstraction to others. They get confused about "Coordinator" and "Communicator". I think "WorkerCoordinator" and "DeviceCommunicator" are much better. Thank you for the proposal!

@youkaichao
Copy link
Member Author

close as it is broken up in several prs.

@youkaichao youkaichao closed this Jun 14, 2024
@youkaichao youkaichao deleted the interface branch June 14, 2024 00:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants