[DataFlow runtime] Online disaggregated training roadmap + PR plan (train-with-decode)#618
Conversation
…-with-decode) Adds refactor/dataflow-online-roadmap.md: the plan for taking the M6 DataFlow runtime from the shipped offline disaggregated path to a live "train-with-decode" disaggregated run (TorchSpec-style). Records the decision NOT to pursue draft-weight sync / staleness (unnecessary when streaming frozen-target hidden states), a competitive scorecard vs TorchSpec/DeepSpec, the target Ray-actor architecture, and a decomposition of milestones O1/O2 into a stacked series of small, reviewable PRs. Docs only; no code change. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive roadmap document (refactor/dataflow-online-roadmap.md) outlining the design, milestones, and PR decomposition for implementing "train-with-decode" online disaggregated training. The feedback highlights a critical design gap in milestone O1.1 regarding cross-process synchronization for SampleRefQueue (which currently uses in-process threading.Condition), and points out several incorrect file paths for rollout_worker.py and sglang_adapter.py that should be corrected to reference the inference/ subdirectory.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| in-process-only for the single-host case; make `SampleRefQueue` operate over the shared | ||
| store. Wire `reconcile_on_restart` into the launcher. |
There was a problem hiding this comment.
When transitioning SampleRefQueue to operate over a shared cross-process store (like SQLite or Redis), note that the current implementation of SampleRefQueue relies on threading.Condition (which is strictly in-process) to block and notify on get() and put() operations. For a true cross-process control plane, you will need to address how blocking/notification is handled across process boundaries (e.g., via database polling, Redis pub/sub, or a distributed queue mechanism) to avoid busy-waiting or missed signals.
| ref); consumer loop (lease → `get` → train → ack at optimizer step); bounded sample pool | ||
| (count-based). | ||
| - *Out:* live SGLang server (O1.3), Ray (O2), byte-backpressure (O2.3). | ||
| - *Files:* `runtime/launch.py`, `runtime/rollout_worker.py`, a small online driver. |
| cf. TorchSpec's SGLang patch) writing to `MooncakeFeatureStore`; producer commits | ||
| `SampleRef`s. | ||
| - *Out:* multi-engine pool, Ray. | ||
| - *Files:* `runtime/sglang_adapter.py` (server-backed), `runtime/rollout_worker.py`, possibly |
There was a problem hiding this comment.
Add docs/roadmap/ — per-phase Goal/Target/Implementation/Tests/Done-when across three tracks (domain-refactor, online-disaggregation, eval-and-breadth) plus a README index. The online track folds in the former online-disaggregation roadmap (#618) so there is one roadmap home. Apply two scope decisions to plan.md and the roadmap: - Frozen target, no weight sync. "Train-with-decode" = a frozen target streaming hidden states (W2/W3), not a serve-and-push workload. Drop the W4 weight lifecycle (WeightVersion/WeightPublisher/update_draft_weights/ServingTrafficStream); draft_weight_version is provenance only. - Ray is an open decision, not a non-goal. Candidate for the O2 scale-out orchestrator; decision gate in the online roadmap. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Superseded by the consolidated roadmap in #630. This roadmap is now folded into
Milestones carry over as O1.1 / O1.2 (in review) → O1.3 live frozen-target capture (next) → O2 |
Wire a shared, durable metadata store into the online disaggregated
producer/consumer so commit/dedup/ack are cross-process, and make the
streaming consumer restart-safe.
- build_disagg_online_{producer,consumer} accept metadata_store /
metadata_db_path; both processes share one SQLiteMetadataStore instead of
a private InMemoryMetadataStore each. New _resolve_metadata_store helper.
- consumer gains resume=: reconcile_on_restart derives the already-trained
set and hands it to StreamingRefQueue as skip_ids, so a restarted consumer
drops durably-trained refs on the append-only channel re-read (no duplicate
train); the committed-but-unacked tail re-streams and re-trains.
- StreamingRefQueue gains skip_ids: drops matching refs on read and counts
them consumed so the producer's in_flight_remote backpressure stays exact.
CPU tests (test_disagg_online_shared_plane.py): cross-process commit/dedup,
durable ack visible across processes, reconcile releases-acked/requeues-
unacked (and requeues-all when the optimizer step wasn't durable), restart
skip. Implements stage O1.1 of the online roadmap (#618).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…loop Replace the drain-then-fit shape (generate the whole prompt pool, then train) with a live loop in which the producer and trainer run concurrently. - run_disagg_online_interleaved: the producer streams refs on a daemon thread while trainer.fit consumes on the main thread (StreamingRefQueue blocks until closed-and-drained, so the trainer tracks the producer). Symmetric, hang-free shutdown: trainer-finishes-first sets a cooperative should_stop so the producer doesn't block on the in-flight watermark after the consumer stops draining; producer-finishes-first closes the channel; producer-raise closes the channel (so the consumer can't hang) and re-raises on the main thread. - build_disagg_online_eagle3_runtime: the named single-process builder from the roadmap; composes producer + consumer over one shared metadata store, one consume-once feature store, and a producer/consumer StreamingRefChannel pair, and returns (trainer, loader, run). Uses the in-process generate stub (no live SGLang server, no Ray -- those are O1.3 / O2). - drive_producer gains a cooperative should_stop predicate + finally-close. CPU tests (test_disagg_online_interleave.py): full-drain-then-terminate, trainer-stops-first cooperative wind-down, producer-exception propagation + consumer unblock. GPU test for the named builder's interleaved run() added to test_disagg_online_launch.py. Stacked on O1.1; implements stage O1.2 (#618). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Docs only. Adds
refactor/dataflow-online-roadmap.md— the plan for taking the M6 DataFlow runtime from the shipped offline disaggregated path to a live "train-with-decode" disaggregated run (the architecture behind TorchSpec / EAGLE 3.1).Decision recorded
Contents
MooncakeFeatureStore,SampleRef) is at parity / arguably cleaner.O1.1shared cross-process control plane ·O1.2build_disagg_online_eagle3_runtime+ async loop (in-process gen stub) ·O1.3live SGLang-server hidden-state capture 🔴 gating.O2.1actor/placement layer ·O2.2multi-producer/trainer + DP-resharding ·O2.3cross-pool backpressure ·O2.4streaming-pool lifetime + online store-bug fixes.No code change. Opening for design review before the O1.1 PR.
🤖 Generated with Claude Code