Skip to content

[DataFlow runtime · online] O1.2 — named builder + interleaved async loop#625

Merged
jiapingW merged 1 commit into
dataflow-up-16-zerocopyfrom
dataflow-up-20-online-async-loop
Jul 2, 2026
Merged

[DataFlow runtime · online] O1.2 — named builder + interleaved async loop#625
jiapingW merged 1 commit into
dataflow-up-16-zerocopyfrom
dataflow-up-20-online-async-loop

Conversation

@maocheng23

Copy link
Copy Markdown
Collaborator

Stage O1.2 of the online roadmap (#618): the named single-process runtime builder and the interleaved async loop that replaces drain-then-fit. Stacked on O1.1 (#624) — review/merge that first; the diff here is against the O1.1 branch.

Problem

The colocated online path was drain-then-fit: generate the whole prompt pool, then train. (Root cause: the in-process SampleRefQueue.get is non-blocking, so the loader ended the instant the queue was momentarily empty.)

Change

  • run_disagg_online_interleaved: the producer streams refs on a daemon thread while trainer.fit consumes on the main thread (StreamingRefQueue blocks until closed-and-drained, so the trainer tracks the producer). Symmetric, hang-free shutdown:
    • trainer finishes first (e.g. max_steps) → a cooperative should_stop winds the producer down instead of blocking on the in-flight watermark after the consumer stops draining;
    • producer finishes first → closes the channel, the loader drains the tail;
    • producer raises → channel closed in finally (consumer can't hang) + the exception re-raised on the main thread.
  • build_disagg_online_eagle3_runtime: the named single-process builder from the roadmap; composes producer + consumer over one shared metadata store, one consume-once feature store, and a producer/consumer StreamingRefChannel pair, returning (trainer, loader, run). Uses the in-process generate_eagle3_data stub — no live SGLang server and no Ray (those are O1.3 / O2).
  • drive_producer gains a cooperative should_stop predicate + a finally-close so it never strands the consumer.

Tests

tests/test_runtime/test_disagg_online_interleave.py (CPU): the three shutdown paths (full-drain-then-terminate, trainer-stops-first cooperative wind-down, producer-exception propagation + consumer unblock). A GPU end-to-end test for the named builder's interleaved run() is added to test_disagg_online_launch.py.

Next

O1.3 — live SGLang-server hidden-state capture (the roadmap's gating risk; needs a GPU + the capture spike first).

🤖 Generated with Claude Code

@gemini-code-assist

Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@gemini-code-assist

Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@maocheng23

Copy link
Copy Markdown
Collaborator Author

Code review

No high-confidence issues found. Checked for bugs in the interleaved producer/trainer shutdown paths, backpressure stop handling, and named online runtime wiring.

…loop

Replace the drain-then-fit shape (generate the whole prompt pool, then
train) with a live loop in which the producer and trainer run concurrently.

- run_disagg_online_interleaved: the producer streams refs on a daemon thread
  while trainer.fit consumes on the main thread (StreamingRefQueue blocks
  until closed-and-drained, so the trainer tracks the producer). Symmetric,
  hang-free shutdown: trainer-finishes-first sets a cooperative should_stop so
  the producer doesn't block on the in-flight watermark after the consumer
  stops draining; producer-finishes-first closes the channel; producer-raise
  closes the channel (so the consumer can't hang) and re-raises on the main
  thread.
- build_disagg_online_eagle3_runtime: the named single-process builder from
  the roadmap; composes producer + consumer over one shared metadata store,
  one consume-once feature store, and a producer/consumer StreamingRefChannel
  pair, and returns (trainer, loader, run). Uses the in-process generate
  stub (no live SGLang server, no Ray -- those are O1.3 / O2).
- drive_producer gains a cooperative should_stop predicate + finally-close.

CPU tests (test_disagg_online_interleave.py): full-drain-then-terminate,
trainer-stops-first cooperative wind-down, producer-exception propagation +
consumer unblock. GPU test for the named builder's interleaved run() added to
test_disagg_online_launch.py. Stacked on O1.1; implements stage O1.2 (#618).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@maocheng23 maocheng23 force-pushed the dataflow-up-19-online-shared-control-plane branch from 5f79d08 to d6a6fd3 Compare July 1, 2026 00:29
@maocheng23 maocheng23 force-pushed the dataflow-up-20-online-async-loop branch from 9dba169 to 17a8770 Compare July 1, 2026 00:29
@maocheng23

Copy link
Copy Markdown
Collaborator Author

Addressed review feedback (self-review pass).

  • run_disagg_online_interleaved: hardened shutdown. After thread.join(timeout) it now checks is_alive() and raises instead of silently returning with a leaked, still-running daemon producer; a trainer exception no longer masks a producer exception (they are chained, producer surfaced as root cause); a producer failure raised after the join no longer goes unreported.

Validated: full tests/test_runtime = 200 OK (2 skipped, 1 xfail), zero failures, on a 2-node H200 pod. Lint clean (black 24.10.0 / isort 5.13.2 / autoflake).

Base automatically changed from dataflow-up-19-online-shared-control-plane to dataflow-up-16-zerocopy July 2, 2026 04:44
@jiapingW jiapingW merged commit 15085dc into dataflow-up-16-zerocopy Jul 2, 2026
1 check passed
@jiapingW jiapingW deleted the dataflow-up-20-online-async-loop branch July 2, 2026 05:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants