Skip to content

Commit

Permalink
Add (default.)log.dir.throttle parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjinleekr committed Apr 21, 2024
1 parent 97e6c13 commit 1df4d93
Show file tree
Hide file tree
Showing 19 changed files with 160 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,8 @@ private static boolean hasProposalsToExecute(Collection<ExecutionProposal> propo
* (if null, use default.replica.movement.strategies).
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* when executing proposals (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* when executing proposals (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
* @param uuid UUID of the execution.
* @param skipInterBrokerReplicaConcurrencyAdjustment {@code true} to skip auto adjusting concurrency of inter-broker
Expand All @@ -667,14 +669,15 @@ public void executeProposals(Set<ExecutionProposal> proposals,
Long executionProgressCheckIntervalMs,
ReplicaMovementStrategy replicaMovementStrategy,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest,
String uuid,
boolean skipInterBrokerReplicaConcurrencyAdjustment) throws OngoingExecutionException {
if (hasProposalsToExecute(proposals, uuid)) {
_executor.executeProposals(proposals, unthrottledBrokers, null, _loadMonitor, concurrentInterBrokerPartitionMovements,
maxInterBrokerPartitionMovements, concurrentIntraBrokerPartitionMovements, clusterConcurrentLeaderMovements,
brokerConcurrentLeaderMovements, executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle,
isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipInterBrokerReplicaConcurrencyAdjustment);
logDirThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipInterBrokerReplicaConcurrencyAdjustment);
} else {
failGeneratingProposalsForExecution(uuid);
}
Expand All @@ -700,6 +703,8 @@ public void executeProposals(Set<ExecutionProposal> proposals,
* (if null, use default.replica.movement.strategies).
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* when executing remove operations (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* when executing remove operations (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
* @param uuid UUID of the execution.
*/
Expand All @@ -714,13 +719,14 @@ public void executeRemoval(Set<ExecutionProposal> proposals,
Long executionProgressCheckIntervalMs,
ReplicaMovementStrategy replicaMovementStrategy,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest,
String uuid) throws OngoingExecutionException {
if (hasProposalsToExecute(proposals, uuid)) {
_executor.executeProposals(proposals, throttleDecommissionedBroker ? Collections.emptySet() : removedBrokers, removedBrokers,
_loadMonitor, concurrentInterBrokerPartitionMovements, maxInterBrokerPartitionMovements, 0,
clusterLeaderMovementConcurrency, brokerLeaderMovementConcurrency,
executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle,
executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, logDirThrottle,
isTriggeredByUserRequest, uuid, isKafkaAssignerMode, false);
} else {
failGeneratingProposalsForExecution(uuid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ public final class ExecutorConfig {
public static final String DEFAULT_REPLICATION_THROTTLE_DOC = "The replication throttle applied to replicas being "
+ "moved, in bytes per second.";

/**
* <code>default.log.dir.throttle</code>
*/
public static final String DEFAULT_LOG_DIR_THROTTLE_CONFIG = "default.log.dir.throttle";
public static final Long DEFAULT_DEFAULT_LOG_DIR_THROTTLE = null;
public static final String DEFAULT_LOG_DIR_THROTTLE_DOC = "The throttle applied to replicas being moved between "
+ "the log dirs, in bytes per second.";

/**
* <code>replica.movement.strategies</code>
*/
Expand Down Expand Up @@ -741,6 +749,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_DEFAULT_REPLICATION_THROTTLE,
ConfigDef.Importance.MEDIUM,
DEFAULT_REPLICATION_THROTTLE_DOC)
.define(DEFAULT_LOG_DIR_THROTTLE_CONFIG,
ConfigDef.Type.LONG,
DEFAULT_DEFAULT_LOG_DIR_THROTTLE,
ConfigDef.Importance.MEDIUM,
DEFAULT_LOG_DIR_THROTTLE_DOC)
.define(REPLICA_MOVEMENT_STRATEGIES_CONFIG,
ConfigDef.Type.LIST,
DEFAULT_REPLICA_MOVEMENT_STRATEGIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,8 @@ public boolean setConcurrencyAdjusterMinIsrCheck(boolean isMinIsrBasedConcurrenc
* @param replicaMovementStrategy The strategy used to determine the execution order of generated replica movement tasks.
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* when executing a proposal (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* when executing a proposal (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
* @param uuid UUID of the execution.
* @param isKafkaAssignerMode {@code true} if kafka assigner mode, {@code false} otherwise.
Expand All @@ -818,6 +820,7 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
Long requestedExecutionProgressCheckIntervalMs,
ReplicaMovementStrategy replicaMovementStrategy,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest,
String uuid,
boolean isKafkaAssignerMode,
Expand All @@ -830,7 +833,7 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency,
requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy,
isTriggeredByUserRequest, loadMonitor);
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest);
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, logDirThrottle, isTriggeredByUserRequest);
} catch (Exception e) {
processExecuteProposalsFailure();
throw e;
Expand Down Expand Up @@ -918,7 +921,7 @@ public synchronized void executeDemoteProposals(Collection<ExecutionProposal> pr
initProposalExecution(proposals, demotedBrokers, concurrentSwaps, null, 0,
requestedClusterLeadershipMovementConcurrency, requestedBrokerLeadershipMovementConcurrency,
requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor);
startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, isTriggeredByUserRequest);
startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, replicationThrottle, isTriggeredByUserRequest);
} catch (Exception e) {
processExecuteProposalsFailure();
throw e;
Expand Down Expand Up @@ -999,12 +1002,15 @@ private int numExecutionStartedInNonKafkaAssignerMode() {
* @param removedBrokers Brokers to be removed, null if no broker has been removed.
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* while moving partitions (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* while moving partitions (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
*/
private void startExecution(LoadMonitor loadMonitor,
Collection<Integer> demotedBrokers,
Collection<Integer> removedBrokers,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest) throws OngoingExecutionException {
_executionStoppedByUser.set(false);
sanityCheckOngoingMovement();
Expand Down Expand Up @@ -1040,7 +1046,7 @@ private void startExecution(LoadMonitor loadMonitor,
_numExecutionStartedInNonKafkaAssignerMode.incrementAndGet();
}
_proposalExecutor.execute(
new ProposalExecutionRunnable(loadMonitor, demotedBrokers, removedBrokers, replicationThrottle, isTriggeredByUserRequest));
new ProposalExecutionRunnable(loadMonitor, demotedBrokers, removedBrokers, replicationThrottle, logDirThrottle, isTriggeredByUserRequest));
}

/**
Expand Down Expand Up @@ -1293,6 +1299,7 @@ private class ProposalExecutionRunnable implements Runnable {
private final Set<Integer> _recentlyDemotedBrokers;
private final Set<Integer> _recentlyRemovedBrokers;
private final Long _replicationThrottle;
private final Long _logDirThrottle;
private Throwable _executionException;
private final boolean _isTriggeredByUserRequest;
private long _lastSlowTaskReportingTimeMs;
Expand All @@ -1307,6 +1314,7 @@ private class ProposalExecutionRunnable implements Runnable {
Collection<Integer> demotedBrokers,
Collection<Integer> removedBrokers,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest) {
_loadMonitor = loadMonitor;
_demotedBrokers = demotedBrokers;
Expand Down Expand Up @@ -1342,6 +1350,7 @@ private class ProposalExecutionRunnable implements Runnable {
_recentlyDemotedBrokers = recentlyDemotedBrokers();
_recentlyRemovedBrokers = recentlyRemovedBrokers();
_replicationThrottle = replicationThrottle;
_logDirThrottle = logDirThrottle;
_isTriggeredByUserRequest = isTriggeredByUserRequest;
_lastSlowTaskReportingTimeMs = -1L;
if (_removedBrokers != null && !_removedBrokers.isEmpty()) {
Expand Down Expand Up @@ -1621,6 +1630,10 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
throttleHelper.setReplicationThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()),
_replicationThrottle);
}
if (_logDirThrottle != null) {
throttleHelper.setLogDirThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()),
_logDirThrottle);
}
// Execute the tasks.
_executionTaskManager.markTasksInProgress(tasksToExecute);
result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute);
Expand All @@ -1642,7 +1655,7 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
.collect(Collectors.toList());
inProgressTasks.addAll(inExecutionTasks());

if (_replicationThrottle != null) {
if (_replicationThrottle != null || _logDirThrottle != null) {
throttleHelper.clearThrottles(completedTasks, inProgressTasks);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class AddBrokersRunnable extends GoalBasedOperationRunnable {
protected final Long _executionProgressCheckIntervalMs;
protected final ReplicaMovementStrategy _replicaMovementStrategy;
protected final Long _replicationThrottle;
protected final Long _logDirThrottle;
protected static final boolean SKIP_AUTO_REFRESHING_CONCURRENCY = false;

/**
Expand All @@ -67,6 +68,7 @@ public AddBrokersRunnable(KafkaCruiseControl kafkaCruiseControl,
_executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS;
_replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY;
_replicationThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG);
_logDirThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG);
}

public AddBrokersRunnable(KafkaCruiseControl kafkaCruiseControl,
Expand All @@ -84,6 +86,7 @@ public AddBrokersRunnable(KafkaCruiseControl kafkaCruiseControl,
_executionProgressCheckIntervalMs = parameters.executionProgressCheckIntervalMs();
_replicaMovementStrategy = parameters.replicaMovementStrategy();
_replicationThrottle = parameters.replicationThrottle();
_logDirThrottle = parameters.logDirThrottle();
}

@Override
Expand Down Expand Up @@ -127,6 +130,7 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept
_executionProgressCheckIntervalMs,
_replicaMovementStrategy,
_replicationThrottle,
_logDirThrottle,
_isTriggeredByUserRequest,
_uuid,
SKIP_AUTO_REFRESHING_CONCURRENCY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG;
import static com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RunnableUtils.*;

Expand All @@ -33,6 +34,7 @@ public class FixOfflineReplicasRunnable extends GoalBasedOperationRunnable {
protected final Long _executionProgressCheckIntervalMs;
protected final ReplicaMovementStrategy _replicaMovementStrategy;
protected final Long _replicationThrottle;
protected final Long _logDirThrottle;
protected static final boolean SKIP_AUTO_REFRESHING_CONCURRENCY = false;

/**
Expand All @@ -55,6 +57,7 @@ public FixOfflineReplicasRunnable(KafkaCruiseControl kafkaCruiseControl,
_executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS;
_replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY;
_replicationThrottle = kafkaCruiseControl.config().getLong(DEFAULT_REPLICATION_THROTTLE_CONFIG);
_logDirThrottle = kafkaCruiseControl.config().getLong(DEFAULT_LOG_DIR_THROTTLE_CONFIG);
}

public FixOfflineReplicasRunnable(KafkaCruiseControl kafkaCruiseControl,
Expand All @@ -70,6 +73,7 @@ public FixOfflineReplicasRunnable(KafkaCruiseControl kafkaCruiseControl,
_executionProgressCheckIntervalMs = parameters.executionProgressCheckIntervalMs();
_replicaMovementStrategy = parameters.replicaMovementStrategy();
_replicationThrottle = parameters.replicationThrottle();
_logDirThrottle = parameters.logDirThrottle();
}

@Override
Expand Down Expand Up @@ -111,6 +115,7 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept
_executionProgressCheckIntervalMs,
_replicaMovementStrategy,
_replicationThrottle,
_logDirThrottle,
_isTriggeredByUserRequest,
_uuid,
SKIP_AUTO_REFRESHING_CONCURRENCY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class RebalanceRunnable extends GoalBasedOperationRunnable {
protected final Long _executionProgressCheckIntervalMs;
protected final ReplicaMovementStrategy _replicaMovementStrategy;
protected final Long _replicationThrottle;
protected final Long _logDirThrottle;
protected final boolean _ignoreProposalCache;
protected final Set<Integer> _destinationBrokerIds;
protected final boolean _isRebalanceDiskMode;
Expand All @@ -64,6 +65,7 @@ public RebalanceRunnable(KafkaCruiseControl kafkaCruiseControl,
_executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS;
_replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY;
_replicationThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG);
_logDirThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG);
_ignoreProposalCache = SELF_HEALING_IGNORE_PROPOSAL_CACHE;
_destinationBrokerIds = SELF_HEALING_DESTINATION_BROKER_IDS;
_isRebalanceDiskMode = SELF_HEALING_IS_REBALANCE_DISK_MODE;
Expand All @@ -83,6 +85,7 @@ public RebalanceRunnable(KafkaCruiseControl kafkaCruiseControl,
_executionProgressCheckIntervalMs = parameters.executionProgressCheckIntervalMs();
_replicaMovementStrategy = parameters.replicaMovementStrategy();
_replicationThrottle = parameters.replicationThrottle();
_logDirThrottle = parameters.logDirThrottle();
_ignoreProposalCache = parameters.ignoreProposalCache();
_destinationBrokerIds = parameters.destinationBrokerIds();
_isRebalanceDiskMode = parameters.isRebalanceDiskMode();
Expand Down Expand Up @@ -124,7 +127,7 @@ protected OptimizerResult workWithoutClusterModel() throws KafkaCruiseControlExc
_concurrentInterBrokerPartitionMovements, _maxInterBrokerPartitionMovements,
_concurrentIntraBrokerPartitionMovements, _clusterLeaderMovementConcurrency, _brokerLeaderMovementConcurrency,
_executionProgressCheckIntervalMs, _replicaMovementStrategy,
_replicationThrottle, _isTriggeredByUserRequest, _uuid, SKIP_AUTO_REFRESHING_CONCURRENCY);
_replicationThrottle, _logDirThrottle, _isTriggeredByUserRequest, _uuid, SKIP_AUTO_REFRESHING_CONCURRENCY);
}
return result;
}
Expand Down
Loading

0 comments on commit 1df4d93

Please sign in to comment.