Skip to content

Commit

Permalink
fix: #1851 Separate (un)setting throttling for inter/intra-broker reb…
Browse files Browse the repository at this point in the history
…alanacing

    - Rename: ReplicationThrottleHelper.clearThrottles → clearInterBrokerThrottles
    - Add ReplicationThrottleHelper.clearIntraBrokerThrottles
    - Executor.intraBrokerMoveReplicas now calls ReplicationThrottleHelper.setLogDirThrottles, ReplicationThrottleHelper.clearIntraBrokerThrottles
  • Loading branch information
dongjinleekr committed Jun 20, 2024
1 parent 1df4d93 commit 4781c13
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AtomicDouble;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.TopicMinIsrCache;
Expand Down Expand Up @@ -1630,10 +1631,6 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
throttleHelper.setReplicationThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()),
_replicationThrottle);
}
if (_logDirThrottle != null) {
throttleHelper.setLogDirThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()),
_logDirThrottle);
}
// Execute the tasks.
_executionTaskManager.markTasksInProgress(tasksToExecute);
result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute);
Expand All @@ -1655,8 +1652,8 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
.collect(Collectors.toList());
inProgressTasks.addAll(inExecutionTasks());

if (_replicationThrottle != null || _logDirThrottle != null) {
throttleHelper.clearThrottles(completedTasks, inProgressTasks);
if (_replicationThrottle != null) {
throttleHelper.clearInterBrokerThrottles(completedTasks, inProgressTasks);
}
}

Expand Down Expand Up @@ -1686,20 +1683,28 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
}
}

private void intraBrokerMoveReplicas() {
private void intraBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException {
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient);
int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements();
long totalDataToMoveInMB = _executionTaskManager.remainingIntraBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
LOG.info("Starting {} intra-broker partition movements.", numTotalPartitionMovements);

int partitionsToMove = numTotalPartitionMovements;
Set<Integer> participatingBrokers = Sets.newHashSet();
// Exhaust all the pending partition movements.
while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) {
// Get tasks to execute.
List<ExecutionTask> tasksToExecute = _executionTaskManager.getIntraBrokerReplicaMovementTasks();
LOG.info("Executor will execute {} task(s)", tasksToExecute.size());

if (!tasksToExecute.isEmpty()) {
if (_logDirThrottle != null) {
participatingBrokers = throttleHelper.setLogDirThrottles(
tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()),
_logDirThrottle
);
}
// Execute the tasks.
_executionTaskManager.markTasksInProgress(tasksToExecute);
executeIntraBrokerReplicaMovements(tasksToExecute, _adminClient, _executionTaskManager, _config);
Expand All @@ -1724,6 +1729,11 @@ private void intraBrokerMoveReplicas() {
waitForIntraBrokerReplicaTasksToFinish();
inExecutionTasks = inExecutionTasks();
}

if (_logDirThrottle != null) {
throttleHelper.clearIntraBrokerThrottles(participatingBrokers);
}

if (inExecutionTasks().isEmpty()) {
LOG.info("Intra-broker partition movements finished.");
} else if (_stopSignal.get() != NO_STOP_EXECUTION) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ void setReplicationThrottles(List<ExecutionProposal> replicaMovementProposals, l
}
}

void setLogDirThrottles(List<ExecutionProposal> replicaMovementProposals, long throttleRate)
throws ExecutionException, InterruptedException, TimeoutException {
Set<Integer> setLogDirThrottles(List<ExecutionProposal> replicaMovementProposals, long throttleRate)
throws ExecutionException, InterruptedException, TimeoutException {
LOG.info("Setting a log dir throttle of {} bytes/sec", throttleRate);
Set<Integer> participatingBrokers = getParticipatingBrokers(replicaMovementProposals);
for (Integer broker: participatingBrokers) {
setLogDirThrottledRateIfNecessary(broker, throttleRate);
}

return participatingBrokers;
}

// Determines if a candidate task is ready to have its throttles removed.
Expand All @@ -116,7 +118,7 @@ boolean taskIsInProgress(ExecutionTask task) {
}

// clear throttles for a specific list of execution tasks
void clearThrottles(List<ExecutionTask> completedTasks, List<ExecutionTask> inProgressTasks)
void clearInterBrokerThrottles(List<ExecutionTask> completedTasks, List<ExecutionTask> inProgressTasks)
throws ExecutionException, InterruptedException, TimeoutException {
List<ExecutionProposal> completedProposals =
completedTasks
Expand Down Expand Up @@ -157,6 +159,13 @@ void clearThrottles(List<ExecutionTask> completedTasks, List<ExecutionTask> inPr
}
}

void clearIntraBrokerThrottles(Set<Integer> participatingBrokers)
throws ExecutionException, InterruptedException, TimeoutException {
for (int broker : participatingBrokers) {
removeLogDirThrottledRateFromBroker(broker);
}
}

private Set<Integer> getParticipatingBrokers(List<ExecutionProposal> replicaMovementProposals) {
Set<Integer> participatingBrokers = new TreeSet<>();
for (ExecutionProposal proposal : replicaMovementProposals) {
Expand Down Expand Up @@ -351,7 +360,6 @@ private void removeThrottledRatesFromBroker(Integer brokerId)
Config brokerConfigs = getBrokerConfigs(brokerId);
ConfigEntry currLeaderThrottle = brokerConfigs.get(LEADER_THROTTLED_RATE);
ConfigEntry currFollowerThrottle = brokerConfigs.get(FOLLOWER_THROTTLED_RATE);
ConfigEntry currLogDirThrottle = brokerConfigs.get(LOG_DIR_THROTTLED_RATE);
List<AlterConfigOp> ops = new ArrayList<>();
if (currLeaderThrottle != null) {
if (currLeaderThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) {
Expand All @@ -369,6 +377,20 @@ private void removeThrottledRatesFromBroker(Integer brokerId)
ops.add(new AlterConfigOp(new ConfigEntry(FOLLOWER_THROTTLED_RATE, null), AlterConfigOp.OpType.DELETE));
}
}
if (!ops.isEmpty()) {
changeBrokerConfigs(brokerId, ops);
}
}

/**
* Remove {@value #LOG_DIR_THROTTLED_RATE} on broker {@param brokerId}
*
* @param brokerId broker to remove {@value #LOG_DIR_THROTTLED_RATE}
*/
private void removeLogDirThrottledRateFromBroker(Integer brokerId) throws ExecutionException, InterruptedException, TimeoutException {
Config brokerConfigs = getBrokerConfigs(brokerId);
ConfigEntry currLogDirThrottle = brokerConfigs.get(LOG_DIR_THROTTLED_RATE);
List<AlterConfigOp> ops = new ArrayList<>();
if (currLogDirThrottle != null) {
if (currLogDirThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) {
LOG.debug("Skipping removal for static log dir throttle rate: {}", currLogDirThrottle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package com.linkedin.kafka.cruisecontrol.executor;

import com.google.common.collect.Sets;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
Expand Down Expand Up @@ -146,7 +147,7 @@ public void testClearThrottleOnNonExistentTopic() throws Exception {
ExecutionTask mockCompleteTask = prepareMockCompleteTask(proposal);
EasyMock.replay(mockAdminClient);

throttleHelper.clearThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList());
throttleHelper.clearInterBrokerThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList());
EasyMock.verify(mockAdminClient, mockCompleteTask);

// Case 2: a situation where Topic0 gets deleted after its configs were read.
Expand All @@ -165,7 +166,7 @@ public void testClearThrottleOnNonExistentTopic() throws Exception {
EasyMock.replay(mockAdminClient);

// Expect no exception
throttleHelper.clearThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList());
throttleHelper.clearInterBrokerThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList());
EasyMock.verify(mockAdminClient, mockCompleteTask);
}

Expand Down Expand Up @@ -242,13 +243,20 @@ public void testAddingThrottlesWithNoPreExistingThrottles() throws Exception {
assertExpectedLogDirThrottledRateForBroker(1, throttleRate);
assertExpectedLogDirThrottledRateForBroker(2, throttleRate);

// We expect all throttles to be cleaned up
throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList());
// We expect all inter-broker throttles to be cleaned up (not intra-broker throttles)
throttleHelper.clearInterBrokerThrottles(Collections.singletonList(task), Collections.emptyList());

for (int i = 0; i < clusterSize(); i++) {
assertExpectedReplicationThrottledRateForBroker(i, null);
}
assertExpectedThrottledReplicas(TOPIC0, "");

assertExpectedLogDirThrottledRateForBroker(0, throttleRate);
assertExpectedLogDirThrottledRateForBroker(1, throttleRate);
assertExpectedLogDirThrottledRateForBroker(2, throttleRate);

// We expect all intra-broker throttles to be cleaned up
throttleHelper.clearIntraBrokerThrottles(Sets.newHashSet(0, 1, 2));
for (int i = 0; i < clusterSize(); i++) {
assertExpectedLogDirThrottledRateForBroker(i, null);
}
Expand Down Expand Up @@ -308,12 +316,12 @@ public void testAddingThrottlesWithPreExistingThrottles() throws Exception {
// Existing throttled replicas are unchanged for topic 1:
assertExpectedThrottledReplicas(TOPIC1, "1:1");

throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList());
throttleHelper.clearInterBrokerThrottles(Collections.singletonList(task), Collections.emptyList());

// We expect all throttles related to replica movement to be removed. Specifically,
// any throttles related to partitions which were not moved will remain.
// However, we do expect the broker throttles to be removed.
throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList());
throttleHelper.clearInterBrokerThrottles(Collections.singletonList(task), Collections.emptyList());
for (int i = 0; i < clusterSize(); i++) {
assertExpectedReplicationThrottledRateForBroker(i, null);
}
Expand Down Expand Up @@ -357,7 +365,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception {
assertExpectedThrottledReplicas(TOPIC0, ReplicationThrottleHelper.WILDCARD_ASTERISK);
assertExpectedThrottledReplicas(TOPIC1, ReplicationThrottleHelper.WILDCARD_ASTERISK);

throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask));
throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask));
assertExpectedReplicationThrottledRateForBroker(0, throttleRate);
// we expect broker 1 to be null since all replica movement related to it has completed.
assertExpectedReplicationThrottledRateForBroker(1, null);
Expand All @@ -369,7 +377,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception {
assertExpectedThrottledReplicas(TOPIC1, ReplicationThrottleHelper.WILDCARD_ASTERISK);

// passing an inProgress task that is not complete should have no effect.
throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask));
throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask));
assertExpectedReplicationThrottledRateForBroker(0, throttleRate);
// we expect broker 1 to be null since all replica movement related to it has completed.
assertExpectedReplicationThrottledRateForBroker(1, null);
Expand All @@ -382,7 +390,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception {

// Completing the in-progress task and the "*" should not be cleaned up.
inProgressTask.completed(3);
throttleHelper.clearThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList());
throttleHelper.clearInterBrokerThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList());

for (int i = 0; i < clusterSize(); i++) {
assertExpectedReplicationThrottledRateForBroker(i, null);
Expand Down Expand Up @@ -422,7 +430,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception {
assertExpectedReplicationThrottledRateForBroker(3, throttleRate);
assertExpectedThrottledReplicas(TOPIC0, "0:0,0:1,0:2,1:0,1:2,1:3");

throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask));
throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask));
assertExpectedReplicationThrottledRateForBroker(0, throttleRate);
// we expect broker 1 to be null since all replica movement related to it has completed.
assertExpectedReplicationThrottledRateForBroker(1, null);
Expand All @@ -432,7 +440,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception {
assertExpectedThrottledReplicas(TOPIC0, "1:0,1:2,1:3");

// passing an inProgress task that is not complete should have no effect.
throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask));
throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask));
assertExpectedReplicationThrottledRateForBroker(0, throttleRate);
// we expect broker 1 to be null since all replica movement related to it has completed.
assertExpectedReplicationThrottledRateForBroker(1, null);
Expand All @@ -443,7 +451,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception {

// Completing the in-progress task and clearing the throttles should clean everything up.
inProgressTask.completed(3);
throttleHelper.clearThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList());
throttleHelper.clearInterBrokerThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList());

for (int i = 0; i < clusterSize(); i++) {
assertExpectedReplicationThrottledRateForBroker(i, null);
Expand Down

0 comments on commit 4781c13

Please sign in to comment.