-
-
Notifications
You must be signed in to change notification settings - Fork 8.4k
[WIP] [Core][P/D] CPU connector for PD disagg #18332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add 🚀 |
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
dst_v.copy_(src_v, non_blocking=True) | ||
|
||
|
||
def h2d_page_copy(src_buffer: torch.Tensor, dst_layer: torch.Tensor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ - do you think we can tweak the code to have an invariant that start_token_idx
and stop_token_idx
are multiples of block_size? This would really simplify the implementation I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will be some annoying corner cases with chunked prefill.
For example, assume the max_num_batched_tokens
is 8192, and we have 2 requests with 8200 tokens.
In the first scheduling step, the first 8192 tokens of the first request will be scheduled as a new request.
In the second scheduling step, the last 8 tokens of the first request will be scheduled as "running request", and then the first 8192 - 8 = 8184 tokens of the second request will be scheduled as a new request.
In the above example, the second request will have the stop_token_idx
not multiples of block_size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but this does not matter for p/d since the offloading tokens are not "scheduled", they are treated like a prefix cache hit from the POV of the scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh i see, its because we are doing the sending chunk by chunk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ApostaC! Still reviewing but posting comments so far...
# NOTE(ApostaC): For a single request, this function will be called | ||
# two times if the first time we returned async_load flag as True. | ||
# The second time will be the "real schedule" time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this is no longer the case, it will now typically only be called the first time in this case. It may still be called a second time if kv cache allocation fails the first time (without any call to update_state_after_alloc
in-between).
prefill_request_id = kv_transfer_params["prefill_request_id"] | ||
self._connect_request_ids(prefill_request_id, request_id) | ||
self._should_be_ready_reqs.add(request_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may want to move this to update_state_after_alloc
. We should assume that get_num_new_matched_tokens
is side-effect free. The multi-connector implementation will assume this.
See discussion and MultiConnector updates in #18632.
) | ||
|
||
# Create a destination spec | ||
dest_spec = DestinationSpec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will we support heterogeneous TP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I'm envisioning the support in the following PRs.
When everything is offloaded to CPU, it would be easier to rearrange the memory there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one idea is to have a separate ring buffer for each KV head
This pull request has merge conflicts that must be resolved before it can be |
layer_id = self._get_layer_id(layer_name) | ||
event = self._decoder_cuda_events.pop(layer_id, None) | ||
if event is not None: | ||
event.synchronize() | ||
|
||
if layer_id == len(self._gpu_kv_caches) - 1: | ||
# Free the memory for the whole request | ||
for p_req_id in self._inflight_h2d_requests: | ||
logger.info("Freeing request %s, current watermark: [%d, %d]", | ||
p_req_id, | ||
self._kv_receiver._allocator.low_watermark, | ||
self._kv_receiver._allocator.high_watermark) | ||
self._kv_receiver.free_request(p_req_id) | ||
self._inflight_h2d_requests.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to synchronize/block here? The idea of the async parts of the connector API is that these loads can span forward passes. If we are blocking in-line then there's no need to return load_kv_async=True
from the scheduler side request_finished
method, and no need to return anything from the worker side get_finished
method...
Sorry if I've misunderstood something here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two "ready" steps:
- first is the tcp transfer
- second if the cpu -> gpu load
only the tcp transfer happens async in this PR
|
Some Notes:
|
I'm traveling these days. Will come back to this PR after this Wednesday. |
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Fixed the crash problem. Now the lm_eval runs with the following output on llama-3.1-8B model:
Will clean the debug code and push commits soon. |
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
@njhill @robertgshaw2-redhat Now the crashing & hanging issue should be fixed. |
TL;DR:
This PR implements CPU-based connector for PD disaggregation, with the following features
(Update May 22) Now the code is functional and can produce correct results in offline test scripts. There are a few more minor things to do.
High-level designs
Prefiller node
When a layer finishes the prefill at the prefiller node, the following things will happen:
Upon the finish of each layer's prefill, the prefill worker will also do a quick "status check" on the "processing queue":
The following screenshot demonstrates the layer-wise async KV cache storing:

Decoder node
Every time the decoder worker has been called, it will do the following things:
Immediate TODO items
Future works