Skip to content

feat(runtime): disaggregated offline EAGLE3 assemble example + 2-node 7B e2e#16

Closed
maocheng23 wants to merge 7 commits into
runtime-m6-disaggfrom
runtime-m6-disagg-example
Closed

feat(runtime): disaggregated offline EAGLE3 assemble example + 2-node 7B e2e#16
maocheng23 wants to merge 7 commits into
runtime-m6-disaggfrom
runtime-m6-disagg-example

Conversation

@maocheng23

Copy link
Copy Markdown
Owner

Builds the assemble example for the M6 disaggregation seam (SharedDirFeatureStore, from #12): runs offline EAGLE3 training with the rollout/feature pool and the training pool on different GPU nodes sharing only a filesystem mount. The control plane carries only tensor-free SampleRef metadata; feature tensors travel through the shared store. Disaggregation changes where features live, not their values — so results match the colocated path.

What's added

  • launch.pybuild_disagg_eagle3_runtime (consumer side). Factored the shared trainer/loader assembly out of build_offline_eagle3_runtime into _assemble_offline_eagle3, so colocated and disaggregated paths are byte-identical by construction. Added a log_interval knob to both builders.
  • data_plane/disagg_ingest.pyingest_offline_features (producer: load .ckptSharedDirFeatureStore.put) + a JSON ref-manifest (write/read_ref_manifest) as the tensor-free metadata bridge between pools (asserts the no-tensor invariant).
  • data_plane/disaggregated.pyretain_on_release (read-only mode). Offline training re-iterates the ref set across epochs; consume-once free deleted files mid-run (→ epoch-2 KeyError and corrupted epoch-1 data). Retain mode keeps files (mirrors LocalFeatureStore's file:// no-op release); online rollout keeps consume-once (default).
  • examples/disagg/run_disagg_eagle3.py (role-branched producer/consumer), run_qwen2.5_7b_eagle3_disagg.sh (rcli --per-node wrapper), README.
  • tests/test_runtime/test_disagg_launch.py — CPU bit-exact differential + GPU FSDP train smoke.

Validation — alignment proven three ways

  1. CPU bit-exact differentialSharedDirFeatureStore serves byte-identical tensors to the colocated LocalFeatureStore file:// path; the ref manifest round-trips carrying no tensors. (33 data-plane tests green.)
  2. GPU FSDPbuild_disagg_eagle3_runtime trains end-to-end through FSDP (31 runtime tests green on H200).
  3. 2-node 7B e2e — Qwen2.5-7B, ShareGPT features, on a 2×8×H200 pod over a shared /workspace mount: producer ingests 64 features → shared store; consumer trains. Run completes EXIT=0, crosses epoch boundaries (retain fix), ploss starts ~5.4 (≈ colocated baseline ~5.5), and acc 0.027→0.083 / acceptance 0.0013→0.034 climb over training (baseline direction). Per-step values are noisy at batch_size=1.

🤖 Generated with Claude Code

maocheng23 and others added 7 commits June 27, 2026 11:37
Adds the consumer/producer assembly for the M6 disaggregation seam
(SharedDirFeatureStore), plus a runnable 2-node example:

- launch.py: build_disagg_eagle3_runtime (consumer side) + factor the shared
  offline trainer assembly out of build_offline_eagle3_runtime, so colocated and
  disaggregated paths produce byte-identical batches/training.
- data_plane/disagg_ingest.py: ingest_offline_features (producer: load .ckpt ->
  SharedDirFeatureStore.put) + JSON ref-manifest (the tensor-free metadata bridge
  between pools; asserts the no-tensor invariant).
- examples/disagg/: run_disagg_eagle3.py (role-branched producer/consumer driver),
  run_qwen2.5_7b_eagle3_disagg.sh (rcli --per-node wrapper), README.
- tests/test_runtime/test_disagg_launch.py: CPU bit-exact differential (disagg
  store serves identical tensors to the colocated path; manifest round-trips
  tensor-free; B9 auth) + a GPU FSDP train smoke.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The thin launchers skip sanity_check(); the train_eagle3 builders read
args.target_batch_size/dp_size which only sanity_check derives. Call it on the
consumer after init_distributed (it needs the process group). Also wire
chat-template/cache-dir/learning-rate into the rcli wrapper.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Thread log_interval through build_offline/build_disagg_eagle3_runtime (default
50) so the example can emit a finer training curve; driver logs every 25 steps.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Offline training re-iterates the ref set across epochs, but SharedDirFeatureStore
consume-once-frees on release() -> epoch 2 get() raised KeyError. Add
retain_on_release (read-only mode): release() drops the lease but keeps the file,
mirroring LocalFeatureStore's file:// no-op release. The disagg consumer sets it;
online rollout keeps consume-once (default False). Whole-store cleanup at run end.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
DISAGG_ROLE=colocated runs the SAME model build + assembly via
build_offline_eagle3_runtime (LocalFeatureStore), so disagg vs colocated can be
compared on identical features/seed. Factored the shared model/optimizer build.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Disagg consumer vs colocated baseline on Qwen2.5-7B (identical features/seed):
training metrics (acceptance_rate/ploss/acc) match to ~5 sig figs; residual is
GPU floating-point noise, not the transport.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Lint-only: formats the files this PR adds/changes; no behavior change. The shell
wrapper is marked executable (check-shebang-scripts-are-executable).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@maocheng23 maocheng23 force-pushed the runtime-m6-disagg-example branch from 549c301 to e7bb5e1 Compare June 27, 2026 18:37
@Boreas618

Copy link
Copy Markdown

Mooncake integration: torch.save serialization vs zero-copy pointer API

The current MooncakeFeatureStore serializes tensors through torch.save into a BytesIO buffer, then calls store.put(key, bytes) on the Mooncake KV API. On the consumer side it does the reverse: store.get(key)torch.load.clone(). This effectively treats Mooncake as a generic networked byte-blob store and bypasses its core value proposition — RDMA zero-copy transfer.

How SGLang uses Mooncake (the canonical integration)

SGLang's Mooncake integration (sglang/srt/disaggregation/mooncake/conn.py, sglang/srt/mem_cache/storage/mooncake_store/) follows a zero-copy pattern throughout:

  1. Register raw memory regions with the engine (store.register_buffer(tensor.data_ptr(), nbytes)).
  2. Transfer via pointer + length — store.batch_put_from(keys, ptrs, sizes) / store.batch_get_into(keys, ptrs, sizes) for the KV store path, or engine.batch_transfer_sync_write(session_id, src_ptrs, dst_ptrs, lengths) for the PD disaggregation path.
  3. No serialization, no pickle, no intermediate byte buffers — the transfer engine reads directly from (and writes directly into) registered memory.

Cost of the current approach

  • Extra CPU work: pickle serialization (torch.save) and deserialization (torch.load) on every sample, both producer and consumer side.
  • 2× peak memory: the original CPU tensors and their serialized bytes representation coexist in memory during put().
  • Extra copies: .detach().cpu()torch.save to BytesIOstore.put (memcpy into Mooncake segment) on the producer; store.gettorch.load.clone() on the consumer — at least 3 extra copies per sample vs zero with the pointer API.
  • No RDMA benefit: the generic put(key, bytes) path does not leverage RDMA one-sided writes; it goes through Mooncake's internal copy path.

For the offline one-shot ingestion use case this may be acceptable today, but it will not scale to larger feature sets or an online hot path.

Suggested refactoring

Replace the serialize-then-put pattern with Mooncake's zero-copy pointer API. The SampleRef.feature_specs already carries per-tensor shape and dtype, which is all the consumer needs to pre-allocate receive buffers.

Producer side:

  • Store each tensor as a separate Mooncake key (e.g. {store_id}/{sample_id}/{feature_name}).
  • Ensure the tensor is contiguous in CPU pinned memory (or GPU memory if co-located with the model).
  • Call store.register_buffer(tensor.data_ptr(), tensor.nbytes) then store.batch_put_from([key], [tensor.data_ptr()], [tensor.nbytes]).
  • Embed the generation counter in SampleRef.metadata (already done) instead of inside the serialized payload.

Consumer side:

  • Pre-allocate an empty tensor from feature_specs (torch.empty(shape, dtype=dtype)).
  • Register it, then call store.batch_get_into([key], [buf.data_ptr()], [buf.nbytes]) — the tensor is populated in-place with no deserialization.
  • The existing clone-on-fetch policy can remain as an opt-in safety net on top.

Lifecycle:

  • Manage register_buffer / deregister_buffer alongside the existing _generation / _freed bookkeeping.
  • The retain_on_release offline mode and the _hold_producer_until_consumed protocol are unaffected — they are control-plane concerns orthogonal to the data transport.

This aligns with SGLang's proven pattern and would make the Mooncake backend a genuine upgrade over SharedDirFeatureStore, rather than a functionally-equivalent alternative with different plumbing.

1 similar comment
@Boreas618

Copy link
Copy Markdown

Mooncake integration: torch.save serialization vs zero-copy pointer API

The current MooncakeFeatureStore serializes tensors through torch.save into a BytesIO buffer, then calls store.put(key, bytes) on the Mooncake KV API. On the consumer side it does the reverse: store.get(key)torch.load.clone(). This effectively treats Mooncake as a generic networked byte-blob store and bypasses its core value proposition — RDMA zero-copy transfer.

How SGLang uses Mooncake (the canonical integration)

SGLang's Mooncake integration (sglang/srt/disaggregation/mooncake/conn.py, sglang/srt/mem_cache/storage/mooncake_store/) follows a zero-copy pattern throughout:

  1. Register raw memory regions with the engine (store.register_buffer(tensor.data_ptr(), nbytes)).
  2. Transfer via pointer + length — store.batch_put_from(keys, ptrs, sizes) / store.batch_get_into(keys, ptrs, sizes) for the KV store path, or engine.batch_transfer_sync_write(session_id, src_ptrs, dst_ptrs, lengths) for the PD disaggregation path.
  3. No serialization, no pickle, no intermediate byte buffers — the transfer engine reads directly from (and writes directly into) registered memory.

Cost of the current approach

  • Extra CPU work: pickle serialization (torch.save) and deserialization (torch.load) on every sample, both producer and consumer side.
  • 2× peak memory: the original CPU tensors and their serialized bytes representation coexist in memory during put().
  • Extra copies: .detach().cpu()torch.save to BytesIOstore.put (memcpy into Mooncake segment) on the producer; store.gettorch.load.clone() on the consumer — at least 3 extra copies per sample vs zero with the pointer API.
  • No RDMA benefit: the generic put(key, bytes) path does not leverage RDMA one-sided writes; it goes through Mooncake's internal copy path.

For the offline one-shot ingestion use case this may be acceptable today, but it will not scale to larger feature sets or an online hot path.

Suggested refactoring

Replace the serialize-then-put pattern with Mooncake's zero-copy pointer API. The SampleRef.feature_specs already carries per-tensor shape and dtype, which is all the consumer needs to pre-allocate receive buffers.

Producer side:

  • Store each tensor as a separate Mooncake key (e.g. {store_id}/{sample_id}/{feature_name}).
  • Ensure the tensor is contiguous in CPU pinned memory (or GPU memory if co-located with the model).
  • Call store.register_buffer(tensor.data_ptr(), tensor.nbytes) then store.batch_put_from([key], [tensor.data_ptr()], [tensor.nbytes]).
  • Embed the generation counter in SampleRef.metadata (already done) instead of inside the serialized payload.

Consumer side:

  • Pre-allocate an empty tensor from feature_specs (torch.empty(shape, dtype=dtype)).
  • Register it, then call store.batch_get_into([key], [buf.data_ptr()], [buf.nbytes]) — the tensor is populated in-place with no deserialization.
  • The existing clone-on-fetch policy can remain as an opt-in safety net on top.

Lifecycle:

  • Manage register_buffer / deregister_buffer alongside the existing _generation / _freed bookkeeping.
  • The retain_on_release offline mode and the _hold_producer_until_consumed protocol are unaffected — they are control-plane concerns orthogonal to the data transport.

This aligns with SGLang's proven pattern and would make the Mooncake backend a genuine upgrade over SharedDirFeatureStore, rather than a functionally-equivalent alternative with different plumbing.

@maocheng23

Copy link
Copy Markdown
Owner Author

@Boreas618 Good call, and we have done this zero-copy part upstream.
Now the codebase is a little messy between two repos.
sgl-project#621
I will sync everything when this part get merged.

@maocheng23

Copy link
Copy Markdown
Owner Author

Superseded: upstreamed + merged as sgl-project#610 (in up-11-m5-recovery). Closing this fork-internal PR.

@maocheng23 maocheng23 closed this Jun 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants