[DataFlow runtime · online] O1.2 — named builder + interleaved async loop#625
Merged
jiapingW merged 1 commit intoJul 2, 2026
Merged
Conversation
Contributor
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Contributor
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Collaborator
Author
Code reviewNo high-confidence issues found. Checked for bugs in the interleaved producer/trainer shutdown paths, backpressure stop handling, and named online runtime wiring. |
…loop Replace the drain-then-fit shape (generate the whole prompt pool, then train) with a live loop in which the producer and trainer run concurrently. - run_disagg_online_interleaved: the producer streams refs on a daemon thread while trainer.fit consumes on the main thread (StreamingRefQueue blocks until closed-and-drained, so the trainer tracks the producer). Symmetric, hang-free shutdown: trainer-finishes-first sets a cooperative should_stop so the producer doesn't block on the in-flight watermark after the consumer stops draining; producer-finishes-first closes the channel; producer-raise closes the channel (so the consumer can't hang) and re-raises on the main thread. - build_disagg_online_eagle3_runtime: the named single-process builder from the roadmap; composes producer + consumer over one shared metadata store, one consume-once feature store, and a producer/consumer StreamingRefChannel pair, and returns (trainer, loader, run). Uses the in-process generate stub (no live SGLang server, no Ray -- those are O1.3 / O2). - drive_producer gains a cooperative should_stop predicate + finally-close. CPU tests (test_disagg_online_interleave.py): full-drain-then-terminate, trainer-stops-first cooperative wind-down, producer-exception propagation + consumer unblock. GPU test for the named builder's interleaved run() added to test_disagg_online_launch.py. Stacked on O1.1; implements stage O1.2 (#618). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
5f79d08 to
d6a6fd3
Compare
9dba169 to
17a8770
Compare
Collaborator
Author
|
Addressed review feedback (self-review pass).
Validated: full |
Base automatically changed from
dataflow-up-19-online-shared-control-plane
to
dataflow-up-16-zerocopy
July 2, 2026 04:44
jiapingW
approved these changes
Jul 2, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stage O1.2 of the online roadmap (#618): the named single-process runtime builder and the interleaved async loop that replaces drain-then-fit. Stacked on O1.1 (#624) — review/merge that first; the diff here is against the O1.1 branch.
Problem
The colocated online path was drain-then-fit: generate the whole prompt pool, then train. (Root cause: the in-process
SampleRefQueue.getis non-blocking, so the loader ended the instant the queue was momentarily empty.)Change
run_disagg_online_interleaved: the producer streams refs on a daemon thread whiletrainer.fitconsumes on the main thread (StreamingRefQueueblocks until closed-and-drained, so the trainer tracks the producer). Symmetric, hang-free shutdown:max_steps) → a cooperativeshould_stopwinds the producer down instead of blocking on the in-flight watermark after the consumer stops draining;finally(consumer can't hang) + the exception re-raised on the main thread.build_disagg_online_eagle3_runtime: the named single-process builder from the roadmap; composes producer + consumer over one shared metadata store, one consume-once feature store, and a producer/consumerStreamingRefChannelpair, returning(trainer, loader, run). Uses the in-processgenerate_eagle3_datastub — no live SGLang server and no Ray (those are O1.3 / O2).drive_producergains a cooperativeshould_stoppredicate + afinally-close so it never strands the consumer.Tests
tests/test_runtime/test_disagg_online_interleave.py(CPU): the three shutdown paths (full-drain-then-terminate, trainer-stops-first cooperative wind-down, producer-exception propagation + consumer unblock). A GPU end-to-end test for the named builder's interleavedrun()is added totest_disagg_online_launch.py.Next
O1.3 — live SGLang-server hidden-state capture (the roadmap's gating risk; needs a GPU + the capture spike first).
🤖 Generated with Claude Code