Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make StickyRebalanceStrategy topology aware #2944

Open
wants to merge 4 commits into
base: helix-gateway-service
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,57 @@

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

import org.apache.helix.controller.rebalancer.topology.Topology;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.InstanceConfig;

/**
* A Node is an entity that can serve capacity recording purpose. It has a capacity and knowledge
* of partitions assigned to it, so it can decide if it can receive additional partitions.
*/
public class CapacityNode {
public class CapacityNode implements Comparable<CapacityNode> {
private int _currentlyAssigned;
private int _capacity;
private final String _id;
private final String _instanceName;
private final String _logicaId;
private final String _faultZone;
private final Map<String, Set<String>> _partitionMap;

public CapacityNode(String id) {
_partitionMap = new HashMap<>();
_currentlyAssigned = 0;
this._id = id;
/**
* Constructor used for non-topology-aware use case
* @param instanceName The instance name of this node
* @param capacity The capacity of this node
*/
public CapacityNode(String instanceName, int capacity) {
this._instanceName = instanceName;
this._logicaId = null;
this._faultZone = null;
this._partitionMap = new HashMap<>();
this._capacity = capacity;
this._currentlyAssigned = 0;
}

/**
* Constructor used for non-topology-aware use case
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is for topology aware?
also can we have one constructor call another?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will make this change

* @param instanceName The instance name of this node
* @param clusterConfig The cluster config for current helix cluster
* @param clusterTopologyConfig The cluster topology config for current helix cluster
* @param instanceConfig The instance config for current instance
*/
public CapacityNode(String instanceName, ClusterConfig clusterConfig,
ClusterTopologyConfig clusterTopologyConfig, InstanceConfig instanceConfig) {
this._instanceName = instanceName;
this._logicaId = clusterTopologyConfig != null ? instanceConfig.getLogicalId(
clusterTopologyConfig.getEndNodeType()) : instanceName;
this._faultZone = computeFaultZone(clusterConfig, instanceConfig);
this._partitionMap = new HashMap<>();
this._capacity = clusterConfig.getGlobalMaxPartitionAllowedPerInstance();
this._currentlyAssigned = 0;
}

/**
Expand Down Expand Up @@ -80,11 +114,27 @@ public void setCapacity(int capacity) {
}

/**
* Get the ID of this node
* @return The ID of this node
* Get the instance name of this node
* @return The instance name of this node
*/
public String getId() {
return _id;
public String getInstanceName() {
return _instanceName;
}

/**
* Get the logical id of this node
* @return The logical id of this node
*/
public String getLogicalId() {
return _logicaId;
}

/**
* Get the fault zone of this node
* @return The fault zone of this node
*/
public String getFaultZone() {
return _faultZone;
}

/**
Expand All @@ -98,8 +148,40 @@ public int getCurrentlyAssigned() {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("##########\nname=").append(_id).append("\nassigned:").append(_currentlyAssigned)
.append("\ncapacity:").append(_capacity);
sb.append("##########\nname=").append(_instanceName).append("\nassigned:")
.append(_currentlyAssigned).append("\ncapacity:").append(_capacity).append("\nlogicalId:")
.append(_logicaId).append("\nfaultZone:").append(_faultZone);
return sb.toString();
}

@Override
public int compareTo(CapacityNode o) {
if (_logicaId != null) {
return _logicaId.compareTo(o.getLogicalId());
}
return _instanceName.compareTo(o.getInstanceName());
}

/**
* Computes the fault zone id based on the domain and fault zone type when topology is enabled.
* For example, when
* the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function
* returns "2".
* If cannot find the fault zone type, this function leaves the fault zone id as the instance name.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use logical ID as default fault zone if we can't find the fault zone type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is copied from the waged rebalancer: https://github.com/apache/helix/blob/master/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java#L363-L383

Any pointers on why logical ID is a better candidate here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of swap, using logic id will make the swap in/out instances in the same fault zone. But I also agree that this should align with other rebalancers' behavior.
What do you think about if we add a todo and consider changing both places later?

* TODO: change the return value to logical id when no fault zone type found. Also do the same for
* waged rebalancer in helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
*/
private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to create a util class with this logic instead of copying it in two places and having to maintain both methods?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth while to create a Node base class(different than the one under topology package) that extracts common logic from AssignableNode and CapacityNode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I prefer to do in later iterations: consolidate CapacityNode + AssignableNode and apply the changes @xyuanlu mentioned above.

LinkedHashMap<String, String> instanceTopologyMap =
Topology.computeInstanceTopologyMap(clusterConfig, instanceConfig.getInstanceName(),
instanceConfig, true /*earlyQuitTillFaultZone*/);

StringBuilder faultZoneStringBuilder = new StringBuilder();
for (Map.Entry<String, String> entry : instanceTopologyMap.entrySet()) {
faultZoneStringBuilder.append(entry.getValue());
faultZoneStringBuilder.append('/');
}
faultZoneStringBuilder.setLength(faultZoneStringBuilder.length() - 1);
return faultZoneStringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.MissingTopStateRecord;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.CustomizedState;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.CustomizedView;
Expand Down Expand Up @@ -190,7 +192,7 @@ public synchronized void refresh(HelixDataAccessor accessor) {

if (getClusterConfig() != null
&& getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance());
buildSimpleCapacityMap();
// Remove all cached IdealState because it is a global computation cannot partially be
// performed for some resources. The computation is simple as well not taking too much resource
// to recompute the assignments.
Expand Down Expand Up @@ -573,11 +575,16 @@ public WagedInstanceCapacity getWagedInstanceCapacity() {
return _wagedInstanceCapacity;
}

private void buildSimpleCapacityMap(int globalMaxPartitionAllowedPerInstance) {
private void buildSimpleCapacityMap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a general comment that we don't need to address right away, but we may want to address before merging this to master.

Should we building the capacity nodes in the resource controller data provider? It seems like an implementation detail for one specific rebalance strategy. If there are no resources in the cluster which use StickyRebalanceStrategy then this is not needed. Why don't we move the creation of simpleCapacitySet to the rebalance strategy?

If this is here because StickyRebalanceStrategy relies on global node capacity, maybe we should be implementing StatefulRebalancer interface instead of using single resources rebalance. That has method computeNewIdealStates for all resources and is intended to be implemented for global rebalancer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is here because StickyRebalanceStrategy relies on global node capacity

Yeah, this is exact the reason - we need to populate the node usage globally before the rebalance stage. StatefulRebalancer seems for rebalancer, but here we need the rebalance strategy. To me the data provider is the glue for rebalancer and rebalance strategy.

ClusterConfig clusterConfig = getClusterConfig();
ClusterTopologyConfig clusterTopologyConfig =
ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
Map<String, InstanceConfig> instanceConfigMap = getAssignableInstanceConfigMap();
_simpleCapacitySet = new HashSet<>();
for (String instance : getEnabledLiveInstances()) {
CapacityNode capacityNode = new CapacityNode(instance);
capacityNode.setCapacity(globalMaxPartitionAllowedPerInstance);
for (String instanceName : getAssignableInstances()) {
CapacityNode capacityNode =
new CapacityNode(instanceName, clusterConfig, clusterTopologyConfig,
instanceConfigMap.getOrDefault(instanceName, new InstanceConfig(instanceName)));
_simpleCapacitySet.add(capacityNode);
}
}
Expand All @@ -591,7 +598,7 @@ public void populateSimpleCapacitySetUsage(final Set<String> resourceNameSet,
// Convert the assignableNodes to map for quick lookup
Map<String, CapacityNode> simpleCapacityMap = new HashMap<>();
for (CapacityNode node : _simpleCapacitySet) {
simpleCapacityMap.put(node.getId(), node);
simpleCapacityMap.put(node.getInstanceName(), node);
}
for (String resourceName : resourceNameSet) {
// Process current state mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
*/

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -74,34 +73,49 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
// Note the liveNodes parameter here might be processed within the rebalancer, e.g. filter based on tags
Set<CapacityNode> assignableNodeSet = new HashSet<>(clusterData.getSimpleCapacitySet());
Set<String> liveNodesSet = new HashSet<>(liveNodes);
assignableNodeSet.removeIf(n -> !liveNodesSet.contains(n.getId()));
assignableNodeSet.removeIf(n -> !liveNodesSet.contains(n.getInstanceName()));

// Convert the assignableNodes to map for quick lookup
Map<String, CapacityNode> assignableNodeMap = assignableNodeSet.stream()
.collect(Collectors.toMap(CapacityNode::getInstanceName, node -> node));

// Populate valid state map given current mapping
Map<String, Set<String>> stateMap =
populateValidAssignmentMapFromCurrentMapping(currentMapping, assignableNodeSet);
populateValidAssignmentMapFromCurrentMapping(currentMapping, assignableNodeMap);

if (logger.isDebugEnabled()) {
logger.debug("currentMapping: {}", currentMapping);
logger.debug("stateMap: {}", stateMap);
}

// Sort the assignable nodes by id
List<CapacityNode> assignableNodeList =
assignableNodeSet.stream().sorted(Comparator.comparing(CapacityNode::getId))
List<CapacityNode> assignableNodeList = assignableNodeSet.stream().sorted()
.collect(Collectors.toList());

// Assign partitions to node by order.
for (int i = 0, index = 0; i < _partitions.size(); i++) {
int startIndex = index;
Map<String, Integer> currentFaultZoneCountMap = new HashMap<>();
int remainingReplica = _statesReplicaCount;
if (stateMap.containsKey(_partitions.get(i))) {
remainingReplica = remainingReplica - stateMap.get(_partitions.get(i)).size();
Set<String> existingReplicas = stateMap.get(_partitions.get(i));
remainingReplica = remainingReplica - existingReplicas.size();
for (String instanceName : existingReplicas) {
String faultZone = assignableNodeMap.get(instanceName).getFaultZone();
currentFaultZoneCountMap.put(faultZone,
currentFaultZoneCountMap.getOrDefault(faultZone, 0) + 1);
}
}
for (int j = 0; j < remainingReplica; j++) {
while (index - startIndex < assignableNodeList.size()) {
CapacityNode node = assignableNodeList.get(index++ % assignableNodeList.size());
if (node.canAdd(_resourceName, _partitions.get(i))) {
stateMap.computeIfAbsent(_partitions.get(i), m -> new HashSet<>()).add(node.getId());
if (this.canAdd(node, _partitions.get(i), currentFaultZoneCountMap)) {
stateMap.computeIfAbsent(_partitions.get(i), m -> new HashSet<>())
.add(node.getInstanceName());
if (node.getFaultZone() != null) {
currentFaultZoneCountMap.put(node.getFaultZone(),
currentFaultZoneCountMap.getOrDefault(node.getFaultZone(), 0) + 1);
}
break;
}
}
Expand All @@ -126,16 +140,13 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
* Populates a valid state map from the current mapping, filtering out invalid nodes.
*
* @param currentMapping the current mapping of partitions to node states
* @param assignableNodes the list of nodes that can be assigned
* @param assignableNodeMap the map of instance name -> nodes that can be assigned
* @return a map of partitions to valid nodes
*/
private Map<String, Set<String>> populateValidAssignmentMapFromCurrentMapping(
final Map<String, Map<String, String>> currentMapping,
final Set<CapacityNode> assignableNodes) {
final Map<String, CapacityNode> assignableNodeMap) {
Map<String, Set<String>> validAssignmentMap = new HashMap<>();
// Convert the assignableNodes to map for quick lookup
Map<String, CapacityNode> assignableNodeMap =
assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId, node -> node));
if (currentMapping != null) {
for (Map.Entry<String, Map<String, String>> entry : currentMapping.entrySet()) {
String partition = entry.getKey();
Expand Down Expand Up @@ -167,4 +178,22 @@ private boolean isValidNodeAssignment(final String partition, final String nodeI
return node != null && (node.hasPartition(_resourceName, partition) || node.canAdd(
_resourceName, partition));
}

/**
* Checks if it's valid to assign the partition to node
*
* @param node node to assign partition
* @param partition partition name
* @param currentFaultZoneCountMap the map of fault zones -> count
* @return true if it's valid to assign the partition to node, false otherwise
*/
protected boolean canAdd(CapacityNode node, String partition,
Map<String, Integer> currentFaultZoneCountMap) {
// Valid assignment when following conditions match:
// 1. Replica is not within the same fault zones of other replicas
// 2. Node has capacity to hold the replica
return !currentFaultZoneCountMap.containsKey(node.getFaultZone()) && node.canAdd(_resourceName,
partition);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,6 @@ protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient,
configAccessor.setClusterConfig(clusterName, clusterConfig);
}

protected void setGlobalMaxPartitionAllowedPerInstanceInCluster(HelixZkClient zkClient,
String clusterName, int maxPartitionAllowed) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
clusterConfig.setGlobalMaxPartitionAllowedPerInstance(maxPartitionAllowed);
configAccessor.setClusterConfig(clusterName, clusterConfig);
}

protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ public void testAssignmentWithGlobalPartitionLimit() {

Set<CapacityNode> capacityNodeSet = new HashSet<>();
for (int i = 0; i < 5; i++) {
CapacityNode capacityNode = new CapacityNode("Node-" + i);
capacityNode.setCapacity(1);
CapacityNode capacityNode = new CapacityNode("Node-" + i, 1);
capacityNodeSet.add(capacityNode);
}

List<String> liveNodes =
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
capacityNodeSet.stream().map(CapacityNode::getInstanceName).collect(Collectors.toList());

List<String> partitions = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Expand Down Expand Up @@ -97,13 +96,12 @@ public void testStickyAssignment() {

Set<CapacityNode> capacityNodeSet = new HashSet<>();
for (int i = 0; i < nNode; i++) {
CapacityNode capacityNode = new CapacityNode("Node-" + i);
capacityNode.setCapacity(1);
CapacityNode capacityNode = new CapacityNode("Node-" + i, 1);
capacityNodeSet.add(capacityNode);
}

List<String> liveNodes =
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
capacityNodeSet.stream().map(CapacityNode::getInstanceName).collect(Collectors.toList());

List<String> partitions = new ArrayList<>();
for (int i = 0; i < nPartitions; i++) {
Expand Down Expand Up @@ -150,40 +148,39 @@ public void testStickyAssignmentMultipleTimes() {

Set<CapacityNode> capacityNodeSet = new HashSet<>();
for (int i = 0; i < nNode; i++) {
CapacityNode capacityNode = new CapacityNode("Node-" + i);
capacityNode.setCapacity(1);
CapacityNode capacityNode = new CapacityNode("Node-" + i, 1);
capacityNodeSet.add(capacityNode);
}

List<String> liveNodes =
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
capacityNodeSet.stream().map(CapacityNode::getInstanceName).collect(Collectors.toList());

List<String> partitions = new ArrayList<>();
for (int i = 0; i < nPartitions; i++) {
partitions.add(TEST_RESOURCE_PREFIX + i);
}
when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);

StickyRebalanceStrategy greedyRebalanceStrategy = new StickyRebalanceStrategy();
greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1);
StickyRebalanceStrategy stickyRebalanceStrategy = new StickyRebalanceStrategy();
stickyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1);
// First round assignment computation:
// 1. Without previous assignment (currentMapping is null)
// 2. Without enough assignable nodes
ZNRecord firstRoundShardAssignment =
greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache);
stickyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache);

// Assert only 3 partitions are fulfilled with assignment
Assert.assertEquals(firstRoundShardAssignment.getListFields().entrySet().stream()
.filter(e -> e.getValue().size() == nReplicas).count(), 3);

// Assign 4 more nodes which is used in second round assignment computation
for (int i = nNode; i < nNode + 4; i++) {
CapacityNode capacityNode = new CapacityNode("Node-" + i);
capacityNode.setCapacity(1);
CapacityNode capacityNode = new CapacityNode("Node-" + i, 1);
capacityNodeSet.add(capacityNode);
}

liveNodes = capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
liveNodes =
capacityNodeSet.stream().map(CapacityNode::getInstanceName).collect(Collectors.toList());

// Populate previous assignment (currentMapping) with first round assignment computation result
Map<String, Map<String, String>> currentMapping = new HashMap<>();
Expand All @@ -199,7 +196,7 @@ public void testStickyAssignmentMultipleTimes() {
// 1. With previous assignment (currentMapping)
// 2. With enough assignable nodes
ZNRecord secondRoundShardAssignment =
greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, currentMapping,
stickyRebalanceStrategy.computePartitionAssignment(null, liveNodes, currentMapping,
clusterDataCache);

// Assert all partitions have been assigned with enough replica
Expand Down
Loading
Loading