Skip to content

Commit df8a300

Browse files
committed
Merge pull request #11615 from bleskes/async_fetch_non_existent_nodes
Internal: AsyncShardFetch can hang if there are new nodes in cluster state
2 parents b2ced13 + 532fa06 commit df8a300

File tree

6 files changed

+115
-15
lines changed

6 files changed

+115
-15
lines changed

core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
8383
return nodesIds;
8484
}
8585

86+
protected String[] resolveNodes(NodesRequest request, ClusterState clusterState) {
87+
return clusterState.nodes().resolveNodesIds(request.nodesIds());
88+
}
89+
90+
8691
private class AsyncAction {
8792

8893
private final NodesRequest request;
@@ -96,7 +101,7 @@ private AsyncAction(NodesRequest request, ActionListener<NodesResponse> listener
96101
this.request = request;
97102
this.listener = listener;
98103
clusterState = clusterService.state();
99-
String[] nodesIds = clusterState.nodes().resolveNodesIds(request.nodesIds());
104+
String[] nodesIds = resolveNodes(request, clusterState);
100105
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
101106
this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
102107
}

core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,19 +165,29 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, MetaData meta
165165
protected synchronized void processAsyncFetch(ShardId shardId, T[] responses, FailedNodeException[] failures) {
166166
if (closed) {
167167
// we are closed, no need to process this async fetch at all
168+
logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type);
168169
return;
169170
}
171+
logger.trace("{} processing fetched [{}] results", shardId, type);
172+
170173
if (responses != null) {
171174
for (T response : responses) {
172175
NodeEntry<T> nodeEntry = cache.get(response.getNode().getId());
173176
// if the entry is there, and not marked as failed already, process it
174-
if (nodeEntry != null && nodeEntry.isFailed() == false) {
177+
if (nodeEntry == null) {
178+
continue;
179+
}
180+
if (nodeEntry.isFailed()) {
181+
logger.trace("{} node {} has failed for [{}] (failure [{}])", shardId, nodeEntry.getNodeId(), type, nodeEntry.getFailure());
182+
} else {
183+
logger.trace("{} marking {} as done for [{}]", shardId, nodeEntry.getNodeId(), type);
175184
nodeEntry.doneFetching(response);
176185
}
177186
}
178187
}
179188
if (failures != null) {
180189
for (FailedNodeException failure : failures) {
190+
logger.trace("{} processing failure {} for [{}]", shardId, failure, type);
181191
NodeEntry<T> nodeEntry = cache.get(failure.nodeId());
182192
// if the entry is there, and not marked as failed already, process it
183193
if (nodeEntry != null && nodeEntry.isFailed() == false) {
@@ -253,6 +263,7 @@ private boolean hasAnyNodeFetching(Map<String, NodeEntry<T>> shardCache) {
253263
// visible for testing
254264
void asyncFetch(final ShardId shardId, final String[] nodesIds, final MetaData metaData) {
255265
IndexMetaData indexMetaData = metaData.index(shardId.getIndex());
266+
logger.trace("{} fetching [{}] from {}", shardId, type, nodesIds);
256267
action.list(shardId, indexMetaData, nodesIds, new ActionListener<BaseNodesResponse<T>>() {
257268
@Override
258269
public void onResponse(BaseNodesResponse<T> response) {

core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.support.nodes.*;
2828
import org.elasticsearch.cluster.ClusterName;
2929
import org.elasticsearch.cluster.ClusterService;
30+
import org.elasticsearch.cluster.ClusterState;
3031
import org.elasticsearch.cluster.metadata.IndexMetaData;
3132
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.common.inject.Inject;
@@ -68,6 +69,13 @@ public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds
6869
execute(new Request(shardId, indexMetaData.getUUID(), nodesIds), listener);
6970
}
7071

72+
@Override
73+
protected String[] resolveNodes(Request request, ClusterState clusterState) {
74+
// default implementation may filter out non existent nodes. it's important to keep exactly the ids
75+
// we were given for accounting on the caller
76+
return request.nodesIds();
77+
}
78+
7179
@Override
7280
protected boolean transportCompress() {
7381
return true; // this can become big...

core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.support.nodes.*;
2828
import org.elasticsearch.cluster.ClusterName;
2929
import org.elasticsearch.cluster.ClusterService;
30+
import org.elasticsearch.cluster.ClusterState;
3031
import org.elasticsearch.cluster.metadata.IndexMetaData;
3132
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.common.inject.Inject;
@@ -81,6 +82,13 @@ public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds
8182
execute(new Request(shardId, false, nodesIds), listener);
8283
}
8384

85+
@Override
86+
protected String[] resolveNodes(Request request, ClusterState clusterState) {
87+
// default implementation may filter out non existent nodes. it's important to keep exactly the ids
88+
// we were given for accounting on the caller
89+
return request.nodesIds();
90+
}
91+
8492
@Override
8593
protected NodeRequest newNodeRequest(String nodeId, Request request) {
8694
return new NodeRequest(nodeId, request);

core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.index.query.QueryBuilders;
3232
import org.elasticsearch.test.ElasticsearchIntegrationTest;
3333
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
34+
import org.elasticsearch.test.junit.annotations.TestLogging;
3435
import org.junit.Test;
3536

3637
import java.util.concurrent.ExecutionException;
@@ -46,6 +47,7 @@
4647
public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
4748

4849
@Test
50+
@TestLogging("cluster.service:TRACE,discovery.zen:TRACE,gateway:TRACE,transport.tracer:TRACE")
4951
public void simpleMinimumMasterNodes() throws Exception {
5052

5153
Settings settings = settingsBuilder()

core/src/test/java/org/elasticsearch/indices/state/RareClusterStateTests.java

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,27 @@
2020
package org.elasticsearch.indices.state;
2121

2222
import com.google.common.collect.ImmutableMap;
23-
23+
import org.elasticsearch.Version;
2424
import org.elasticsearch.action.ActionListener;
2525
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
2626
import org.elasticsearch.action.index.IndexResponse;
27-
import org.elasticsearch.cluster.ClusterInfo;
28-
import org.elasticsearch.cluster.ClusterState;
29-
import org.elasticsearch.cluster.DiskUsage;
27+
import org.elasticsearch.cluster.*;
28+
import org.elasticsearch.cluster.block.ClusterBlocks;
3029
import org.elasticsearch.cluster.metadata.IndexMetaData;
3130
import org.elasticsearch.cluster.metadata.MappingMetaData;
31+
import org.elasticsearch.cluster.metadata.MetaData;
32+
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.cluster.node.DiscoveryNodes;
3334
import org.elasticsearch.cluster.routing.RoutingNodes;
3435
import org.elasticsearch.cluster.routing.RoutingTable;
3536
import org.elasticsearch.cluster.routing.ShardRouting;
37+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3638
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
3739
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
3840
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
3941
import org.elasticsearch.common.collect.ImmutableOpenMap;
4042
import org.elasticsearch.common.settings.Settings;
43+
import org.elasticsearch.common.transport.DummyTransportAddress;
4144
import org.elasticsearch.common.unit.TimeValue;
4245
import org.elasticsearch.discovery.DiscoveryModule;
4346
import org.elasticsearch.discovery.DiscoverySettings;
@@ -52,19 +55,12 @@
5255
import org.junit.Test;
5356

5457
import java.io.IOException;
55-
import java.util.Arrays;
56-
import java.util.HashSet;
57-
import java.util.List;
58-
import java.util.Map;
59-
import java.util.Set;
58+
import java.util.*;
6059
import java.util.concurrent.atomic.AtomicReference;
6160

6261
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6362
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
64-
import static org.hamcrest.Matchers.equalTo;
65-
import static org.hamcrest.Matchers.hasItem;
66-
import static org.hamcrest.Matchers.hasSize;
67-
import static org.hamcrest.Matchers.instanceOf;
63+
import static org.hamcrest.Matchers.*;
6864

6965
/**
7066
*/
@@ -102,6 +98,72 @@ public void testUnassignedShardAndEmptyNodesInRoutingTable() throws Exception {
10298
allocator.allocateUnassigned(routingAllocation);
10399
}
104100

101+
@Test
102+
@TestLogging("gateway:TRACE")
103+
public void testAssignmentWithJustAddedNodes() throws Exception {
104+
internalCluster().startNode();
105+
final String index = "index";
106+
prepareCreate(index).setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
107+
ensureGreen(index);
108+
109+
// close to have some unassigned started shards shards..
110+
client().admin().indices().prepareClose(index).get();
111+
112+
113+
final String masterName = internalCluster().getMasterName();
114+
final ClusterService clusterService = internalCluster().clusterService(masterName);
115+
final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, masterName);
116+
clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() {
117+
@Override
118+
public ClusterState execute(ClusterState currentState) throws Exception {
119+
// inject a node
120+
ClusterState.Builder builder = ClusterState.builder(currentState);
121+
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).put(new DiscoveryNode("_non_existent", DummyTransportAddress.INSTANCE, Version.CURRENT)));
122+
123+
// open index
124+
final IndexMetaData indexMetaData = IndexMetaData.builder(currentState.metaData().index(index)).state(IndexMetaData.State.OPEN).build();
125+
126+
builder.metaData(MetaData.builder(currentState.metaData()).put(indexMetaData, true));
127+
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
128+
ClusterState updatedState = builder.build();
129+
130+
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
131+
routingTable.addAsRecovery(updatedState.metaData().index(index));
132+
updatedState = ClusterState.builder(updatedState).routingTable(routingTable).build();
133+
134+
RoutingAllocation.Result result = allocationService.reroute(updatedState);
135+
return ClusterState.builder(updatedState).routingResult(result).build();
136+
137+
}
138+
139+
@Override
140+
public void onFailure(String source, Throwable t) {
141+
142+
}
143+
});
144+
ensureGreen(index);
145+
// remove the extra node
146+
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
147+
@Override
148+
public ClusterState execute(ClusterState currentState) throws Exception {
149+
// inject a node
150+
ClusterState.Builder builder = ClusterState.builder(currentState);
151+
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove("_non_existent"));
152+
153+
currentState = builder.build();
154+
RoutingAllocation.Result result = allocationService.reroute(currentState);
155+
return ClusterState.builder(currentState).routingResult(result).build();
156+
157+
}
158+
159+
@Override
160+
public void onFailure(String source, Throwable t) {
161+
162+
}
163+
});
164+
}
165+
166+
105167
@Test
106168
@TestLogging(value = "cluster.service:TRACE")
107169
public void testDeleteCreateInOneBulk() throws Exception {
@@ -190,6 +252,7 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
190252
public void onResponse(PutMappingResponse response) {
191253
putMappingResponse.set(response);
192254
}
255+
193256
@Override
194257
public void onFailure(Throwable e) {
195258
putMappingResponse.set(e);
@@ -221,6 +284,7 @@ public void run() {
221284
public void onResponse(IndexResponse response) {
222285
docIndexResponse.set(response);
223286
}
287+
224288
@Override
225289
public void onFailure(Throwable e) {
226290
docIndexResponse.set(e);
@@ -304,6 +368,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
304368
public void onResponse(PutMappingResponse response) {
305369
putMappingResponse.set(response);
306370
}
371+
307372
@Override
308373
public void onFailure(Throwable e) {
309374
putMappingResponse.set(e);
@@ -329,6 +394,7 @@ public void run() {
329394
public void onResponse(IndexResponse response) {
330395
docIndexResponse.set(response);
331396
}
397+
332398
@Override
333399
public void onFailure(Throwable e) {
334400
docIndexResponse.set(e);

0 commit comments

Comments
 (0)