Skip to content

[DataFlow runtime · online M-O1.1] SQLiteSampleRefQueue: durable cross-process queue#19

Closed
maocheng23 wants to merge 1 commit into
dataflow-up-16-zerocopyfrom
dataflow-up-17-online-controlplane
Closed

[DataFlow runtime · online M-O1.1] SQLiteSampleRefQueue: durable cross-process queue#19
maocheng23 wants to merge 1 commit into
dataflow-up-16-zerocopyfrom
dataflow-up-17-online-controlplane

Conversation

@maocheng23

Copy link
Copy Markdown
Owner

Stacks on sgl-project#614 (dataflow-up-16-zerocopy). Merge bottom-up. First implementation PR of the online disaggregated stack planned in sgl-project#618 (refactor/dataflow-online-roadmap.md, milestone O1.1).

What

The in-process SampleRefQueue couples producer and consumer to one OS process (a threading.Condition over in-memory dicts). A disaggregated run puts the rollout pool and the trainer pool in separate processes, so pending/leased state must be shared across processes, not just threads.

Adds SQLiteSampleRefQueue — a drop-in for DataFlowController(sample_queue=...) that keeps the exact put/get/ack/fail/depth/in_flight contract but holds the queue in SQLite, so a producer process and a consumer process over one DB file see one queue.

  • Lease race closed by BEGIN IMMEDIATE around read-then-lease (WAL + busy_timeout let a second process wait out the writer lock) — two consumer processes never double-lease.
  • Wall-clock leases (monotonic clocks aren't comparable across processes) + timeout reclaim.
  • Partitioned (DP-resharding) lease is non-blocking while the global pool is non-empty — matches the in-process queue's shard semantics.
  • Metadata-only (assert_no_tensors on put); durable across process restart.

No controller change needed

commit/lease/ack already route only through sample_queue + MetadataStore, so a shared SQLiteSampleRefQueue + SQLiteMetadataStore makes the train-side control plane cross-process as-is — see test_cross_process_commit_lease_ack_then_reconcile (two controllers, separate instances, share commit → lease → ack and reconcile from the shared durable marker).

Tests

tests/test_runtime/test_durable_queue.py (11 tests, CPU, green locally via an isolated import of the runtime modules): cross-instance commit/lease/ack, no-double-lease, partition disjoint/complete + reshard-remainder-once, non-blocking empty shard, fail retry/drop, lease-timeout reclaim, idempotent put, durable-across-reopen, and the cross-controller integration test.

Not in this PR (next in the stack)

O1.2 engine-agnostic FeatureSource + build_disagg_online_eagle3_runtime (async loop); O1.3 live SGLang capture (via the stable public return_hidden_states API — no version coupling); O2.x Ray actors / backpressure / streaming-pool lifetime.

🤖 Generated with Claude Code

…line O1.1)

The in-process SampleRefQueue couples producer and consumer to one OS process
(threading.Condition over in-memory dicts), so a disaggregated *run* (rollout pool
and trainer pool as separate processes) cannot share lease/ack state. Adds
SQLiteSampleRefQueue: a drop-in for DataFlowController(sample_queue=...) that keeps
the exact put/get/ack/fail/depth/in_flight contract but holds pending/leased state
in SQLite, so producer and consumer processes over one DB file see one queue.

- Lease race closed by BEGIN IMMEDIATE around read-then-lease (WAL + busy_timeout
  let a second process wait out the writer lock) -- two consumers never double-lease.
- Wall-clock leases (monotonic is not comparable across processes) + timeout reclaim.
- Partitioned (DP-resharding) lease is non-blocking while the global pool is
  non-empty, matching the in-process queue.
- Metadata-only (assert_no_tensors on put), durable across reopen.

No controller change: commit/lease/ack already route only through sample_queue +
MetadataStore, so a shared SQLite queue + SQLiteMetadataStore makes the train-side
control plane cross-process as-is (test_cross_process_commit_lease_ack_then_reconcile).

First PR of the online disaggregated stack (refactor/dataflow-online-roadmap.md, O1.1).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@maocheng23

Copy link
Copy Markdown
Owner Author

Superseded: duplicates the existing StreamingRefChannel/StreamingRefQueue on dataflow-up-17-online-disagg. Pivoting to review+harden the existing online-disagg/staleness branches instead. (Roadmap context: sgl-project#618.)

@maocheng23 maocheng23 closed this Jun 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant