[DataFlow runtime] Phase C — colocated lightweight control plane#636
Conversation
Colocated runs now pay nothing for the disagg control plane on the SAME code path (plan.md §C / roadmap domain-refactor.md §C). DeploymentMode becomes load-bearing rather than decorative. - NoOpMetadataStore: a MetadataStore that retains nothing (no dedup index, no durable marker) for local_colocated runs. - resolve_control_plane(mode, run_id) -> (controller, durable_ack): local_colocated gets the no-op store + durable_ack=False (loader releases features as it consumes them); dataflow_colocated/disaggregated keep the durable store + optimizer-boundary ack. Disagg builders are untouched. - build_offline_runtime / build_online_runtime route through the selector and thread durable_ack; the domain Trainer skips the ack transaction and the offline enqueue when durable_ack is False. Gates: test_noop_metadata_store (contract + selection, CPU) and test_colocated_vs_disagg_equiv (per-step loss bit-identical across the no-op colocated and full disagg control planes — only the control plane differs). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a lightweight local_colocated deployment mode to bypass the overhead of the disaggregated control plane, adding a NoOpMetadataStore and a resolve_control_plane helper. Feedback on these changes highlights several important improvements: defining a default no-op lambda for ack_fn in Trainer to prevent potential TypeErrors, validating the deployment_mode parameter to avoid silent misconfigurations, dynamically allocating ports in tests to prevent collisions, and cleaning up temporary directories to avoid disk space leaks.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| ack_fn = None | ||
| if durable_ack: | ||
|
|
||
| def ack_fn(ids, step): | ||
| controller.ack_train_refs( | ||
| trainer_id, ids, global_step=step, optimizer_durable=True | ||
| ) | ||
|
|
There was a problem hiding this comment.
If TrainerController expects ack_fn to be a callable, passing None when durable_ack is False will result in a TypeError when it is invoked. It is safer and more robust to define ack_fn as a no-op lambda by default.
ack_fn = lambda ids, step: None\n if durable_ack:\n\n def ack_fn(ids, step):\n controller.ack_train_refs(\n trainer_id, ids, global_step=step, optimizer_durable=True\n )| if deployment_mode == "local_colocated": | ||
| store: MetadataStore = NoOpMetadataStore() | ||
| durable_ack = False | ||
| else: | ||
| store = ( | ||
| SQLiteMetadataStore(metadata_db_path) | ||
| if metadata_db_path | ||
| else InMemoryMetadataStore() | ||
| ) | ||
| durable_ack = True |
There was a problem hiding this comment.
To prevent silent misconfigurations or typos in the deployment_mode parameter, it is highly recommended to explicitly validate the mode against the expected values and raise a ValueError for unrecognized modes.
if deployment_mode == \"local_colocated\":\n store: MetadataStore = NoOpMetadataStore()\n durable_ack = False\n elif deployment_mode in (\"dataflow_colocated\", \"disaggregated\"):\n store = (\n SQLiteMetadataStore(metadata_db_path)\n if metadata_db_path\n else InMemoryMetadataStore()\n )\n durable_ack = True\n else:\n raise ValueError(\n f\"Unknown deployment_mode: {deployment_mode!r}. \"\n \"Expected 'local_colocated', 'dataflow_colocated', or 'disaggregated'.\"\n )| def test_loss_curve_independent_of_control_plane(self): | ||
| from tests.test_runtime import _fixtures as fx | ||
|
|
||
| fx.build_single_rank_distributed(port="29571") |
There was a problem hiding this comment.
Hardcoding a specific port for distributed process group initialization can lead to flaky test failures due to port collisions in concurrent test environments. It is highly recommended to dynamically allocate a free port.
import socket\n with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:\n s.bind((\"\", 0))\n port = str(s.getsockname()[1])\n fx.build_single_rank_distributed(port=port)| from specforge.training import Trainer | ||
|
|
||
| TTT, N, MAX_LEN = 3, 8, 512 | ||
| workdir = tempfile.mkdtemp(prefix="coloc_disagg_") |
There was a problem hiding this comment.
The temporary directory created by tempfile.mkdtemp is never cleaned up, which can lead to disk space leaks over time. Registering a cleanup handler using self.addCleanup ensures that the directory is deleted after the test finishes.
import shutil\n workdir = tempfile.mkdtemp(prefix=\"coloc_disagg_\")\n self.addCleanup(shutil.rmtree, workdir)…resolver, stronger gate Post-review fixes for #636 (adversarial review vs the #630 roadmap): - build_offline_runtime / build_online_runtime take deployment_mode (default local_colocated) + metadata_db_path, so the mode parameter actually varies at the builder surface instead of being a hardcoded literal, and tests/tools can drive any mode through the ONE builder. - resolve_control_plane: drop the never-passed sample_queue/backpressure kwargs; the non-colocated arm now delegates the default store to DataFlowController's own InMemory default instead of re-encoding the selection policy (SQLite only when metadata_db_path is given). - Domain Trainer: the colocated offline path (durable_ack=False) keeps the control plane's assert_no_tensors contract on every ref even though the durable enqueue is skipped. - test_colocated_vs_disagg_equiv: both legs now run through build_offline_runtime (no hand-copied assembly); the disagg leg uses a real SQLiteMetadataStore and asserts the durable ack marker was written; the deterministic-algorithms flag is restored after the test instead of leaking process-wide. - test_noop_metadata_store: cover metadata_db_path -> SQLite selection (and local_colocated ignoring it). - Docs: DESIGN.md + NoOpMetadataStore state the scope of the no-op axis explicitly (leasing/queue bookkeeping stays shared; backpressure is already opt-in) and the consequences of retaining nothing (status() zeros, no dedup, no TrainLease ref reconstruction). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Self-review of the fix commit confirmed the new deployment_mode / metadata_db_path parameters were a hazard on the ONLINE builder: the online loader consumes a rank-private in-process SampleRefQueue that is fed only by commit_sample()==True, so a shared durable store dedups each sample onto exactly one rank's queue (divergent per-rank batch counts -> mismatched FSDP collectives -> hang), and a reused db file starves a restarted run to zero steps. The offline builder keeps the parameter (its refs-mode loader iterates the full ref list regardless of commit dedup, which is what the equivalence gate exercises); online durable-store runs remain the disagg online builders' job. Documented at the pin site. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Phase C — colocated lightweight control plane
Implements Phase C of the reconciled architecture plan (#630,
docs/roadmap/domain-refactor.md§C): colocated runs pay nothing for the disaggregation control plane, on the same code path — not a fork.Stacked on #635 (Phase B4). Part of the
dataflow-up-*stack.What changed
NoOpMetadataStore(control_plane/metadata_store.py) — aMetadataStorethat retains nothing (no committed-ref index, no durable marker) forlocal_colocatedruns.resolve_control_plane(deployment_mode, run_id, …) -> (controller, durable_ack)(control_plane/controller.py) — makesDeploymentModeload-bearing rather than decorative:local_colocated→NoOpMetadataStore+durable_ack=False(the loader releases each feature as it consumes it; no durable ack transaction).dataflow_colocated/disaggregated→ durable store (SQLite when a path is given, else in-memory) + optimizer-boundary ack.launch.py— the two colocated builders (build_offline_runtime,build_online_runtime) route through the selector and threaddurable_ack. Disagg builders are untouched.Trainerskips the durable-ack callback (and the offline enqueue) whendurable_ack=False.Only the (metadata store + ack policy) differ between modes — the trainer, loader, and
SampleRefQueueare byte-for-byte identical, so the training result is provably independent of the deployment mode.Tests / gates
test_noop_metadata_store.py— the no-op contract +DeploymentModeselection (CPU).test_colocated_vs_disagg_equiv.py— same seed + fixtures produce a bit-identical per-step loss curve through the no-op colocated plane and the full disaggregated plane (only the control plane differs).Validation
Full
tests/test_runtimesuite 231 OK (2 skipped, 1 xfail) on 8×H200 — no regressions.Review + fixes (July 1)
An adversarial review of the C+D stack against the #630 roadmap surfaced the findings below, fixed in the two
[Phase C review fixes]commits (the fixes were themselves re-reviewed by a second adversarial pass). Revalidated: fulltests/test_runtime= 239 OK (2 skip, 1 xfail) on 8×H200 — 231 baseline + 8 new gates.deployment_modeis a real parameter onbuild_offline_runtime(+metadata_db_path), so the mode varies at the builder surface; the equivalence gate now drives BOTH legs through the one builder, with the disagg leg on a real SQLite store and the durable ack marker asserted.build_online_runtimestays deliberately pinned tolocal_colocated(documented at the pin site): the online loader consumes a rank-private queue fed bycommit_sample()==True, so a shared durable store would dedup each sample onto a single rank's queue (divergent per-rank batch counts → mismatched FSDP collectives) — durable-store online runs are the disagg online builders' job.resolve_control_planeslimmed: never-passed kwargs dropped; default-store selection delegated to the controller instead of re-encoding the policy.assert_no_tensorscontract on every ref even though the durable enqueue is skipped.DESIGN.md+NoOpMetadataStorestate the no-op axis precisely (store + durable ack; leasing/queue bookkeeping stays shared — in-process, no I/O; backpressure already opt-in) and the consequences of retaining nothing (status()counts read 0, no commit dedup, noTrainLeaseref reconstruction). Roadmap §C wording updated on docs: reconciled SpecForge architecture plan (DataFlow runtime + domain layer) #630 to match.