-
-
Notifications
You must be signed in to change notification settings - Fork 8.4k
[Docs] Improve V1 KVConnector interface documentation #19172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,9 +8,15 @@ | |
Scheduler-side: runs in the scheduler, binds metadata, which | ||
is used by the worker-side to load/save KV cache. | ||
get_num_new_matched_tokens() - get number of new tokens | ||
that exist in the remote KV cache | ||
that exist in the remote KV cache. Might be called multiple | ||
times for a given request and should be side-effect free. | ||
update_state_after_alloc() - update KVConnector state after | ||
temporary buffer alloc by the CacheManager. | ||
request_finished() - called when a request is finished, with | ||
the computed kv cache blocks for the request. | ||
Returns whether KV cache should be freed now or will be | ||
freed asynchronously and optionally returns KV transfer | ||
params. | ||
|
||
Worker-side: runs in each worker, loads/saves KV cache to/from | ||
the Connector based on the metadata. | ||
|
@@ -19,6 +25,9 @@ | |
|
||
save_kv_layer() - starts saving KV for layer i (maybe async) | ||
wait_for_save() - blocks until all saves are done | ||
|
||
get_finished() - called with ids of finished requests, returns | ||
ids of requests that have completed async sending/recving. | ||
njhill marked this conversation as resolved.
Show resolved
Hide resolved
njhill marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
|
||
import enum | ||
|
@@ -184,7 +193,8 @@ def get_finished( | |
finished generating tokens. | ||
|
||
Returns: | ||
ids of requests that have finished asynchronous transfer, | ||
ids of requests that have finished asynchronous transfer | ||
(requests that previously returned True from request_finished()), | ||
Comment on lines
+196
to
+197
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be great if we can add some descriptions about the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ApostaC right, it's not used in the existing NIXL connector since this sends only prefill cache (after a single token is generated) but might be needed in general sending/offload cases, where the connector needs to know when there are no more tokens to be generated from the request so that it can return that request's id once all remaining transfer for the request has completed. You can see an example of this in the draft LMCache async saving PR that I opened: LMCache/LMCache#666 |
||
tuple of (sending/saving ids, recving/loading ids). | ||
The finished saves/sends req ids must belong to a set provided in a | ||
call to this method (this call or a prior one). | ||
|
@@ -215,7 +225,8 @@ def get_num_new_matched_tokens( | |
- The number of tokens that can be loaded from the | ||
external KV cache beyond what is already computed. | ||
- `True` if external KV cache tokens will be loaded | ||
asynchronously (between scheduler steps). | ||
asynchronously (between scheduler steps). Must be | ||
'False' if the first element is 0. | ||
""" | ||
pass | ||
|
||
|
@@ -225,6 +236,18 @@ def update_state_after_alloc(self, request: "Request", | |
num_external_tokens: int): | ||
""" | ||
Update KVConnector state after block allocation. | ||
|
||
If get_num_new_matched_tokens previously returned True for a | ||
request, this function may be called twice for that same request - | ||
first when blocks are allocated for the connector tokens to be | ||
asynchronously loaded into, and second when any additional blocks | ||
are allocated, after the load/transfer is complete. | ||
|
||
Args: | ||
request (Request): the request object. | ||
blocks (KVCacheBlocks): the blocks allocated for the request. | ||
num_external_tokens (int): the number of tokens that will be | ||
loaded from the external KV cache. | ||
""" | ||
pass | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,7 +101,7 @@ def __init__( | |
# This is flushed at the end of each scheduling step. | ||
self.finished_req_ids: set[str] = set() | ||
|
||
# P/D: requests in process of recving KV transfers | ||
# KV Connector: requests in process of async KV loading or recving | ||
self.finished_recving_kv_req_ids: set[str] = set() | ||
|
||
# OPTIMIZATION: Cache the CachedRequestData objects to avoid creating | ||
|
@@ -821,7 +821,7 @@ def update_from_output( | |
if not stopped: | ||
new_running.append(request) | ||
|
||
# P/D: update state for finished KV Transfers. | ||
# KV Connector: update state for finished KV Transfers. | ||
self._update_from_kv_xfer_finished(model_runner_output) | ||
|
||
# Return the cached request data to the queue so they can be reused. | ||
|
@@ -968,7 +968,7 @@ def shutdown(self) -> None: | |
self.kv_event_publisher.shutdown() | ||
|
||
######################################################################## | ||
# P/D Related Methods | ||
# KV Connector Related Methods | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think P/D (Prefill/Decode) is meaningful and should not be deleted entirely. Maybe we add the explanation for P/D? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yewentao256 the connector API that the scheduler interfaces with is more generic than just P/D, the same primitives can be used for example by connectors that implement offloading and reloading from a distributed kvcache. So here "KV connector" methods is more appropriate (P/D disagg is just one application of those). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense. Thanks for the explanation! |
||
######################################################################## | ||
|
||
def get_kv_connector(self) -> Optional[KVConnectorBase_V1]: | ||
|
@@ -991,7 +991,7 @@ def _connector_finished( | |
|
||
def _update_waiting_for_remote_kv(self, request: Request) -> bool: | ||
""" | ||
P/D: check if the request_id is finished_recving. | ||
KV Connector: check if the request_id is finished_recving. | ||
|
||
The finished_recving_kv_req_ids list is populated | ||
on the previous steps()'s update_from_output based | ||
|
@@ -1026,15 +1026,15 @@ def _update_waiting_for_remote_kv(self, request: Request) -> bool: | |
def _update_from_kv_xfer_finished(self, | ||
model_runner_output: ModelRunnerOutput): | ||
""" | ||
P/D: update the scheduler state based on the output. | ||
KV Connector: update the scheduler state based on the output. | ||
|
||
The Worker side connectors add finished_recving and | ||
finished_sending reqs to the output. | ||
* if finished_sending: free the blocks | ||
# if finished_recving: add to state so we can | ||
scheduler the request during the next step. | ||
""" | ||
# P/D: update recv and send status from last step. | ||
# KV Connector:: update recv and send status from last step. | ||
for req_id in (model_runner_output.finished_recving or ()): | ||
logger.debug("Finished recving KV transfer for request %s", req_id) | ||
self.finished_recving_kv_req_ids.add(req_id) | ||
|
Uh oh!
There was an error while loading. Please reload this page.