Skip to content

W7: finish data decentralization (bounded shard cache, streaming router fit, peer-verifiable routing)#15

Merged
MasonFlint44 merged 8 commits into
mainfrom
w7-data-decentralization
Jun 19, 2026
Merged

W7: finish data decentralization (bounded shard cache, streaming router fit, peer-verifiable routing)#15
MasonFlint44 merged 8 commits into
mainfrom
w7-data-decentralization

Conversation

@MasonFlint44

Copy link
Copy Markdown
Owner

Closes the central data dependencies that remained after Phase 0d's data.ship: spec (which already removed corpus bytes from the wire). Design: docs/w7-data-decentralization-design.md.

What lands

Slice a — bounded worker shard cache. The two stateful worker loops kept materialized shards in a plain dict that never evicted, so a worker failing over across many paths accumulated every shard it ever leased in RAM (the on-disk cache bounds re-streaming, not memory). Replaced with _ShardCache, a bounded LRU keyed by path; an evicted shard re-materializes from the spec (cheap when the disk cache is warm) or reloads from bytes on its next lease — byte-identical training, eviction changes only when a shard is rebuilt. Exposed as run.worker_max_shards / opendipaco join --max-shards (None = library default).

Slice b — bounded-memory streaming router fit. In spec mode the server still loaded the whole corpus to fit the k-means router and count tokens. fit_routing_from_source streams a bounded sample (the first N docs) to fit; build_server_corpus routes the scheduler/coordinator through it + SpecCorpus.build's streaming counts pass, so the server never holds the full corpus. Gated by data.router_sample (spec-mode-only); off by default and byte-identical to the in-hand path when unset. Sampling changes the centroids → it's an opt-in, §0f-adjacent dynamics lever.

Slice c — peer-verifiable routing. verify_routing re-runs the deterministic sampled fit from the spec's own public source and compares to the shipped centroids; opendipaco join --verify-routing refuses to train on a mismatch (the check fires worker-side at the _materialize_from_spec seam, memoized per spec). Opt-in, off by default — belt-and-suspenders, not a security boundary (a wrong router only wastes compute; weights are grant/quorum-gated).

Volunteer-local knobs (worker_max_shards, worker_max_batch, verify_routing) are stripped from the published manifest so a joiner never inherits the operator's hardware-specific values.

Reviewed

Each slice got a recall-biased multi-agent code-review pass; fixes folded in (see commits):

  • a: manifest leak — worker_max_shards/worker_max_batch now stripped (volunteer-local).
  • b: dropped round-robin fallback — a too-small router_sample (or C4 filtering) now degrades to round-robin instead of crashing KMeansRouter.fit at startup.
  • c: RoutingVerificationError was swallowed by the coordinator path's broad except (infinite nack-retry instead of refuse) — now propagates fatally as the contract promised; round-robin-degrade → "can't verify" (warn+proceed) not refuse; dtype-safe + tolerance relaxed for cross-stack k-means.

Owed (tracked in docs/remaining-gaps.md)

  • Decentralized EM re-sharding — still central, not in the async/sharded loop; decentralizing it changes global assignments and needs consensus (§0f/research, rides the WAN run).
  • --verify-routing is a no-op in schedule.mode: decentralized (no _materialize_from_spec seam; warned at startup).
  • verify_routing is tolerant across very different numerical stacks (k-means isn't bit-identical across BLAS/torch builds; it only ever refuses, never silently trains on a bad router).

Testing

uv run pytest green (full suite, exit 0). New: tests/test_shard_cache.py; additions to tests/test_data_spec.py, tests/test_join.py, tests/test_launch.py. Central/unsampled paths kept byte-identical (asserted).

🤖 Generated with Claude Code

MasonFlint44 and others added 7 commits June 17, 2026 21:24
…uter fit/verify)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The two stateful worker loops kept materialized shards in a plain dict that
never evicted: a long-lived worker failing over across many paths accumulated
every shard it ever leased in RAM (the on-disk cache bounds re-streaming, not
memory). Replace it with `_ShardCache`, a bounded LRU keyed by path; an evicted
shard is re-materialized from the spec (cheap when the disk cache is warm) or
reloaded from bytes on its next lease. Training is byte-identical — eviction
changes only *when* a shard is rebuilt, never its contents; a one-path worker
holds a single entry and never evicts. The decentralized worker uses
corpus.shard() directly and is untouched.

Exposed as run.worker_max_shards (None = library default 4) and
`opendipaco join --max-shards`.

Review fixes folded in:
- manifest: strip worker_max_shards AND worker_max_batch — both are volunteer-
  local hardware knobs (like max_mbps); a joiner must fall back to its own
  default, not inherit the operator's value. worker_max_shards is now Optional
  so a stripped manifest (-> None) rebuilds into a valid config.
- _ShardCache(None) -> library default (the stripped-manifest path).
- honest __iter__ comment: the snapshot is cheap single-thread safety, not a
  lock substitute.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
In spec mode the server still loaded the whole corpus into RAM to fit the
k-means router and count tokens (build_documents -> fit on all docs) -- the live
central data-authority memory cost the spec path was meant to remove.

Add data/spec.py::fit_routing_from_source: stream the public source, fit on the
first `sample` document prefixes (bounded RAM), return a kmeans_routing dict.
Deterministic in (source, sample, seq_len, feature_dim, seeds) so any peer
reproduces identical centroids -- the basis for slice c's verify_routing.

roles.py::build_server_corpus is the new scheduler/coordinator entry: with
data.router_sample set (spec mode) it streams a sampled fit + SpecCorpus.build
counts pass, never holding the full corpus; otherwise it falls back to the exact
in-hand build (byte-identical). Factored out _spec_source() to drop the
source-dispatch duplication. Wired run_scheduler / run_coordinator / sharded
run_local; the decentralized worker keeps the in-hand path (it needs a real
ShardedCorpus).

data.router_sample knob: None = off; validated >= 1 and spec-mode-only at load.
Sampling changes the centroids -> different shards, so it is opt-in and NOT
byte-identical (a 0f-adjacent dynamics lever, off by default).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…st determinism

Code-review fixes for the streaming router fit:
- fit_routing_from_source degrades to round-robin when the streamed sample
  yields fewer than num_paths docs, mirroring the in-hand builder's
  `len(docs) < num_paths` fallback. The streaming path previously went straight
  to KMeansRouter.fit, which raises "need >= K samples" — reachable via a
  too-small router_sample (default num_paths=16) or aggressive C4 filtering,
  crashing the scheduler/coordinator at startup.
- remove the feat_seed parameter: no caller passed it (always 0), and the
  featurizer seed must equal the routing-dict seed anyway; hardcode 0 to match
  the in-hand path.
- tighten the determinism docstring: k-means++ seeds off randperm(n) where n is
  the streamed doc count, so reproducibility rests on bit-identical stream replay
  (synthetic regenerates exactly; pin the C4 revision) — the contract slice c's
  verify_routing depends on.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A worker can now refuse a shipped router that doesn't match the corpus it claims
to come from, instead of trusting whatever bytes the spec carries.

- data/spec.py::verify_routing re-runs the slice-b deterministic sampled fit from
  the spec's own public source and compares to the shipped centroids. The sampled
  fit records sample/router_seed in routing["fit"] so verification is
  self-contained; round-robin verifies trivially; an in-hand (full-corpus) spec
  has no fit metadata and raises "can't verify".
- enforcement at the materialization seam (_materialize_from_spec, shared by both
  worker loops), not at join time (the spec arrives per-task, not in the
  manifest): verified once per spec (memoized by fingerprint so an LRU
  re-materialization doesn't re-stream), fatal RoutingVerificationError on
  mismatch (refuse to train), warn-then-proceed when unverifiable.
- --verify-routing join flag / run.verify_routing knob, volunteer-local (stripped
  from the manifest), off by default (re-streaming the sample costs bandwidth).

Belt-and-suspenders, not a security boundary: a wrong router only wastes compute
(weights are grant/quorum-gated), and a self-consistent malicious spec (a
different public corpus + matching centroids) is the W8 data-poisoning question.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Code-review fixes for routing verification:
- RoutingVerificationError now escapes _serve_connection's broad except (it was
  caught -> nack -> continue, so the coordinator/run_worker path re-leased the
  same poisoned spec forever and re-fit every retry). Re-raised before the broad
  handler so it propagates past the OSError/ConnectionError-only reconnect loop
  and the worker exits, matching the sharded path and the "refuse to train"
  contract.
- verify_routing: a re-fit that degrades to round-robin (live source now yields
  < num_paths docs) raises "cannot verify" instead of returning False -> the
  worker warns and proceeds rather than an honest worker refusing on a transient
  data-availability dip.
- compare centroids in float32 so a round-tripped/float64 tensor yields a verdict
  instead of allclose raising (which escaped the ValueError-only catch).
- relax the centroid tolerance to 1e-3 and document that k-means isn't
  bit-identical across BLAS/torch stacks (tolerant compare; only ever refuses,
  never silently trains on a bad router). Docstring also scoped honestly: it
  confirms centroids match the recorded fit params over the stated source, not
  that the params/routing are benign (attacker-picks-seed is a W8 question).
- decentralized mode warns that --verify-routing is a no-op there (no
  _materialize_from_spec seam) instead of the join client implying it's active.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Roadmap W7 -> mostly landed (bounded shard cache, sampled streaming router fit,
peer-verifiable routing); remaining-gaps records the owed pieces: decentralized
EM re-sharding (§0f/research), verify_routing no-op in decentralized mode, and
the cross-stack-tolerant centroid compare.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 33916d282e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/opendipaco/data/spec.py Outdated
Comment on lines +235 to +238
recomputed = fit_routing_from_source(
spec["source"], num_paths=spec["num_paths"], vocab_size=feat["vocab_size"],
seq_len=spec["seq_len"], sample=fit["sample"], feature_dim=feat["feature_dim"],
router_seed=fit["router_seed"], doc_source=source, tokenizer=tokenizer)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Verify the featurizer seed as well

When --verify-routing is enabled, this re-fit ignores routing["featurizer"]["seed"]: fit_routing_from_source always uses the default seed, while _build_predictor later routes documents with the shipped f["seed"]. A spec that changes only the featurizer seed keeps the same centroids, passes this allclose check, and then materializes different shard assignments, so the tampered/wrong router is not actually refused.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — confirmed and fixed in cf6006f. fit_routing_from_source regained a feat_seed parameter (recorded in the routing dict via kmeans_routing(..., seed=feat_seed)), and verify_routing now re-fits with the shipped featurizer["seed"], so a spec that flips only the featurizer seed reproduces different centroids → mismatch → refused. Also added a featurizer["kind"] guard so a future/tampered kind reports "cannot verify" rather than being blindly re-fit as bag-of-tokens. Test: test_verify_routing_detects_tampered_featurizer_seed.

…nvalid configs

Codex (P2): verify_routing re-fit the router ignoring the shipped
featurizer["seed"] (always 0 after slice b dropped feat_seed as "dead"), while
_build_predictor routes at materialization with the shipped seed. A spec that
keeps the legit centroids but flips only the featurizer seed would pass
verification yet route through a different projection. Reintroduce feat_seed on
fit_routing_from_source (recorded in the routing dict); verify_routing now
re-fits with the shipped seed, so a seed-only tamper produces a centroid
mismatch and is refused.

Whole-PR code-review hardening:
- verify_routing guards featurizer["kind"] == "bag_of_tokens" (matching
  _build_predictor) so a future/tampered featurizer kind reports can't-verify
  instead of being blindly re-fit as bag-of-tokens and falsely rejected.
- config: data.router_sample now requires data.routing: kmeans (round-robin fits
  no router, so the sample was silently ignored).
- config: schedule.mode: decentralized rejects data.ship: spec at load -- the
  decentralized worker materializes via corpus.shard(), which a SpecCorpus can't
  serve, so it would otherwise crash on the first lease (central/rendezvous spec
  is unaffected; the gate keys on schedule.mode).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@MasonFlint44 MasonFlint44 merged commit fa58df2 into main Jun 19, 2026
2 checks passed
@MasonFlint44 MasonFlint44 deleted the w7-data-decentralization branch June 19, 2026 18:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant