Skip to content

[DataFlow runtime] Online disaggregated training roadmap + PR plan (train-with-decode)#618

Closed
maocheng23 wants to merge 1 commit into
sgl-project:mainfrom
maocheng23:refactor/dataflow-online-roadmap
Closed

[DataFlow runtime] Online disaggregated training roadmap + PR plan (train-with-decode)#618
maocheng23 wants to merge 1 commit into
sgl-project:mainfrom
maocheng23:refactor/dataflow-online-roadmap

Conversation

@maocheng23

Copy link
Copy Markdown
Collaborator

Docs only. Adds refactor/dataflow-online-roadmap.md — the plan for taking the M6 DataFlow runtime from the shipped offline disaggregated path to a live "train-with-decode" disaggregated run (the architecture behind TorchSpec / EAGLE 3.1).

Decision recorded

  • Regime: train-with-decode, not on-policy. A live engine runs the frozen target and streams hidden states into training; the draft is not in the generation loop.
  • Consequence: weight-sync / staleness is OUT of scope — the streamed data is independent of the draft weights, so there is nothing to re-sync (TorchSpec, the SOTA comparable, has no weight-sync either). This removes the previously-floated "weight sync + two-axis staleness gate" milestone.

Contents

  • Competitive scorecard vs TorchSpec (orchestration/live-gen — where we're behind) and DeepSpec (breadth/eval — orthogonal edge). Our data plane (MooncakeFeatureStore, SampleRef) is at parity / arguably cleaner.
  • Target Ray-actor architecture.
  • PR decomposition (the point — no mega-PRs, merge bottom-up):
    • O1 — live single-pair: O1.1 shared cross-process control plane · O1.2 build_disagg_online_eagle3_runtime + async loop (in-process gen stub) · O1.3 live SGLang-server hidden-state capture 🔴 gating.
    • O2 — Ray orchestration + scale-out: O2.1 actor/placement layer · O2.2 multi-producer/trainer + DP-resharding · O2.3 cross-pool backpressure · O2.4 streaming-pool lifetime + online store-bug fixes.
    • Parallel eval/breadth track (DeepSpec parity), independent.

No code change. Opening for design review before the O1.1 PR.

🤖 Generated with Claude Code

…-with-decode)

Adds refactor/dataflow-online-roadmap.md: the plan for taking the M6 DataFlow
runtime from the shipped offline disaggregated path to a live "train-with-decode"
disaggregated run (TorchSpec-style). Records the decision NOT to pursue draft-weight
sync / staleness (unnecessary when streaming frozen-target hidden states), a
competitive scorecard vs TorchSpec/DeepSpec, the target Ray-actor architecture, and a
decomposition of milestones O1/O2 into a stacked series of small, reviewable PRs.

Docs only; no code change.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive roadmap document (refactor/dataflow-online-roadmap.md) outlining the design, milestones, and PR decomposition for implementing "train-with-decode" online disaggregated training. The feedback highlights a critical design gap in milestone O1.1 regarding cross-process synchronization for SampleRefQueue (which currently uses in-process threading.Condition), and points out several incorrect file paths for rollout_worker.py and sglang_adapter.py that should be corrected to reference the inference/ subdirectory.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +122 to +123
in-process-only for the single-host case; make `SampleRefQueue` operate over the shared
store. Wire `reconcile_on_restart` into the launcher.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When transitioning SampleRefQueue to operate over a shared cross-process store (like SQLite or Redis), note that the current implementation of SampleRefQueue relies on threading.Condition (which is strictly in-process) to block and notify on get() and put() operations. For a true cross-process control plane, you will need to address how blocking/notification is handled across process boundaries (e.g., via database polling, Redis pub/sub, or a distributed queue mechanism) to avoid busy-waiting or missed signals.

ref); consumer loop (lease → `get` → train → ack at optimizer step); bounded sample pool
(count-based).
- *Out:* live SGLang server (O1.3), Ray (O2), byte-backpressure (O2.3).
- *Files:* `runtime/launch.py`, `runtime/rollout_worker.py`, a small online driver.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The file rollout_worker.py is located under the inference/ subdirectory (i.e., specforge/runtime/inference/rollout_worker.py). Please update the path to runtime/inference/rollout_worker.py to prevent confusion during implementation.

cf. TorchSpec's SGLang patch) writing to `MooncakeFeatureStore`; producer commits
`SampleRef`s.
- *Out:* multi-engine pool, Ray.
- *Files:* `runtime/sglang_adapter.py` (server-backed), `runtime/rollout_worker.py`, possibly

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Both sglang_adapter.py and rollout_worker.py are located under the inference/ subdirectory (i.e., specforge/runtime/inference/). Please update these paths to runtime/inference/sglang_adapter.py and runtime/inference/rollout_worker.py respectively to keep the roadmap accurate.

maocheng23 added a commit that referenced this pull request Jun 30, 2026
Add docs/roadmap/ — per-phase Goal/Target/Implementation/Tests/Done-when
across three tracks (domain-refactor, online-disaggregation, eval-and-breadth)
plus a README index. The online track folds in the former online-disaggregation
roadmap (#618) so there is one roadmap home.

Apply two scope decisions to plan.md and the roadmap:
- Frozen target, no weight sync. "Train-with-decode" = a frozen target streaming
  hidden states (W2/W3), not a serve-and-push workload. Drop the W4 weight
  lifecycle (WeightVersion/WeightPublisher/update_draft_weights/ServingTrafficStream);
  draft_weight_version is provenance only.
- Ray is an open decision, not a non-goal. Candidate for the O2 scale-out
  orchestrator; decision gate in the online roadmap.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@maocheng23

Copy link
Copy Markdown
Collaborator Author

Superseded by the consolidated roadmap in #630.

This roadmap is now folded into docs/roadmap/online-disaggregation.md (one roadmap home,
cross-linked from plan.md), with two scope decisions applied:

  • Frozen target — no weight sync. Train-with-decode = a frozen target streaming hidden states;
    weight-sync / hot draft-update / on-policy are out of scope. draft_weight_version is provenance
    only.
  • Ray = open (a candidate for O2 scale-out, with a decision gate), not committed and no longer a
    flat non-goal.

Milestones carry over as O1.1 / O1.2 (in review) → O1.3 live frozen-target capture (next) → O2
scale-out → O3 hardening. Closing in favor of #630.

@maocheng23 maocheng23 closed this Jun 30, 2026
maocheng23 added a commit that referenced this pull request Jul 1, 2026
Wire a shared, durable metadata store into the online disaggregated
producer/consumer so commit/dedup/ack are cross-process, and make the
streaming consumer restart-safe.

- build_disagg_online_{producer,consumer} accept metadata_store /
  metadata_db_path; both processes share one SQLiteMetadataStore instead of
  a private InMemoryMetadataStore each. New _resolve_metadata_store helper.
- consumer gains resume=: reconcile_on_restart derives the already-trained
  set and hands it to StreamingRefQueue as skip_ids, so a restarted consumer
  drops durably-trained refs on the append-only channel re-read (no duplicate
  train); the committed-but-unacked tail re-streams and re-trains.
- StreamingRefQueue gains skip_ids: drops matching refs on read and counts
  them consumed so the producer's in_flight_remote backpressure stays exact.

CPU tests (test_disagg_online_shared_plane.py): cross-process commit/dedup,
durable ack visible across processes, reconcile releases-acked/requeues-
unacked (and requeues-all when the optimizer step wasn't durable), restart
skip. Implements stage O1.1 of the online roadmap (#618).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
maocheng23 added a commit that referenced this pull request Jul 1, 2026
…loop

Replace the drain-then-fit shape (generate the whole prompt pool, then
train) with a live loop in which the producer and trainer run concurrently.

- run_disagg_online_interleaved: the producer streams refs on a daemon thread
  while trainer.fit consumes on the main thread (StreamingRefQueue blocks
  until closed-and-drained, so the trainer tracks the producer). Symmetric,
  hang-free shutdown: trainer-finishes-first sets a cooperative should_stop so
  the producer doesn't block on the in-flight watermark after the consumer
  stops draining; producer-finishes-first closes the channel; producer-raise
  closes the channel (so the consumer can't hang) and re-raises on the main
  thread.
- build_disagg_online_eagle3_runtime: the named single-process builder from
  the roadmap; composes producer + consumer over one shared metadata store,
  one consume-once feature store, and a producer/consumer StreamingRefChannel
  pair, and returns (trainer, loader, run). Uses the in-process generate
  stub (no live SGLang server, no Ray -- those are O1.3 / O2).
- drive_producer gains a cooperative should_stop predicate + finally-close.

CPU tests (test_disagg_online_interleave.py): full-drain-then-terminate,
trainer-stops-first cooperative wind-down, producer-exception propagation +
consumer unblock. GPU test for the named builder's interleaved run() added to
test_disagg_online_launch.py. Stacked on O1.1; implements stage O1.2 (#618).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant