Skip to content

[DataFlow runtime] Online disaggregated training (build_disagg_online_* + StreamingRefChannel)#18

Closed
maocheng23 wants to merge 2 commits into
dataflow-up-16-zerocopyfrom
dataflow-up-17-online-disagg
Closed

[DataFlow runtime] Online disaggregated training (build_disagg_online_* + StreamingRefChannel)#18
maocheng23 wants to merge 2 commits into
dataflow-up-16-zerocopyfrom
dataflow-up-17-online-disagg

Conversation

@maocheng23

Copy link
Copy Markdown
Owner

Stacked on dataflow-up-16-zerocopy (= upstream sgl-project#614sgl-project#612). Opened on the fork because its base isn't upstream yet; retarget to sgl-project once sgl-project#614 lands.

What

Makes online disaggregated training work: a rollout producer pool streams features to a trainer consumer pool on another node — tensors over Mooncake, refs over a new streaming control channel. (Offline disagg hands the consumer a static manifest; online needs a continuous stream.)

  • StreamingRefChannel — cross-process, append-only, tensor-free SampleRef stream (partial-line-safe tail reads; in_flight_remote backpressure; close() EOF sentinel). No shared data mount.
  • StreamingRefQueue — adapts it to the loader's SampleRefQueue protocol; get() blocks until refs arrive or the channel closes.
  • build_disagg_online_producer — RolloutWorker(s) put() consume-once features into Mooncake + publish refs; drive_producer() runs to pool drain with backpressure, then closes.
  • build_disagg_online_consumer — online trainer (target_head=None) reading the channel + a consume-once Mooncake store; cross-pool free via the shared Mooncake remove().

Validation (2× H200)

No hot-switch. Two-axis staleness is the planned next PR.

🤖 Generated with Claude Code

@maocheng23 maocheng23 force-pushed the dataflow-up-16-zerocopy branch from 73023aa to 207e0d4 Compare June 28, 2026 23:55
@maocheng23 maocheng23 force-pushed the dataflow-up-17-online-disagg branch from b1255f7 to 57da206 Compare June 28, 2026 23:56
@maocheng23 maocheng23 force-pushed the dataflow-up-16-zerocopy branch from 207e0d4 to f057907 Compare June 29, 2026 03:57
@maocheng23 maocheng23 force-pushed the dataflow-up-17-online-disagg branch from 57da206 to d2c0382 Compare June 29, 2026 03:57
maocheng23 and others added 2 commits June 29, 2026 13:20
…ocess online ref stream

The offline disagg path hands the consumer a STATIC ref manifest written once.
Online disaggregation needs a continuous stream: the rollout producer commits
SampleRefs while the trainer consumes them, on another node. StreamingRefChannel
is that control-plane channel:

* tensor-free append-only JSONL (asserts no-tensor on publish); feature tensors
  go through the FeatureStore (Mooncake), so no shared *data* mount is needed.
* poll() tail-reads complete lines from the last offset, buffering a partial
  trailing line so a half-written record is never parsed.
* mark_consumed()/consumed_remote() give the producer a cross-process
  backpressure signal (in_flight_remote) with no shared in-process state.
* close() drops an EOF sentinel so stream() terminates once drained;
  idle_timeout_s guards against a dead producer.

Filesystem-backed (any shared control mount); a networked control plane slots in
behind the same publish/poll API later. 7 CPU tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…consumer} + StreamingRefQueue

Wires online disaggregated training: a rollout producer pool streams features to
a trainer pool on a different node, tensors over Mooncake, refs over the
StreamingRefChannel.

* StreamingRefQueue: adapts the channel to the SampleRefQueue protocol
  (get/ack/fail) the FeatureDataLoader consumes. get() blocks until refs are
  available or the channel is closed-and-drained; ack() advances the channel's
  consumed counter (the producer's backpressure signal).
* build_disagg_online_producer: RolloutWorker(s) (HF/SGLang target via
  SGLangAdapter) put() consume-once features into a Mooncake store and publish
  refs to the channel. drive_producer() runs until the prompt pool drains,
  pausing while in_flight_remote() exceeds a high-watermark so a lagging trainer
  can't overrun the segment, then closes the channel.
* build_disagg_online_consumer: the online trainer assembly (target_head=None)
  reading refs from a StreamingRefQueue + tensors from a consume-once Mooncake
  store. The loader frees each sample on read (get -> release -> remote remove).

The cross-pool consume-once free works through the shared Mooncake remove (proven
in the mooncake cross-process tests); from SampleRef down the trainer path is
identical to colocated online.

Tests: test_disagg_online (CPU integration -- stream -> loader -> consume-once
free, backpressure, blocks-until-close) + test_disagg_online_launch (GPU --
producer streams, consumer trains through FSDP end to end). StreamingRefQueue
covered via the integration test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@maocheng23

Copy link
Copy Markdown
Owner Author

Superseded: re-opened upstream (sgl-project, online-disagg PR based on sgl-project#621). Closing this fork-only PR.

@maocheng23 maocheng23 closed this Jun 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant