Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/91659.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 91659
summary: Avoid NPE when disassociateDeadNodes is executed for a node present in the
desired balance
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -415,24 +415,30 @@ private DiscoveryNode findRelocationTarget(
) {
for (final var nodeId : desiredNodeIds) {
// TODO consider ignored nodes here too?
if (nodeId.equals(shardRouting.currentNodeId()) == false) {
final var currentNode = routingNodes.node(nodeId);
final var decision = canAllocateDecider.apply(shardRouting, currentNode);
logger.trace("relocate {} to {}: {}", shardRouting, nodeId, decision);
if (decision.type() == Decision.Type.YES) {
return currentNode.node();
}
if (nodeId.equals(shardRouting.currentNodeId())) {
continue;
}
final var node = routingNodes.node(nodeId);
if (node == null) { // node left the cluster while reconciliation is still in progress
continue;
}
final var decision = canAllocateDecider.apply(shardRouting, node);
logger.trace("relocate {} to {}: {}", shardRouting, nodeId, decision);
if (decision.type() == Decision.Type.YES) {
return node.node();
}
}

return null;
}

private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) {
assert target != null : "Target node is not found";
return allocation.deciders().canAllocate(shardRouting, target, allocation);
}

private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, RoutingNode target) {
assert target != null : "Target node is not found";
return allocation.deciders().canForceAllocateDuringReplace(shardRouting, target, allocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
Expand Down Expand Up @@ -78,25 +79,22 @@
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_VERSION_CREATED;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;

public class DesiredBalanceReconcilerTests extends ESTestCase {

public void testNoChangesOnEmptyDesiredBalance() {
final var clusterState = DesiredBalanceComputerTests.createInitialClusterState(3);
final var routingAllocation = new RoutingAllocation(
new AllocationDeciders(List.of()),
clusterState.mutableRoutingNodes(),
clusterState,
ClusterInfo.EMPTY,
SnapshotShardSizeInfo.EMPTY,
0L
);
final var routingAllocation = createRoutingAllocationFrom(clusterState);

reconcile(routingAllocation, new DesiredBalance(1, Map.of()));
assertFalse(routingAllocation.routingNodesChanged());
Expand Down Expand Up @@ -319,10 +317,6 @@ public void testUnassignedShardsInterleaving() {

final var stateWithInitializingPrimaries = startInitializingShardsAndReroute(allocationService, clusterState);
for (final var indexRoutingTable : stateWithInitializingPrimaries.routingTable()) {
for (int i = 0; i < indexRoutingTable.size(); i++) {
final var indexShardRoutingTable = indexRoutingTable.shard(i);
}

for (int i = 0; i < indexRoutingTable.size(); i++) {
final var indexShardRoutingTable = indexRoutingTable.shard(i);
assertTrue(indexShardRoutingTable.primaryShard().initializing());
Expand Down Expand Up @@ -1021,6 +1015,49 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
assertThat(reroutedState.getRoutingNodes().node("node-1").shardsWithState(ShardRoutingState.RELOCATING), hasSize(1));
}

public void testDoNotRebalanceToTheNodeThatNoLongerExists() {

var indexMetadata = IndexMetadata.builder("index-1")
.settings(
Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
)
.system(randomBoolean())
.build();
final var index = indexMetadata.getIndex();
final var shardId = new ShardId(index, 0);

final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder()
// data-node-1 left the cluster
.localNodeId("data-node-2")
.masterNodeId("data-node-2")
.add(new DiscoveryNode("data-node-2", buildNewFakeTransportAddress(), Version.CURRENT))
)
.metadata(Metadata.builder().put(indexMetadata, true))
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(index).addShard(newShardRouting(shardId, "data-node-2", true, STARTED)))
)
.build();

final var allocation = createRoutingAllocationFrom(clusterState);
final var balance = new DesiredBalance(
1,
Map.of(shardId, new ShardAssignment(Set.of("data-node-1"), 1, 0, 0)) // shard is assigned to the node that has left
);

reconcile(allocation, balance);

assertThat(allocation.routingNodes().node("data-node-1"), nullValue());
assertThat(allocation.routingNodes().node("data-node-2"), notNullValue());
// shard is kept wherever until balance is recalculated
assertThat(allocation.routingNodes().node("data-node-2").getByShardId(shardId), notNullValue());
}

private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) {
new DesiredBalanceReconciler(desiredBalance, routingAllocation, new NodeAllocationOrdering()).run();
}
Expand All @@ -1037,6 +1074,17 @@ private static AllocationService createTestAllocationService(
);
}

private static RoutingAllocation createRoutingAllocationFrom(ClusterState clusterState) {
return new RoutingAllocation(
new AllocationDeciders(List.of()),
clusterState.mutableRoutingNodes(),
clusterState,
ClusterInfo.EMPTY,
SnapshotShardSizeInfo.EMPTY,
0L
);
}

private static AllocationService createTestAllocationService(
Consumer<RoutingAllocation> allocationConsumer,
ClusterInfoService clusterInfoService,
Expand Down Expand Up @@ -1116,19 +1164,16 @@ private static DesiredBalance desiredBalance(ClusterState clusterState, BiPredic
private static DiscoveryNodes discoveryNodes(int nodeCount) {
final var discoveryNodes = DiscoveryNodes.builder();
for (var i = 0; i < nodeCount; i++) {
final var transportAddress = buildNewFakeTransportAddress();
final var discoveryNode = new DiscoveryNode(
"node-" + i,
"node-" + i,
UUIDs.randomBase64UUID(random()),
transportAddress.address().getHostString(),
transportAddress.getAddress(),
transportAddress,
Map.of(),
Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE),
Version.CURRENT
discoveryNodes.add(
new DiscoveryNode(
"node-" + i,
"node-" + i,
buildNewFakeTransportAddress(),
Map.of(),
Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE),
Version.CURRENT
)
);
discoveryNodes.add(discoveryNode);
}
discoveryNodes.masterNodeId("node-0").localNodeId("node-0");
return discoveryNodes.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
Expand Down Expand Up @@ -377,14 +376,10 @@ public void onFailure(Exception e) {
}

private static DiscoveryNode createDiscoveryNode(String nodeId) {
var transportAddress = buildNewFakeTransportAddress();
return new DiscoveryNode(
nodeId,
nodeId,
UUIDs.randomBase64UUID(random()),
transportAddress.address().getHostString(),
transportAddress.getAddress(),
transportAddress,
buildNewFakeTransportAddress(),
Map.of(),
Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE),
Version.CURRENT
Expand Down