Skip to content

Commit 532fa06

Browse files
committed
Internal: AsyncShardFetch can hang if there are new nodes in cluster state
The AsyncShardFetch retrieves shard information from the different nodes in order to detirment the best location for unassigned shards. The class uses TransportNodesListGatewayStartedShards and TransportNodesListShardStoreMetaData in order to fetch this information. These actions, inherit from TransportNodesAction and are activated using a list of node ids. Those node ids are extracted from the cluster state that is used to assign shards. If we perform a reroute and adding new news in the same cluster state update task, it is possible that the AsyncShardFetch administration is based on a different cluster state then the one used by TransportNodesAction to resolve nodes. This can cause a problem since TransportNodesAction filters away unkown nodes, causing the administration in AsyncShardFetch to get confused. This commit fixes this allowing to override node resolving in TransportNodesAction and uses the exact node ids transfered by AsyncShardFetch
1 parent 93beea1 commit 532fa06

File tree

6 files changed

+115
-15
lines changed

6 files changed

+115
-15
lines changed

core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
8383
return nodesIds;
8484
}
8585

86+
protected String[] resolveNodes(NodesRequest request, ClusterState clusterState) {
87+
return clusterState.nodes().resolveNodesIds(request.nodesIds());
88+
}
89+
90+
8691
private class AsyncAction {
8792

8893
private final NodesRequest request;
@@ -96,7 +101,7 @@ private AsyncAction(NodesRequest request, ActionListener<NodesResponse> listener
96101
this.request = request;
97102
this.listener = listener;
98103
clusterState = clusterService.state();
99-
String[] nodesIds = clusterState.nodes().resolveNodesIds(request.nodesIds());
104+
String[] nodesIds = resolveNodes(request, clusterState);
100105
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
101106
this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
102107
}

core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,19 +165,29 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, MetaData meta
165165
protected synchronized void processAsyncFetch(ShardId shardId, T[] responses, FailedNodeException[] failures) {
166166
if (closed) {
167167
// we are closed, no need to process this async fetch at all
168+
logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type);
168169
return;
169170
}
171+
logger.trace("{} processing fetched [{}] results", shardId, type);
172+
170173
if (responses != null) {
171174
for (T response : responses) {
172175
NodeEntry<T> nodeEntry = cache.get(response.getNode().getId());
173176
// if the entry is there, and not marked as failed already, process it
174-
if (nodeEntry != null && nodeEntry.isFailed() == false) {
177+
if (nodeEntry == null) {
178+
continue;
179+
}
180+
if (nodeEntry.isFailed()) {
181+
logger.trace("{} node {} has failed for [{}] (failure [{}])", shardId, nodeEntry.getNodeId(), type, nodeEntry.getFailure());
182+
} else {
183+
logger.trace("{} marking {} as done for [{}]", shardId, nodeEntry.getNodeId(), type);
175184
nodeEntry.doneFetching(response);
176185
}
177186
}
178187
}
179188
if (failures != null) {
180189
for (FailedNodeException failure : failures) {
190+
logger.trace("{} processing failure {} for [{}]", shardId, failure, type);
181191
NodeEntry<T> nodeEntry = cache.get(failure.nodeId());
182192
// if the entry is there, and not marked as failed already, process it
183193
if (nodeEntry != null && nodeEntry.isFailed() == false) {
@@ -253,6 +263,7 @@ private boolean hasAnyNodeFetching(Map<String, NodeEntry<T>> shardCache) {
253263
// visible for testing
254264
void asyncFetch(final ShardId shardId, final String[] nodesIds, final MetaData metaData) {
255265
IndexMetaData indexMetaData = metaData.index(shardId.getIndex());
266+
logger.trace("{} fetching [{}] from {}", shardId, type, nodesIds);
256267
action.list(shardId, indexMetaData, nodesIds, new ActionListener<BaseNodesResponse<T>>() {
257268
@Override
258269
public void onResponse(BaseNodesResponse<T> response) {

core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.support.nodes.*;
2828
import org.elasticsearch.cluster.ClusterName;
2929
import org.elasticsearch.cluster.ClusterService;
30+
import org.elasticsearch.cluster.ClusterState;
3031
import org.elasticsearch.cluster.metadata.IndexMetaData;
3132
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.common.inject.Inject;
@@ -68,6 +69,13 @@ public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds
6869
execute(new Request(shardId, indexMetaData.getUUID(), nodesIds), listener);
6970
}
7071

72+
@Override
73+
protected String[] resolveNodes(Request request, ClusterState clusterState) {
74+
// default implementation may filter out non existent nodes. it's important to keep exactly the ids
75+
// we were given for accounting on the caller
76+
return request.nodesIds();
77+
}
78+
7179
@Override
7280
protected boolean transportCompress() {
7381
return true; // this can become big...

core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.support.nodes.*;
2828
import org.elasticsearch.cluster.ClusterName;
2929
import org.elasticsearch.cluster.ClusterService;
30+
import org.elasticsearch.cluster.ClusterState;
3031
import org.elasticsearch.cluster.metadata.IndexMetaData;
3132
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.common.inject.Inject;
@@ -81,6 +82,13 @@ public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds
8182
execute(new Request(shardId, false, nodesIds), listener);
8283
}
8384

85+
@Override
86+
protected String[] resolveNodes(Request request, ClusterState clusterState) {
87+
// default implementation may filter out non existent nodes. it's important to keep exactly the ids
88+
// we were given for accounting on the caller
89+
return request.nodesIds();
90+
}
91+
8492
@Override
8593
protected NodeRequest newNodeRequest(String nodeId, Request request) {
8694
return new NodeRequest(nodeId, request);

core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.index.query.QueryBuilders;
3232
import org.elasticsearch.test.ElasticsearchIntegrationTest;
3333
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
34+
import org.elasticsearch.test.junit.annotations.TestLogging;
3435
import org.junit.Test;
3536

3637
import java.util.concurrent.ExecutionException;
@@ -46,6 +47,7 @@
4647
public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
4748

4849
@Test
50+
@TestLogging("cluster.service:TRACE,discovery.zen:TRACE,gateway:TRACE,transport.tracer:TRACE")
4951
public void simpleMinimumMasterNodes() throws Exception {
5052

5153
Settings settings = settingsBuilder()

core/src/test/java/org/elasticsearch/indices/state/RareClusterStateTests.java

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,27 @@
2020
package org.elasticsearch.indices.state;
2121

2222
import com.google.common.collect.ImmutableMap;
23-
23+
import org.elasticsearch.Version;
2424
import org.elasticsearch.action.ActionListener;
2525
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
2626
import org.elasticsearch.action.index.IndexResponse;
27-
import org.elasticsearch.cluster.ClusterInfo;
28-
import org.elasticsearch.cluster.ClusterState;
29-
import org.elasticsearch.cluster.DiskUsage;
27+
import org.elasticsearch.cluster.*;
28+
import org.elasticsearch.cluster.block.ClusterBlocks;
3029
import org.elasticsearch.cluster.metadata.IndexMetaData;
3130
import org.elasticsearch.cluster.metadata.MappingMetaData;
31+
import org.elasticsearch.cluster.metadata.MetaData;
32+
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.cluster.node.DiscoveryNodes;
3334
import org.elasticsearch.cluster.routing.RoutingNodes;
3435
import org.elasticsearch.cluster.routing.RoutingTable;
3536
import org.elasticsearch.cluster.routing.ShardRouting;
37+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3638
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
3739
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
3840
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
3941
import org.elasticsearch.common.collect.ImmutableOpenMap;
4042
import org.elasticsearch.common.settings.Settings;
43+
import org.elasticsearch.common.transport.DummyTransportAddress;
4144
import org.elasticsearch.common.unit.TimeValue;
4245
import org.elasticsearch.discovery.DiscoveryModule;
4346
import org.elasticsearch.discovery.DiscoverySettings;
@@ -52,19 +55,12 @@
5255
import org.junit.Test;
5356

5457
import java.io.IOException;
55-
import java.util.Arrays;
56-
import java.util.HashSet;
57-
import java.util.List;
58-
import java.util.Map;
59-
import java.util.Set;
58+
import java.util.*;
6059
import java.util.concurrent.atomic.AtomicReference;
6160

6261
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6362
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
64-
import static org.hamcrest.Matchers.equalTo;
65-
import static org.hamcrest.Matchers.hasItem;
66-
import static org.hamcrest.Matchers.hasSize;
67-
import static org.hamcrest.Matchers.instanceOf;
63+
import static org.hamcrest.Matchers.*;
6864

6965
/**
7066
*/
@@ -102,6 +98,72 @@ public void testUnassignedShardAndEmptyNodesInRoutingTable() throws Exception {
10298
allocator.allocateUnassigned(routingAllocation);
10399
}
104100

101+
@Test
102+
@TestLogging("gateway:TRACE")
103+
public void testAssignmentWithJustAddedNodes() throws Exception {
104+
internalCluster().startNode();
105+
final String index = "index";
106+
prepareCreate(index).setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
107+
ensureGreen(index);
108+
109+
// close to have some unassigned started shards shards..
110+
client().admin().indices().prepareClose(index).get();
111+
112+
113+
final String masterName = internalCluster().getMasterName();
114+
final ClusterService clusterService = internalCluster().clusterService(masterName);
115+
final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, masterName);
116+
clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() {
117+
@Override
118+
public ClusterState execute(ClusterState currentState) throws Exception {
119+
// inject a node
120+
ClusterState.Builder builder = ClusterState.builder(currentState);
121+
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).put(new DiscoveryNode("_non_existent", DummyTransportAddress.INSTANCE, Version.CURRENT)));
122+
123+
// open index
124+
final IndexMetaData indexMetaData = IndexMetaData.builder(currentState.metaData().index(index)).state(IndexMetaData.State.OPEN).build();
125+
126+
builder.metaData(MetaData.builder(currentState.metaData()).put(indexMetaData, true));
127+
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
128+
ClusterState updatedState = builder.build();
129+
130+
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
131+
routingTable.addAsRecovery(updatedState.metaData().index(index));
132+
updatedState = ClusterState.builder(updatedState).routingTable(routingTable).build();
133+
134+
RoutingAllocation.Result result = allocationService.reroute(updatedState);
135+
return ClusterState.builder(updatedState).routingResult(result).build();
136+
137+
}
138+
139+
@Override
140+
public void onFailure(String source, Throwable t) {
141+
142+
}
143+
});
144+
ensureGreen(index);
145+
// remove the extra node
146+
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
147+
@Override
148+
public ClusterState execute(ClusterState currentState) throws Exception {
149+
// inject a node
150+
ClusterState.Builder builder = ClusterState.builder(currentState);
151+
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove("_non_existent"));
152+
153+
currentState = builder.build();
154+
RoutingAllocation.Result result = allocationService.reroute(currentState);
155+
return ClusterState.builder(currentState).routingResult(result).build();
156+
157+
}
158+
159+
@Override
160+
public void onFailure(String source, Throwable t) {
161+
162+
}
163+
});
164+
}
165+
166+
105167
@Test
106168
@TestLogging(value = "cluster.service:TRACE")
107169
public void testDeleteCreateInOneBulk() throws Exception {
@@ -190,6 +252,7 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
190252
public void onResponse(PutMappingResponse response) {
191253
putMappingResponse.set(response);
192254
}
255+
193256
@Override
194257
public void onFailure(Throwable e) {
195258
putMappingResponse.set(e);
@@ -221,6 +284,7 @@ public void run() {
221284
public void onResponse(IndexResponse response) {
222285
docIndexResponse.set(response);
223286
}
287+
224288
@Override
225289
public void onFailure(Throwable e) {
226290
docIndexResponse.set(e);
@@ -304,6 +368,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
304368
public void onResponse(PutMappingResponse response) {
305369
putMappingResponse.set(response);
306370
}
371+
307372
@Override
308373
public void onFailure(Throwable e) {
309374
putMappingResponse.set(e);
@@ -329,6 +394,7 @@ public void run() {
329394
public void onResponse(IndexResponse response) {
330395
docIndexResponse.set(response);
331396
}
397+
332398
@Override
333399
public void onFailure(Throwable e) {
334400
docIndexResponse.set(e);

0 commit comments

Comments
 (0)