Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions examples/disagg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,35 @@ The control plane carries only tensor-free `SampleRef` metadata (the manifest);
feature tensors travel through the shared store. `build_disagg_eagle3_runtime`
reuses the exact offline trainer assembly, so results align by construction.

## Backends

The feature transport is selected by `DISAGG_BACKEND` (default `shared_dir`):

| backend | store | shared *data* mount? |
|---|---|---|
| `shared_dir` (default) | `SharedDirFeatureStore` (`torch.save` on a POSIX mount) | required |
| `mooncake` | `MooncakeFeatureStore` (RDMA/TCP network object store) | not needed |

`mooncake` is the M6 **fast path**: producer `put()`s and consumer `get()`s by key
across nodes peer-to-peer, so feature tensors need no shared *data* mount (only the
small ref manifest still uses `DISAGG_MANIFEST`). Each object is hard-pinned so
Mooncake's cache LRU never drops a committed feature. Because a Mooncake object
lives in the **producer's** memory segment, the producer must stay alive until the
consumer finishes — the example holds it open until the consumer writes
`<manifest>.consumed` (or `DISAGG_PRODUCER_HOLD_S` elapses). Enable with:

```bash
export DISAGG_BACKEND=mooncake
export MOONCAKE_LOCAL_HOSTNAME=<this-node-ip>
export MOONCAKE_METADATA_SERVER=<metadata url>
export MOONCAKE_MASTER_SERVER_ADDR=<master host:port>
export MOONCAKE_PROTOCOL=tcp # or rdma
```

Requires the `mooncake` package and a running Mooncake master/metadata service
(verify on a Mooncake-enabled GPU host). The contract itself is unit-tested
backend-agnostically in `tests/test_runtime/test_mooncake_store.py`.

## Run it (rcli, 2 nodes)

1. Generate offline features on node 0 (any EAGLE3 feature generator), e.g. into
Expand Down
104 changes: 92 additions & 12 deletions examples/disagg/run_disagg_eagle3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,32 @@
features live, not their values, so the training curve matches the colocated
offline baseline.

Role is taken from ``DISAGG_ROLE`` (``producer``/``consumer``); if unset it is
derived from ``RCLI_NODE_RANK`` (0 -> producer, else consumer). Shared paths +
auth come from the environment so one wrapper can drive both nodes:

DISAGG_STORE_ROOT=/workspace/disagg_store # shared mount, both pools
DISAGG_MANIFEST=/workspace/disagg_store/refs.json
Backend is selected by ``DISAGG_BACKEND`` (default ``shared_dir``):

* ``shared_dir`` — ``SharedDirFeatureStore`` over a shared POSIX mount.
* ``mooncake`` — ``MooncakeFeatureStore``, the M6 fast path: a network object
store (RDMA/TCP), so the feature tensors need **no shared data mount**. Caveat:
a Mooncake object lives in the *producer's* memory segment, so the producer
process must stay alive until the consumer has read everything — this example
holds the producer open until the consumer writes ``<manifest>.consumed`` (or
``DISAGG_PRODUCER_HOLD_S`` seconds elapse). The small ref manifest + sentinels
still travel through ``DISAGG_MANIFEST``.

Role is taken from ``DISAGG_ROLE`` (``producer``/``consumer``/``colocated``); if
unset it is derived from ``RCLI_NODE_RANK`` (0 -> producer, else consumer).
Config comes from the environment so one wrapper can drive both nodes:

DISAGG_MANIFEST=/workspace/disagg_store/refs.json # small shared control plane
DISAGG_STORE_ID=eagle3-disagg # producer/consumer must match
DISAGG_AUTH_TOKEN=<secret> # optional (B9 auth)
# backend=shared_dir (default):
DISAGG_STORE_ROOT=/workspace/disagg_store # shared *data* mount
# backend=mooncake:
DISAGG_BACKEND=mooncake
MOONCAKE_LOCAL_HOSTNAME=<this-node-ip>
MOONCAKE_METADATA_SERVER=<metadata server url>
MOONCAKE_MASTER_SERVER_ADDR=<master host:port>
MOONCAKE_PROTOCOL=tcp # or "rdma"
"""

import os
Expand All @@ -50,6 +68,8 @@
write_ref_manifest,
)
from specforge.runtime.data_plane.disaggregated import AuthPolicy, SharedDirFeatureStore
from specforge.runtime.data_plane.feature_store import FeatureStore
from specforge.runtime.data_plane.mooncake_store import MooncakeFeatureStore
from specforge.runtime.launch import (
build_disagg_eagle3_runtime,
build_offline_eagle3_runtime,
Expand All @@ -65,17 +85,72 @@ def _role() -> str:
return "producer" if os.environ.get("RCLI_NODE_RANK", "0") == "0" else "consumer"


def _store(args, *, retain_on_release: bool = False) -> SharedDirFeatureStore:
def _backend() -> str:
return os.environ.get("DISAGG_BACKEND", "shared_dir")


def _store(args, *, retain_on_release: bool = False) -> FeatureStore:
token = os.environ.get("DISAGG_AUTH_TOKEN") or None
store_id = os.environ.get("DISAGG_STORE_ID", RUN_ID)
if _backend() == "mooncake":
# Fast path: producer put()s and consumer get()s by key across nodes over
# the Mooncake object store -- no shared *data* mount. store_id namespaces
# the keys, so producer and consumer must agree on it (as with shared_dir).
setup_kwargs = {
"local_hostname": os.environ["MOONCAKE_LOCAL_HOSTNAME"],
"metadata_server": os.environ["MOONCAKE_METADATA_SERVER"],
"master_server_addr": os.environ["MOONCAKE_MASTER_SERVER_ADDR"],
"protocol": os.environ.get("MOONCAKE_PROTOCOL", "tcp"),
}
# The contributed segment must hold the hard-pinned feature set, so size
# it for the workload (default 1 GiB is fine only for tiny sets).
for env_key, kw in (
("MOONCAKE_GLOBAL_SEGMENT_SIZE", "global_segment_size"),
("MOONCAKE_LOCAL_BUFFER_SIZE", "local_buffer_size"),
):
if os.environ.get(env_key):
setup_kwargs[kw] = int(os.environ[env_key])
return MooncakeFeatureStore(
store_id=store_id,
setup_kwargs=setup_kwargs,
auth=AuthPolicy(token),
credential=token,
retain_on_release=retain_on_release,
)
return SharedDirFeatureStore(
os.environ["DISAGG_STORE_ROOT"],
store_id=os.environ.get("DISAGG_STORE_ID", RUN_ID),
store_id=store_id,
auth=AuthPolicy(token),
credential=token,
retain_on_release=retain_on_release,
)


def _hold_producer_until_consumed(manifest: str) -> None:
"""Keep the producer (and its Mooncake memory segment) alive until the
consumer signals completion, since a Mooncake object lives in the producing
process's segment. shared_dir does not need this (files persist on the mount).
"""
consumed = manifest + ".consumed"
hold_s = float(os.environ.get("DISAGG_PRODUCER_HOLD_S", "3600"))
deadline = time.monotonic() + hold_s
print(
f"[producer] mooncake backend: holding segment until {consumed} "
f"(<= {hold_s:.0f}s)",
flush=True,
)
while not os.path.exists(consumed):
if time.monotonic() > deadline:
print(
"[producer] hold timed out before consumer signalled; exiting "
"(consumer may lose features)",
flush=True,
)
return
time.sleep(2)
print("[producer] consumer signalled done; releasing segment", flush=True)


def run_producer(args) -> None:
manifest = os.environ["DISAGG_MANIFEST"]
store = _store(args)
Expand All @@ -88,11 +163,14 @@ def run_producer(args) -> None:
)
write_ref_manifest(refs, manifest)
open(manifest + ".done", "w").close() # liveness marker the consumer waits on
location = getattr(store, "root", f"mooncake://{store.store_id}")
print(
f"[producer] ingested {len(refs)} samples into {store.root}; "
f"[producer] ingested {len(refs)} samples into {location}; "
f"manifest -> {manifest}",
flush=True,
)
if _backend() == "mooncake":
_hold_producer_until_consumed(manifest)


def _build_model_and_optimizer(args):
Expand Down Expand Up @@ -196,9 +274,8 @@ def run_consumer(args) -> None:
# offline ref set is re-iterated across epochs -> retain on release (read-only)
store = _store(args, retain_on_release=True)
refs = read_ref_manifest(manifest)
print(
f"[consumer] training from {len(refs)} disagg refs in {store.root}", flush=True
)
location = getattr(store, "root", f"mooncake://{store.store_id}")
print(f"[consumer] training from {len(refs)} disagg refs in {location}", flush=True)

trainer, loader = build_disagg_eagle3_runtime(
feature_store=store,
Expand All @@ -222,6 +299,9 @@ def run_consumer(args) -> None:
)
trainer.fit(loader)
destroy_distributed()
if _backend() == "mooncake":
# release the producer holding its Mooncake segment open (see docstring)
open(manifest + ".consumed", "w").close()


def main() -> None:
Expand Down
12 changes: 12 additions & 0 deletions examples/disagg/run_qwen2.5_7b_eagle3_disagg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ set -euo pipefail
export DISAGG_STORE_ROOT DISAGG_MANIFEST
export DISAGG_STORE_ID="${DISAGG_STORE_ID:-eagle3-disagg}"
export DISAGG_AUTH_TOKEN="${DISAGG_AUTH_TOKEN:-disagg-secret}"

# --- Optional: Mooncake fast-path backend (network object store, no shared data
# mount). Uncomment + point at your Mooncake master/metadata to route features
# over Mooncake instead of $DISAGG_STORE_ROOT. The manifest still uses
# $DISAGG_MANIFEST, and the producer holds its segment open until the consumer
# finishes (see run_disagg_eagle3.py). ---
# export DISAGG_BACKEND=mooncake
# export MOONCAKE_LOCAL_HOSTNAME="$(hostname -i | awk '{print $1}')"
# export MOONCAKE_METADATA_SERVER="http://<metadata-host>:8080/metadata"
# export MOONCAKE_MASTER_SERVER_ADDR="<master-host>:50051"
# export MOONCAKE_PROTOCOL=tcp # or "rdma"
# export MOONCAKE_GLOBAL_SEGMENT_SIZE=$((8*1024*1024*1024)) # >= total feature bytes
export FLASHINFER_DISABLE_VERSION_CHECK=1
export HOME=/root HF_HOME=/root/.cache/huggingface TRITON_CACHE_DIR=/root/.triton
export PYTHONPATH="$SF_HOME:$SF_HOME/scripts:${PYTHONPATH:-}"
Expand Down
2 changes: 2 additions & 0 deletions specforge/runtime/data_plane/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
load_feature_file,
spec_from_tensor,
)
from specforge.runtime.data_plane.mooncake_store import MooncakeFeatureStore
from specforge.runtime.data_plane.offline_reader import (
OfflineManifestReader,
list_feature_files,
Expand All @@ -26,5 +27,6 @@
"OfflineManifestReader",
"list_feature_files",
"SharedDirFeatureStore",
"MooncakeFeatureStore",
"AuthPolicy",
]
Loading
Loading