Merge DataFlow runtime branch into main#648
Open
maocheng23 wants to merge 30 commits into
Open
Conversation
Replace the torch.save/torch.load pickle round-trip with Mooncake's native
raw-buffer DMA. One hard-pinned object per *tensor*, keyed
{store_id}/{sid}/g{gen}/{name}: put() writes each tensor straight from its
storage via put_from(ptr); get() reads each straight into a tensor allocated
from the ref's FeatureSpec via get_into(ptr). Shape/dtype travel on the ref, so
there is no serialized header. The generation lives in the key (like
SharedDirFeatureStore's filename gen), so a re-put supersedes the old key set
and a stale ref's keys are gone -> get() raises (B5). Hard-pin is preserved
(put_from carries the ReplicateConfig). Falls back to the pickle blob path when
the backend lacks put_from/get_into (zero_copy=False or an older mooncake).
Source + receive buffers are registered with the transfer engine around the DMA:
RDMA rejects an unregistered address (AddressNotRegistered, -800); TCP ignores
the registration. Validated cross-node on 2x H200 over BOTH tcp and rdma --
producer put_from on node 0, consumer get_into + FSDP train on node 1.
Tests: the fake now simulates put_from/get_into via ctypes, so the full contract
runs on the zero-copy path (28 tests, real gated test green on a live master).
Adds zero-copy specifics (per-tensor keys, no-pickle-on-the-wire, re-put
supersede, pickle fallback) + cross-process contract tests (producer/consumer as
separate instances over one backend) + abort-under-lease-defer and
re-put-remove-failure edge cases.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…or_keys _store_get_tensor only checked get_into's return for a negative error code, so a short read (0 <= rc < nb) into the freshly torch.empty'd receive buffer was accepted, handing the trainer a tensor with an uninitialized garbage tail. Unlike the pickle path (torch.load reconstructs whole tensors), the raw-buffer path cannot otherwise detect under-fill. get_into returns the bytes read (a full read == nb), so require rc == nb and raise KeyError on a short read. Adds a zero-copy regression test that truncates the stored blob. Also remove the dead _tensor_keys() helper: it has no call sites, while _try_physical_free and _sample_exists each inline the same per-tensor-key comprehension. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ocess online ref stream The offline disagg path hands the consumer a STATIC ref manifest written once. Online disaggregation needs a continuous stream: the rollout producer commits SampleRefs while the trainer consumes them, on another node. StreamingRefChannel is that control-plane channel: * tensor-free append-only JSONL (asserts no-tensor on publish); feature tensors go through the FeatureStore (Mooncake), so no shared *data* mount is needed. * poll() tail-reads complete lines from the last offset, buffering a partial trailing line so a half-written record is never parsed. * mark_consumed()/consumed_remote() give the producer a cross-process backpressure signal (in_flight_remote) with no shared in-process state. * close() drops an EOF sentinel so stream() terminates once drained; idle_timeout_s guards against a dead producer. Filesystem-backed (any shared control mount); a networked control plane slots in behind the same publish/poll API later. 7 CPU tests. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…consumer} + StreamingRefQueue Wires online disaggregated training: a rollout producer pool streams features to a trainer pool on a different node, tensors over Mooncake, refs over the StreamingRefChannel. * StreamingRefQueue: adapts the channel to the SampleRefQueue protocol (get/ack/fail) the FeatureDataLoader consumes. get() blocks until refs are available or the channel is closed-and-drained; ack() advances the channel's consumed counter (the producer's backpressure signal). * build_disagg_online_producer: RolloutWorker(s) (HF/SGLang target via SGLangAdapter) put() consume-once features into a Mooncake store and publish refs to the channel. drive_producer() runs until the prompt pool drains, pausing while in_flight_remote() exceeds a high-watermark so a lagging trainer can't overrun the segment, then closes the channel. * build_disagg_online_consumer: the online trainer assembly (target_head=None) reading refs from a StreamingRefQueue + tensors from a consume-once Mooncake store. The loader frees each sample on read (get -> release -> remote remove). The cross-pool consume-once free works through the shared Mooncake remove (proven in the mooncake cross-process tests); from SampleRef down the trainer path is identical to colocated online. Tests: test_disagg_online (CPU integration -- stream -> loader -> consume-once free, backpressure, blocks-until-close) + test_disagg_online_launch (GPU -- producer streams, consumer trains through FSDP end to end). StreamingRefQueue covered via the integration test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
[DataFlow runtime · online] Online disaggregated training (StreamingRefChannel + build_disagg_online_*)
Wire a shared, durable metadata store into the online disaggregated
producer/consumer so commit/dedup/ack are cross-process, and make the
streaming consumer restart-safe.
- build_disagg_online_{producer,consumer} accept metadata_store /
metadata_db_path; both processes share one SQLiteMetadataStore instead of
a private InMemoryMetadataStore each. New _resolve_metadata_store helper.
- consumer gains resume=: reconcile_on_restart derives the already-trained
set and hands it to StreamingRefQueue as skip_ids, so a restarted consumer
drops durably-trained refs on the append-only channel re-read (no duplicate
train); the committed-but-unacked tail re-streams and re-trains.
- StreamingRefQueue gains skip_ids: drops matching refs on read and counts
them consumed so the producer's in_flight_remote backpressure stays exact.
CPU tests (test_disagg_online_shared_plane.py): cross-process commit/dedup,
durable ack visible across processes, reconcile releases-acked/requeues-
unacked (and requeues-all when the optimizer step wasn't durable), restart
skip. Implements stage O1.1 of the online roadmap (#618).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…loop Replace the drain-then-fit shape (generate the whole prompt pool, then train) with a live loop in which the producer and trainer run concurrently. - run_disagg_online_interleaved: the producer streams refs on a daemon thread while trainer.fit consumes on the main thread (StreamingRefQueue blocks until closed-and-drained, so the trainer tracks the producer). Symmetric, hang-free shutdown: trainer-finishes-first sets a cooperative should_stop so the producer doesn't block on the in-flight watermark after the consumer stops draining; producer-finishes-first closes the channel; producer-raise closes the channel (so the consumer can't hang) and re-raises on the main thread. - build_disagg_online_eagle3_runtime: the named single-process builder from the roadmap; composes producer + consumer over one shared metadata store, one consume-once feature store, and a producer/consumer StreamingRefChannel pair, and returns (trainer, loader, run). Uses the in-process generate stub (no live SGLang server, no Ray -- those are O1.3 / O2). - drive_producer gains a cooperative should_stop predicate + finally-close. CPU tests (test_disagg_online_interleave.py): full-drain-then-terminate, trainer-stops-first cooperative wind-down, producer-exception propagation + consumer unblock. GPU test for the named builder's interleaved run() added to test_disagg_online_launch.py. Stacked on O1.1; implements stage O1.2 (#618). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…erized builders Adding a draft model is now a StrategySpec entry, not a new build_*_runtime family. The topology stays a named builder; the model becomes a `strategy=` parameter resolved through a registry. launch.py no longer grows as (topologies x models). - registry.py (new): StrategySpec + register_strategy/resolve_strategy/ available_strategies + concat_collate. eagle3 spec fully wired (reader/transform/collate/online-collate/adapter). - launch.py: extract _assemble_trainer + _assemble_rollout_workers shared by every topology (offline / disagg-offline / online / disagg-online producer+consumer + one-process + interleaved); each builder takes `strategy=` and resolves a spec. eagle3-named builders kept as back-compat aliases; eagle3 behavior is byte-identical. - scripts/train_eagle3_dataflow.py, examples/disagg/run_disagg_eagle3.py: use the strategy-neutral builders. - tests/test_runtime/test_strategy_registry.py (new, CPU): registry / alias / unwired-strategy-guard contract. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…e + online)
DFlash now trains through the runtime via a StrategySpec + a DFlashAdapter, with
ZERO launch.py changes (the spec seam from the previous commit carries it).
- registry.py: dflash spec — offline reader (OfflineManifestReader with dflash
feature_keys, no aux/target swap), per-sample transform, padding collate; online
via DFlashAdapter; supports_online=True.
- inference/dflash_adapter.py (new): wraps generate_dflash_data, emits
{input_ids, hidden_states, loss_mask}; verify_capture self-skips the eagle3
aux/target checks (different feature names + __aux_layer_ids__=None).
- tests/_fixtures.py: write_offline_files_dflash + build_dflash (tiny Qwen3 target
-> DFlash draft + TargetEmbeddingsAndHead -> OnlineDFlashModel).
- tests/test_dflash_launch.py + test_dflash_online_launch.py (new, GPU): offline
and online dflash train end-to-end through FSDP.
- tests/test_strategy_registry.py: dflash-fully-wired assertions.
DFlash is online-only in production (no offline dumper exists yet — prepare_
hidden_states.py is eagle3-only), so the offline path is exercised with synthetic
fixtures while online is its real workflow.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…dent loss
Domino is the third algorithm on the composable launch — a StrategySpec plus the
ONE genuine shared-contract extension the analysis predicted.
- strategy.py: StepContext{global_step, total_steps} threaded into forward_loss
(optional; eagle3/dflash ignore it). DominoTrainStrategy: reuses the DFlash
feature schema + adapter; its forward_loss reads ctx to compute the decaying
lambda_base that blends Domino's base loss (mirrors train_domino.get_lambda_base).
- trainer.py: TrainerCore.train_step / eval_step accept a StepContext; fit passes
StepContext(global_step, total_steps=max_steps). Backward-compatible.
- contracts.py: DraftStrategyName += "domino".
- registry.py: domino spec — reuses DFlash transform/collate/adapter, domino reader
(strategy tag) + DominoTrainStrategy. No new builder, no launch.py change.
- tests/_fixtures.py: build_domino (DFlash draft w/ projector_type="domino" head ->
OnlineDominoModel).
- tests/test_domino_launch.py (new): CPU lambda-schedule test + offline/online GPU
end-to-end.
Adding domino touched ZERO launch.py and reused the dflash data path — exactly the
"new algorithm = a spec + its loss" goal.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… boundary
Extract a backend-agnostic `TargetEngine` ABC (modeling/target/base.py) with a
generic `capture(...)` entry point + a real `backend` tag, replacing the two
EAGLE3-/DFlash-named ABCs as the shared base:
TargetEngine
├── Eagle3TargetEngine (was Eagle3TargetModel) {HF,SGLang,Custom}
└── DFlashTargetEngine (was DFlashTargetModel) {SGLang,HF}
- `capture()` / `set_capture_layers()` are thin dispatchers onto the unchanged
`generate_eagle3_data` / `generate_dflash_data` / `set_aux_hidden_states_layers`,
so extraction is byte-identical — this PR is pure structure/naming, no logic.
- Real `backend` class attr on every leaf ("sglang"/"hf"/"custom"); the inference
adapters' health() stops reading getattr(..., "unknown").
- Generic `get_target_engine(strategy=, backend=)` factory (modeling/target/factory.py),
loaders imported lazily so `import specforge` still works without the pinned sglang
(dflash_target_model imports sglang unconditionally — kept off the eager path).
- All pre-Phase-B names (`Eagle3TargetModel`, `get_eagle3_target_model`, ...) kept as
aliases; scripts/tests untouched. `sglang_server` factory branch lands in B2.
- Add tests/test_runtime/test_target_engine_abc.py (hierarchy, backend tags, capture
dispatch, alias identity, factory dispatch).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ang version
Extract EVERY sglang internal + the duplicated extend/capture forward into one
version-pinned boundary, `sglang_backend/capture.py::SGLangCaptureBackend`, and
have the algorithm engines COMPOSE it instead of embedding it:
SGLangCaptureBackend (the only place that imports sglang.srt.* for capture)
· build() ServerArgs / ModelConfig / SGLangRunner wiring (unified)
· _forward_extend() the single ScheduleBatch/ForwardBatch capture forward
· _maybe_prepare_mlp_sync_batch() ONE (0.5.9) prepare_mlp_sync signature
· extend / extend_vlm / extend_dflash / get_rope_index / set_eagle3_capture_layers
SGLangEagle3TargetEngine / SGLangDFlashTargetEngine now hold a backend and do
only torch-side output shaping — they import ZERO sglang internals (verified by
tests/test_runtime/test_sglang_capture_backend.py, a pure-AST invariant).
Why: before this, both sglang engines imported ~20 sglang symbols and each carried
its own near-duplicate `_extend`; the two copies had drifted to DIFFERENT sglang
API versions (eagle3 = module-level prepare_mlp_sync_batch_raw(attn_cp_size=);
dflash = the removed Scheduler.prepare_mlp_sync_batch_raw(spec_algorithm=)). A
sglang bump touched every subclass and the copies could silently diverge. Now a
bump touches one file; "put the pieces together" (capture backend + shaping +
adapter) instead of tangling the version into each algorithm.
Behavior:
- Byte-identical on the test configs (TP=1/2, dp=1): require_mlp_sync is False so
the unified mlp-sync branch is skipped identically; construction, req building,
the forward, splitting/shard logic, and pool-clear ordering are transplanted
verbatim (`import specforge` stays sglang-optional via lazy import in
from_pretrained; the engine forward is still under @torch.no_grad).
- Two deliberate, flagged changes: (1) DFlash's mlp-sync now uses the same 0.5.9
signature as eagle3 — its old Scheduler.* call was latent-broken for dp>1;
(2) dropped a stray debug print() in DFlash set_capture_layers.
Also adds the `sglang_server` backend (SGLangServerEagle3TargetEngine): selectable
via get_eagle3_target_model(backend="sglang_server"), construction raises an
actionable NotImplementedError until the live-capture depth is set by the O1.3
spike (docs/roadmap/online-disaggregation.md §O1.3).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Introduce the domain training layer `specforge/training/` with a caller-facing
`Trainer` that composes the whole training spine behind one object + `.fit()`:
FeatureDataLoader + FSDPTrainingBackend.prepare_model (FSDP wrap)
+ spec.make_strategy -> TrainerCore -> TrainerController
`Trainer` is the canonical assembler now; `launch._assemble_trainer` delegates to
it and returns the same `(TrainerController, FeatureDataLoader)` tuple, so every
`build_*_runtime` path is byte-for-byte unchanged (no fork — one wiring path).
The runtime seam (TrainerController / TrainerCore / DraftTrainStrategy /
FSDPTrainingBackend) is untouched; this is the domain facade over it. Topology
(offline/online/disagg) stays invisible to Trainer — absorbed by the (ref source
+ FeatureStore) it is handed. No HiddenStateStream: the loader is the stream.
- specforge/training/{__init__.py (PEP 562 lazy Trainer export), trainer.py}
- launch.py: _assemble_trainer delegates to Trainer; drops the now-unused
FeatureDataLoader / FSDPTrainingBackend / ParallelConfig / TrainerCore /
TrainerController imports.
- tests/test_runtime/test_domain_trainer.py: fakes the runtime pieces and asserts
the composition (refs enqueued, loader/backend/core/controller args, ack_fn
wired to the DataFlowController, .fit() delegates over the loader).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…docs + gate) Makes the Phase B abstractions load-bearing instead of forward-only scaffolding. 1. adapter cutover: SGLangAdapter / DFlashAdapter now call the engine's generic `capture(...)` instead of `generate_eagle3_data` / `generate_dflash_data` (which stay as byte-identical back-compat aliases). This is the dataflow runtime's real capture path, so `TargetEngine.capture()` is now exercised. 2. factory cutover: the dataflow test fixtures + online launch tests build targets via `get_target_engine(strategy=, backend=)`; the old `get_eagle3_target_model` / `get_dflash_target_model` stay as shims (legacy scripts untouched). 3. docs: runtime/inference/DESIGN.md, runtime/ARCHITECTURE.md, and the adapter docstrings now describe the boundary as `TargetEngine.capture` (sglang glue in SGLangCaptureBackend) instead of `Eagle3TargetModel.generate_eagle3_data`. 4. explicit B gate: tests/test_runtime/test_phase_b_gate.py — `capture() ≡ legacy generate_*_data()` **bytewise** (eagle3 + dflash), a stable captured-feature digest, and per-step-loss reproducibility through the full online capture→train path (which now routes through `capture()`), instead of relying only on the full-suite's indirect coverage. No behavior change: `capture()` dispatches to the legacy method, so byte-identical. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Colocated runs now pay nothing for the disagg control plane on the SAME code path (plan.md §C / roadmap domain-refactor.md §C). DeploymentMode becomes load-bearing rather than decorative. - NoOpMetadataStore: a MetadataStore that retains nothing (no dedup index, no durable marker) for local_colocated runs. - resolve_control_plane(mode, run_id) -> (controller, durable_ack): local_colocated gets the no-op store + durable_ack=False (loader releases features as it consumes them); dataflow_colocated/disaggregated keep the durable store + optimizer-boundary ack. Disagg builders are untouched. - build_offline_runtime / build_online_runtime route through the selector and thread durable_ack; the domain Trainer skips the ack transaction and the offline enqueue when durable_ack is False. Gates: test_noop_metadata_store (contract + selection, CPU) and test_colocated_vs_disagg_equiv (per-step loss bit-identical across the no-op colocated and full disagg control planes — only the control plane differs). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…resolver, stronger gate Post-review fixes for #636 (adversarial review vs the #630 roadmap): - build_offline_runtime / build_online_runtime take deployment_mode (default local_colocated) + metadata_db_path, so the mode parameter actually varies at the builder surface instead of being a hardcoded literal, and tests/tools can drive any mode through the ONE builder. - resolve_control_plane: drop the never-passed sample_queue/backpressure kwargs; the non-colocated arm now delegates the default store to DataFlowController's own InMemory default instead of re-encoding the selection policy (SQLite only when metadata_db_path is given). - Domain Trainer: the colocated offline path (durable_ack=False) keeps the control plane's assert_no_tensors contract on every ref even though the durable enqueue is skipped. - test_colocated_vs_disagg_equiv: both legs now run through build_offline_runtime (no hand-copied assembly); the disagg leg uses a real SQLiteMetadataStore and asserts the durable ack marker was written; the deterministic-algorithms flag is restored after the test instead of leaking process-wide. - test_noop_metadata_store: cover metadata_db_path -> SQLite selection (and local_colocated ignoring it). - Docs: DESIGN.md + NoOpMetadataStore state the scope of the no-op axis explicitly (leasing/queue bookkeeping stays shared; backpressure is already opt-in) and the consequences of retaining nothing (status() zeros, no dedup, no TrainLease ref reconstruction). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Self-review of the fix commit confirmed the new deployment_mode / metadata_db_path parameters were a hazard on the ONLINE builder: the online loader consumes a rank-private in-process SampleRefQueue that is fed only by commit_sample()==True, so a shared durable store dedups each sample onto exactly one rank's queue (divergent per-rank batch counts -> mismatched FSDP collectives -> hang), and a reused db file starves a restarted run to zero steps. The offline builder keeps the parameter (its refs-mode loader iterates the full ref list regardless of commit dedup, which is what the equivalence gate exercises); online durable-store runs remain the disagg online builders' job. Documented at the pin site. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…-control-plane [DataFlow runtime · online] O1.1 — shared cross-process control plane
…loop [DataFlow runtime · online] O1.2 — named builder + interleaved async loop
…unch [DataFlow runtime] Composable launch: StrategySpec registry + parameterized builders
[DataFlow runtime] DFlash end-to-end on the composable launch (offline + online)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
[DataFlow runtime] Domino end-to-end + StepContext for schedule-dependent loss
[DataFlow runtime] Phase B1 — TargetEngine ABC + de-EAGLE3 the target boundary
…e-backend [DataFlow runtime] Phase B2 — decouple the target engine from the sglang version
[DataFlow runtime] Phase B3 — domain Trainer wrapping the runtime seam
…-cutover [DataFlow runtime] Phase B4 — adopt the de-EAGLE3 surface (cutover + docs + gate)
…htweight [DataFlow runtime] Phase C — colocated lightweight control plane
Contributor
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
dataflow-up-16-zerocopyintomain.Notes
Testing