Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Allocation and rebalancing based on average primary shard count per index #6422

Merged
Prev Previous commit
Next Next commit
Add average primary shard count constraint for allocation and rebalan…
…cing operation

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Mar 1, 2023
commit b1d432745177ec4660318cbb202dd7a117d6f930

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.opensearch.cluster.block.ClusterBlock;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.node.DiscoveryNodeFilters;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.IndexMetadataUpdater;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.ImmutableOpenIntMap;
Expand Down Expand Up @@ -81,7 +80,6 @@
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -91,7 +89,6 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM;
import static org.opensearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR;
Expand Down Expand Up @@ -916,27 +913,6 @@ public Index getResizeSourceIndex() {
: null;
}

public boolean isSegRepEnabled() {
return INDEX_REPLICATION_TYPE_SETTING.exists(settings)
? settings.get(INDEX_REPLICATION_TYPE_SETTING.getKey()).equals(ReplicationType.SEGMENT.toString())
: false;
}

private static final Comparator<ShardRouting> PRIMARY_FIRST = Comparator.comparing(ShardRouting::primary).reversed();

public Iterable<ShardRouting> getIndexShardRoutingIterator(Stream<ShardRouting> routingStream) {
if (this.isSegRepEnabled()) {
routingStream = routingStream.sorted(PRIMARY_FIRST);
}
return routingStream::iterator;
}

// For segrep shards, move primary shard first in place of replica/random shard, to have better primary balance
// This can be applied for all shard types but can be baked initially with segrep enabled indices only
public boolean movePrimaryFirst(ShardRouting shardRouting) {
return this.isSegRepEnabled() && shardRouting.primary();
}

ImmutableOpenMap<String, DiffableStringMap> getCustomData() {
return this.customData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,49 @@ public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
return shards;
}


/**
* Determine the shards of an index with a specific state
* @param index id of the index
* @param states set of states which should be listed
* @return a list of shards
*/
public List<ShardRouting> primaryShardsWithState(String index, ShardRoutingState... states) {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
List<ShardRouting> shards = new ArrayList<>();

if (states.length == 1) {
if (states[0] == ShardRoutingState.INITIALIZING) {
for (ShardRouting shardEntry : initializingShards) {
if (shardEntry.primary() == false || shardEntry.getIndexName().equals(index) == false) {
continue;
}
shards.add(shardEntry);
}
return shards;
} else if (states[0] == ShardRoutingState.RELOCATING) {
for (ShardRouting shardEntry : relocatingShards) {
if (shardEntry.primary() == false || shardEntry.getIndexName().equals(index) == false) {
continue;
}
shards.add(shardEntry);
}
return shards;
}
}

for (ShardRouting shardEntry : this) {
if (!shardEntry.getIndexName().equals(index) || shardEntry.primary() == false) {
continue;
}
for (ShardRoutingState state : states) {
if (shardEntry.state() == state) {
shards.add(shardEntry);
}
}
}
return shards;
}

/**
* Determine the shards of an index with a specific state
* @param index id of the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,55 @@
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer;

import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID;
import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_WEIGHT;
import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.isIndexPrimaryShardsPerNodeBreached;

/**
* Allocation constraints specify conditions which, if breached, reduce the
* priority of a node for receiving shard allocations.
* priority of a node for receiving unassigned shard allocations.
*
* @opensearch.internal
*/
public class AllocationConstraints {
public final long CONSTRAINT_WEIGHT = 1000000L;
private List<Predicate<ConstraintParams>> constraintPredicates;

public AllocationConstraints() {
this.constraintPredicates = new ArrayList<>(1);
this.constraintPredicates.add(isIndexShardsPerNodeBreached());
}

class ConstraintParams {
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
private String index;

ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
this.balancer = balancer;
this.node = node;
this.index = index;
}
}
public final static long INDEX_SHARD_PER_NODE_BREACH_WEIGHT = 1000000L;

/**
* Evaluates configured allocation constraint predicates for given node - index
* combination; and returns a weight value based on the number of breached
* constraints.
*
* Constraint weight should be added to the weight calculated via weight
* function, to reduce priority of allocating on nodes with breached
* constraints.
*
* This weight function is used only in case of unassigned shards to avoid overloading a newly added node.
* Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this function.
* This constraint is only applied for unassigned shards to avoid overloading a newly added node.
* Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this constraint.
*/
public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.breach.constraint";
private Map<String, Constraint> constraintSet;
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved

public void updateAllocationConstraint(String constraint, boolean enable) {
this.constraintSet.get(constraint).setEnable(enable);
}

public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
int constraintsBreached = 0;
ConstraintParams params = new ConstraintParams(balancer, node, index);
for (Predicate<ConstraintParams> predicate : constraintPredicates) {
if (predicate.test(params)) {
constraintsBreached++;
}
}
return constraintsBreached * CONSTRAINT_WEIGHT;
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
return params.weight(constraintSet);
}

public AllocationConstraints() {
this.constraintSet = new HashMap<>();
this.constraintSet.putIfAbsent(
INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID,
new Constraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, isIndexShardsPerNodeBreached(), INDEX_SHARD_PER_NODE_BREACH_WEIGHT)
);
this.constraintSet.putIfAbsent(
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID,
new Constraint(
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID,
isIndexPrimaryShardsPerNodeBreached(),
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_WEIGHT
)
);
}

/**
Expand All @@ -76,12 +73,11 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
* This constraint is breached when balancer attempts to allocate more than
* average shards per index per node.
*/
private Predicate<ConstraintParams> isIndexShardsPerNodeBreached() {
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached() {
return (params) -> {
int currIndexShardsOnNode = params.node.numShards(params.index);
int allowedIndexShardsPerNode = (int) Math.ceil(params.balancer.avgShardsPerNode(params.index));
int currIndexShardsOnNode = params.getNode().numShards(params.getIndex());
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()));
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer;

import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;

/**
* Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or
* re-balancing target used in {@link RebalanceConstraints}
*
* @opensearch.internal
*/
public class Constraint implements Predicate<Constraint.ConstraintParams> {
private String name;

private long weight;

boolean enable;
private Predicate<ConstraintParams> predicate;

public Constraint(String name, Predicate<ConstraintParams> constraintPredicate, long weight) {
this.name = name;
this.weight = weight;
this.predicate = constraintPredicate;
this.enable = false;
}

@Override
public boolean test(ConstraintParams constraintParams) {
return this.enable && predicate.test(constraintParams);
}

public String getName() {
return name;
}

public long getWeight() {
return weight;
}

public void setEnable(boolean enable) {
this.enable = enable;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Constraint that = (Constraint) o;
return name.equals(that.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}

static class ConstraintParams {
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
private String index;

ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
this.balancer = balancer;
this.node = node;
this.index = index;
}

public ShardsBalancer getBalancer() {
return balancer;
}

public BalancedShardsAllocator.ModelNode getNode() {
return node;
}

public String getIndex() {
return index;
}

/**
* Evaluates configured allocation constraint predicates for given node - index
* combination; and returns a weight value based on the number of breached
* constraints.
* <p>
* Constraint weight should be added to the weight calculated via weight
* function, to reduce priority of allocating on nodes with breached
* constraints.
* </p>
*/
public long weight(Map<String, Constraint> constraintSet) {
long totalConstraintWeight = 0;
for (Constraint constraint : constraintSet.values()) {
if (constraint.test(this)) {
totalConstraintWeight += constraint.getWeight();
}
}
return totalConstraintWeight;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE;

public class RebalanceConstraints {
public final static String PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID = PREFER_PRIMARY_SHARD_BALANCE.getKey();
public final static long PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_WEIGHT = 10000L;
private Map<String, Constraint> constraintSet;

public RebalanceConstraints() {
this.constraintSet = new HashMap<>();
this.constraintSet.putIfAbsent(
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID,
new Constraint(
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID,
isIndexPrimaryShardsPerNodeBreached(),
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_WEIGHT
)
);
}

public void updateRebalanceConstraint(String constraint, boolean enable) {
this.constraintSet.get(constraint).setEnable(enable);
}

public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
return params.weight(constraintSet);
}

/**
* When primary balance is preferred, add node constraint of average primary shards per node to give the node a
* higher weight resulting in lesser chances of being target of unassigned shard allocation or rebalancing target node
*/
public static Predicate<Constraint.ConstraintParams> isIndexPrimaryShardsPerNodeBreached() {
return (params) -> {
int currPrimaryShardsOnNode = params.getNode().numPrimaryShards(params.getIndex());
int allowedPrimaryShardsPerNode = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()));
return currPrimaryShardsOnNode > allowedPrimaryShardsPerNode;
};
}
}
Loading