-
Notifications
You must be signed in to change notification settings - Fork 707
ct: Compute data threshold across shards #29061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
ct: Compute data threshold across shards #29061
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR optimizes the write request scheduler by replacing task queue-based byte counting with atomic counters for cross-shard data threshold evaluation. The scheduler now polls more frequently (20ms intervals) and can trigger early uploads when sufficient data is accumulated, reducing latency compared to waiting for the full scheduling interval.
Key Changes:
- Atomic counters replace synchronous cross-shard queries for byte counting
- Time-based fallback now checks data thresholds before waiting for deadline
- Frequent polling (20ms) enables early upload triggers when data threshold is met
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| write_request_scheduler.h | Added atomic counter references vector and modified apply_time_based_fallback signature to track last upload time |
| write_request_scheduler.cc | Implemented atomic counter collection at startup, modified fallback logic to check data threshold before deadline, and reduced polling interval to 20ms |
| write_pipeline.h | Converted _stage_bytes from plain array to cache-aligned atomic counters and added stage_bytes_ref() accessor |
| write_pipeline.cc | Updated byte access methods to work with atomic counters using relaxed memory ordering |
| write_request_scheduler_bench.cc | Updated benchmark test to pass required last_upload timestamp parameter |
| co_await ss::sleep_abortable(_scheduling_interval(), _as); | ||
| co_await apply_time_based_fallback(); | ||
| co_await ss::sleep_abortable( | ||
| std::chrono::milliseconds(20) /*TODO: use constant*/, _as); |
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded polling interval of 20ms should be extracted to a named constant or configuration parameter. This magic number appears inline with a TODO comment, suggesting it was intended to be refactored. Consider adding it as a static constexpr member or configuration binding.
src/v/cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.cc
Outdated
Show resolved
Hide resolved
|
For the failing test, I tried to get it working by advancing the manual clock, but something else seems to be going on. Apply this patch and you'll get a convenient crash |
9cd4826 to
d1eb058
Compare
| std::mutex mutex; | ||
|
|
||
| // Id of the shard which is preparing to upload L0 | ||
| std::optional<ss::shard_id> in_flight{std::nullopt}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used
d1eb058 to
a104e46
Compare
e799915 to
e3e9780
Compare
Retry command for Build#78359please wait until all jobs are finished before running the slash command |
|
/ci-repeat 1 |
Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
it's known that there is some data left Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Use padded atomic counter to track per-stage work. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Currently, the write_request_scheduler uses 'map' method to collect information from every shard. This is done on shard zero. The tasks are submitted to every shard's queue and then awaited. This operation has relatively high latency. This commit changes this algorithm a bit. In the start method the scheduler collects references to counters of all stages. The counters are atomic. Instead of using map the time-based-fallback policy just collects counter values directly. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
This commit introduces a radical change to the write request scheduler. Instead of triggering L0 uploads via two different policies the scheduler now uses one policy. It's implemented as a background fiber that wakes up frequently and checks the amount of data on every shard. If total amount of data is larger than the limit or the timeout is reached and the shard has the most data across all shards the upload is triggered. This change eliminates the coordinator shard. The time of the last upload is tracked via atomic variable. The conficts are handled via the standard mutex (no actual blocking is used). There is no difference between shards when it comes to scheduling. The change also eliminates extra x-shard hop. Previously the coordinator shard would submit task to the target shard and then the target shard would pull write requests from all shards. Now every shard can decide to initiate the upload. When this happens the shard just pulls requests from all shards. Synchronization: - there is a x-shard context shared between all shards. This context object contains a set of atomic counters that track the amount of pending work (in bytes). - the context also tracks the time of the last upload so all shards can agree on this. - finally, the context stores the mutex which is used to avoid races. The mutex is locked in order to pull requests from other shards. While the requests are pulled no other shard can trigger new L0 upload. This doesn't prevent us from uploading L0 objects in parallel in the future. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
to reflect changes in the write_request_scheduler Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
e3e9780 to
dea806d
Compare
Retry command for Build#78533please wait until all jobs are finished before running the slash command |
|
@Lazin did you triage the test failures? |
Use atomic counters to avoid going through the task queue to evaluate the amount of outstanding bytes. Do the evaluation frequently (20ms period) and trigger x-shard upload once we have enough data.
Backports Required
Release Notes