From 4781c138fa5219b89f3f9970e4015d5c798d1ee4 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 21 Jun 2024 08:50:07 +0900 Subject: [PATCH] fix: #1851 Separate (un)setting throttling for inter/intra-broker rebalanacing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename: ReplicationThrottleHelper.clearThrottles → clearInterBrokerThrottles - Add ReplicationThrottleHelper.clearIntraBrokerThrottles - Executor.intraBrokerMoveReplicas now calls ReplicationThrottleHelper.setLogDirThrottles, ReplicationThrottleHelper.clearIntraBrokerThrottles --- .../cruisecontrol/executor/Executor.java | 24 ++++++++++---- .../executor/ReplicationThrottleHelper.java | 30 ++++++++++++++--- .../ReplicationThrottleHelperTest.java | 32 ++++++++++++------- 3 files changed, 63 insertions(+), 23 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index b9020e337..ff729109c 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -7,6 +7,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.AtomicDouble; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; import com.linkedin.kafka.cruisecontrol.common.TopicMinIsrCache; @@ -1630,10 +1631,6 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc throttleHelper.setReplicationThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), _replicationThrottle); } - if (_logDirThrottle != null) { - throttleHelper.setLogDirThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), - _logDirThrottle); - } // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); @@ -1655,8 +1652,8 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc .collect(Collectors.toList()); inProgressTasks.addAll(inExecutionTasks()); - if (_replicationThrottle != null || _logDirThrottle != null) { - throttleHelper.clearThrottles(completedTasks, inProgressTasks); + if (_replicationThrottle != null) { + throttleHelper.clearInterBrokerThrottles(completedTasks, inProgressTasks); } } @@ -1686,13 +1683,15 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc } } - private void intraBrokerMoveReplicas() { + private void intraBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException { + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient); int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingIntraBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); LOG.info("Starting {} intra-broker partition movements.", numTotalPartitionMovements); int partitionsToMove = numTotalPartitionMovements; + Set participatingBrokers = Sets.newHashSet(); // Exhaust all the pending partition movements. while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { // Get tasks to execute. @@ -1700,6 +1699,12 @@ private void intraBrokerMoveReplicas() { LOG.info("Executor will execute {} task(s)", tasksToExecute.size()); if (!tasksToExecute.isEmpty()) { + if (_logDirThrottle != null) { + participatingBrokers = throttleHelper.setLogDirThrottles( + tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), + _logDirThrottle + ); + } // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); executeIntraBrokerReplicaMovements(tasksToExecute, _adminClient, _executionTaskManager, _config); @@ -1724,6 +1729,11 @@ private void intraBrokerMoveReplicas() { waitForIntraBrokerReplicaTasksToFinish(); inExecutionTasks = inExecutionTasks(); } + + if (_logDirThrottle != null) { + throttleHelper.clearIntraBrokerThrottles(participatingBrokers); + } + if (inExecutionTasks().isEmpty()) { LOG.info("Intra-broker partition movements finished."); } else if (_stopSignal.get() != NO_STOP_EXECUTION) { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java index e7c3a1301..ea74019eb 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java @@ -89,13 +89,15 @@ void setReplicationThrottles(List replicaMovementProposals, l } } - void setLogDirThrottles(List replicaMovementProposals, long throttleRate) - throws ExecutionException, InterruptedException, TimeoutException { + Set setLogDirThrottles(List replicaMovementProposals, long throttleRate) + throws ExecutionException, InterruptedException, TimeoutException { LOG.info("Setting a log dir throttle of {} bytes/sec", throttleRate); Set participatingBrokers = getParticipatingBrokers(replicaMovementProposals); for (Integer broker: participatingBrokers) { setLogDirThrottledRateIfNecessary(broker, throttleRate); } + + return participatingBrokers; } // Determines if a candidate task is ready to have its throttles removed. @@ -116,7 +118,7 @@ boolean taskIsInProgress(ExecutionTask task) { } // clear throttles for a specific list of execution tasks - void clearThrottles(List completedTasks, List inProgressTasks) + void clearInterBrokerThrottles(List completedTasks, List inProgressTasks) throws ExecutionException, InterruptedException, TimeoutException { List completedProposals = completedTasks @@ -157,6 +159,13 @@ void clearThrottles(List completedTasks, List inPr } } + void clearIntraBrokerThrottles(Set participatingBrokers) + throws ExecutionException, InterruptedException, TimeoutException { + for (int broker : participatingBrokers) { + removeLogDirThrottledRateFromBroker(broker); + } + } + private Set getParticipatingBrokers(List replicaMovementProposals) { Set participatingBrokers = new TreeSet<>(); for (ExecutionProposal proposal : replicaMovementProposals) { @@ -351,7 +360,6 @@ private void removeThrottledRatesFromBroker(Integer brokerId) Config brokerConfigs = getBrokerConfigs(brokerId); ConfigEntry currLeaderThrottle = brokerConfigs.get(LEADER_THROTTLED_RATE); ConfigEntry currFollowerThrottle = brokerConfigs.get(FOLLOWER_THROTTLED_RATE); - ConfigEntry currLogDirThrottle = brokerConfigs.get(LOG_DIR_THROTTLED_RATE); List ops = new ArrayList<>(); if (currLeaderThrottle != null) { if (currLeaderThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) { @@ -369,6 +377,20 @@ private void removeThrottledRatesFromBroker(Integer brokerId) ops.add(new AlterConfigOp(new ConfigEntry(FOLLOWER_THROTTLED_RATE, null), AlterConfigOp.OpType.DELETE)); } } + if (!ops.isEmpty()) { + changeBrokerConfigs(brokerId, ops); + } + } + + /** + * Remove {@value #LOG_DIR_THROTTLED_RATE} on broker {@param brokerId} + * + * @param brokerId broker to remove {@value #LOG_DIR_THROTTLED_RATE} + */ + private void removeLogDirThrottledRateFromBroker(Integer brokerId) throws ExecutionException, InterruptedException, TimeoutException { + Config brokerConfigs = getBrokerConfigs(brokerId); + ConfigEntry currLogDirThrottle = brokerConfigs.get(LOG_DIR_THROTTLED_RATE); + List ops = new ArrayList<>(); if (currLogDirThrottle != null) { if (currLogDirThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) { LOG.debug("Skipping removal for static log dir throttle rate: {}", currLogDirThrottle); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java index 49fb2de4d..8e7ddf6de 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java @@ -4,6 +4,7 @@ package com.linkedin.kafka.cruisecontrol.executor; +import com.google.common.collect.Sets; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness; import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo; @@ -146,7 +147,7 @@ public void testClearThrottleOnNonExistentTopic() throws Exception { ExecutionTask mockCompleteTask = prepareMockCompleteTask(proposal); EasyMock.replay(mockAdminClient); - throttleHelper.clearThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList()); EasyMock.verify(mockAdminClient, mockCompleteTask); // Case 2: a situation where Topic0 gets deleted after its configs were read. @@ -165,7 +166,7 @@ public void testClearThrottleOnNonExistentTopic() throws Exception { EasyMock.replay(mockAdminClient); // Expect no exception - throttleHelper.clearThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList()); EasyMock.verify(mockAdminClient, mockCompleteTask); } @@ -242,13 +243,20 @@ public void testAddingThrottlesWithNoPreExistingThrottles() throws Exception { assertExpectedLogDirThrottledRateForBroker(1, throttleRate); assertExpectedLogDirThrottledRateForBroker(2, throttleRate); - // We expect all throttles to be cleaned up - throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList()); + // We expect all inter-broker throttles to be cleaned up (not intra-broker throttles) + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(task), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { assertExpectedReplicationThrottledRateForBroker(i, null); } assertExpectedThrottledReplicas(TOPIC0, ""); + + assertExpectedLogDirThrottledRateForBroker(0, throttleRate); + assertExpectedLogDirThrottledRateForBroker(1, throttleRate); + assertExpectedLogDirThrottledRateForBroker(2, throttleRate); + + // We expect all intra-broker throttles to be cleaned up + throttleHelper.clearIntraBrokerThrottles(Sets.newHashSet(0, 1, 2)); for (int i = 0; i < clusterSize(); i++) { assertExpectedLogDirThrottledRateForBroker(i, null); } @@ -308,12 +316,12 @@ public void testAddingThrottlesWithPreExistingThrottles() throws Exception { // Existing throttled replicas are unchanged for topic 1: assertExpectedThrottledReplicas(TOPIC1, "1:1"); - throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(task), Collections.emptyList()); // We expect all throttles related to replica movement to be removed. Specifically, // any throttles related to partitions which were not moved will remain. // However, we do expect the broker throttles to be removed. - throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(task), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { assertExpectedReplicationThrottledRateForBroker(i, null); } @@ -357,7 +365,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { assertExpectedThrottledReplicas(TOPIC0, ReplicationThrottleHelper.WILDCARD_ASTERISK); assertExpectedThrottledReplicas(TOPIC1, ReplicationThrottleHelper.WILDCARD_ASTERISK); - throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. assertExpectedReplicationThrottledRateForBroker(1, null); @@ -369,7 +377,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { assertExpectedThrottledReplicas(TOPIC1, ReplicationThrottleHelper.WILDCARD_ASTERISK); // passing an inProgress task that is not complete should have no effect. - throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. assertExpectedReplicationThrottledRateForBroker(1, null); @@ -382,7 +390,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { // Completing the in-progress task and the "*" should not be cleaned up. inProgressTask.completed(3); - throttleHelper.clearThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { assertExpectedReplicationThrottledRateForBroker(i, null); @@ -422,7 +430,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { assertExpectedReplicationThrottledRateForBroker(3, throttleRate); assertExpectedThrottledReplicas(TOPIC0, "0:0,0:1,0:2,1:0,1:2,1:3"); - throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. assertExpectedReplicationThrottledRateForBroker(1, null); @@ -432,7 +440,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { assertExpectedThrottledReplicas(TOPIC0, "1:0,1:2,1:3"); // passing an inProgress task that is not complete should have no effect. - throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. assertExpectedReplicationThrottledRateForBroker(1, null); @@ -443,7 +451,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { // Completing the in-progress task and clearing the throttles should clean everything up. inProgressTask.completed(3); - throttleHelper.clearThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { assertExpectedReplicationThrottledRateForBroker(i, null);