Skip to content

[DataFlow runtime · online] Online disaggregated training (StreamingRefChannel + build_disagg_online_*)#622

Merged
jiapingW merged 2 commits into
dataflow-up-16-zerocopyfrom
dataflow-up-17-online-disagg
Jun 30, 2026
Merged

[DataFlow runtime · online] Online disaggregated training (StreamingRefChannel + build_disagg_online_*)#622
jiapingW merged 2 commits into
dataflow-up-16-zerocopyfrom
dataflow-up-17-online-disagg

Conversation

@maocheng23

Copy link
Copy Markdown
Collaborator

Online disaggregated training on top of the Mooncake zero-copy store (#621). Adds StreamingRefChannel/StreamingRefQueue (cross-process append-only tensor-free SampleRef stream with backpressure + EOF) and build_disagg_online_producer/build_disagg_online_consumer in launch.py. Cross-pool consume-once free works via shared Mooncake remove(); from SampleRef down the trainer path == colocated online. NO hot-switch.

Validated 2-node real-Mooncake online e2e over RDMA (rollout pool streams refs → trainer pool trains FSDP steps cross-node, consume-once). Rebased onto current #621.

Stacked: #608#621this → up-18 staleness. Supersedes the fork-only PR maocheng23#18.

🤖 Generated with Claude Code

maocheng23 and others added 2 commits June 29, 2026 13:20
…ocess online ref stream

The offline disagg path hands the consumer a STATIC ref manifest written once.
Online disaggregation needs a continuous stream: the rollout producer commits
SampleRefs while the trainer consumes them, on another node. StreamingRefChannel
is that control-plane channel:

* tensor-free append-only JSONL (asserts no-tensor on publish); feature tensors
  go through the FeatureStore (Mooncake), so no shared *data* mount is needed.
* poll() tail-reads complete lines from the last offset, buffering a partial
  trailing line so a half-written record is never parsed.
* mark_consumed()/consumed_remote() give the producer a cross-process
  backpressure signal (in_flight_remote) with no shared in-process state.
* close() drops an EOF sentinel so stream() terminates once drained;
  idle_timeout_s guards against a dead producer.

Filesystem-backed (any shared control mount); a networked control plane slots in
behind the same publish/poll API later. 7 CPU tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…consumer} + StreamingRefQueue

Wires online disaggregated training: a rollout producer pool streams features to
a trainer pool on a different node, tensors over Mooncake, refs over the
StreamingRefChannel.

* StreamingRefQueue: adapts the channel to the SampleRefQueue protocol
  (get/ack/fail) the FeatureDataLoader consumes. get() blocks until refs are
  available or the channel is closed-and-drained; ack() advances the channel's
  consumed counter (the producer's backpressure signal).
* build_disagg_online_producer: RolloutWorker(s) (HF/SGLang target via
  SGLangAdapter) put() consume-once features into a Mooncake store and publish
  refs to the channel. drive_producer() runs until the prompt pool drains,
  pausing while in_flight_remote() exceeds a high-watermark so a lagging trainer
  can't overrun the segment, then closes the channel.
* build_disagg_online_consumer: the online trainer assembly (target_head=None)
  reading refs from a StreamingRefQueue + tensors from a consume-once Mooncake
  store. The loader frees each sample on read (get -> release -> remote remove).

The cross-pool consume-once free works through the shared Mooncake remove (proven
in the mooncake cross-process tests); from SampleRef down the trainer path is
identical to colocated online.

Tests: test_disagg_online (CPU integration -- stream -> loader -> consume-once
free, backpressure, blocks-until-close) + test_disagg_online_launch (GPU --
producer streams, consumer trains through FSDP end to end). StreamingRefQueue
covered via the integration test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@maocheng23 maocheng23 requested a review from FrankLeeeee as a code owner June 29, 2026 20:22
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@jiapingW jiapingW merged commit 8adec57 into dataflow-up-16-zerocopy Jun 30, 2026
1 check passed
@jiapingW jiapingW deleted the dataflow-up-17-online-disagg branch June 30, 2026 13:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants