[DataFlow runtime] Online disaggregated training (build_disagg_online_* + StreamingRefChannel)#18
Closed
maocheng23 wants to merge 2 commits into
Closed
Conversation
73023aa to
207e0d4
Compare
b1255f7 to
57da206
Compare
207e0d4 to
f057907
Compare
57da206 to
d2c0382
Compare
…ocess online ref stream The offline disagg path hands the consumer a STATIC ref manifest written once. Online disaggregation needs a continuous stream: the rollout producer commits SampleRefs while the trainer consumes them, on another node. StreamingRefChannel is that control-plane channel: * tensor-free append-only JSONL (asserts no-tensor on publish); feature tensors go through the FeatureStore (Mooncake), so no shared *data* mount is needed. * poll() tail-reads complete lines from the last offset, buffering a partial trailing line so a half-written record is never parsed. * mark_consumed()/consumed_remote() give the producer a cross-process backpressure signal (in_flight_remote) with no shared in-process state. * close() drops an EOF sentinel so stream() terminates once drained; idle_timeout_s guards against a dead producer. Filesystem-backed (any shared control mount); a networked control plane slots in behind the same publish/poll API later. 7 CPU tests. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…consumer} + StreamingRefQueue Wires online disaggregated training: a rollout producer pool streams features to a trainer pool on a different node, tensors over Mooncake, refs over the StreamingRefChannel. * StreamingRefQueue: adapts the channel to the SampleRefQueue protocol (get/ack/fail) the FeatureDataLoader consumes. get() blocks until refs are available or the channel is closed-and-drained; ack() advances the channel's consumed counter (the producer's backpressure signal). * build_disagg_online_producer: RolloutWorker(s) (HF/SGLang target via SGLangAdapter) put() consume-once features into a Mooncake store and publish refs to the channel. drive_producer() runs until the prompt pool drains, pausing while in_flight_remote() exceeds a high-watermark so a lagging trainer can't overrun the segment, then closes the channel. * build_disagg_online_consumer: the online trainer assembly (target_head=None) reading refs from a StreamingRefQueue + tensors from a consume-once Mooncake store. The loader frees each sample on read (get -> release -> remote remove). The cross-pool consume-once free works through the shared Mooncake remove (proven in the mooncake cross-process tests); from SampleRef down the trainer path is identical to colocated online. Tests: test_disagg_online (CPU integration -- stream -> loader -> consume-once free, backpressure, blocks-until-close) + test_disagg_online_launch (GPU -- producer streams, consumer trains through FSDP end to end). StreamingRefQueue covered via the integration test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
d2c0382 to
7cd5452
Compare
Owner
Author
|
Superseded: re-opened upstream (sgl-project, online-disagg PR based on sgl-project#621). Closing this fork-only PR. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked on
dataflow-up-16-zerocopy(= upstream sgl-project#614 → sgl-project#612). Opened on the fork because its base isn't upstream yet; retarget to sgl-project once sgl-project#614 lands.What
Makes online disaggregated training work: a rollout producer pool streams features to a trainer consumer pool on another node — tensors over Mooncake, refs over a new streaming control channel. (Offline disagg hands the consumer a static manifest; online needs a continuous stream.)
StreamingRefChannel— cross-process, append-only, tensor-freeSampleRefstream (partial-line-safe tail reads;in_flight_remotebackpressure;close()EOF sentinel). No shared data mount.StreamingRefQueue— adapts it to the loader'sSampleRefQueueprotocol;get()blocks until refs arrive or the channel closes.build_disagg_online_producer— RolloutWorker(s)put()consume-once features into Mooncake + publish refs;drive_producer()runs to pool drain with backpressure, then closes.build_disagg_online_consumer— online trainer (target_head=None) reading the channel + a consume-once Mooncake store; cross-pool free via the shared Mooncakeremove().Validation (2× H200)
test_disagg_online_launch): producer streams 8 refs → consumer trains 2 FSDP optimizer steps from the stream. Green.test_disagg_online) +StreamingRefChannel(7 tests). Green.No hot-switch. Two-axis staleness is the planned next PR.
🤖 Generated with Claude Code