Skip to content

Commit

Permalink
Offline calculation of total shard per node and caching it for weight…
Browse files Browse the repository at this point in the history
… calculation inside LocalShardBalancer (opensearch-project#14675) (opensearch-project#14689)

(cherry picked from commit 6d0484a)

Signed-off-by: RS146BIJAY <rishavsagar4b1@gmail.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: kkewwei <kkewwei@163.com>
  • Loading branch information
2 people authored and kkewwei committed Jul 24, 2024
1 parent 35105a5 commit 4ab2ffa
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class LocalShardsBalancer extends ShardsBalancer {
private final float avgPrimaryShardsPerNode;
private final BalancedShardsAllocator.NodeSorter sorter;
private final Set<RoutingNode> inEligibleTargetNode;
private int totalShardCount = 0;

public LocalShardsBalancer(
Logger logger,
Expand Down Expand Up @@ -125,8 +126,7 @@ public float avgPrimaryShardsPerNode() {
*/
@Override
public float avgShardsPerNode() {
float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum);
return totalShards / nodes.size();
return totalShardCount / nodes.size();
}

/**
Expand Down Expand Up @@ -598,13 +598,15 @@ void moveShards() {
final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
final BalancedShardsAllocator.ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
sourceNode.removeShard(shardRouting);
--totalShardCount;
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(
shardRouting,
targetNode.getNodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
allocation.changes()
);
targetNode.addShard(relocatingShards.v2());
++totalShardCount;
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
Expand Down Expand Up @@ -724,6 +726,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
node.addShard(shard);
++totalShardCount;
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
}
Expand Down Expand Up @@ -815,6 +818,7 @@ void allocateUnassigned() {
);
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
minNode.addShard(shard);
++totalShardCount;
if (!shard.primary()) {
// copy over the same replica shards to the secondary array so they will get allocated
// in a subsequent iteration, allowing replicas of other shards to be allocated first
Expand Down Expand Up @@ -844,6 +848,7 @@ void allocateUnassigned() {
allocation.routingTable()
);
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
++totalShardCount;
} else {
if (logger.isTraceEnabled()) {
logger.trace("No Node found to assign shard [{}]", shard);
Expand Down Expand Up @@ -1011,18 +1016,21 @@ private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, Bala
}
final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
maxNode.removeShard(shard);
--totalShardCount;
long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);

if (decision.type() == Decision.Type.YES) {
/* only allocate on the cluster if we are not throttled */
logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1());
++totalShardCount;
return true;
} else {
/* allocate on the model even if throttled */
logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
assert decision.type() == Decision.Type.THROTTLE;
minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize));
++totalShardCount;
return false;
}
}
Expand Down

0 comments on commit 4ab2ffa

Please sign in to comment.