-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make BrokerSetAwareGoal configurable to accept partition coloring #1864
base: migrate_to_kafka_2_4
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
import com.linkedin.kafka.cruisecontrol.config.ReplicaToBrokerSetMappingPolicy; | ||
import com.linkedin.kafka.cruisecontrol.exception.BrokerSetResolutionException; | ||
import com.linkedin.kafka.cruisecontrol.exception.ReplicaToBrokerSetMappingException; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
|
@@ -84,6 +85,7 @@ public class BrokerSetAwareGoal extends AbstractGoal { | |
private ReplicaToBrokerSetMappingPolicy _replicaToBrokerSetMappingPolicy; | ||
private Set<String> _excludedTopics; | ||
private Set<String> _mustHaveTopicLeadersPerBroker; | ||
private boolean _isPartitionLevelBrokerSetAware; | ||
|
||
/** | ||
* Constructor for Broker Set Aware Goal. | ||
|
@@ -123,6 +125,8 @@ public boolean isHardGoal() { | |
*/ | ||
@Override | ||
protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException { | ||
_isPartitionLevelBrokerSetAware = _balancingConstraint.allowPartitionColoring(); | ||
|
||
// This is used to identify brokers not excluded for replica moves. | ||
final HashSet<Integer> brokersAllowedReplicaMove = GoalUtils.aliveBrokersNotExcludedForReplicaMove(clusterModel, optimizationOptions); | ||
if (brokersAllowedReplicaMove.isEmpty()) { | ||
|
@@ -138,7 +142,7 @@ protected void initGoalState(ClusterModel clusterModel, OptimizationOptions opti | |
// Whether the {@link com.linkedin.kafka.cruisecontrol.model.SortedReplicas} tracks only leader replicas or all replicas. | ||
boolean tracksOnlyLeaderReplicas = false; | ||
|
||
_mustHaveTopicLeadersPerBroker = Collections.unmodifiableSet( | ||
_mustHaveTopicLeadersPerBroker = _isPartitionLevelBrokerSetAware ? Collections.emptySet() : Collections.unmodifiableSet( | ||
Utils.getTopicNamesMatchedWithPattern(_balancingConstraint.topicsWithMinLeadersPerBrokerPattern(), clusterModel::topics)); | ||
_excludedTopics = Collections.unmodifiableSet( | ||
Stream.of(_mustHaveTopicLeadersPerBroker, optimizationOptions.excludedTopics()).flatMap(Set::stream).collect(Collectors.toSet())); | ||
|
@@ -178,28 +182,38 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op | |
// Sanity check: No replica should be moved to a broker, which used to host any replica of the same partition on its broken disk. | ||
GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name()); | ||
// Sanity check to confirm that the final distribution is broker set aware. | ||
ensureBrokerSetAware(clusterModel, optimizationOptions); | ||
ensureBrokerSetAware(clusterModel); | ||
if (_provisionResponse.status() != ProvisionStatus.OVER_PROVISIONED) { | ||
_provisionResponse = new ProvisionResponse(ProvisionStatus.RIGHT_SIZED); | ||
} | ||
finish(); | ||
} | ||
|
||
private void ensureBrokerSetAware(ClusterModel clusterModel, OptimizationOptions optimizationOptions) | ||
private void ensureBrokerSetAware(ClusterModel clusterModel) | ||
throws OptimizationFailureException { | ||
// Sanity check to confirm that the final distribution is brokerSet aware. | ||
for (Map.Entry<String, List<Partition>> partitionsByTopic : clusterModel.getPartitionsByTopic().entrySet()) { | ||
String topicName = partitionsByTopic.getKey(); | ||
if (!_excludedTopics.contains(topicName)) { | ||
List<Partition> partitions = partitionsByTopic.getValue(); | ||
Set<Broker> allBrokersForTopic = partitions.stream() | ||
.map(partition -> partition.partitionBrokers()) | ||
.flatMap(brokers -> brokers.stream()) | ||
.collect(Collectors.toSet()); | ||
// Checks if a topic's brokers do not all live in a single brokerSet | ||
if (_brokersByBrokerSet.values().stream().noneMatch(brokerSetBrokers -> brokerSetBrokers.containsAll(allBrokersForTopic))) { | ||
throw new OptimizationFailureException( | ||
String.format("[%s] Topic %s is not brokerSet-aware. brokers (%s).", name(), topicName, allBrokersForTopic)); | ||
if (_isPartitionLevelBrokerSetAware) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of doing this i would recommend introducing inheritance.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could also just introduce the validate() method in th ENUMs (with values PARTITION and TOPIC )
|
||
for (Partition partition: partitions) { | ||
// Checks if a partition's brokers do not all live in a single brokerSet | ||
if (_brokersByBrokerSet.values().stream().noneMatch(brokerSetBrokers -> brokerSetBrokers.containsAll(partition.partitionBrokers()))) { | ||
throw new OptimizationFailureException( | ||
String.format("[%s] Partition %s is not brokerSet-aware. brokers (%s).", name(), partition, partition.partitionBrokers())); | ||
} | ||
} | ||
} else { | ||
Set<Broker> allBrokersForTopic = partitions.stream() | ||
.map(Partition::partitionBrokers) | ||
.flatMap(Collection::stream) | ||
.collect(Collectors.toSet()); | ||
// Checks if a topic's brokers do not all live in a single brokerSet | ||
if (_brokersByBrokerSet.values().stream().noneMatch(brokerSetBrokers -> brokerSetBrokers.containsAll(allBrokersForTopic))) { | ||
throw new OptimizationFailureException( | ||
String.format("[%s] Topic %s is not brokerSet-aware. brokers (%s).", name(), topicName, allBrokersForTopic)); | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not use word
coloring
since it may be confusing. We have been referring to it asbrokerserAwareness