Skip to content

[DataFlow runtime] Phase C — colocated lightweight control plane#636

Merged
jiapingW merged 5 commits into
dataflow-up-16-zerocopyfrom
dataflow-up-28-colocated-lightweight
Jul 3, 2026
Merged

[DataFlow runtime] Phase C — colocated lightweight control plane#636
jiapingW merged 5 commits into
dataflow-up-16-zerocopyfrom
dataflow-up-28-colocated-lightweight

Conversation

@maocheng23

@maocheng23 maocheng23 commented Jul 1, 2026

Copy link
Copy Markdown
Collaborator

Phase C — colocated lightweight control plane

Implements Phase C of the reconciled architecture plan (#630, docs/roadmap/domain-refactor.md §C): colocated runs pay nothing for the disaggregation control plane, on the same code path — not a fork.

Stacked on #635 (Phase B4). Part of the dataflow-up-* stack.

What changed

  • NoOpMetadataStore (control_plane/metadata_store.py) — a MetadataStore that retains nothing (no committed-ref index, no durable marker) for local_colocated runs.
  • resolve_control_plane(deployment_mode, run_id, …) -> (controller, durable_ack) (control_plane/controller.py) — makes DeploymentMode load-bearing rather than decorative:
    • local_colocatedNoOpMetadataStore + durable_ack=False (the loader releases each feature as it consumes it; no durable ack transaction).
    • dataflow_colocated / disaggregated → durable store (SQLite when a path is given, else in-memory) + optimizer-boundary ack.
  • launch.py — the two colocated builders (build_offline_runtime, build_online_runtime) route through the selector and thread durable_ack. Disagg builders are untouched.
  • The domain Trainer skips the durable-ack callback (and the offline enqueue) when durable_ack=False.

Only the (metadata store + ack policy) differ between modes — the trainer, loader, and SampleRefQueue are byte-for-byte identical, so the training result is provably independent of the deployment mode.

Tests / gates

  • test_noop_metadata_store.py — the no-op contract + DeploymentMode selection (CPU).
  • test_colocated_vs_disagg_equiv.py — same seed + fixtures produce a bit-identical per-step loss curve through the no-op colocated plane and the full disaggregated plane (only the control plane differs).

Validation

Full tests/test_runtime suite 231 OK (2 skipped, 1 xfail) on 8×H200 — no regressions.

Review + fixes (July 1)

An adversarial review of the C+D stack against the #630 roadmap surfaced the findings below, fixed in the two [Phase C review fixes] commits (the fixes were themselves re-reviewed by a second adversarial pass). Revalidated: full tests/test_runtime = 239 OK (2 skip, 1 xfail) on 8×H200 — 231 baseline + 8 new gates.

  • deployment_mode is a real parameter on build_offline_runtime (+ metadata_db_path), so the mode varies at the builder surface; the equivalence gate now drives BOTH legs through the one builder, with the disagg leg on a real SQLite store and the durable ack marker asserted.
  • build_online_runtime stays deliberately pinned to local_colocated (documented at the pin site): the online loader consumes a rank-private queue fed by commit_sample()==True, so a shared durable store would dedup each sample onto a single rank's queue (divergent per-rank batch counts → mismatched FSDP collectives) — durable-store online runs are the disagg online builders' job.
  • resolve_control_plane slimmed: never-passed kwargs dropped; default-store selection delegated to the controller instead of re-encoding the policy.
  • The colocated offline path keeps the control plane's assert_no_tensors contract on every ref even though the durable enqueue is skipped.
  • Scope docs: DESIGN.md + NoOpMetadataStore state the no-op axis precisely (store + durable ack; leasing/queue bookkeeping stays shared — in-process, no I/O; backpressure already opt-in) and the consequences of retaining nothing (status() counts read 0, no commit dedup, no TrainLease ref reconstruction). Roadmap §C wording updated on docs: reconciled SpecForge architecture plan (DataFlow runtime + domain layer) #630 to match.
  • Test hygiene: the deterministic-algorithms flag is restored after the gate instead of leaking process-wide.

Colocated runs now pay nothing for the disagg control plane on the SAME code
path (plan.md §C / roadmap domain-refactor.md §C). DeploymentMode becomes
load-bearing rather than decorative.

- NoOpMetadataStore: a MetadataStore that retains nothing (no dedup index, no
  durable marker) for local_colocated runs.
- resolve_control_plane(mode, run_id) -> (controller, durable_ack): local_colocated
  gets the no-op store + durable_ack=False (loader releases features as it
  consumes them); dataflow_colocated/disaggregated keep the durable store +
  optimizer-boundary ack. Disagg builders are untouched.
- build_offline_runtime / build_online_runtime route through the selector and
  thread durable_ack; the domain Trainer skips the ack transaction and the
  offline enqueue when durable_ack is False.

Gates: test_noop_metadata_store (contract + selection, CPU) and
test_colocated_vs_disagg_equiv (per-step loss bit-identical across the no-op
colocated and full disagg control planes — only the control plane differs).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a lightweight local_colocated deployment mode to bypass the overhead of the disaggregated control plane, adding a NoOpMetadataStore and a resolve_control_plane helper. Feedback on these changes highlights several important improvements: defining a default no-op lambda for ack_fn in Trainer to prevent potential TypeErrors, validating the deployment_mode parameter to avoid silent misconfigurations, dynamically allocating ports in tests to prevent collisions, and cleaning up temporary directories to avoid disk space leaks.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +103 to +110
ack_fn = None
if durable_ack:

def ack_fn(ids, step):
controller.ack_train_refs(
trainer_id, ids, global_step=step, optimizer_durable=True
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If TrainerController expects ack_fn to be a callable, passing None when durable_ack is False will result in a TypeError when it is invoked. It is safer and more robust to define ack_fn as a no-op lambda by default.

        ack_fn = lambda ids, step: None\n        if durable_ack:\n\n            def ack_fn(ids, step):\n                controller.ack_train_refs(\n                    trainer_id, ids, global_step=step, optimizer_durable=True\n                )

Comment on lines +382 to +391
if deployment_mode == "local_colocated":
store: MetadataStore = NoOpMetadataStore()
durable_ack = False
else:
store = (
SQLiteMetadataStore(metadata_db_path)
if metadata_db_path
else InMemoryMetadataStore()
)
durable_ack = True

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To prevent silent misconfigurations or typos in the deployment_mode parameter, it is highly recommended to explicitly validate the mode against the expected values and raise a ValueError for unrecognized modes.

    if deployment_mode == \"local_colocated\":\n        store: MetadataStore = NoOpMetadataStore()\n        durable_ack = False\n    elif deployment_mode in (\"dataflow_colocated\", \"disaggregated\"):\n        store = (\n            SQLiteMetadataStore(metadata_db_path)\n            if metadata_db_path\n            else InMemoryMetadataStore()\n        )\n        durable_ack = True\n    else:\n        raise ValueError(\n            f\"Unknown deployment_mode: {deployment_mode!r}. \"\n            \"Expected 'local_colocated', 'dataflow_colocated', or 'disaggregated'.\"\n        )

def test_loss_curve_independent_of_control_plane(self):
from tests.test_runtime import _fixtures as fx

fx.build_single_rank_distributed(port="29571")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Hardcoding a specific port for distributed process group initialization can lead to flaky test failures due to port collisions in concurrent test environments. It is highly recommended to dynamically allocate a free port.

        import socket\n        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:\n            s.bind((\"\", 0))\n            port = str(s.getsockname()[1])\n        fx.build_single_rank_distributed(port=port)

from specforge.training import Trainer

TTT, N, MAX_LEN = 3, 8, 512
workdir = tempfile.mkdtemp(prefix="coloc_disagg_")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The temporary directory created by tempfile.mkdtemp is never cleaned up, which can lead to disk space leaks over time. Registering a cleanup handler using self.addCleanup ensures that the directory is deleted after the test finishes.

        import shutil\n        workdir = tempfile.mkdtemp(prefix=\"coloc_disagg_\")\n        self.addCleanup(shutil.rmtree, workdir)

maocheng23 and others added 2 commits July 1, 2026 14:03
…resolver, stronger gate

Post-review fixes for #636 (adversarial review vs the #630 roadmap):

- build_offline_runtime / build_online_runtime take deployment_mode
  (default local_colocated) + metadata_db_path, so the mode parameter
  actually varies at the builder surface instead of being a hardcoded
  literal, and tests/tools can drive any mode through the ONE builder.
- resolve_control_plane: drop the never-passed sample_queue/backpressure
  kwargs; the non-colocated arm now delegates the default store to
  DataFlowController's own InMemory default instead of re-encoding the
  selection policy (SQLite only when metadata_db_path is given).
- Domain Trainer: the colocated offline path (durable_ack=False) keeps
  the control plane's assert_no_tensors contract on every ref even
  though the durable enqueue is skipped.
- test_colocated_vs_disagg_equiv: both legs now run through
  build_offline_runtime (no hand-copied assembly); the disagg leg uses a
  real SQLiteMetadataStore and asserts the durable ack marker was
  written; the deterministic-algorithms flag is restored after the test
  instead of leaking process-wide.
- test_noop_metadata_store: cover metadata_db_path -> SQLite selection
  (and local_colocated ignoring it).
- Docs: DESIGN.md + NoOpMetadataStore state the scope of the no-op axis
  explicitly (leasing/queue bookkeeping stays shared; backpressure is
  already opt-in) and the consequences of retaining nothing (status()
  zeros, no dedup, no TrainLease ref reconstruction).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Self-review of the fix commit confirmed the new deployment_mode /
metadata_db_path parameters were a hazard on the ONLINE builder: the
online loader consumes a rank-private in-process SampleRefQueue that is
fed only by commit_sample()==True, so a shared durable store dedups
each sample onto exactly one rank's queue (divergent per-rank batch
counts -> mismatched FSDP collectives -> hang), and a reused db file
starves a restarted run to zero steps. The offline builder keeps the
parameter (its refs-mode loader iterates the full ref list regardless
of commit dedup, which is what the equivalence gate exercises); online
durable-store runs remain the disagg online builders' job. Documented
at the pin site.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@maocheng23 maocheng23 marked this pull request as ready for review July 2, 2026 23:38
@maocheng23 maocheng23 requested a review from FrankLeeeee as a code owner July 2, 2026 23:38
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

Base automatically changed from dataflow-up-27-target-engine-cutover to dataflow-up-16-zerocopy July 3, 2026 02:27
@jiapingW jiapingW self-requested a review July 3, 2026 02:28
@jiapingW jiapingW merged commit 8fb067c into dataflow-up-16-zerocopy Jul 3, 2026
1 check passed
@jiapingW jiapingW deleted the dataflow-up-28-colocated-lightweight branch July 3, 2026 02:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants