Skip to content

[Docs] Improve V1 KVConnector interface documentation #19172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions vllm/distributed/kv_transfer/kv_connector/v1/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@
Scheduler-side: runs in the scheduler, binds metadata, which
is used by the worker-side to load/save KV cache.
get_num_new_matched_tokens() - get number of new tokens
that exist in the remote KV cache
that exist in the remote KV cache. Might be called multiple
times for a given request and should be side-effect free.
update_state_after_alloc() - update KVConnector state after
temporary buffer alloc by the CacheManager.
request_finished() - called when a request is finished, with
the computed kv cache blocks for the request.
Returns whether KV cache should be freed now or will be
freed asynchronously and optionally returns KV transfer
params.

Worker-side: runs in each worker, loads/saves KV cache to/from
the Connector based on the metadata.
Expand All @@ -19,6 +25,9 @@

save_kv_layer() - starts saving KV for layer i (maybe async)
wait_for_save() - blocks until all saves are done

get_finished() - called with ids of finished requests, returns
ids of requests that have completed async sending/recving.
"""

import enum
Expand Down Expand Up @@ -184,7 +193,8 @@ def get_finished(
finished generating tokens.

Returns:
ids of requests that have finished asynchronous transfer,
ids of requests that have finished asynchronous transfer
(requests that previously returned True from request_finished()),
Comment on lines +196 to +197
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if we can add some descriptions about the finished_req_ids argument.
IIUC, the existing NIXL connector does not use this argument, so it's a bit unclear about what it means and how should we use it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ApostaC right, it's not used in the existing NIXL connector since this sends only prefill cache (after a single token is generated) but might be needed in general sending/offload cases, where the connector needs to know when there are no more tokens to be generated from the request so that it can return that request's id once all remaining transfer for the request has completed.

You can see an example of this in the draft LMCache async saving PR that I opened: LMCache/LMCache#666

tuple of (sending/saving ids, recving/loading ids).
The finished saves/sends req ids must belong to a set provided in a
call to this method (this call or a prior one).
Expand Down Expand Up @@ -215,7 +225,8 @@ def get_num_new_matched_tokens(
- The number of tokens that can be loaded from the
external KV cache beyond what is already computed.
- `True` if external KV cache tokens will be loaded
asynchronously (between scheduler steps).
asynchronously (between scheduler steps). Must be
'False' if the first element is 0.
"""
pass

Expand All @@ -225,6 +236,18 @@ def update_state_after_alloc(self, request: "Request",
num_external_tokens: int):
"""
Update KVConnector state after block allocation.

If get_num_new_matched_tokens previously returned True for a
request, this function may be called twice for that same request -
first when blocks are allocated for the connector tokens to be
asynchronously loaded into, and second when any additional blocks
are allocated, after the load/transfer is complete.

Args:
request (Request): the request object.
blocks (KVCacheBlocks): the blocks allocated for the request.
num_external_tokens (int): the number of tokens that will be
loaded from the external KV cache.
"""
pass

Expand Down
12 changes: 6 additions & 6 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(
# This is flushed at the end of each scheduling step.
self.finished_req_ids: set[str] = set()

# P/D: requests in process of recving KV transfers
# KV Connector: requests in process of async KV loading or recving
self.finished_recving_kv_req_ids: set[str] = set()

# OPTIMIZATION: Cache the CachedRequestData objects to avoid creating
Expand Down Expand Up @@ -821,7 +821,7 @@ def update_from_output(
if not stopped:
new_running.append(request)

# P/D: update state for finished KV Transfers.
# KV Connector: update state for finished KV Transfers.
self._update_from_kv_xfer_finished(model_runner_output)

# Return the cached request data to the queue so they can be reused.
Expand Down Expand Up @@ -968,7 +968,7 @@ def shutdown(self) -> None:
self.kv_event_publisher.shutdown()

########################################################################
# P/D Related Methods
# KV Connector Related Methods
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think P/D (Prefill/Decode) is meaningful and should not be deleted entirely. Maybe we add the explanation for P/D?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yewentao256 the connector API that the scheduler interfaces with is more generic than just P/D, the same primitives can be used for example by connectors that implement offloading and reloading from a distributed kvcache. So here "KV connector" methods is more appropriate (P/D disagg is just one application of those).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Thanks for the explanation!

########################################################################

def get_kv_connector(self) -> Optional[KVConnectorBase_V1]:
Expand All @@ -991,7 +991,7 @@ def _connector_finished(

def _update_waiting_for_remote_kv(self, request: Request) -> bool:
"""
P/D: check if the request_id is finished_recving.
KV Connector: check if the request_id is finished_recving.

The finished_recving_kv_req_ids list is populated
on the previous steps()'s update_from_output based
Expand Down Expand Up @@ -1026,15 +1026,15 @@ def _update_waiting_for_remote_kv(self, request: Request) -> bool:
def _update_from_kv_xfer_finished(self,
model_runner_output: ModelRunnerOutput):
"""
P/D: update the scheduler state based on the output.
KV Connector: update the scheduler state based on the output.

The Worker side connectors add finished_recving and
finished_sending reqs to the output.
* if finished_sending: free the blocks
# if finished_recving: add to state so we can
scheduler the request during the next step.
"""
# P/D: update recv and send status from last step.
# KV Connector:: update recv and send status from last step.
for req_id in (model_runner_output.finished_recving or ()):
logger.debug("Finished recving KV transfer for request %s", req_id)
self.finished_recving_kv_req_ids.add(req_id)
Expand Down