[CORE-16225] 2/3: Cloud I/O Scheduler: Reservation-based policy#30595
[CORE-16225] 2/3: Cloud I/O Scheduler: Reservation-based policy#30595oleiman wants to merge 3 commits into
Conversation
|
/ci-repeat 1 |
|
/ci-repeat 1 |
Retry command for Build#84908please wait until all jobs are finished before running the slash command |
d1d7b19 to
f484b2e
Compare
fe957fb to
d6e0755
Compare
d6e0755 to
271737a
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a per-shard cloud I/O admission scheduler framework and threads a new cloud_io::group_id label through key cloud read/write paths so the client pool can apply policy-driven gating (with a new reservation-based policy implementation added in cloud_io).
Changes:
- Add
cloud_io::scheduler+scheduler_types(policy_type,group_id, configs) and a fullreservation_policyimplementation with unit tests. - Integrate scheduler configuration into
cloud_storage::configurationand pass it intocloud_storage_clients::client_pool, adding group-aware acquire overloads and tagging some L0/L1 operations withgroup_id. - Update
cloud_io::remote_api(and test mocks) to acceptgroup_idfor download/upload/stream operations, and propagate the tag into client acquisition.
Reviewed changes
Copilot reviewed 36 out of 36 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/redpanda/application_services.cc | Pass scheduler config into the cloud storage client pool construction. |
| src/v/cloud_topics/level_zero/tests/l0_object_size_distribution_test.cc | Update mocks/EXPECT_CALLs for new group_id parameter. |
| src/v/cloud_topics/level_zero/reader/tests/mocks.h | Update mock remote API signatures to include group_id. |
| src/v/cloud_topics/level_zero/reader/materialized_extent.cc | Tag L0 downloads with group_id::consumer_fetch. |
| src/v/cloud_topics/level_zero/batcher/tests/remote_mock.h | Update test remote mock APIs for group_id. |
| src/v/cloud_topics/level_zero/batcher/batcher.cc | Tag L0 uploads with group_id::producer_upload. |
| src/v/cloud_topics/level_one/metastore/tests/garbage_collector_test.cc | Update test I/O interface override to accept group_id. |
| src/v/cloud_topics/level_one/frontend_reader/level_one_reader.cc | Tag L1 reads with group_id::consumer_fetch. |
| src/v/cloud_topics/level_one/common/file_io.h | Extend io::read_object override to accept group_id. |
| src/v/cloud_topics/level_one/common/file_io.cc | Plumb group_id into remote download stream calls. |
| src/v/cloud_topics/level_one/common/fake_io.h | Extend fake I/O interface to accept group_id. |
| src/v/cloud_topics/level_one/common/fake_io.cc | Accept (unused) group_id in fake implementation. |
| src/v/cloud_topics/level_one/common/BUILD | Add scheduler_types dep for group id/config types. |
| src/v/cloud_topics/level_one/common/abstract_io.h | Add group_id to I/O interface for read operations. |
| src/v/cloud_topics/level_one/common/abstract_io.cc | Plumb group_id through read_object_as_iobuf. |
| src/v/cloud_storage/configuration.h | Add scheduler config field to cloud storage configuration. |
| src/v/cloud_storage/configuration.cc | Initialize scheduler config in S3/ABS config builders. |
| src/v/cloud_storage/BUILD | Add scheduler_types dependency. |
| src/v/cloud_storage_clients/tests/client_pool_builder.h | Allow tests to pass scheduler config into pool construction. |
| src/v/cloud_storage_clients/client_pool.h | Add scheduler + group-aware acquire overloads; track scheduler waiters. |
| src/v/cloud_storage_clients/client_pool.cc | Instantiate/stop scheduler; gate acquisitions by group_id; add peer admit+borrow helper. |
| src/v/cloud_storage_clients/BUILD | Add scheduler libs as dependencies. |
| src/v/cloud_io/tests/scheduler_test.cc | Add unit tests for default (passthrough) scheduler behavior. |
| src/v/cloud_io/tests/reservation_policy_test.cc | Add extensive unit tests for reservation policy semantics. |
| src/v/cloud_io/tests/BUILD | Register new scheduler/reservation policy test targets. |
| src/v/cloud_io/scheduler.h | Define scheduler wrapper interface over policies. |
| src/v/cloud_io/scheduler.cc | Implement scheduler wrapper and passthrough policy selection (reservation currently throws). |
| src/v/cloud_io/scheduler_types.h | Define policy/group IDs and scheduler config types. |
| src/v/cloud_io/scheduler_policy.h | Define the scheduler policy abstract base class. |
| src/v/cloud_io/reservation_policy.h | Add reservation-based admission policy API and documentation. |
| src/v/cloud_io/reservation_policy.cc | Implement reservation-based admission policy logic. |
| src/v/cloud_io/reservation_policy_types.h | Define reservation policy internal state types and invariants. |
| src/v/cloud_io/remote.h | Extend remote API methods to accept group_id. |
| src/v/cloud_io/remote.cc | Pass group_id through to client_pool acquisition for download/upload paths. |
| src/v/cloud_io/remote_api.h | Extend remote API interface for group-tagged operations. |
| src/v/cloud_io/BUILD | Add new scheduler/reservation policy libraries and deps. |
| std::unique_ptr<scheduler_policy> | ||
| scheduler::make_policy(size_t capacity, scheduler_config cfg) { | ||
| switch (cfg.policy) { | ||
| case policy_type::passthrough: | ||
| return std::make_unique<passthrough>(capacity); | ||
| case policy_type::reservation: | ||
| throw std::runtime_error("reservation_policy: not implemented"); | ||
| } |
| if (likely(!_idle_clients.empty())) { | ||
| co_await _sched->admit(gid, as); | ||
| if (_idle_clients.empty()) { | ||
| _sched->release(gid); | ||
| continue; | ||
| } | ||
| client = pop_most_recently_used(); | ||
| } else if ( |
| void reservation_policy::set_target_reserved(group_id g, size_t value) { | ||
| auto& gs = _groups[g]; | ||
| // Reconcile the reservation lane to reflect the new target. Compute | ||
| // the delta against current_reserved() (the derived current size) so | ||
| // that any reclamation or refill since the last call are accounted | ||
| // for; the lane may have ebbed and flowed between calls. | ||
| const size_t cur = gs.current_reserved(); | ||
| if (value > cur) { | ||
| const size_t delta = value - cur; | ||
| vassert( | ||
| _shared.current() >= delta, | ||
| "set_target_reserved({}, {}): would underflow _shared " | ||
| "(current={}, delta={})", | ||
| to_string_view(g), | ||
| value, | ||
| _shared.current(), | ||
| delta); | ||
| _shared.consume(delta); | ||
| gs.reserved_sem.signal(delta); | ||
| } else if (value < cur) { | ||
| const size_t delta = cur - value; | ||
| vassert( | ||
| gs.reserved_sem.current() >= delta, | ||
| "set_target_reserved({}, {}): would underflow reserved_sem " | ||
| "(current={}, in_flight={}, delta={})", | ||
| to_string_view(g), | ||
| value, | ||
| gs.reserved_sem.current(), | ||
| gs.reserved_in_flight, | ||
| delta); | ||
| gs.reserved_sem.consume(delta); |
271737a to
63e36f7
Compare
3e838c1 to
eea2b1d
Compare
e2ee996 to
167deb1
Compare
Add reservation_policy_types.h with the per-group runtime state vocabulary used by the admission scheduler: - reservation_waiter: an intrusive list hook and a monotonic sequence number for FIFO ordering across groups. - reservation_group_state: holds the reservation semaphore, target_reserved, in-flight and reserved-in-flight counters, queued waiters, admit totals, and inactive_since. Encapsulates the predicates and state mutations composed by the policy. - default_dwell_duration: how long an idle group keeps its reservation lane before reclaim returns those slots to the common pool. Header-only. Also adds runtime config for reservation policy to pre-existing scheduler_config in scheduler_types.h. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Add the reservation-based admission policy. Each group has a reservation lane backed by a dedicated semaphore. Slots that aren't in a lane live in a common pool that any group can claim. Mechanics: - Admit: try the group's reservation lane first, then the common pool. If both are empty, queue the caller on the group's intrusive waiter list. - Release: a reserved-lane in-flight slot routes back into the lane (to a queued waiter or to the lane semaphore). A common-pool in-flight slot first tries to dispatch a queued waiter, then to refill an under-target lane, and finally falls back to the common pool. - Dispatch: prefer the oldest waiter from a group below its target_reserved (under-target preference). Otherwise pick the global oldest waiter. - Reclaim and refill: capacity moves between a group's reservation lane and the common pool. An idle group's lane is reclaimed when its dwell window expires. A common-pool release with no queued waiter refills the most under-target group up to its target_reserved. Also splits scheduler_policy into its own Bazel target so reservation_policy depends on just the abstract interface. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Tests covering admit/release accounting, queueing and dispatch, abort handling, reservation lane isolation, and the reclaim and refill cycle. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
167deb1 to
cba3744
Compare
|
|
||
| // Common-pool release: try to dispatch a queued waiter, then | ||
| // refill a reservation lane, then fall back to the common pool. | ||
| if (dispatch_next()) { | ||
| return; | ||
| } | ||
| if (const auto target = pick_refill_candidate(); target.has_value()) { | ||
| _groups[*target].grant_reserved_slot(); | ||
| } else { |
There was a problem hiding this comment.
Lifetime concern: stop() synchronously cancels waiters and returns ss::now(), but it doesn't wait for the suspended admit() coroutines to actually unwind. There is a window between cancel_waiter() setting the exception on w.p and the coroutine resuming/destroying its sub (the abort_source subscription). During that window the subscription is still active and captures &gs, this, and &w by reference.
If the policy is destroyed in that window (e.g. caller does co_await scheduler.stop() then drops the scheduler before the reactor cycles back to the queued continuations) and the caller subsequently fires as.request_abort() from a still-alive abort_source, the subscription lambda dereferences a dangling gs/this (it also calls _now_fn() on the dead policy).
The destructor only asserts gs.waiters.empty(), which passes after cancel — it doesn't catch coroutines that have been unlinked but not yet resumed.
Consider gating outstanding admit() invocations behind an ss::gate (or tracking an in-flight admit counter) and having stop() await that gate closing, so the policy is guaranteed to outlive every active subscription. Even just documenting the contract on stop() / the destructor would help — right now the safety requirement is implicit.
| if (_dispatch_counter++ % 1000 == 0) { | ||
| vlog( | ||
| log.debug, | ||
| "reservation_policy: dispatch #{} picked={} | {}", | ||
| _dispatch_counter, | ||
| to_string_view(gs.id), | ||
| fmt::join(_groups, " ")); | ||
| } | ||
|
|
||
| return true; |
There was a problem hiding this comment.
Minor: the [&w, &gs, this](...) noexcept lambda calls _now_fn(), which is a std::function<...> and not guaranteed non-throwing. The default in the constructor (ss::lowres_clock::now) is fine, but if a test ever installs a throwing now_fn via set_now_fn_for_test, this lambda will std::terminate. Probably fine for now (no caller does this), but worth either documenting the non-throw expectation on now_fn_t or wrapping the call.
| case policy_type::passthrough: | ||
| return std::make_unique<passthrough>(capacity); | ||
| case policy_type::reservation: | ||
| throw std::runtime_error("reservation_policy: not implemented"); |
There was a problem hiding this comment.
The reservation enumerator is now publicly exposed in policy_type, but selecting it via cluster config will trip this runtime_error at scheduler construction. Understood that wiring lands in the 3/3 PR, but until then this is a footgun — anyone who flips the config will crash on startup. Consider either (a) holding off on adding the enumerator until the wiring lands, or (b) replacing throw with vassert(false, ...) so the failure is obviously an unimplemented-code path rather than a runtime configuration error.
|
|
||
| void reservation_policy::set_now_fn_for_test(now_fn_t fn) { | ||
| _now_fn = std::move(fn); | ||
| } |
There was a problem hiding this comment.
Cosmetic: if (_dispatch_counter++ % 1000 == 0) fires on counter values 0, 1000, 2000, ... The log message then prints the post-increment value (1, 1001, 2001, ...), which is slightly off-by-one in the human-readable text. Pre-incrementing or reading the counter separately would make "dispatch #N" match a multiple of 1000.
Code ReviewNice, well-documented introduction of the reservation policy. The header comments do a great job of laying out the dual-pool model and the refill/reclaim cycle, and the test suite covers the interesting state transitions (saturation, abort, cross-group dispatch, reclaim-then-refill, dispatch-vs-refill ordering, FIFO across under-target groups). Below are the higher-level concerns; line-specific notes are inline. Correctness / safety
Minor
Design observations (not blockers)
TestsCoverage looks good for single-shard behavior. Things I didn't see covered (possibly out of scope for this PR):
Overall this looks solid for a 2/3 staging PR; the |
| std::unique_ptr<scheduler_policy> | ||
| scheduler::make_policy(size_t capacity, scheduler_config cfg) { | ||
| switch (cfg.policy) { | ||
| case policy_type::passthrough: | ||
| return std::make_unique<passthrough>(capacity); | ||
| case policy_type::reservation: | ||
| throw std::runtime_error("reservation_policy: not implemented"); | ||
| } |
| /// Reservations ebb and flow with demand: an active under-target group's | ||
| /// lane refills toward target_reserved; an idle past-dwell group's lane | ||
| /// is reclaimed to zero. set_target_reserved at startup pre-allocates the | ||
| /// lane so the first admits don't pay refill latency; after that the | ||
| /// cycle is purely demand-driven. |
| #include <algorithm> | ||
| #include <limits> | ||
| #include <utility> |
Sequence:
Backports Required
Release Notes