-
Notifications
You must be signed in to change notification settings - Fork 451
feat: router supporting intra-worker dp routing #1285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThe changes introduce generic worker identifiers throughout the key-value router modules, replacing fixed Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Router
participant Indexer
participant Scheduler
Client->>Router: schedule(token_ids, lora_id)
Router->>Indexer: find_best_match(tokens)
Indexer-->>Router: ((WorkerId, DpRank), overlap)
Router->>Scheduler: schedule(overlap, isl_tokens)
Scheduler-->>Router: (WorkerId, DpRank)
Router-->>Client: (WorkerId, DpRank)
Poem
📜 Recent review detailsConfiguration used: .coderabbit.yaml 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (3)
⏰ Context from checks skipped due to timeout of 90000ms (4)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🔭 Outside diff range comments (5)
lib/llm/src/kv_router/recorder.rs (1)
105-107
:⚠️ Potential issueMismatch between comment and test data
The comment says “second event from worker 2” but you still pass
worker_id = 1
.-// Create second event from worker 2 using helper function -let event2 = create_remove_event(1, 43, vec![2, 3]); +// Create second event from worker 2 using helper function +let event2 = create_remove_event(2, 43, vec![2, 3]);The discrepancy doesn’t break the test, yet it can confuse future readers or cause false assumptions in downstream logic that groups events per worker.
components/metrics/src/lib.rs (1)
447-482
: 🛠️ Refactor suggestionUnchecked
metrics[0]
indexing can panic
Endpoint.data
is aVec
; indexing without a guard will panic if it’s empty.
Althoughpostprocess_metrics
currently guarantees one element, defensive code is safer and future-proof:- let metrics = endpoint.data.clone(); + let Some(first) = endpoint.data.first() else { continue };and replace every
metrics[0]
withfirst
.This also avoids the unnecessary
clone()
.lib/llm/src/kv_router.rs (1)
214-222
: 💡 Verification agent❓ Verification inconclusive
DpRank
is currently ignored when routing – potential mis-routingAfter selecting
(worker_id, dp_rank)
the router calls
self.inner.direct(updated_request, instance_id.0)
, i.e. it strips theDpRank
.
If multiple DP ranks of the same worker expose independent KV caches, this will route the
request to an arbitrary rank 0 instance and silently degrade hit-rate.// TODO: this does not do dp routing self.inner.direct(updated_request, instance_id.0).awaitPlease propagate
dp_rank
(e.g. through an enriched instance identifier or a secondary
routing key) or clearly document why dropping it is safe.
DP rank dropped during routing – needs fix or documentation
The snippet in lib/llm/src/kv_router.rs (lines 214–222) calls
// TODO: this does not do dp routing self.inner.direct(updated_request, instance_id.0).awaitwhich strips off the
dp_rank
component ofinstance_id
. If each DP rank has its own KV cache, routing always to rank 0 will silently degrade hit-rates.Please either:
- Propagate the full
instance_id
(including itsdp_rank
) into the backend call (e.g. extendinner.direct
to accept both worker_id and dp_rank), or- Introduce a secondary routing key for dp_rank, or
- Add a comment in the code/docs explaining why dropping dp_rank is safe until DP-routing support is added.
lib/llm/src/kv_router/scheduler.rs (1)
300-302
:⚠️ Potential issueInvalid use of
rand
API – code will not compile
rand::rng()
andrandom_range
do not exist in therand
crate.-let mut rng = rand::rng(); -best_workers[rng.random_range(0..best_workers.len())] +let mut rng = rand::thread_rng(); +best_workers[rng.gen_range(0..best_workers.len())]Add
use rand::Rng;
(already present) and optionallyrand::seq::SliceRandom
if you want
choose
.lib/llm/src/kv_router/indexer.rs (1)
795-797
: 🛠️ Refactor suggestion
⚠️ Potential issue
iter::repeat_n
is not available instd::iter
– compilation will fail
std::iter
does not exposerepeat_n
; it belongs to the itertools crate.
Attempting to build will yield a “method not found” / “function not found” error.- scores.frequencies.extend(iter::repeat_n(0, diff as usize)); + // std alternative – no extra dependency required + scores + .frequencies + .extend(std::iter::repeat(0).take(diff as usize));If you prefer
itertools
, import and gate it behind an optional feature, but avoid hard-to-spot build breaks in the core path.
♻️ Duplicate comments (1)
lib/llm/src/kv_router/indexer.rs (1)
844-847
: Identical panic risk in sharded indexer’sapply_event
See rationale above – replace
unwrap()
with graceful error handling to prevent unexpected crashes in production.
🧹 Nitpick comments (8)
components/metrics/src/main.rs (1)
201-202
: Address the incomplete dp_rank handling.The TODO comment indicates that the current implementation only uses
worker_id.0
and doesn't handledp_rank
properly. This could lead to incorrect metrics aggregation when multiple DP ranks are involved.Consider implementing proper handling of the composite worker ID in metrics collection. This might involve:
- Updating the metrics collection to include DP rank as a dimension
- Modifying the
update_kv_hit_rate
method signature to accept the full composite ID- Ensuring metrics are properly partitioned by both worker ID and DP rank
Would you like me to help design a solution for proper composite worker ID handling in metrics collection?
components/router/src/main.rs (1)
92-96
: Consider clarifying selector extensibility
CustomWorkerSelector
simply delegates toDefaultWorkerSelector
.
If custom logic is never injected, you could:
- Omit the wrapper and pass
DefaultWorkerSelector
directly, or- Add a brief comment stating what future heuristics will be added (e.g., DP-aware shard affinity).
Not blocking, just a readability note.
lib/llm/src/kv_router/recorder.rs (1)
96-102
: Alias usage is valid but may surprise newcomers
KvRecorder::<T>::new
works because a type-alias inherits inherent impls from its target (Recorder<_>
).
A short doc-comment on the alias would help readers unfamiliar with this Rust feature.components/metrics/src/lib.rs (1)
598-605
: Only the first DP-rank is surfaced to Prometheus
postprocess_metrics
wraps a singleForwardPassMetrics
in aVec
, yet upstream callers now provide one metrics struct per DP rank.
Downstreamupdate()
ignores elements after index 0, losing visibility into multi-rank workers.Consider either aggregating the vector (e.g., sum/avg) or exporting per-rank labels (add an extra label for
dp_rank
).lib/llm/src/kv_router/indexer.rs (4)
120-131
: Trailing tokens are silently ignored incompute_block_hash_for_seq
Using
chunks_exact
skips the final partial chunk (≤kv_block_size - 1
tokens).
If that is intentional, add a doc comment warning callers; if not, switch tochunks
to include the remainder:- .chunks_exact(kv_block_size) + .chunks(kv_block_size)Dropping data without explicit notice is error-prone, especially once different models use different block sizes.
406-411
:inspect
is a readability outlier here
Option::inspect
is okay, but the intent (“ensure non-increasing frequencies”) is clearer with a straightforwardif let
:- self.frequencies - .last() - .inspect(|elem| debug_assert!(**elem >= frequency)); + if let Some(&last) = self.frequencies.last() { + debug_assert!(last >= frequency); + }Pure style, but helps future maintainers unfamiliar with
inspect
.
510-515
: Single-thread runtime:current_thread()
is lighter thanmulti_thread(1)
tokio::runtime::Builder::new_current_thread()
avoids the overhead of the multi-thread scheduler while maintaining the same semantics you want here.-let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(1) +let runtime = tokio::runtime::Builder::new_current_thread()Not critical, but saves memory and a few CPU cycles.
691-692
: Broadcast channel capacity of 1 048 576 messages is excessiveAn unbounded-looking buffer can hide back-pressure problems and blow up memory under load.
Consider:
- Reducing the size (a few thousand is usually plenty).
- Forward-pressure strategies (drop oldest, reject new, etc.) depending on SLA.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
components/metrics/src/bin/mock_worker.rs
(1 hunks)components/metrics/src/lib.rs
(3 hunks)components/metrics/src/main.rs
(3 hunks)components/router/src/main.rs
(2 hunks)lib/llm/src/kv_router.rs
(7 hunks)lib/llm/src/kv_router/indexer.rs
(32 hunks)lib/llm/src/kv_router/metrics_aggregator.rs
(2 hunks)lib/llm/src/kv_router/protocols.rs
(3 hunks)lib/llm/src/kv_router/publisher.rs
(1 hunks)lib/llm/src/kv_router/recorder.rs
(5 hunks)lib/llm/src/kv_router/scheduler.rs
(10 hunks)lib/llm/src/kv_router/scoring.rs
(2 hunks)lib/llm/src/kv_router/worker.rs
(0 hunks)
💤 Files with no reviewable changes (1)
- lib/llm/src/kv_router/worker.rs
🧰 Additional context used
🧬 Code Graph Analysis (2)
lib/llm/src/kv_router/publisher.rs (1)
lib/llm/src/kv_router/indexer.rs (1)
compute_block_hash_for_seq
(119-131)
lib/llm/src/kv_router/recorder.rs (3)
lib/llm/src/kv_router/indexer.rs (6)
create_remove_event
(915-932)new
(150-156)new
(197-199)new
(385-390)new
(574-576)new
(764-766)lib/llm/src/kv_router/protocols.rs (1)
new
(194-196)lib/llm/src/recorder.rs (1)
send_events
(287-386)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build and Test - vllm
🔇 Additional comments (9)
components/metrics/src/bin/mock_worker.rs (1)
17-17
: LGTM: Clean import reorganization.The import path change from
scheduler
toprotocols
module aligns with the broader refactoring to centralize event definitions in the protocols module.lib/llm/src/kv_router/metrics_aggregator.rs (1)
21-21
: LGTM: Import reorganization aligns with module restructuring.The import path change from
scheduler
toscoring
module is consistent with the broader refactoring.lib/llm/src/kv_router/publisher.rs (1)
17-17
: LGTM: Clean import consolidation.The consolidation of multiple imports into a single line using
protocols::*
improves readability and aligns with the module reorganization.components/metrics/src/main.rs (3)
30-30
: LGTM: Import updates support composite worker ID type.The import changes correctly add the necessary types (
WorkerId
,DpRank
) to support the composite worker identifier.
183-183
: LGTM: Correct parameterization for composite worker ID.The type parameter
(WorkerId, DpRank)
correctly reflects the new composite worker identifier structure.
189-191
: LGTM: Proper handling of composite worker ID in logging.The logging correctly accesses both components of the worker ID tuple (
worker_id.0
andworker_id.1
) for complete visibility.components/router/src/main.rs (1)
28-32
: Generic worker-ID import looks goodThe file correctly pulls in
WorkerId
andDpRank
, and the downstream type annotation inselect_worker
compiles cleanly.
No further action required.lib/llm/src/kv_router/recorder.rs (1)
31-33
: Explicit test alias helps readabilityIntroducing
type TestWorkerId = i64;
makes the generic tests clear and future-proof.lib/llm/src/kv_router/indexer.rs (1)
81-140
: Trait bound sanity check
RadixBlock
,OverlapScores
, and several HashMap / HashSet usages require
T: Eq + Hash + Clone
. Ensure theWorkerGeneral
trait declares these super-traits; otherwise downstream code will fail to compile.If
WorkerGeneral
already does, ignore this note.
e1011d8
to
263c12d
Compare
To run other dp ranks headless on same node or other nodes can run | ||
|
||
``` | ||
VLLM_LOGGING_LEVEL=DEBUG CUDA_VISIBLE_DEVICES=1 VLLM_USE_V1=1 vllm serve Qwen/Qwen3-0.6B -dp 1 -dpr 1 --data-parallel-address 127.0.0.1 --data-parallel-rpc-port 62300 --data-parallel-size-local 1 --enforce-eager --headless --kv-events-config '{"enable_kv_cache_events": true, "publisher": "zmq"}' --enable-prefix-caching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V1 and prefix caching are enabled by default
|
||
VllmDecodeWorker: | ||
enforce-eager: true | ||
max-num-batched-tokens: 16384 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to set that?
SimpleLoadBalancer: | ||
enable_disagg: false | ||
common-configs: [model, served_model_name] | ||
router_mode: kv | ||
|
||
VllmDecodeWorker: | ||
enforce-eager: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to set that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry. I think we pushed the changes to our config file for our local dev. We were hijacking your simple load balancer to do kv routing 😆, but those changes were not pushed. For now, we are still working on cleaning the python bits up.
model: Qwen/Qwen3-0.6B | ||
|
||
block-size: 16 | ||
max-model-len: 16384 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to set that?
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B | ||
model: Qwen/Qwen3-0.6B | ||
|
||
block-size: 16 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to set that?
|
||
VllmDecodeWorker: | ||
enforce-eager: true | ||
max-num-batched-tokens: 16384 | ||
enable-prefix-caching: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is enabled by default in V1
SimpleLoadBalancer: | ||
enable_disagg: false | ||
common-configs: [model, served_model_name] | ||
router_mode: kv | ||
|
||
VllmDecodeWorker: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we need kv-events-config
specified here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see it's hard coded in the config, what's the reason for that instead of using config yaml?
Overview:
Make the current Router more general with respect to handling the type of worker id (beyond i64). Directly motivated by the need to support direct dp routing for vllm v1
Related PRs:
vllm emitting dp_rank in events
vllm supporting direct dp targeting
Summary by CodeRabbit
New Features
Refactor
Removals
Bug Fixes