[DataFlow runtime · online] Online disaggregated training (StreamingRefChannel + build_disagg_online_*)#622
Merged
Conversation
…ocess online ref stream The offline disagg path hands the consumer a STATIC ref manifest written once. Online disaggregation needs a continuous stream: the rollout producer commits SampleRefs while the trainer consumes them, on another node. StreamingRefChannel is that control-plane channel: * tensor-free append-only JSONL (asserts no-tensor on publish); feature tensors go through the FeatureStore (Mooncake), so no shared *data* mount is needed. * poll() tail-reads complete lines from the last offset, buffering a partial trailing line so a half-written record is never parsed. * mark_consumed()/consumed_remote() give the producer a cross-process backpressure signal (in_flight_remote) with no shared in-process state. * close() drops an EOF sentinel so stream() terminates once drained; idle_timeout_s guards against a dead producer. Filesystem-backed (any shared control mount); a networked control plane slots in behind the same publish/poll API later. 7 CPU tests. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…consumer} + StreamingRefQueue Wires online disaggregated training: a rollout producer pool streams features to a trainer pool on a different node, tensors over Mooncake, refs over the StreamingRefChannel. * StreamingRefQueue: adapts the channel to the SampleRefQueue protocol (get/ack/fail) the FeatureDataLoader consumes. get() blocks until refs are available or the channel is closed-and-drained; ack() advances the channel's consumed counter (the producer's backpressure signal). * build_disagg_online_producer: RolloutWorker(s) (HF/SGLang target via SGLangAdapter) put() consume-once features into a Mooncake store and publish refs to the channel. drive_producer() runs until the prompt pool drains, pausing while in_flight_remote() exceeds a high-watermark so a lagging trainer can't overrun the segment, then closes the channel. * build_disagg_online_consumer: the online trainer assembly (target_head=None) reading refs from a StreamingRefQueue + tensors from a consume-once Mooncake store. The loader frees each sample on read (get -> release -> remote remove). The cross-pool consume-once free works through the shared Mooncake remove (proven in the mooncake cross-process tests); from SampleRef down the trainer path is identical to colocated online. Tests: test_disagg_online (CPU integration -- stream -> loader -> consume-once free, backpressure, blocks-until-close) + test_disagg_online_launch (GPU -- producer streams, consumer trains through FSDP end to end). StreamingRefQueue covered via the integration test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Contributor
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
jiapingW
approved these changes
Jun 30, 2026
This was referenced Jul 4, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Online disaggregated training on top of the Mooncake zero-copy store (#621). Adds
StreamingRefChannel/StreamingRefQueue(cross-process append-only tensor-free SampleRef stream with backpressure + EOF) andbuild_disagg_online_producer/build_disagg_online_consumerinlaunch.py. Cross-pool consume-once free works via shared Mooncakeremove(); from SampleRef down the trainer path == colocated online. NO hot-switch.Validated 2-node real-Mooncake online e2e over RDMA (rollout pool streams refs → trainer pool trains FSDP steps cross-node, consume-once). Rebased onto current #621.
Stacked: #608 → #621 → this → up-18 staleness. Supersedes the fork-only PR maocheng23#18.
🤖 Generated with Claude Code