Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offline calculation of total shard across all node and caching it for weight calculation inside LocalShardBalancer #14675

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class LocalShardsBalancer extends ShardsBalancer {
private final float avgPrimaryShardsPerNode;
private final BalancedShardsAllocator.NodeSorter sorter;
private final Set<RoutingNode> inEligibleTargetNode;
private int totalShardCount = 0;

public LocalShardsBalancer(
Logger logger,
Expand Down Expand Up @@ -127,8 +128,7 @@ public float avgPrimaryShardsPerNode() {
*/
@Override
public float avgShardsPerNode() {
float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum);
return totalShards / nodes.size();
return totalShardCount / nodes.size();
}

/**
Expand Down Expand Up @@ -600,13 +600,15 @@ void moveShards() {
final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
final BalancedShardsAllocator.ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
sourceNode.removeShard(shardRouting);
--totalShardCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement is definitely good. I have just few concerns around maintaining inc/dec counts of total shards along with every remove/add shard operation.

  1. Can you please move the two operations (add/remove and inc/dec) to a single method since both are necessary now? Something like this.
  2. Also, it looks like there is a need of an invariant assertion now to be invoked on start and end of the moveShards method. Ref CP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can you please check and confirm if tests are covering all add/remove shards scenarios?

Copy link
Member

@shwetathareja shwetathareja Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vikasvb90 what would you assert on in moveShards? I agree we can add more unit test to cover possible cases for addShard/ removeShard combination (in case those are missing)

Copy link
Contributor

@vikasvb90 vikasvb90 Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to ensure tests break if a addShard or removeShard is used without changing totalShardCount.

Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(
shardRouting,
targetNode.getNodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
allocation.changes()
);
targetNode.addShard(relocatingShards.v2());
++totalShardCount;
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
Expand Down Expand Up @@ -726,6 +728,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
node.addShard(shard);
++totalShardCount;
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
}
Expand Down Expand Up @@ -816,6 +819,7 @@ void allocateUnassigned() {
);
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
minNode.addShard(shard);
++totalShardCount;
if (!shard.primary()) {
// copy over the same replica shards to the secondary array so they will get allocated
// in a subsequent iteration, allowing replicas of other shards to be allocated first
Expand Down Expand Up @@ -845,6 +849,7 @@ void allocateUnassigned() {
allocation.routingTable()
);
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
++totalShardCount;
} else {
if (logger.isTraceEnabled()) {
logger.trace("No Node found to assign shard [{}]", shard);
Expand Down Expand Up @@ -1012,18 +1017,21 @@ private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, Bala
}
final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
maxNode.removeShard(shard);
--totalShardCount;
long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);

if (decision.type() == Decision.Type.YES) {
/* only allocate on the cluster if we are not throttled */
logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1());
++totalShardCount;
return true;
} else {
/* allocate on the model even if throttled */
logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
assert decision.type() == Decision.Type.THROTTLE;
minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize));
++totalShardCount;
return false;
}
}
Expand Down
Loading