Skip to content

Commit 6b42137

Browse files
committed
Use unwrapped cause to determine if node is closing (#39723)
We need to unwrap and use the actual cause when determining if the node with primary shard is shutting down because TransportService will throw a TransportException wrapped in a SendRequestTransportException. Relates #39584
1 parent 517d03c commit 6b42137

File tree

3 files changed

+13
-5
lines changed

3 files changed

+13
-5
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,9 @@ public void onFailure(Exception replicaException) {
199199
}
200200

201201
private void onNoLongerPrimary(Exception failure) {
202-
final boolean nodeIsClosing = failure instanceof NodeClosedException ||
203-
(failure instanceof TransportException && "TransportService is closed stopped can't send request".equals(failure.getMessage()));
202+
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
203+
final boolean nodeIsClosing = cause instanceof NodeClosedException
204+
|| (cause instanceof TransportException && "TransportService is closed stopped can't send request".equals(cause.getMessage()));
204205
final String message;
205206
if (nodeIsClosing) {
206207
message = String.format(Locale.ROOT,

server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.index.shard.ShardId;
4444
import org.elasticsearch.node.NodeClosedException;
4545
import org.elasticsearch.test.ESTestCase;
46+
import org.elasticsearch.transport.SendRequestTransportException;
4647
import org.elasticsearch.transport.TransportException;
4748

4849
import java.util.ArrayList;
@@ -203,7 +204,9 @@ public void testNoLongerPrimary() throws Exception {
203204
if (randomBoolean()) {
204205
shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT));
205206
} else if (randomBoolean()) {
206-
shardActionFailure = new TransportException("TransportService is closed stopped can't send request");
207+
shardActionFailure = new SendRequestTransportException(
208+
new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT), "internal:cluster/shard/failure",
209+
new TransportException("TransportService is closed stopped can't send request"));
207210
} else {
208211
shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead");
209212
}

server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@
6666
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
6767
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6868
import static org.hamcrest.Matchers.equalTo;
69+
import static org.hamcrest.Matchers.everyItem;
6970
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
7071
import static org.hamcrest.Matchers.is;
72+
import static org.hamcrest.Matchers.isIn;
7173
import static org.hamcrest.Matchers.isOneOf;
7274
import static org.hamcrest.Matchers.not;
7375

@@ -452,8 +454,10 @@ public void testRestartNodeWhileIndexing() throws Exception {
452454
for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) {
453455
String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName();
454456
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
455-
IndexShard indexShard = indicesService.getShardOrNull(shardRouting.shardId());
456-
assertThat(IndexShardTestCase.getShardDocUIDs(indexShard), equalTo(ackedDocs));
457+
IndexShard shard = indicesService.getShardOrNull(shardRouting.shardId());
458+
Set<String> docs = IndexShardTestCase.getShardDocUIDs(shard);
459+
assertThat("shard [" + shard.routingEntry() + "] docIds [" + docs + "] vs " + " acked docIds [" + ackedDocs + "]",
460+
ackedDocs, everyItem(isIn(docs)));
457461
}
458462
}
459463
}

0 commit comments

Comments
 (0)