Skip to content

[WIP] [Core][P/D] CPU connector for PD disagg #18332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

ApostaC
Copy link
Collaborator

@ApostaC ApostaC commented May 19, 2025

TL;DR:
This PR implements CPU-based connector for PD disaggregation, with the following features

  • Async layerwise D2H copies at prefiller side
  • (WIP) Async layerwise H2D copies at decoder side
  • (WIP) Pure async CPU-CPU data transmission (plan: NIXL with Mooncake transfer engine or UCX)

(Update May 22) Now the code is functional and can produce correct results in offline test scripts. There are a few more minor things to do.

  • Backend perf bench: Mooncake vs UCX
  • test TP compatibility
  • add online examples
  • remove the hard code
  • add potential performance improvements
  • fix the linter issues

High-level designs

Prefiller node

When a layer finishes the prefill at the prefiller node, the following things will happen:

  • Prefill worker process launch async d2h copies
  • Prefill worker sends a request to the decode worker (via a side-channel)
  • Prefill save this request to the "processing queue"

Upon the finish of each layer's prefill, the prefill worker will also do a quick "status check" on the "processing queue":

  • If the request finished both d2h copy and got a "ready" response from the decode worker, the prefiller worker will start sending the KV cache
  • If the request has finished sending, the prefiller worker will free the resources of the request and remove it from the "processing queue"

The following screenshot demonstrates the layer-wise async KV cache storing:
image

Decoder node

Every time the decoder worker has been called, it will do the following things:

  • Get new pending requests from the scheduler
  • Check the pending requests from the prefiller side-channel, and allocate the memory to receive the data and send the response back to the prefiller
  • check if there are new data receiving tasks finished and updates its internal states
  • Load the KV cache to GPU layer-by-layer
    • Start the async H2D copy of layer i+1
    • Wait for the async H2D copy of layer i to be finished
    • Return to the model execution of layer i
  • Return the ready request IDs back to the scheduler

Immediate TODO items

  • Finish the functionality
  • Fix linter issues
  • Unit tests

Future works

  • Load-balancer
  • Separate the CPU connector into a different process
  • Async parallelism support

ApostaC added 3 commits May 19, 2025 05:26
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

@mergify mergify bot added the v1 label May 19, 2025
ApostaC added 16 commits May 20, 2025 00:28
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
@ApostaC ApostaC marked this pull request as ready for review May 28, 2025 15:39
ApostaC added 5 commits May 28, 2025 17:02
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
dst_v.copy_(src_v, non_blocking=True)


def h2d_page_copy(src_buffer: torch.Tensor, dst_layer: torch.Tensor,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ - do you think we can tweak the code to have an invariant that start_token_idx and stop_token_idx are multiples of block_size? This would really simplify the implementation I think

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be some annoying corner cases with chunked prefill.

For example, assume the max_num_batched_tokens is 8192, and we have 2 requests with 8200 tokens.
In the first scheduling step, the first 8192 tokens of the first request will be scheduled as a new request.
In the second scheduling step, the last 8 tokens of the first request will be scheduled as "running request", and then the first 8192 - 8 = 8184 tokens of the second request will be scheduled as a new request.

In the above example, the second request will have the stop_token_idx not multiples of block_size.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but this does not matter for p/d since the offloading tokens are not "scheduled", they are treated like a prefix cache hit from the POV of the scheduler

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, its because we are doing the sending chunk by chunk

Copy link
Member

@njhill njhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ApostaC! Still reviewing but posting comments so far...

Comment on lines +542 to +544
# NOTE(ApostaC): For a single request, this function will be called
# two times if the first time we returned async_load flag as True.
# The second time will be the "real schedule" time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is no longer the case, it will now typically only be called the first time in this case. It may still be called a second time if kv cache allocation fails the first time (without any call to update_state_after_alloc in-between).

Comment on lines +580 to +582
prefill_request_id = kv_transfer_params["prefill_request_id"]
self._connect_request_ids(prefill_request_id, request_id)
self._should_be_ready_reqs.add(request_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want to move this to update_state_after_alloc. We should assume that get_num_new_matched_tokens is side-effect free. The multi-connector implementation will assume this.

See discussion and MultiConnector updates in #18632.

)

# Create a destination spec
dest_spec = DestinationSpec(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will we support heterogeneous TP?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I'm envisioning the support in the following PRs.
When everything is offloaded to CPU, it would be easier to rearrange the memory there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one idea is to have a separate ring buffer for each KV head

Copy link

mergify bot commented Jun 3, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @ApostaC.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Jun 3, 2025
@vllm-project vllm-project deleted a comment from gemini-code-assist bot Jun 3, 2025
Comment on lines +773 to +786
layer_id = self._get_layer_id(layer_name)
event = self._decoder_cuda_events.pop(layer_id, None)
if event is not None:
event.synchronize()

if layer_id == len(self._gpu_kv_caches) - 1:
# Free the memory for the whole request
for p_req_id in self._inflight_h2d_requests:
logger.info("Freeing request %s, current watermark: [%d, %d]",
p_req_id,
self._kv_receiver._allocator.low_watermark,
self._kv_receiver._allocator.high_watermark)
self._kv_receiver.free_request(p_req_id)
self._inflight_h2d_requests.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to synchronize/block here? The idea of the async parts of the connector API is that these loads can span forward passes. If we are blocking in-line then there's no need to return load_kv_async=True from the scheduler side request_finished method, and no need to return anything from the worker side get_finished method...

Sorry if I've misunderstood something here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two "ready" steps:

  • first is the tcp transfer
  • second if the cpu -> gpu load

only the tcp transfer happens async in this PR

@robertgshaw2-redhat
Copy link
Collaborator

robertgshaw2-redhat commented Jun 6, 2025

  • justfile
# Setting this allows creating a symlink to Justfile from another dir
set working-directory := "/home/rshaw/vllm/pd_examples/"

# Needed for the proxy server
vllm-directory := "/home/rshaw/vllm/" 

MODEL := "Qwen/Qwen3-0.6B"
P_PORT := "8100"
D_PORT := "8200"
PROXY_PORT := "8192"

prefill:
    CUDA_VISIBLE_DEVICES=0 \
    vllm serve {{MODEL}} \
      --port {{P_PORT}} \
      --disable-log-requests \
      --enforce-eager \
      --kv-transfer-config '{"kv_connector":"CPUConnector","kv_role":"kv_producer","kv_connector_extra_config": {"host": "localhost", "port": "54321", "size": 8}}'

decode:
    CUDA_VISIBLE_DEVICES=1 \
    vllm serve {{MODEL}} \
      --port {{D_PORT}} \
      --disable-log-requests \
      --enforce-eager \
      --kv-transfer-config '{"kv_connector":"CPUConnector","kv_role":"kv_consumer","kv_connector_extra_config": {"host": "localhost", "port": "54321", "size": 8}}'


proxy:
    python "{{vllm-directory}}tests/v1/kv_connector/cpu_kv_integration/toy_proxy_server.py" \
      --port {{PROXY_PORT}} \
      --prefiller-port {{P_PORT}} \
      --decoder-port {{D_PORT}}


send_request:
  curl -X POST http://localhost:{{PROXY_PORT}}/v1/completions \
    -H "Content-Type: application/json" \
    -d '{ \
      "model": "{{MODEL}}", \
      "prompt": "Red Hat is the best open source company by far across Linux, K8s, and AI, and vLLM has the greatest community in open source AI software infrastructure. I love vLLM because", \
      "max_tokens": 150, \
      "temperature": 0.7 \
    }'

benchmark:
  python {{vllm-directory}}/benchmarks/benchmark_serving.py --port {{PROXY_PORT}} --model {{MODEL}} --dataset-name random --random-input-len 1000 --random-output-len 100

benchmark_one INPUT_LEN:
  python {{vllm-directory}}benchmarks/benchmark_one_concurrent_req.py \
    --model {{MODEL}} \
    --input-len {{INPUT_LEN}} \
    --output-len 1 \
    --num-requests 5 \
    --seed $(date +%s) \
    --port {{PROXY_PORT}}
  

eval:
  lm_eval --model local-completions --tasks gsm8k \
    --model_args model={{MODEL}},base_url=http://127.0.0.1:{{PROXY_PORT}}/v1/completions,num_concurrent=50,max_retries=3,tokenized_requests=False \
    --limit 300

@robertgshaw2-redhat
Copy link
Collaborator

robertgshaw2-redhat commented Jun 6, 2025

Some Notes:

  • I ran LM Eval with the Justfile above. The PR crashes under load.
  • I ran the following benchmark_one on a local H100 machine ... so its going over localhost. We see about 3s to send 10k tokens
Scenario 100 Tokens 1000 Tokens 10000 Tokens
P/D 168ms 406ms 3039ms
No P/D 78ms 168ms 470ms

@ApostaC
Copy link
Collaborator Author

ApostaC commented Jun 9, 2025

I'm traveling these days. Will come back to this PR after this Wednesday.

ApostaC added 2 commits June 15, 2025 16:13
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
@mergify mergify bot removed the needs-rebase label Jun 15, 2025
@ApostaC
Copy link
Collaborator Author

ApostaC commented Jun 24, 2025

Fixed the crash problem. Now the lm_eval runs with the following output on llama-3.1-8B model:

|Tasks|Version|     Filter     |n-shot|  Metric   |   |Value |   |Stderr|
|-----|------:|----------------|-----:|-----------|---|-----:|---|-----:|
|gsm8k|      3|flexible-extract|     5|exact_match|↑  |0.7933|±  |0.0234|
|     |       |strict-match    |     5|exact_match|↑  |0.7400|±  |0.0254|

Will clean the debug code and push commits soon.

ApostaC added 2 commits June 25, 2025 20:16
Signed-off-by: ApostaC <yihua98@uchicago.edu>
Signed-off-by: ApostaC <yihua98@uchicago.edu>
@ApostaC
Copy link
Collaborator Author

ApostaC commented Jun 26, 2025

@njhill @robertgshaw2-redhat Now the crashing & hanging issue should be fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants