Skip to content

feat: router supporting intra-worker dp routing #1285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from

Conversation

PeaBrane
Copy link
Contributor

@PeaBrane PeaBrane commented May 30, 2025

Overview:

Make the current Router more general with respect to handling the type of worker id (beyond i64). Directly motivated by the need to support direct dp routing for vllm v1

Related PRs:

vllm emitting dp_rank in events

vllm supporting direct dp targeting

Summary by CodeRabbit

  • New Features

    • Expanded worker identification to support composite keys consisting of both WorkerId and DpRank, enabling more granular scheduling and routing decisions.
  • Refactor

    • Generalized internal logic to handle flexible worker ID types, replacing fixed types with generics across scheduling, routing, and metrics modules.
    • Updated metrics and endpoint data structures to support multiple metric entries per endpoint.
    • Adjusted worker selection and scoring logic to operate on (WorkerId, DpRank) pairs.
    • Simplified import statements and consolidated related imports for clarity.
  • Removals

    • Removed the KvRoutedIngress service worker abstraction.
  • Bug Fixes

    • Improved handling of metrics aggregation and endpoint processing for scenarios with multiple data entries.

Copy link

copy-pr-bot bot commented May 30, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

Copy link
Contributor

coderabbitai bot commented May 30, 2025

Walkthrough

The changes introduce generic worker identifiers throughout the key-value router modules, replacing fixed i64 worker IDs with generic types and supporting composite identifiers like (WorkerId, DpRank). This affects scheduling, event handling, metrics, and selection logic. Several struct and method signatures are updated, and one worker ingress implementation is removed. Imports and endpoint data structures are also refactored for consistency.

Changes

File(s) Change Summary
lib/llm/src/kv_router/protocols.rs Introduced WorkerGeneral trait and generic worker ID usage. Refactored RouterResponse, WorkerSelectionResult, and event structs to be generic over worker ID types. Added WorkerId and DpRank type aliases.
lib/llm/src/kv_router/indexer.rs Refactored all indexer types, traits, and methods to be generic over worker ID type T: WorkerGeneral. Updated data structures, channels, and test code accordingly.
lib/llm/src/kv_router/scheduler.rs Updated scheduling logic and data structures to use (WorkerId, DpRank) as worker identifiers. Modified request/response types, event channels, and selection logic for composite keys. Removed unused imports and redundant struct.
lib/llm/src/kv_router.rs Propagated (WorkerId, DpRank) composite identifier through router, scheduling, and event handling. Updated struct fields, method signatures, and trait implementations to use generic worker IDs. Adjusted response and event deserialization.
lib/llm/src/kv_router/recorder.rs Made KvRecorder generic over worker ID type. Updated test code to use generic forms.
lib/llm/src/kv_router/metrics_aggregator.rs Changed imports and endpoint data decoding to expect vectors of ForwardPassMetrics.
lib/llm/src/kv_router/publisher.rs Consolidated multiple import statements into one. No logic changes.
lib/llm/src/kv_router/scoring.rs Defined a new Endpoint struct locally with a vector of ForwardPassMetrics. Updated ProcessedEndpoints to use WorkerId and adjusted statistics calculation.
components/metrics/src/lib.rs Consolidated imports. Changed metric data handling from single struct to vector of structs. Updated code to access first element or wrap metric in a vector. Modified postprocess_metrics to use a vector for Endpoint.data.
components/metrics/src/bin/mock_worker.rs,
components/metrics/src/main.rs
Updated imports for KVHitRateEvent, WorkerId, and DpRank. Adjusted event deserialization and logging to handle composite worker IDs.
components/router/src/main.rs Updated CustomWorkerSelector method signature to return WorkerSelectionResult<(WorkerId, DpRank)>. Adjusted imports.
lib/llm/src/kv_router/worker.rs Deleted: Removed the KvRoutedIngress struct and its builder/start logic for NATS service worker. Removed associated constants and re-exports.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Router
    participant Indexer
    participant Scheduler

    Client->>Router: schedule(token_ids, lora_id)
    Router->>Indexer: find_best_match(tokens)
    Indexer-->>Router: ((WorkerId, DpRank), overlap)
    Router->>Scheduler: schedule(overlap, isl_tokens)
    Scheduler-->>Router: (WorkerId, DpRank)
    Router-->>Client: (WorkerId, DpRank)
Loading

Poem

A hop and a leap, now workers are pairs,
Not just an ID, but a rank each one bears.
Schedules and scores, with generics they flow,
The router’s more clever—just watch it go!
Farewell to old code, new types in the sun,
This rabbit’s delighted: the refactor’s done!
🐇✨


📜 Recent review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6bee243 and dab052c.

📒 Files selected for processing (3)
  • lib/llm/src/kv_router/protocols.rs (3 hunks)
  • lib/llm/src/kv_router/scheduler.rs (10 hunks)
  • lib/llm/src/kv_router/scoring.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • lib/llm/src/kv_router/scoring.rs
  • lib/llm/src/kv_router/protocols.rs
  • lib/llm/src/kv_router/scheduler.rs
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Build and Test - vllm
  • GitHub Check: pre-merge-rust (lib/bindings/python)
  • GitHub Check: pre-merge-rust (.)
  • GitHub Check: pre-merge-rust (lib/runtime/examples)

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🔭 Outside diff range comments (5)
lib/llm/src/kv_router/recorder.rs (1)

105-107: ⚠️ Potential issue

Mismatch between comment and test data

The comment says “second event from worker 2” but you still pass worker_id = 1.

-// Create second event from worker 2 using helper function
-let event2 = create_remove_event(1, 43, vec![2, 3]);
+// Create second event from worker 2 using helper function
+let event2 = create_remove_event(2, 43, vec![2, 3]);

The discrepancy doesn’t break the test, yet it can confuse future readers or cause false assumptions in downstream logic that groups events per worker.

components/metrics/src/lib.rs (1)

447-482: 🛠️ Refactor suggestion

Unchecked metrics[0] indexing can panic

Endpoint.data is a Vec; indexing without a guard will panic if it’s empty.
Although postprocess_metrics currently guarantees one element, defensive code is safer and future-proof:

-            let metrics = endpoint.data.clone();
+            let Some(first) = endpoint.data.first() else { continue };

and replace every metrics[0] with first.

This also avoids the unnecessary clone().

lib/llm/src/kv_router.rs (1)

214-222: 💡 Verification agent

❓ Verification inconclusive

DpRank is currently ignored when routing – potential mis-routing

After selecting (worker_id, dp_rank) the router calls
self.inner.direct(updated_request, instance_id.0), i.e. it strips the DpRank.
If multiple DP ranks of the same worker expose independent KV caches, this will route the
request to an arbitrary rank 0 instance and silently degrade hit-rate.

// TODO: this does not do dp routing
self.inner.direct(updated_request, instance_id.0).await

Please propagate dp_rank (e.g. through an enriched instance identifier or a secondary
routing key) or clearly document why dropping it is safe.


DP rank dropped during routing – needs fix or documentation

The snippet in lib/llm/src/kv_router.rs (lines 214–222) calls

// TODO: this does not do dp routing
self.inner.direct(updated_request, instance_id.0).await

which strips off the dp_rank component of instance_id. If each DP rank has its own KV cache, routing always to rank 0 will silently degrade hit-rates.

Please either:

  • Propagate the full instance_id (including its dp_rank) into the backend call (e.g. extend inner.direct to accept both worker_id and dp_rank), or
  • Introduce a secondary routing key for dp_rank, or
  • Add a comment in the code/docs explaining why dropping dp_rank is safe until DP-routing support is added.
lib/llm/src/kv_router/scheduler.rs (1)

300-302: ⚠️ Potential issue

Invalid use of rand API – code will not compile

rand::rng() and random_range do not exist in the rand crate.

-let mut rng = rand::rng();
-best_workers[rng.random_range(0..best_workers.len())]
+let mut rng = rand::thread_rng();
+best_workers[rng.gen_range(0..best_workers.len())]

Add use rand::Rng; (already present) and optionally rand::seq::SliceRandom if you want
choose.

lib/llm/src/kv_router/indexer.rs (1)

795-797: 🛠️ Refactor suggestion

⚠️ Potential issue

iter::repeat_n is not available in std::iter – compilation will fail

std::iter does not expose repeat_n; it belongs to the itertools crate.
Attempting to build will yield a “method not found” / “function not found” error.

- scores.frequencies.extend(iter::repeat_n(0, diff as usize));
+ // std alternative – no extra dependency required
+ scores
+     .frequencies
+     .extend(std::iter::repeat(0).take(diff as usize));

If you prefer itertools, import and gate it behind an optional feature, but avoid hard-to-spot build breaks in the core path.

♻️ Duplicate comments (1)
lib/llm/src/kv_router/indexer.rs (1)

844-847: Identical panic risk in sharded indexer’s apply_event

See rationale above – replace unwrap() with graceful error handling to prevent unexpected crashes in production.

🧹 Nitpick comments (8)
components/metrics/src/main.rs (1)

201-202: Address the incomplete dp_rank handling.

The TODO comment indicates that the current implementation only uses worker_id.0 and doesn't handle dp_rank properly. This could lead to incorrect metrics aggregation when multiple DP ranks are involved.

Consider implementing proper handling of the composite worker ID in metrics collection. This might involve:

  1. Updating the metrics collection to include DP rank as a dimension
  2. Modifying the update_kv_hit_rate method signature to accept the full composite ID
  3. Ensuring metrics are properly partitioned by both worker ID and DP rank

Would you like me to help design a solution for proper composite worker ID handling in metrics collection?

components/router/src/main.rs (1)

92-96: Consider clarifying selector extensibility

CustomWorkerSelector simply delegates to DefaultWorkerSelector.
If custom logic is never injected, you could:

  1. Omit the wrapper and pass DefaultWorkerSelector directly, or
  2. Add a brief comment stating what future heuristics will be added (e.g., DP-aware shard affinity).

Not blocking, just a readability note.

lib/llm/src/kv_router/recorder.rs (1)

96-102: Alias usage is valid but may surprise newcomers

KvRecorder::<T>::new works because a type-alias inherits inherent impls from its target (Recorder<_>).
A short doc-comment on the alias would help readers unfamiliar with this Rust feature.

components/metrics/src/lib.rs (1)

598-605: Only the first DP-rank is surfaced to Prometheus

postprocess_metrics wraps a single ForwardPassMetrics in a Vec, yet upstream callers now provide one metrics struct per DP rank.
Downstream update() ignores elements after index 0, losing visibility into multi-rank workers.

Consider either aggregating the vector (e.g., sum/avg) or exporting per-rank labels (add an extra label for dp_rank).

lib/llm/src/kv_router/indexer.rs (4)

120-131: Trailing tokens are silently ignored in compute_block_hash_for_seq

Using chunks_exact skips the final partial chunk (≤ kv_block_size - 1 tokens).
If that is intentional, add a doc comment warning callers; if not, switch to chunks to include the remainder:

- .chunks_exact(kv_block_size)
+ .chunks(kv_block_size)

Dropping data without explicit notice is error-prone, especially once different models use different block sizes.


406-411: inspect is a readability outlier here

Option::inspect is okay, but the intent (“ensure non-increasing frequencies”) is clearer with a straightforward if let:

- self.frequencies
-     .last()
-     .inspect(|elem| debug_assert!(**elem >= frequency));
+ if let Some(&last) = self.frequencies.last() {
+     debug_assert!(last >= frequency);
+ }

Pure style, but helps future maintainers unfamiliar with inspect.


510-515: Single-thread runtime: current_thread() is lighter than multi_thread(1)

tokio::runtime::Builder::new_current_thread() avoids the overhead of the multi-thread scheduler while maintaining the same semantics you want here.

-let runtime = tokio::runtime::Builder::new_multi_thread()
-    .worker_threads(1)
+let runtime = tokio::runtime::Builder::new_current_thread()

Not critical, but saves memory and a few CPU cycles.


691-692: Broadcast channel capacity of 1 048 576 messages is excessive

An unbounded-looking buffer can hide back-pressure problems and blow up memory under load.
Consider:

  1. Reducing the size (a few thousand is usually plenty).
  2. Forward-pressure strategies (drop oldest, reject new, etc.) depending on SLA.
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6336143 and 6bee243.

📒 Files selected for processing (13)
  • components/metrics/src/bin/mock_worker.rs (1 hunks)
  • components/metrics/src/lib.rs (3 hunks)
  • components/metrics/src/main.rs (3 hunks)
  • components/router/src/main.rs (2 hunks)
  • lib/llm/src/kv_router.rs (7 hunks)
  • lib/llm/src/kv_router/indexer.rs (32 hunks)
  • lib/llm/src/kv_router/metrics_aggregator.rs (2 hunks)
  • lib/llm/src/kv_router/protocols.rs (3 hunks)
  • lib/llm/src/kv_router/publisher.rs (1 hunks)
  • lib/llm/src/kv_router/recorder.rs (5 hunks)
  • lib/llm/src/kv_router/scheduler.rs (10 hunks)
  • lib/llm/src/kv_router/scoring.rs (2 hunks)
  • lib/llm/src/kv_router/worker.rs (0 hunks)
💤 Files with no reviewable changes (1)
  • lib/llm/src/kv_router/worker.rs
🧰 Additional context used
🧬 Code Graph Analysis (2)
lib/llm/src/kv_router/publisher.rs (1)
lib/llm/src/kv_router/indexer.rs (1)
  • compute_block_hash_for_seq (119-131)
lib/llm/src/kv_router/recorder.rs (3)
lib/llm/src/kv_router/indexer.rs (6)
  • create_remove_event (915-932)
  • new (150-156)
  • new (197-199)
  • new (385-390)
  • new (574-576)
  • new (764-766)
lib/llm/src/kv_router/protocols.rs (1)
  • new (194-196)
lib/llm/src/recorder.rs (1)
  • send_events (287-386)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Build and Test - vllm
🔇 Additional comments (9)
components/metrics/src/bin/mock_worker.rs (1)

17-17: LGTM: Clean import reorganization.

The import path change from scheduler to protocols module aligns with the broader refactoring to centralize event definitions in the protocols module.

lib/llm/src/kv_router/metrics_aggregator.rs (1)

21-21: LGTM: Import reorganization aligns with module restructuring.

The import path change from scheduler to scoring module is consistent with the broader refactoring.

lib/llm/src/kv_router/publisher.rs (1)

17-17: LGTM: Clean import consolidation.

The consolidation of multiple imports into a single line using protocols::* improves readability and aligns with the module reorganization.

components/metrics/src/main.rs (3)

30-30: LGTM: Import updates support composite worker ID type.

The import changes correctly add the necessary types (WorkerId, DpRank) to support the composite worker identifier.


183-183: LGTM: Correct parameterization for composite worker ID.

The type parameter (WorkerId, DpRank) correctly reflects the new composite worker identifier structure.


189-191: LGTM: Proper handling of composite worker ID in logging.

The logging correctly accesses both components of the worker ID tuple (worker_id.0 and worker_id.1) for complete visibility.

components/router/src/main.rs (1)

28-32: Generic worker-ID import looks good

The file correctly pulls in WorkerId and DpRank, and the downstream type annotation in select_worker compiles cleanly.
No further action required.

lib/llm/src/kv_router/recorder.rs (1)

31-33: Explicit test alias helps readability

Introducing type TestWorkerId = i64; makes the generic tests clear and future-proof.

lib/llm/src/kv_router/indexer.rs (1)

81-140: Trait bound sanity check

RadixBlock, OverlapScores, and several HashMap / HashSet usages require
T: Eq + Hash + Clone. Ensure the WorkerGeneral trait declares these super-traits; otherwise downstream code will fail to compile.

If WorkerGeneral already does, ignore this note.

@PeaBrane PeaBrane marked this pull request as draft May 30, 2025 06:46
@PeaBrane PeaBrane changed the title feat: more general handling of worker id type feat: router supporting intra-worker dp routing Jun 4, 2025
To run other dp ranks headless on same node or other nodes can run

```
VLLM_LOGGING_LEVEL=DEBUG CUDA_VISIBLE_DEVICES=1 VLLM_USE_V1=1 vllm serve Qwen/Qwen3-0.6B -dp 1 -dpr 1 --data-parallel-address 127.0.0.1 --data-parallel-rpc-port 62300 --data-parallel-size-local 1 --enforce-eager --headless --kv-events-config '{"enable_kv_cache_events": true, "publisher": "zmq"}' --enable-prefix-caching
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V1 and prefix caching are enabled by default


VllmDecodeWorker:
enforce-eager: true
max-num-batched-tokens: 16384
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to set that?

SimpleLoadBalancer:
enable_disagg: false
common-configs: [model, served_model_name]
router_mode: kv

VllmDecodeWorker:
enforce-eager: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to set that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry. I think we pushed the changes to our config file for our local dev. We were hijacking your simple load balancer to do kv routing 😆, but those changes were not pushed. For now, we are still working on cleaning the python bits up.

model: Qwen/Qwen3-0.6B

block-size: 16
max-model-len: 16384
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to set that?

served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
model: Qwen/Qwen3-0.6B

block-size: 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to set that?


VllmDecodeWorker:
enforce-eager: true
max-num-batched-tokens: 16384
enable-prefix-caching: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is enabled by default in V1

SimpleLoadBalancer:
enable_disagg: false
common-configs: [model, served_model_name]
router_mode: kv

VllmDecodeWorker:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need kv-events-config specified here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see it's hard coded in the config, what's the reason for that instead of using config yaml?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants