Skip to content

Merge DataFlow runtime branch into main#648

Open
maocheng23 wants to merge 30 commits into
mainfrom
dataflow-up-16-zerocopy
Open

Merge DataFlow runtime branch into main#648
maocheng23 wants to merge 30 commits into
mainfrom
dataflow-up-16-zerocopy

Conversation

@maocheng23

Copy link
Copy Markdown
Collaborator

Summary

Notes

Testing

  • Not run locally; integration PR creation only.

maocheng23 and others added 30 commits June 28, 2026 20:41
Replace the torch.save/torch.load pickle round-trip with Mooncake's native
raw-buffer DMA. One hard-pinned object per *tensor*, keyed
{store_id}/{sid}/g{gen}/{name}: put() writes each tensor straight from its
storage via put_from(ptr); get() reads each straight into a tensor allocated
from the ref's FeatureSpec via get_into(ptr). Shape/dtype travel on the ref, so
there is no serialized header. The generation lives in the key (like
SharedDirFeatureStore's filename gen), so a re-put supersedes the old key set
and a stale ref's keys are gone -> get() raises (B5). Hard-pin is preserved
(put_from carries the ReplicateConfig). Falls back to the pickle blob path when
the backend lacks put_from/get_into (zero_copy=False or an older mooncake).

Source + receive buffers are registered with the transfer engine around the DMA:
RDMA rejects an unregistered address (AddressNotRegistered, -800); TCP ignores
the registration. Validated cross-node on 2x H200 over BOTH tcp and rdma --
producer put_from on node 0, consumer get_into + FSDP train on node 1.

Tests: the fake now simulates put_from/get_into via ctypes, so the full contract
runs on the zero-copy path (28 tests, real gated test green on a live master).
Adds zero-copy specifics (per-tensor keys, no-pickle-on-the-wire, re-put
supersede, pickle fallback) + cross-process contract tests (producer/consumer as
separate instances over one backend) + abort-under-lease-defer and
re-put-remove-failure edge cases.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…or_keys

_store_get_tensor only checked get_into's return for a negative error code, so
a short read (0 <= rc < nb) into the freshly torch.empty'd receive buffer was
accepted, handing the trainer a tensor with an uninitialized garbage tail.
Unlike the pickle path (torch.load reconstructs whole tensors), the raw-buffer
path cannot otherwise detect under-fill. get_into returns the bytes read (a
full read == nb), so require rc == nb and raise KeyError on a short read. Adds
a zero-copy regression test that truncates the stored blob.

Also remove the dead _tensor_keys() helper: it has no call sites, while
_try_physical_free and _sample_exists each inline the same per-tensor-key
comprehension.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ocess online ref stream

The offline disagg path hands the consumer a STATIC ref manifest written once.
Online disaggregation needs a continuous stream: the rollout producer commits
SampleRefs while the trainer consumes them, on another node. StreamingRefChannel
is that control-plane channel:

* tensor-free append-only JSONL (asserts no-tensor on publish); feature tensors
  go through the FeatureStore (Mooncake), so no shared *data* mount is needed.
* poll() tail-reads complete lines from the last offset, buffering a partial
  trailing line so a half-written record is never parsed.
* mark_consumed()/consumed_remote() give the producer a cross-process
  backpressure signal (in_flight_remote) with no shared in-process state.
* close() drops an EOF sentinel so stream() terminates once drained;
  idle_timeout_s guards against a dead producer.

Filesystem-backed (any shared control mount); a networked control plane slots in
behind the same publish/poll API later. 7 CPU tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…consumer} + StreamingRefQueue

Wires online disaggregated training: a rollout producer pool streams features to
a trainer pool on a different node, tensors over Mooncake, refs over the
StreamingRefChannel.

* StreamingRefQueue: adapts the channel to the SampleRefQueue protocol
  (get/ack/fail) the FeatureDataLoader consumes. get() blocks until refs are
  available or the channel is closed-and-drained; ack() advances the channel's
  consumed counter (the producer's backpressure signal).
* build_disagg_online_producer: RolloutWorker(s) (HF/SGLang target via
  SGLangAdapter) put() consume-once features into a Mooncake store and publish
  refs to the channel. drive_producer() runs until the prompt pool drains,
  pausing while in_flight_remote() exceeds a high-watermark so a lagging trainer
  can't overrun the segment, then closes the channel.
* build_disagg_online_consumer: the online trainer assembly (target_head=None)
  reading refs from a StreamingRefQueue + tensors from a consume-once Mooncake
  store. The loader frees each sample on read (get -> release -> remote remove).

The cross-pool consume-once free works through the shared Mooncake remove (proven
in the mooncake cross-process tests); from SampleRef down the trainer path is
identical to colocated online.

Tests: test_disagg_online (CPU integration -- stream -> loader -> consume-once
free, backpressure, blocks-until-close) + test_disagg_online_launch (GPU --
producer streams, consumer trains through FSDP end to end). StreamingRefQueue
covered via the integration test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
[DataFlow runtime · online] Online disaggregated training (StreamingRefChannel + build_disagg_online_*)
Wire a shared, durable metadata store into the online disaggregated
producer/consumer so commit/dedup/ack are cross-process, and make the
streaming consumer restart-safe.

- build_disagg_online_{producer,consumer} accept metadata_store /
  metadata_db_path; both processes share one SQLiteMetadataStore instead of
  a private InMemoryMetadataStore each. New _resolve_metadata_store helper.
- consumer gains resume=: reconcile_on_restart derives the already-trained
  set and hands it to StreamingRefQueue as skip_ids, so a restarted consumer
  drops durably-trained refs on the append-only channel re-read (no duplicate
  train); the committed-but-unacked tail re-streams and re-trains.
- StreamingRefQueue gains skip_ids: drops matching refs on read and counts
  them consumed so the producer's in_flight_remote backpressure stays exact.

CPU tests (test_disagg_online_shared_plane.py): cross-process commit/dedup,
durable ack visible across processes, reconcile releases-acked/requeues-
unacked (and requeues-all when the optimizer step wasn't durable), restart
skip. Implements stage O1.1 of the online roadmap (#618).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…loop

Replace the drain-then-fit shape (generate the whole prompt pool, then
train) with a live loop in which the producer and trainer run concurrently.

- run_disagg_online_interleaved: the producer streams refs on a daemon thread
  while trainer.fit consumes on the main thread (StreamingRefQueue blocks
  until closed-and-drained, so the trainer tracks the producer). Symmetric,
  hang-free shutdown: trainer-finishes-first sets a cooperative should_stop so
  the producer doesn't block on the in-flight watermark after the consumer
  stops draining; producer-finishes-first closes the channel; producer-raise
  closes the channel (so the consumer can't hang) and re-raises on the main
  thread.
- build_disagg_online_eagle3_runtime: the named single-process builder from
  the roadmap; composes producer + consumer over one shared metadata store,
  one consume-once feature store, and a producer/consumer StreamingRefChannel
  pair, and returns (trainer, loader, run). Uses the in-process generate
  stub (no live SGLang server, no Ray -- those are O1.3 / O2).
- drive_producer gains a cooperative should_stop predicate + finally-close.

CPU tests (test_disagg_online_interleave.py): full-drain-then-terminate,
trainer-stops-first cooperative wind-down, producer-exception propagation +
consumer unblock. GPU test for the named builder's interleaved run() added to
test_disagg_online_launch.py. Stacked on O1.1; implements stage O1.2 (#618).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…erized builders

Adding a draft model is now a StrategySpec entry, not a new build_*_runtime
family. The topology stays a named builder; the model becomes a `strategy=`
parameter resolved through a registry. launch.py no longer grows as
(topologies x models).

- registry.py (new): StrategySpec + register_strategy/resolve_strategy/
  available_strategies + concat_collate. eagle3 spec fully wired
  (reader/transform/collate/online-collate/adapter).
- launch.py: extract _assemble_trainer + _assemble_rollout_workers shared by
  every topology (offline / disagg-offline / online / disagg-online
  producer+consumer + one-process + interleaved); each builder takes `strategy=`
  and resolves a spec. eagle3-named builders kept as back-compat aliases;
  eagle3 behavior is byte-identical.
- scripts/train_eagle3_dataflow.py, examples/disagg/run_disagg_eagle3.py: use
  the strategy-neutral builders.
- tests/test_runtime/test_strategy_registry.py (new, CPU): registry / alias /
  unwired-strategy-guard contract.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…e + online)

DFlash now trains through the runtime via a StrategySpec + a DFlashAdapter, with
ZERO launch.py changes (the spec seam from the previous commit carries it).

- registry.py: dflash spec — offline reader (OfflineManifestReader with dflash
  feature_keys, no aux/target swap), per-sample transform, padding collate; online
  via DFlashAdapter; supports_online=True.
- inference/dflash_adapter.py (new): wraps generate_dflash_data, emits
  {input_ids, hidden_states, loss_mask}; verify_capture self-skips the eagle3
  aux/target checks (different feature names + __aux_layer_ids__=None).
- tests/_fixtures.py: write_offline_files_dflash + build_dflash (tiny Qwen3 target
  -> DFlash draft + TargetEmbeddingsAndHead -> OnlineDFlashModel).
- tests/test_dflash_launch.py + test_dflash_online_launch.py (new, GPU): offline
  and online dflash train end-to-end through FSDP.
- tests/test_strategy_registry.py: dflash-fully-wired assertions.

DFlash is online-only in production (no offline dumper exists yet — prepare_
hidden_states.py is eagle3-only), so the offline path is exercised with synthetic
fixtures while online is its real workflow.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…dent loss

Domino is the third algorithm on the composable launch — a StrategySpec plus the
ONE genuine shared-contract extension the analysis predicted.

- strategy.py: StepContext{global_step, total_steps} threaded into forward_loss
  (optional; eagle3/dflash ignore it). DominoTrainStrategy: reuses the DFlash
  feature schema + adapter; its forward_loss reads ctx to compute the decaying
  lambda_base that blends Domino's base loss (mirrors train_domino.get_lambda_base).
- trainer.py: TrainerCore.train_step / eval_step accept a StepContext; fit passes
  StepContext(global_step, total_steps=max_steps). Backward-compatible.
- contracts.py: DraftStrategyName += "domino".
- registry.py: domino spec — reuses DFlash transform/collate/adapter, domino reader
  (strategy tag) + DominoTrainStrategy. No new builder, no launch.py change.
- tests/_fixtures.py: build_domino (DFlash draft w/ projector_type="domino" head ->
  OnlineDominoModel).
- tests/test_domino_launch.py (new): CPU lambda-schedule test + offline/online GPU
  end-to-end.

Adding domino touched ZERO launch.py and reused the dflash data path — exactly the
"new algorithm = a spec + its loss" goal.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… boundary

Extract a backend-agnostic `TargetEngine` ABC (modeling/target/base.py) with a
generic `capture(...)` entry point + a real `backend` tag, replacing the two
EAGLE3-/DFlash-named ABCs as the shared base:

  TargetEngine
  ├── Eagle3TargetEngine  (was Eagle3TargetModel)  {HF,SGLang,Custom}
  └── DFlashTargetEngine  (was DFlashTargetModel)   {SGLang,HF}

- `capture()` / `set_capture_layers()` are thin dispatchers onto the unchanged
  `generate_eagle3_data` / `generate_dflash_data` / `set_aux_hidden_states_layers`,
  so extraction is byte-identical — this PR is pure structure/naming, no logic.
- Real `backend` class attr on every leaf ("sglang"/"hf"/"custom"); the inference
  adapters' health() stops reading getattr(..., "unknown").
- Generic `get_target_engine(strategy=, backend=)` factory (modeling/target/factory.py),
  loaders imported lazily so `import specforge` still works without the pinned sglang
  (dflash_target_model imports sglang unconditionally — kept off the eager path).
- All pre-Phase-B names (`Eagle3TargetModel`, `get_eagle3_target_model`, ...) kept as
  aliases; scripts/tests untouched. `sglang_server` factory branch lands in B2.
- Add tests/test_runtime/test_target_engine_abc.py (hierarchy, backend tags, capture
  dispatch, alias identity, factory dispatch).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ang version

Extract EVERY sglang internal + the duplicated extend/capture forward into one
version-pinned boundary, `sglang_backend/capture.py::SGLangCaptureBackend`, and
have the algorithm engines COMPOSE it instead of embedding it:

  SGLangCaptureBackend  (the only place that imports sglang.srt.* for capture)
    · build()            ServerArgs / ModelConfig / SGLangRunner wiring (unified)
    · _forward_extend()  the single ScheduleBatch/ForwardBatch capture forward
    · _maybe_prepare_mlp_sync_batch()  ONE (0.5.9) prepare_mlp_sync signature
    · extend / extend_vlm / extend_dflash / get_rope_index / set_eagle3_capture_layers

  SGLangEagle3TargetEngine / SGLangDFlashTargetEngine  now hold a backend and do
  only torch-side output shaping — they import ZERO sglang internals (verified by
  tests/test_runtime/test_sglang_capture_backend.py, a pure-AST invariant).

Why: before this, both sglang engines imported ~20 sglang symbols and each carried
its own near-duplicate `_extend`; the two copies had drifted to DIFFERENT sglang
API versions (eagle3 = module-level prepare_mlp_sync_batch_raw(attn_cp_size=);
dflash = the removed Scheduler.prepare_mlp_sync_batch_raw(spec_algorithm=)). A
sglang bump touched every subclass and the copies could silently diverge. Now a
bump touches one file; "put the pieces together" (capture backend + shaping +
adapter) instead of tangling the version into each algorithm.

Behavior:
- Byte-identical on the test configs (TP=1/2, dp=1): require_mlp_sync is False so
  the unified mlp-sync branch is skipped identically; construction, req building,
  the forward, splitting/shard logic, and pool-clear ordering are transplanted
  verbatim (`import specforge` stays sglang-optional via lazy import in
  from_pretrained; the engine forward is still under @torch.no_grad).
- Two deliberate, flagged changes: (1) DFlash's mlp-sync now uses the same 0.5.9
  signature as eagle3 — its old Scheduler.* call was latent-broken for dp>1;
  (2) dropped a stray debug print() in DFlash set_capture_layers.

Also adds the `sglang_server` backend (SGLangServerEagle3TargetEngine): selectable
via get_eagle3_target_model(backend="sglang_server"), construction raises an
actionable NotImplementedError until the live-capture depth is set by the O1.3
spike (docs/roadmap/online-disaggregation.md §O1.3).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Introduce the domain training layer `specforge/training/` with a caller-facing
`Trainer` that composes the whole training spine behind one object + `.fit()`:

    FeatureDataLoader  +  FSDPTrainingBackend.prepare_model (FSDP wrap)
                       +  spec.make_strategy -> TrainerCore -> TrainerController

`Trainer` is the canonical assembler now; `launch._assemble_trainer` delegates to
it and returns the same `(TrainerController, FeatureDataLoader)` tuple, so every
`build_*_runtime` path is byte-for-byte unchanged (no fork — one wiring path).
The runtime seam (TrainerController / TrainerCore / DraftTrainStrategy /
FSDPTrainingBackend) is untouched; this is the domain facade over it. Topology
(offline/online/disagg) stays invisible to Trainer — absorbed by the (ref source
+ FeatureStore) it is handed. No HiddenStateStream: the loader is the stream.

- specforge/training/{__init__.py (PEP 562 lazy Trainer export), trainer.py}
- launch.py: _assemble_trainer delegates to Trainer; drops the now-unused
  FeatureDataLoader / FSDPTrainingBackend / ParallelConfig / TrainerCore /
  TrainerController imports.
- tests/test_runtime/test_domain_trainer.py: fakes the runtime pieces and asserts
  the composition (refs enqueued, loader/backend/core/controller args, ack_fn
  wired to the DataFlowController, .fit() delegates over the loader).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…docs + gate)

Makes the Phase B abstractions load-bearing instead of forward-only scaffolding.

1. adapter cutover: SGLangAdapter / DFlashAdapter now call the engine's generic
   `capture(...)` instead of `generate_eagle3_data` / `generate_dflash_data`
   (which stay as byte-identical back-compat aliases). This is the dataflow
   runtime's real capture path, so `TargetEngine.capture()` is now exercised.
2. factory cutover: the dataflow test fixtures + online launch tests build targets
   via `get_target_engine(strategy=, backend=)`; the old `get_eagle3_target_model`
   / `get_dflash_target_model` stay as shims (legacy scripts untouched).
3. docs: runtime/inference/DESIGN.md, runtime/ARCHITECTURE.md, and the adapter
   docstrings now describe the boundary as `TargetEngine.capture` (sglang glue in
   SGLangCaptureBackend) instead of `Eagle3TargetModel.generate_eagle3_data`.
4. explicit B gate: tests/test_runtime/test_phase_b_gate.py — `capture() ≡ legacy
   generate_*_data()` **bytewise** (eagle3 + dflash), a stable captured-feature
   digest, and per-step-loss reproducibility through the full online capture→train
   path (which now routes through `capture()`), instead of relying only on the
   full-suite's indirect coverage.

No behavior change: `capture()` dispatches to the legacy method, so byte-identical.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Colocated runs now pay nothing for the disagg control plane on the SAME code
path (plan.md §C / roadmap domain-refactor.md §C). DeploymentMode becomes
load-bearing rather than decorative.

- NoOpMetadataStore: a MetadataStore that retains nothing (no dedup index, no
  durable marker) for local_colocated runs.
- resolve_control_plane(mode, run_id) -> (controller, durable_ack): local_colocated
  gets the no-op store + durable_ack=False (loader releases features as it
  consumes them); dataflow_colocated/disaggregated keep the durable store +
  optimizer-boundary ack. Disagg builders are untouched.
- build_offline_runtime / build_online_runtime route through the selector and
  thread durable_ack; the domain Trainer skips the ack transaction and the
  offline enqueue when durable_ack is False.

Gates: test_noop_metadata_store (contract + selection, CPU) and
test_colocated_vs_disagg_equiv (per-step loss bit-identical across the no-op
colocated and full disagg control planes — only the control plane differs).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…resolver, stronger gate

Post-review fixes for #636 (adversarial review vs the #630 roadmap):

- build_offline_runtime / build_online_runtime take deployment_mode
  (default local_colocated) + metadata_db_path, so the mode parameter
  actually varies at the builder surface instead of being a hardcoded
  literal, and tests/tools can drive any mode through the ONE builder.
- resolve_control_plane: drop the never-passed sample_queue/backpressure
  kwargs; the non-colocated arm now delegates the default store to
  DataFlowController's own InMemory default instead of re-encoding the
  selection policy (SQLite only when metadata_db_path is given).
- Domain Trainer: the colocated offline path (durable_ack=False) keeps
  the control plane's assert_no_tensors contract on every ref even
  though the durable enqueue is skipped.
- test_colocated_vs_disagg_equiv: both legs now run through
  build_offline_runtime (no hand-copied assembly); the disagg leg uses a
  real SQLiteMetadataStore and asserts the durable ack marker was
  written; the deterministic-algorithms flag is restored after the test
  instead of leaking process-wide.
- test_noop_metadata_store: cover metadata_db_path -> SQLite selection
  (and local_colocated ignoring it).
- Docs: DESIGN.md + NoOpMetadataStore state the scope of the no-op axis
  explicitly (leasing/queue bookkeeping stays shared; backpressure is
  already opt-in) and the consequences of retaining nothing (status()
  zeros, no dedup, no TrainLease ref reconstruction).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Self-review of the fix commit confirmed the new deployment_mode /
metadata_db_path parameters were a hazard on the ONLINE builder: the
online loader consumes a rank-private in-process SampleRefQueue that is
fed only by commit_sample()==True, so a shared durable store dedups
each sample onto exactly one rank's queue (divergent per-rank batch
counts -> mismatched FSDP collectives -> hang), and a reused db file
starves a restarted run to zero steps. The offline builder keeps the
parameter (its refs-mode loader iterates the full ref list regardless
of commit dedup, which is what the equivalence gate exercises); online
durable-store runs remain the disagg online builders' job. Documented
at the pin site.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…-control-plane

[DataFlow runtime · online] O1.1 — shared cross-process control plane
…loop

[DataFlow runtime · online] O1.2 — named builder + interleaved async loop
…unch

[DataFlow runtime] Composable launch: StrategySpec registry + parameterized builders
[DataFlow runtime] DFlash end-to-end on the composable launch (offline + online)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
[DataFlow runtime] Domino end-to-end + StepContext for schedule-dependent loss
[DataFlow runtime] Phase B1 — TargetEngine ABC + de-EAGLE3 the target boundary
…e-backend

[DataFlow runtime] Phase B2 — decouple the target engine from the sglang version
[DataFlow runtime] Phase B3 — domain Trainer wrapping the runtime seam
…-cutover

[DataFlow runtime] Phase B4 — adopt the de-EAGLE3 surface (cutover + docs + gate)
…htweight

[DataFlow runtime] Phase C — colocated lightweight control plane
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants