Skip to content

[CORE-16225] 2/3: Cloud I/O Scheduler: Reservation-based policy#30595

Draft
oleiman wants to merge 3 commits into
devfrom
ct/core-16225/cloud-io-min-share-scheduler
Draft

[CORE-16225] 2/3: Cloud I/O Scheduler: Reservation-based policy#30595
oleiman wants to merge 3 commits into
devfrom
ct/core-16225/cloud-io-min-share-scheduler

Conversation

@oleiman
Copy link
Copy Markdown
Member

@oleiman oleiman commented May 24, 2026

Sequence:

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v26.1.x
  • v25.3.x
  • v25.2.x

Release Notes

  • none

@oleiman oleiman self-assigned this May 24, 2026
@oleiman oleiman changed the title Ct/core 16225/cloud io min share scheduler [CORE-16225] 2/2: Cloud I/O Scheduler: Min-share May 24, 2026
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented May 24, 2026

/ci-repeat 1

@oleiman oleiman changed the base branch from ct/core-16225/cloud-io-base-scheduler to dev May 24, 2026 05:25
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented May 24, 2026

/ci-repeat 1

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented May 24, 2026

Retry command for Build#84908

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/cluster_config_test.py::ClusterConfigTest.test_valid_settings
tests/rptest/tests/cluster_config_test.py::ClusterConfigTest.test_rpk_export_import
tests/rptest/tests/shadow_linking_rnot_test.py::ShadowLinkingRandomOpsTest.test_node_operations@{"failures":true,"workload_set":"cloud_combos"}

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

CI test results

test results on build#84908
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FAIL ClusterConfigTest test_rpk_export_import null integration https://buildkite.com/redpanda/redpanda/builds/84908#019e588b-ab50-443f-a3bf-315bc7817eaa 0/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ClusterConfigTest&test_method=test_rpk_export_import
FAIL ClusterConfigTest test_rpk_export_import null integration https://buildkite.com/redpanda/redpanda/builds/84908#019e5890-c54d-4460-81ed-7ac38add545f 0/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ClusterConfigTest&test_method=test_rpk_export_import
FAIL ClusterConfigTest test_valid_settings null integration https://buildkite.com/redpanda/redpanda/builds/84908#019e588b-ab4e-4472-9eaf-bc542e6482fd 0/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ClusterConfigTest&test_method=test_valid_settings
FAIL ClusterConfigTest test_valid_settings null integration https://buildkite.com/redpanda/redpanda/builds/84908#019e5890-c54a-4805-bd94-7ee63eea61bf 0/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ClusterConfigTest&test_method=test_valid_settings
FLAKY(PASS) ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "redpanda"}, "storage_mode": "cloud", "with_failures": false} integration https://buildkite.com/redpanda/redpanda/builds/84908#019e5890-c549-42a6-87d2-886f255df986 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0022, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
FLAKY(FAIL) ShadowLinkingRandomOpsTest test_node_operations {"failures": true, "workload_set": "cloud_combos"} integration https://buildkite.com/redpanda/redpanda/builds/84908#019e5890-c54b-42d0-b1d0-08cf0dd9da37 9/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingRandomOpsTest&test_method=test_node_operations

@oleiman oleiman changed the title [CORE-16225] 2/2: Cloud I/O Scheduler: Min-share [CORE-16225] 2/3: Cloud I/O Scheduler: Min-share May 25, 2026
@oleiman oleiman force-pushed the ct/core-16225/cloud-io-min-share-scheduler branch 2 times, most recently from d1d7b19 to f484b2e Compare May 25, 2026 23:39
@oleiman oleiman changed the title [CORE-16225] 2/3: Cloud I/O Scheduler: Min-share [CORE-16225] 2/3: Cloud I/O Scheduler: Reservation-based policy May 25, 2026
@oleiman oleiman force-pushed the ct/core-16225/cloud-io-min-share-scheduler branch 3 times, most recently from fe957fb to d6e0755 Compare May 26, 2026 16:35
@oleiman oleiman requested a review from andrwng May 26, 2026 22:26
@oleiman oleiman force-pushed the ct/core-16225/cloud-io-min-share-scheduler branch from d6e0755 to 271737a Compare May 27, 2026 21:24
@oleiman oleiman requested a review from Copilot May 27, 2026 23:44
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a per-shard cloud I/O admission scheduler framework and threads a new cloud_io::group_id label through key cloud read/write paths so the client pool can apply policy-driven gating (with a new reservation-based policy implementation added in cloud_io).

Changes:

  • Add cloud_io::scheduler + scheduler_types (policy_type, group_id, configs) and a full reservation_policy implementation with unit tests.
  • Integrate scheduler configuration into cloud_storage::configuration and pass it into cloud_storage_clients::client_pool, adding group-aware acquire overloads and tagging some L0/L1 operations with group_id.
  • Update cloud_io::remote_api (and test mocks) to accept group_id for download/upload/stream operations, and propagate the tag into client acquisition.

Reviewed changes

Copilot reviewed 36 out of 36 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/v/redpanda/application_services.cc Pass scheduler config into the cloud storage client pool construction.
src/v/cloud_topics/level_zero/tests/l0_object_size_distribution_test.cc Update mocks/EXPECT_CALLs for new group_id parameter.
src/v/cloud_topics/level_zero/reader/tests/mocks.h Update mock remote API signatures to include group_id.
src/v/cloud_topics/level_zero/reader/materialized_extent.cc Tag L0 downloads with group_id::consumer_fetch.
src/v/cloud_topics/level_zero/batcher/tests/remote_mock.h Update test remote mock APIs for group_id.
src/v/cloud_topics/level_zero/batcher/batcher.cc Tag L0 uploads with group_id::producer_upload.
src/v/cloud_topics/level_one/metastore/tests/garbage_collector_test.cc Update test I/O interface override to accept group_id.
src/v/cloud_topics/level_one/frontend_reader/level_one_reader.cc Tag L1 reads with group_id::consumer_fetch.
src/v/cloud_topics/level_one/common/file_io.h Extend io::read_object override to accept group_id.
src/v/cloud_topics/level_one/common/file_io.cc Plumb group_id into remote download stream calls.
src/v/cloud_topics/level_one/common/fake_io.h Extend fake I/O interface to accept group_id.
src/v/cloud_topics/level_one/common/fake_io.cc Accept (unused) group_id in fake implementation.
src/v/cloud_topics/level_one/common/BUILD Add scheduler_types dep for group id/config types.
src/v/cloud_topics/level_one/common/abstract_io.h Add group_id to I/O interface for read operations.
src/v/cloud_topics/level_one/common/abstract_io.cc Plumb group_id through read_object_as_iobuf.
src/v/cloud_storage/configuration.h Add scheduler config field to cloud storage configuration.
src/v/cloud_storage/configuration.cc Initialize scheduler config in S3/ABS config builders.
src/v/cloud_storage/BUILD Add scheduler_types dependency.
src/v/cloud_storage_clients/tests/client_pool_builder.h Allow tests to pass scheduler config into pool construction.
src/v/cloud_storage_clients/client_pool.h Add scheduler + group-aware acquire overloads; track scheduler waiters.
src/v/cloud_storage_clients/client_pool.cc Instantiate/stop scheduler; gate acquisitions by group_id; add peer admit+borrow helper.
src/v/cloud_storage_clients/BUILD Add scheduler libs as dependencies.
src/v/cloud_io/tests/scheduler_test.cc Add unit tests for default (passthrough) scheduler behavior.
src/v/cloud_io/tests/reservation_policy_test.cc Add extensive unit tests for reservation policy semantics.
src/v/cloud_io/tests/BUILD Register new scheduler/reservation policy test targets.
src/v/cloud_io/scheduler.h Define scheduler wrapper interface over policies.
src/v/cloud_io/scheduler.cc Implement scheduler wrapper and passthrough policy selection (reservation currently throws).
src/v/cloud_io/scheduler_types.h Define policy/group IDs and scheduler config types.
src/v/cloud_io/scheduler_policy.h Define the scheduler policy abstract base class.
src/v/cloud_io/reservation_policy.h Add reservation-based admission policy API and documentation.
src/v/cloud_io/reservation_policy.cc Implement reservation-based admission policy logic.
src/v/cloud_io/reservation_policy_types.h Define reservation policy internal state types and invariants.
src/v/cloud_io/remote.h Extend remote API methods to accept group_id.
src/v/cloud_io/remote.cc Pass group_id through to client_pool acquisition for download/upload paths.
src/v/cloud_io/remote_api.h Extend remote API interface for group-tagged operations.
src/v/cloud_io/BUILD Add new scheduler/reservation policy libraries and deps.

Comment on lines +45 to +52
std::unique_ptr<scheduler_policy>
scheduler::make_policy(size_t capacity, scheduler_config cfg) {
switch (cfg.policy) {
case policy_type::passthrough:
return std::make_unique<passthrough>(capacity);
case policy_type::reservation:
throw std::runtime_error("reservation_policy: not implemented");
}
Comment on lines 325 to 332
if (likely(!_idle_clients.empty())) {
co_await _sched->admit(gid, as);
if (_idle_clients.empty()) {
_sched->release(gid);
continue;
}
client = pop_most_recently_used();
} else if (
Comment on lines +255 to +285
void reservation_policy::set_target_reserved(group_id g, size_t value) {
auto& gs = _groups[g];
// Reconcile the reservation lane to reflect the new target. Compute
// the delta against current_reserved() (the derived current size) so
// that any reclamation or refill since the last call are accounted
// for; the lane may have ebbed and flowed between calls.
const size_t cur = gs.current_reserved();
if (value > cur) {
const size_t delta = value - cur;
vassert(
_shared.current() >= delta,
"set_target_reserved({}, {}): would underflow _shared "
"(current={}, delta={})",
to_string_view(g),
value,
_shared.current(),
delta);
_shared.consume(delta);
gs.reserved_sem.signal(delta);
} else if (value < cur) {
const size_t delta = cur - value;
vassert(
gs.reserved_sem.current() >= delta,
"set_target_reserved({}, {}): would underflow reserved_sem "
"(current={}, in_flight={}, delta={})",
to_string_view(g),
value,
gs.reserved_sem.current(),
gs.reserved_in_flight,
delta);
gs.reserved_sem.consume(delta);
@oleiman oleiman force-pushed the ct/core-16225/cloud-io-min-share-scheduler branch from 271737a to 63e36f7 Compare May 28, 2026 18:10
@oleiman oleiman added claude-review Adding this label to a PR will trigger a workflow to review the code using claude. and removed claude-review Adding this label to a PR will trigger a workflow to review the code using claude. labels May 28, 2026
@oleiman oleiman force-pushed the ct/core-16225/cloud-io-min-share-scheduler branch 2 times, most recently from 3e838c1 to eea2b1d Compare May 29, 2026 03:51
@oleiman oleiman force-pushed the ct/core-16225/cloud-io-min-share-scheduler branch 4 times, most recently from e2ee996 to 167deb1 Compare June 1, 2026 05:53
oleiman added 3 commits May 31, 2026 22:56
Add reservation_policy_types.h with the per-group runtime state vocabulary
used by the admission scheduler:

- reservation_waiter: an intrusive list hook and a monotonic
  sequence number for FIFO ordering across groups.
- reservation_group_state: holds the reservation semaphore,
  target_reserved, in-flight and reserved-in-flight counters,
  queued waiters, admit totals, and inactive_since. Encapsulates
  the predicates and state mutations composed by the policy.
- default_dwell_duration: how long an idle group keeps its
  reservation lane before reclaim returns those slots to the
  common pool.

Header-only.

Also adds runtime config for reservation policy to pre-existing scheduler_config
in scheduler_types.h.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Add the reservation-based admission policy. Each group has a reservation lane
backed by a dedicated semaphore. Slots that aren't in a lane live in a common
pool that any group can claim.

Mechanics:

- Admit: try the group's reservation lane first, then the common
  pool. If both are empty, queue the caller on the group's
  intrusive waiter list.
- Release: a reserved-lane in-flight slot routes back into the
  lane (to a queued waiter or to the lane semaphore). A
  common-pool in-flight slot first tries to dispatch a queued
  waiter, then to refill an under-target lane, and finally falls
  back to the common pool.
- Dispatch: prefer the oldest waiter from a group below its
  target_reserved (under-target preference). Otherwise pick the
  global oldest waiter.
- Reclaim and refill: capacity moves between a group's
  reservation lane and the common pool. An idle group's lane is
  reclaimed when its dwell window expires. A common-pool release
  with no queued waiter refills the most under-target group up
  to its target_reserved.

Also splits scheduler_policy into its own Bazel target so reservation_policy
depends on just the abstract interface.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Tests covering admit/release accounting, queueing and dispatch, abort handling,
reservation lane isolation, and the reclaim and refill cycle.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
@oleiman oleiman force-pushed the ct/core-16225/cloud-io-min-share-scheduler branch from 167deb1 to cba3744 Compare June 1, 2026 05:56
@oleiman oleiman requested a review from Copilot June 1, 2026 06:57
@oleiman oleiman added claude-review Adding this label to a PR will trigger a workflow to review the code using claude. and removed claude-review Adding this label to a PR will trigger a workflow to review the code using claude. labels Jun 1, 2026
Comment on lines +181 to +189

// Common-pool release: try to dispatch a queued waiter, then
// refill a reservation lane, then fall back to the common pool.
if (dispatch_next()) {
return;
}
if (const auto target = pick_refill_candidate(); target.has_value()) {
_groups[*target].grant_reserved_slot();
} else {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lifetime concern: stop() synchronously cancels waiters and returns ss::now(), but it doesn't wait for the suspended admit() coroutines to actually unwind. There is a window between cancel_waiter() setting the exception on w.p and the coroutine resuming/destroying its sub (the abort_source subscription). During that window the subscription is still active and captures &gs, this, and &w by reference.

If the policy is destroyed in that window (e.g. caller does co_await scheduler.stop() then drops the scheduler before the reactor cycles back to the queued continuations) and the caller subsequently fires as.request_abort() from a still-alive abort_source, the subscription lambda dereferences a dangling gs/this (it also calls _now_fn() on the dead policy).

The destructor only asserts gs.waiters.empty(), which passes after cancel — it doesn't catch coroutines that have been unlinked but not yet resumed.

Consider gating outstanding admit() invocations behind an ss::gate (or tracking an in-flight admit counter) and having stop() await that gate closing, so the policy is guaranteed to outlive every active subscription. Even just documenting the contract on stop() / the destructor would help — right now the safety requirement is implicit.

Comment on lines +226 to +235
if (_dispatch_counter++ % 1000 == 0) {
vlog(
log.debug,
"reservation_policy: dispatch #{} picked={} | {}",
_dispatch_counter,
to_string_view(gs.id),
fmt::join(_groups, " "));
}

return true;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: the [&w, &gs, this](...) noexcept lambda calls _now_fn(), which is a std::function<...> and not guaranteed non-throwing. The default in the constructor (ss::lowres_clock::now) is fine, but if a test ever installs a throwing now_fn via set_now_fn_for_test, this lambda will std::terminate. Probably fine for now (no caller does this), but worth either documenting the non-throw expectation on now_fn_t or wrapping the call.

case policy_type::passthrough:
return std::make_unique<passthrough>(capacity);
case policy_type::reservation:
throw std::runtime_error("reservation_policy: not implemented");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reservation enumerator is now publicly exposed in policy_type, but selecting it via cluster config will trip this runtime_error at scheduler construction. Understood that wiring lands in the 3/3 PR, but until then this is a footgun — anyone who flips the config will crash on startup. Consider either (a) holding off on adding the enumerator until the wiring lands, or (b) replacing throw with vassert(false, ...) so the failure is obviously an unimplemented-code path rather than a runtime configuration error.


void reservation_policy::set_now_fn_for_test(now_fn_t fn) {
_now_fn = std::move(fn);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosmetic: if (_dispatch_counter++ % 1000 == 0) fires on counter values 0, 1000, 2000, ... The log message then prints the post-increment value (1, 1001, 2001, ...), which is slightly off-by-one in the human-readable text. Pre-incrementing or reading the counter separately would make "dispatch #N" match a multiple of 1000.

@claude
Copy link
Copy Markdown

claude Bot commented Jun 1, 2026

Code Review

Nice, well-documented introduction of the reservation policy. The header comments do a great job of laying out the dual-pool model and the refill/reclaim cycle, and the test suite covers the interesting state transitions (saturation, abort, cross-group dispatch, reclaim-then-refill, dispatch-vs-refill ordering, FIFO across under-target groups). Below are the higher-level concerns; line-specific notes are inline.

Correctness / safety

  • stop() lifetime (inline on reservation_policy.cc) — The biggest concern. stop() cancels waiters synchronously and returns ss::now(), but the suspended admit() coroutines aren't guaranteed to have unwound by the time stop() resolves. Their abort_source subscriptions still capture &gs/this and will dangle if the policy is destroyed before the reactor runs the scheduled continuations and the caller subsequently fires request_abort(). Recommend an ss::gate (or counter) that stop() awaits, or at minimum a documented contract on the destructor/stop().
  • make_policy throws for reservation (inline) — The enum is publicly exposed but make_policy throws. Setting policy = reservation via config crashes startup. vassert(false, "not yet implemented") would make the staged-delivery intent clearer.
  • set_target_reserved is documented as unsafe under live traffic when shrinking (header comment is good). Just make sure the eventual config-driven path in 3/3 deals with this — the assertion will abort the process otherwise.

Minor

  • The subscription lambda calls _now_fn() from a noexcept context (inline). Default impl is fine; document the non-throw expectation on now_fn_t.
  • The dispatch-counter log uses the post-increment value, so it prints "dispatch #1" on the first hit (inline).
  • refill_priority_ratio does current_reserved() * 1000 / target_reserved; not a realistic overflow at current capacities, but a one-liner comment noting the assumption (or using a smaller scale / float) would harden it against future growth.
  • reservation_policy.h includes <memory> but I don't see a std::unique_ptr/shared_ptr use in the header itself. Probably removable.
  • reservation_policy(size_t capacity, ...) initializes _shared(0, …) then _shared.signal(capacity). If ssx::semaphore accepts an initial count, doing _shared(capacity, …) is one fewer step (and one fewer way to forget the signal in a future edit).
  • waiter_count() is O(n) on intrusive_list::size() — already documented; tests using it are fine, but worth keeping it out of any hot path (it isn't used in one today).

Design observations (not blockers)

  • A never-active group (inactive_since == nullopt) drains its pre-allocated lane on the first reclaim sweep by any other group. The header comment on reservation_group_state is explicit about this, and it's the right call (otherwise capacity is wasted forever waiting for a group that may never show up) — just confirming the trade-off is intentional: the startup "warm lane so the first admit doesn't pay refill latency" benefit only applies when the owning group goes first.
  • Tie-breaking in pick_refill_candidate and dispatch_next falls back to enum iteration order (producer_upload before consumer_fetch before default_group). The "DispatchAndRefillAcrossUnderTargetGroups" test relies on this — fine, but a future enum reordering would silently change that test's expected outcome.

Tests

Coverage looks good for single-shard behavior. Things I didn't see covered (possibly out of scope for this PR):

  • A test for stop() itself — that queued waiters are aborted with abort_requested_exception and the destructor passes afterward.
  • An abort that races with dispatch (e.g. waiter dispatched in the same task as request_abort()) — the cancel_waiter is-linked guard should already handle it, but a regression test would pin the behavior.

Overall this looks solid for a 2/3 staging PR; the stop() lifetime question is the main thing I'd want addressed before this can be relied on by the wiring in 30596.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Comment on lines 46 to 53
std::unique_ptr<scheduler_policy>
scheduler::make_policy(size_t capacity, scheduler_config cfg) {
switch (cfg.policy) {
case policy_type::passthrough:
return std::make_unique<passthrough>(capacity);
case policy_type::reservation:
throw std::runtime_error("reservation_policy: not implemented");
}
Comment on lines +73 to +77
/// Reservations ebb and flow with demand: an active under-target group's
/// lane refills toward target_reserved; an idle past-dwell group's lane
/// is reclaimed to zero. set_target_reserved at startup pre-allocates the
/// lane so the first admits don't pay refill latency; after that the
/// cycle is purely demand-driven.
Comment on lines +19 to +21
#include <algorithm>
#include <limits>
#include <utility>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/build area/redpanda claude-review Adding this label to a PR will trigger a workflow to review the code using claude.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants