Skip to content

Conversation

@Lazin
Copy link
Contributor

@Lazin Lazin commented Dec 18, 2025

Use atomic counters to avoid going through the task queue to evaluate the amount of outstanding bytes. Do the evaluation frequently (20ms period) and trigger x-shard upload once we have enough data.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.3.x
  • v25.2.x
  • v25.1.x

Release Notes

  • none

Copilot AI review requested due to automatic review settings December 18, 2025 20:31
@Lazin Lazin marked this pull request as draft December 18, 2025 20:31
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes the write request scheduler by replacing task queue-based byte counting with atomic counters for cross-shard data threshold evaluation. The scheduler now polls more frequently (20ms intervals) and can trigger early uploads when sufficient data is accumulated, reducing latency compared to waiting for the full scheduling interval.

Key Changes:

  • Atomic counters replace synchronous cross-shard queries for byte counting
  • Time-based fallback now checks data thresholds before waiting for deadline
  • Frequent polling (20ms) enables early upload triggers when data threshold is met

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
write_request_scheduler.h Added atomic counter references vector and modified apply_time_based_fallback signature to track last upload time
write_request_scheduler.cc Implemented atomic counter collection at startup, modified fallback logic to check data threshold before deadline, and reduced polling interval to 20ms
write_pipeline.h Converted _stage_bytes from plain array to cache-aligned atomic counters and added stage_bytes_ref() accessor
write_pipeline.cc Updated byte access methods to work with atomic counters using relaxed memory ordering
write_request_scheduler_bench.cc Updated benchmark test to pass required last_upload timestamp parameter

co_await ss::sleep_abortable(_scheduling_interval(), _as);
co_await apply_time_based_fallback();
co_await ss::sleep_abortable(
std::chrono::milliseconds(20) /*TODO: use constant*/, _as);
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded polling interval of 20ms should be extracted to a named constant or configuration parameter. This magic number appears inline with a TODO comment, suggesting it was intended to be refactored. Consider adding it as a static constexpr member or configuration binding.

Copilot uses AI. Check for mistakes.
@dotnwat
Copy link
Member

dotnwat commented Dec 19, 2025

For the failing test, I tried to get it working by advancing the manual clock, but something else seems to be going on. Apply this patch and you'll get a convenient crash

diff --git a/src/v/cloud_topics/level_zero/tests/l0_object_size_distribution_test.cc b/src/v/cloud_topics/level_zero/tests/l0_object_size_distribution_test.cc
index b203a1c6ae..ee5db8f98c 100644
--- a/src/v/cloud_topics/level_zero/tests/l0_object_size_distribution_test.cc
+++ b/src/v/cloud_topics/level_zero/tests/l0_object_size_distribution_test.cc
@@ -224,17 +224,23 @@ TEST_F(L0ObjectSizeDistFixture, ThreeToOne) {
     }

     // upload the built batches from each core
-    pipeline
-      .invoke_on_all([&](auto& p) {
-          return seastar::async([&] {
-              auto data = std::move(batches[seastar::this_shard_id()]);
-              p.write_and_debounce(
-                 test_ntp0, min_epoch, std::move(data), deadline)
-                .discard_result()
-                .get();
-          });
-      })
-      .get();
+    auto f = pipeline.invoke_on_all([&](auto& p) {
+        return seastar::async([&] {
+            auto data = std::move(batches[seastar::this_shard_id()]);
+            p.write_and_debounce(
+               test_ntp0, min_epoch, std::move(data), deadline)
+              .discard_result()
+              .get();
+        });
+    });
+
+    while (true) {
+        ss::manual_clock::advance(10ms);
+        seastar::yield().get();
+        if (f.available()) {
+            break;
+        }
+    }

@Lazin Lazin force-pushed the ct/compute-data-threshold-across-shards branch from 9cd4826 to d1eb058 Compare December 23, 2025 00:35
std::mutex mutex;

// Id of the shard which is preparing to upload L0
std::optional<ss::shard_id> in_flight{std::nullopt};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used

@Lazin Lazin force-pushed the ct/compute-data-threshold-across-shards branch from d1eb058 to a104e46 Compare December 23, 2025 17:49
@Lazin Lazin requested review from dotnwat and wdberkeley December 23, 2025 17:49
@Lazin Lazin marked this pull request as ready for review December 23, 2025 17:50
@Lazin Lazin force-pushed the ct/compute-data-threshold-across-shards branch 2 times, most recently from e799915 to e3e9780 Compare December 23, 2025 19:00
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Dec 23, 2025

Retry command for Build#78359

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":false,"new_bootstrap":true}
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":false,"new_bootstrap":false}
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":true,"new_bootstrap":false}
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":true,"new_bootstrap":true}

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Dec 23, 2025

CI test results

test results on build#78359
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
NodesDecommissioningTest test_node_is_not_allowed_to_join_after_restart {"cloud_topic": false, "new_bootstrap": false} integration https://buildkite.com/redpanda/redpanda/builds/78359#019b4cb1-3ebf-41ec-b8d8-d550956ea85e FLAKY 1/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_node_is_not_allowed_to_join_after_restart
NodesDecommissioningTest test_node_is_not_allowed_to_join_after_restart {"cloud_topic": true, "new_bootstrap": false} integration https://buildkite.com/redpanda/redpanda/builds/78359#019b4cb1-3ec1-4a76-86db-0aa4d10c3472 FAIL 0/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_node_is_not_allowed_to_join_after_restart
NodesDecommissioningTest test_node_is_not_allowed_to_join_after_restart {"cloud_topic": false, "new_bootstrap": true} integration https://buildkite.com/redpanda/redpanda/builds/78359#019b4cb1-3eb4-4cc1-b0c2-37e487957e08 FLAKY 2/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_node_is_not_allowed_to_join_after_restart
NodesDecommissioningTest test_node_is_not_allowed_to_join_after_restart {"cloud_topic": false, "new_bootstrap": true} integration https://buildkite.com/redpanda/redpanda/builds/78359#019b4d02-615f-4b26-8777-a2573b714fec FLAKY 2/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_node_is_not_allowed_to_join_after_restart
NodesDecommissioningTest test_node_is_not_allowed_to_join_after_restart {"cloud_topic": true, "new_bootstrap": true} integration https://buildkite.com/redpanda/redpanda/builds/78359#019b4cb1-3eb6-43cd-8c6b-7ff7183a3a52 FAIL 0/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_node_is_not_allowed_to_join_after_restart
NodesDecommissioningTest test_node_is_not_allowed_to_join_after_restart {"cloud_topic": true, "new_bootstrap": true} integration https://buildkite.com/redpanda/redpanda/builds/78359#019b4cb3-16e4-49ea-af64-924c0faa8f29 FLAKY 1/5 https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_node_is_not_allowed_to_join_after_restart
RedpandaOIDCTest test_admin_invalidate_keys null integration https://buildkite.com/redpanda/redpanda/builds/78359#019b4cb3-16ec-416d-8526-8b7982fa7149 FLAKY 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0000, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=RedpandaOIDCTest&test_method=test_admin_invalidate_keys
test results on build#78533
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
PartitionMovementTest test_cross_core_movements {"cloud_storage_type": 1} integration https://buildkite.com/redpanda/redpanda/builds/78533#019b8ea6-d18d-4761-9ec3-5bb6ff27def1 FLAKY 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0000, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=PartitionMovementTest&test_method=test_cross_core_movements
NodesDecommissioningTest test_node_is_not_allowed_to_join_after_restart {"cloud_topic": false, "new_bootstrap": false} integration https://buildkite.com/redpanda/redpanda/builds/78533#019b8ea5-842d-4948-9f1c-faaf697b9d3b FAIL 0/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_node_is_not_allowed_to_join_after_restart
NodesDecommissioningTest test_node_is_not_allowed_to_join_after_restart {"cloud_topic": true, "new_bootstrap": false} integration https://buildkite.com/redpanda/redpanda/builds/78533#019b8ea5-8422-4cb4-96b7-461c96a7b8ee FAIL 0/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_node_is_not_allowed_to_join_after_restart
NodesDecommissioningTest test_node_is_not_allowed_to_join_after_restart {"cloud_topic": false, "new_bootstrap": true} integration https://buildkite.com/redpanda/redpanda/builds/78533#019b8ea5-8423-45b3-8c85-6a3d511adacf FAIL 0/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_node_is_not_allowed_to_join_after_restart
NodesDecommissioningTest test_node_is_not_allowed_to_join_after_restart {"cloud_topic": true, "new_bootstrap": true} integration https://buildkite.com/redpanda/redpanda/builds/78533#019b8ea5-8425-4848-a482-8a452500be65 FAIL 0/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_node_is_not_allowed_to_join_after_restart

@Lazin
Copy link
Contributor Author

Lazin commented Dec 24, 2025

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":false,"new_bootstrap":true}
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":false,"new_bootstrap":false}
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":true,"new_bootstrap":false}
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":true,"new_bootstrap":true}

Lazin added 7 commits January 5, 2026 09:14
Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
it's known that there is some data left

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Use padded atomic counter to track per-stage work.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Currently, the write_request_scheduler uses 'map' method to collect
information from every shard. This is done on shard zero. The tasks are
submitted to every shard's queue and then awaited. This operation has
relatively high latency.

This commit changes this algorithm a bit. In the start method the
scheduler collects references to counters of all stages. The counters
are atomic. Instead of using map the time-based-fallback policy just
collects counter values directly.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
This commit introduces a radical change to the write request scheduler.
Instead of triggering L0 uploads via two different policies the
scheduler now uses one policy. It's implemented as a background fiber
that wakes up frequently and checks the amount of data on every shard.
If total amount of data is larger than the limit or the timeout is
reached and the shard has the most data across all shards the upload is
triggered.

This change eliminates the coordinator shard. The time of the last
upload is tracked via atomic variable. The conficts are handled via the
standard mutex (no actual blocking is used). There is no difference
between shards when it comes to scheduling.

The change also eliminates extra x-shard hop.
Previously the coordinator shard would submit task to the target shard
and then the target shard would pull write requests from all shards. Now
every shard can decide to initiate the upload. When this happens the
shard just pulls requests from all shards.

Synchronization:
- there is a x-shard context shared between all shards. This context
  object contains a set of atomic counters that track the amount of
  pending work (in bytes).
- the context also tracks the time of the last upload so all shards can
  agree on this.
- finally, the context stores the mutex which is used to avoid races.

The mutex is locked in order to pull requests from other shards. While
the requests are pulled no other shard can trigger new L0 upload.
This doesn't prevent us from uploading L0 objects in parallel in the
future.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
to reflect changes in the write_request_scheduler

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
@Lazin Lazin force-pushed the ct/compute-data-threshold-across-shards branch from e3e9780 to dea806d Compare January 5, 2026 14:31
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Jan 5, 2026

Retry command for Build#78533

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":true,"new_bootstrap":false}
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":false,"new_bootstrap":false}
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":false,"new_bootstrap":true}
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_node_is_not_allowed_to_join_after_restart@{"cloud_topic":true,"new_bootstrap":true}

@dotnwat
Copy link
Member

dotnwat commented Jan 5, 2026

@Lazin did you triage the test failures?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants