Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make BrokerSetAwareGoal configurable to accept partition coloring #1864

Open
wants to merge 1 commit into
base: migrate_to_kafka_2_4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class BalancingConstraint {
private final long _fastModePerBrokerMoveTimeoutMs;
private final BrokerSetResolver _brokerSetResolver;
private final ReplicaToBrokerSetMappingPolicy _replicaToBrokerSetMappingPolicy;
private final boolean _allowPartitionColoring;

/**
* Constructor for Balancing Constraint.
Expand Down Expand Up @@ -88,6 +89,8 @@ public BalancingConstraint(KafkaCruiseControlConfig config) {
// Replica to Broker Set mapping policy class
_replicaToBrokerSetMappingPolicy = config.getConfiguredInstance(AnalyzerConfig.REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_CONFIG,
ReplicaToBrokerSetMappingPolicy.class);
// For BrokerSetAwareGoal, whether run in topic coloring mode or in partition coloring mode
_allowPartitionColoring = config.getBoolean(AnalyzerConfig.ALLOW_PARTITION_LEVEL_BROKER_SET_AWARE);
}

Properties setProps(Properties props) {
Expand Down Expand Up @@ -267,6 +270,13 @@ public ReplicaToBrokerSetMappingPolicy replicaToBrokerSetMappingPolicy() {
return _replicaToBrokerSetMappingPolicy;
}

/**
* @return true if BrokerSetAwareGoal runs on partition level
*/
public boolean allowPartitionColoring() {
return _allowPartitionColoring;
}

/**
* Set resource balance percentage for the given resource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.kafka.cruisecontrol.config.ReplicaToBrokerSetMappingPolicy;
import com.linkedin.kafka.cruisecontrol.exception.BrokerSetResolutionException;
import com.linkedin.kafka.cruisecontrol.exception.ReplicaToBrokerSetMappingException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class BrokerSetAwareGoal extends AbstractGoal {
private ReplicaToBrokerSetMappingPolicy _replicaToBrokerSetMappingPolicy;
private Set<String> _excludedTopics;
private Set<String> _mustHaveTopicLeadersPerBroker;
private boolean _isPartitionLevelBrokerSetAware;

/**
* Constructor for Broker Set Aware Goal.
Expand Down Expand Up @@ -123,6 +125,8 @@ public boolean isHardGoal() {
*/
@Override
protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
_isPartitionLevelBrokerSetAware = _balancingConstraint.allowPartitionColoring();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not use word coloring since it may be confusing. We have been referring to it as brokerserAwareness


// This is used to identify brokers not excluded for replica moves.
final HashSet<Integer> brokersAllowedReplicaMove = GoalUtils.aliveBrokersNotExcludedForReplicaMove(clusterModel, optimizationOptions);
if (brokersAllowedReplicaMove.isEmpty()) {
Expand All @@ -138,7 +142,7 @@ protected void initGoalState(ClusterModel clusterModel, OptimizationOptions opti
// Whether the {@link com.linkedin.kafka.cruisecontrol.model.SortedReplicas} tracks only leader replicas or all replicas.
boolean tracksOnlyLeaderReplicas = false;

_mustHaveTopicLeadersPerBroker = Collections.unmodifiableSet(
_mustHaveTopicLeadersPerBroker = _isPartitionLevelBrokerSetAware ? Collections.emptySet() : Collections.unmodifiableSet(
Utils.getTopicNamesMatchedWithPattern(_balancingConstraint.topicsWithMinLeadersPerBrokerPattern(), clusterModel::topics));
_excludedTopics = Collections.unmodifiableSet(
Stream.of(_mustHaveTopicLeadersPerBroker, optimizationOptions.excludedTopics()).flatMap(Set::stream).collect(Collectors.toSet()));
Expand Down Expand Up @@ -178,28 +182,38 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op
// Sanity check: No replica should be moved to a broker, which used to host any replica of the same partition on its broken disk.
GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name());
// Sanity check to confirm that the final distribution is broker set aware.
ensureBrokerSetAware(clusterModel, optimizationOptions);
ensureBrokerSetAware(clusterModel);
if (_provisionResponse.status() != ProvisionStatus.OVER_PROVISIONED) {
_provisionResponse = new ProvisionResponse(ProvisionStatus.RIGHT_SIZED);
}
finish();
}

private void ensureBrokerSetAware(ClusterModel clusterModel, OptimizationOptions optimizationOptions)
private void ensureBrokerSetAware(ClusterModel clusterModel)
throws OptimizationFailureException {
// Sanity check to confirm that the final distribution is brokerSet aware.
for (Map.Entry<String, List<Partition>> partitionsByTopic : clusterModel.getPartitionsByTopic().entrySet()) {
String topicName = partitionsByTopic.getKey();
if (!_excludedTopics.contains(topicName)) {
List<Partition> partitions = partitionsByTopic.getValue();
Set<Broker> allBrokersForTopic = partitions.stream()
.map(partition -> partition.partitionBrokers())
.flatMap(brokers -> brokers.stream())
.collect(Collectors.toSet());
// Checks if a topic's brokers do not all live in a single brokerSet
if (_brokersByBrokerSet.values().stream().noneMatch(brokerSetBrokers -> brokerSetBrokers.containsAll(allBrokersForTopic))) {
throw new OptimizationFailureException(
String.format("[%s] Topic %s is not brokerSet-aware. brokers (%s).", name(), topicName, allBrokersForTopic));
if (_isPartitionLevelBrokerSetAware) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this i would recommend introducing inheritance.

  1. Introduce a configuration - broker.set.awareness.level=partition
    We can have only two choices - partition and topic
  2. The Level can be picked from an ENUM so that no other level is allowed.
  3. The default value of this configuration can be set to level topic.
  4. Introduce an interface called BrokerSetAwarenessValidator
  5. Implement two implementations TopicLevelBrokerSetAwarenessValidator and PartitionLevelBrokerSetAwarenessValidator
  6. Based on configuration set, the instance of the BrokerSetAwarenessValidator will be chosen.
  7. From here just call brokerSetAwarenessValidator.validate(partitions)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also just introduce the validate() method in th ENUMs (with values PARTITION and TOPIC )

    private enum BrokerSetAwarenessLevel {
        TOPIC {
            public void validate(Partitions p) {
                // do something
            }
        },
        PARTITION {
            public void validate(Partitions p) {
                // do something different
            }
        }

for (Partition partition: partitions) {
// Checks if a partition's brokers do not all live in a single brokerSet
if (_brokersByBrokerSet.values().stream().noneMatch(brokerSetBrokers -> brokerSetBrokers.containsAll(partition.partitionBrokers()))) {
throw new OptimizationFailureException(
String.format("[%s] Partition %s is not brokerSet-aware. brokers (%s).", name(), partition, partition.partitionBrokers()));
}
}
} else {
Set<Broker> allBrokersForTopic = partitions.stream()
.map(Partition::partitionBrokers)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
// Checks if a topic's brokers do not all live in a single brokerSet
if (_brokersByBrokerSet.values().stream().noneMatch(brokerSetBrokers -> brokerSetBrokers.containsAll(allBrokersForTopic))) {
throw new OptimizationFailureException(
String.format("[%s] Topic %s is not brokerSet-aware. brokers (%s).", name(), topicName, allBrokersForTopic));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,15 @@ public final class AnalyzerConfig {
String.format("The class implements %s interface and is used to generate replica to broker set mapping.",
ReplicaToBrokerSetMappingPolicy.class.getName());

/**
* <code>allow.partition.level.broker.set.aware</code>
*/
public static final String ALLOW_PARTITION_LEVEL_BROKER_SET_AWARE = "allow.partition.level.broker.set.aware";
public static final boolean DEFAULT_ALLOW_PARTITION_LEVEL_BROKER_SET_AWARE = false;
public static final String ALLOW_PARTITION_LEVEL_BROKER_SET_AWARE_DOC =
String.format("Whether %s allow topic level coloring or partition level coloring. True if it allows partition level coloring",
BrokerSetAwareGoal.class.getName());

private AnalyzerConfig() {
}

Expand Down Expand Up @@ -683,6 +692,10 @@ public static ConfigDef define(ConfigDef configDef) {
.define(REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_CONFIG,
ConfigDef.Type.CLASS, DEFAULT_REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS,
ConfigDef.Importance.LOW,
REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_DOC);
REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_DOC)
.define(ALLOW_PARTITION_LEVEL_BROKER_SET_AWARE,
ConfigDef.Type.BOOLEAN, DEFAULT_ALLOW_PARTITION_LEVEL_BROKER_SET_AWARE,
ConfigDef.Importance.LOW,
ALLOW_PARTITION_LEVEL_BROKER_SET_AWARE_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,25 @@ public static Collection<Object[]> data() {
p.add(params(balancingConstraint, DeterministicCluster.brokerSetUnSatisfiable4(),
Arrays.asList(BrokerSetAwareGoal.class.getName(),
DiskUsageDistributionGoal.class.getName()), verifications, null));

// Broker Set Awareness partition coloring mode
properties.setProperty(AnalyzerConfig.ALLOW_PARTITION_LEVEL_BROKER_SET_AWARE, Boolean.toString(true));
balancingConstraint = new BalancingConstraint(new KafkaCruiseControlConfig(properties));
p.add(params(balancingConstraint, DeterministicCluster.brokerSetSatisfiable1(),
Collections.singletonList(BrokerSetAwareGoal.class.getName()), verifications, null));

p.add(params(balancingConstraint, DeterministicCluster.brokerSetSatisfiable2(),
Collections.singletonList(BrokerSetAwareGoal.class.getName()), verifications, null));

p.add(params(balancingConstraint, DeterministicCluster.brokerSetSatisfiable3(),
Collections.singletonList(BrokerSetAwareGoal.class.getName()), verifications, null));

p.add(params(balancingConstraint, DeterministicCluster.brokerSetSatisfiablePartitionColoring1(),
Collections.singletonList(BrokerSetAwareGoal.class.getName()), verifications, null));

p.add(params(balancingConstraint, DeterministicCluster.brokerSetUnSatisfiable1(),
Collections.singletonList(BrokerSetAwareGoal.class.getName()), verifications, OptimizationFailureException.class));

return p;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,123 @@ public static ClusterModel brokerSetSatisfiable8() {
return cluster;
}

/**
* This is the satisfiable cluster setup for partition coloring only
* 6 brokers, 2 BrokerSets, each BrokerSet has 3 Brokers
* 2 topics, each topic has four partitions, each partition has 3 replicas.
* Partitions are contained within the boundary of BrokerSet, but topic is not bounded by BrokerSet
* E.g. T0_P0, T1_P1, T0_P2, T1_P3 are in the BrokerSet {B0, B1, B2},
* T1_P0, T0_P1, T1_P2, T0_P3 are in the BrokerSet {B3, B4, B5}.
*
* <p>
* <h3>Replica Distribution</h3>
* <li>B0: T0_P0_leader, T1_P1_follower_1, T0_P2_follower_2, T1_P3_leader</li>
* <li>B1: T0_P0_follower_1, T1_P1_leader, T0_P2_follower_1, T1_P3_follower_1</li>
* <li>B2: T0_P0_follower_2, T1_P1_follower_2, T0_P2_leader, T1_P3_follower_2</li>
* <li>B3: T1_P0_leader, T0_P1_follower_1, T1_P2_follower_2, T0_P3_leader</li>
* <li>B4: T1_P0_follower_1, T0_P1_leader, T1_P2_follower_1, T0_P3_follower_1</li>
* <li>B5: T1_P0_follower_2, T0_P1_follower_2, T1_P2_leader, T0_P3_follower_2</li>
* </p>
*
* @return Cluster model for the tests.
*/
public static ClusterModel brokerSetSatisfiablePartitionColoring1() {
ClusterModel cluster = getHomogeneousCluster(RACK_BY_BROKER4, TestConstants.BROKER_CAPACITY, null);

// Create topic partitions.
TopicPartition topic0Partition0 = new TopicPartition(TOPIC0, 0);
TopicPartition topic0Partition1 = new TopicPartition(TOPIC0, 1);
TopicPartition topic0Partition2 = new TopicPartition(TOPIC0, 2);
TopicPartition topic0Partition3 = new TopicPartition(TOPIC0, 3);

TopicPartition topic1Partition0 = new TopicPartition(TOPIC1, 0);
TopicPartition topic1Partition1 = new TopicPartition(TOPIC1, 1);
TopicPartition topic1Partition2 = new TopicPartition(TOPIC1, 2);
TopicPartition topic1Partition3 = new TopicPartition(TOPIC1, 3);

AggregatedMetricValues aggregatedMetricValues =
getAggregatedMetricValues(TestConstants.TYPICAL_CPU_CAPACITY / 2, TestConstants.LARGE_BROKER_CAPACITY / 2,
TestConstants.MEDIUM_BROKER_CAPACITY / 2, TestConstants.LARGE_BROKER_CAPACITY / 2);

// Create replicas for topics.
// T0_P0_leader
cluster.createReplica(RACK_BY_BROKER4.get(0).toString(), 0, topic0Partition0, 0, true);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(0).toString(), 0, topic0Partition0, aggregatedMetricValues, Collections.singletonList(1L));
// T0_P1_leader
cluster.createReplica(RACK_BY_BROKER4.get(4).toString(), 4, topic0Partition1, 0, true);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(4).toString(), 4, topic0Partition1, aggregatedMetricValues, Collections.singletonList(1L));
// T0_P2_leader
cluster.createReplica(RACK_BY_BROKER4.get(2).toString(), 2, topic0Partition2, 0, true);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(2).toString(), 2, topic0Partition2, aggregatedMetricValues, Collections.singletonList(1L));
// T0_P3_leader
cluster.createReplica(RACK_BY_BROKER4.get(3).toString(), 3, topic0Partition3, 0, true);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(3).toString(), 3, topic0Partition3, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P0_leader
cluster.createReplica(RACK_BY_BROKER4.get(3).toString(), 3, topic1Partition0, 0, true);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(3).toString(), 3, topic1Partition0, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P1_leader
cluster.createReplica(RACK_BY_BROKER4.get(1).toString(), 1, topic1Partition1, 0, true);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(1).toString(), 1, topic1Partition1, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P2_leader
cluster.createReplica(RACK_BY_BROKER4.get(5).toString(), 5, topic1Partition2, 0, true);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(5).toString(), 5, topic1Partition2, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P3_leader
cluster.createReplica(RACK_BY_BROKER4.get(0).toString(), 0, topic1Partition3, 0, true);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(0).toString(), 0, topic1Partition3, aggregatedMetricValues, Collections.singletonList(1L));

// T0_P0_follower_1
cluster.createReplica(RACK_BY_BROKER4.get(1).toString(), 1, topic0Partition0, 1, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(1).toString(), 1, topic0Partition0, aggregatedMetricValues, Collections.singletonList(1L));
// T0_P1_follower_1
cluster.createReplica(RACK_BY_BROKER4.get(3).toString(), 3, topic0Partition1, 1, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(3).toString(), 3, topic0Partition1, aggregatedMetricValues, Collections.singletonList(1L));
// T0_P2_follower_1
cluster.createReplica(RACK_BY_BROKER4.get(1).toString(), 1, topic0Partition2, 1, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(1).toString(), 1, topic0Partition2, aggregatedMetricValues, Collections.singletonList(1L));
// T0_P3_follower_1
cluster.createReplica(RACK_BY_BROKER4.get(4).toString(), 4, topic0Partition3, 1, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(4).toString(), 4, topic0Partition3, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P0_follower_1
cluster.createReplica(RACK_BY_BROKER4.get(4).toString(), 4, topic1Partition0, 1, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(4).toString(), 4, topic1Partition0, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P1_follower_1
cluster.createReplica(RACK_BY_BROKER4.get(0).toString(), 0, topic1Partition1, 1, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(0).toString(), 0, topic1Partition1, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P2_follower_1
cluster.createReplica(RACK_BY_BROKER4.get(4).toString(), 4, topic1Partition2, 1, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(4).toString(), 4, topic1Partition2, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P3_follower_1
cluster.createReplica(RACK_BY_BROKER4.get(1).toString(), 1, topic1Partition3, 1, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(1).toString(), 1, topic1Partition3, aggregatedMetricValues, Collections.singletonList(1L));

// T0_P0_follower_2
cluster.createReplica(RACK_BY_BROKER4.get(2).toString(), 2, topic0Partition0, 2, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(2).toString(), 2, topic0Partition0, aggregatedMetricValues, Collections.singletonList(1L));
// T0_P1_follower_2
cluster.createReplica(RACK_BY_BROKER4.get(5).toString(), 5, topic0Partition1, 2, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(5).toString(), 5, topic0Partition1, aggregatedMetricValues, Collections.singletonList(1L));
// T0_P2_follower_2
cluster.createReplica(RACK_BY_BROKER4.get(0).toString(), 0, topic0Partition2, 2, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(0).toString(), 0, topic0Partition2, aggregatedMetricValues, Collections.singletonList(1L));
// T0_P3_follower_2
cluster.createReplica(RACK_BY_BROKER4.get(5).toString(), 5, topic0Partition3, 2, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(5).toString(), 5, topic0Partition3, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P0_follower_2
cluster.createReplica(RACK_BY_BROKER4.get(5).toString(), 5, topic1Partition0, 2, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(5).toString(), 5, topic1Partition0, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P1_follower_2
cluster.createReplica(RACK_BY_BROKER4.get(2).toString(), 2, topic1Partition1, 2, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(2).toString(), 2, topic1Partition1, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P2_follower_2
cluster.createReplica(RACK_BY_BROKER4.get(3).toString(), 3, topic1Partition2, 2, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(3).toString(), 3, topic1Partition2, aggregatedMetricValues, Collections.singletonList(1L));
// T1_P3_follower_2
cluster.createReplica(RACK_BY_BROKER4.get(2).toString(), 2, topic1Partition3, 2, false);
cluster.setReplicaLoad(RACK_BY_BROKER4.get(2).toString(), 2, topic1Partition3, aggregatedMetricValues, Collections.singletonList(1L));

return cluster;
}

/**
* 6 brokers, 2 BrokerSets, each BrokerSet has 3 Brokers
* 2 topics, each topic has four partitions, each partition has 3 replicas.
Expand Down
Loading