Skip to content

Commit

Permalink
Replication operation that try to perform the primary phase on a repl…
Browse files Browse the repository at this point in the history
…ica should be retried

In extreme cases a local primary shard can be replaced with a replica while a replication request is in flight and the primary action is applied to the shard (via `acquirePrimaryOperationLock()).  elastic#17044 changed the exception used in that method to something that isn't recognized as `TransportActions.isShardNotAvailableException`, causing the operation to fail immediately instead of retrying. This commit fixes this by check the primary flag before
acquiring the lock. This is safe to do as an IndexShard will never be demoted once a primary.

Closes elastic#17358
  • Loading branch information
bleskes committed Mar 29, 2016
1 parent 833fc84 commit 48b4f08
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,9 @@ public void handleResponse(Response response) {
public void handleException(TransportException exp) {
try {
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
(isPrimaryAction && retryPrimaryException(exp.unwrapCause()))) {
final Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
(isPrimaryAction && retryPrimaryException(cause))) {
logger.trace("received an error from node [{}] for request [{}], scheduling a retry", exp, node.id(), request);
retry(exp);
} else {
Expand Down Expand Up @@ -799,6 +800,12 @@ void finishBecauseUnavailable(ShardId shardId, String message) {
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
// we may end up here if the cluster state used to route the primary is so stale that the underlying
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
// the replica will take over and a replica will be assigned to the first node.
if (indexShard.routingEntry().primary() == false) {
throw new RetryOnPrimaryException(indexShard.shardId(), "actual shard is not a primary " + indexShard.routingEntry());
}
return IndexShardReferenceImpl.createOnPrimary(indexShard);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,6 @@ private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardSta

private void verifyPrimary() {
if (shardRouting.primary() == false) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
throw new IllegalStateException("shard is not a primary " + shardRouting);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -158,15 +160,15 @@ public void testBlocks() throws ExecutionException, InterruptedException {
ReplicationTask task = maybeTask();

ClusterBlocks.Builder block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
assertPhase(task, "failed");

block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
listener = new PlainActionFuture<>();
reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener);
Expand All @@ -181,7 +183,7 @@ public void testBlocks() throws ExecutionException, InterruptedException {
assertPhase(task, "waiting_for_retry");

block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener, ClusterBlockException.class);
assertIndexShardUninitialized();
Expand All @@ -196,7 +198,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
final ShardId shardId = new ShardId(index, "_na_", 0);
// no replicas in oder to skip the replication part
setState(clusterService, state(index, true,
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
ReplicationTask task = maybeTask();

logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Expand All @@ -221,7 +223,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
final List<CapturingTransport.CapturedRequest> capturedRequests =
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
assertThat(capturedRequests, notNullValue());
assertThat(capturedRequests.size(), equalTo(1));
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
Expand All @@ -234,7 +236,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept
* before the relocation target, there is a time span where relocation source believes active primary to be on
* relocation target and relocation target believes active primary to be on relocation source. This results in replication
* requests being sent back and forth.
*
* <p>
* This test checks that replication request is not routed back from relocation target to relocation source in case of
* stale index routing table on relocation target.
*/
Expand Down Expand Up @@ -271,7 +273,7 @@ public void testNoRerouteOnStaleClusterState() throws InterruptedException, Exec
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
final List<CapturingTransport.CapturedRequest> capturedRequests =
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
assertThat(capturedRequests, notNullValue());
assertThat(capturedRequests.size(), equalTo(1));
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
Expand All @@ -282,7 +284,7 @@ public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
final String index = "test";
// no replicas in oder to skip the replication part
setState(clusterService, state(index, true,
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
Expand All @@ -299,6 +301,61 @@ public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
assertListenerThrows("must throw shard not found exception", listener, ShardNotFoundException.class);
}

public void testStalePrimaryShardOnReroute() throws InterruptedException {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
// no replicas in order to skip the replication part
setState(clusterService, stateWithActivePrimary(index, true, randomInt(3)));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId);
boolean timeout = randomBoolean();
if (timeout) {
request.timeout("0s");
} else {
request.timeout("1h");
}
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();

TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener);
reroutePhase.run();
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests, arrayWithSize(1));
assertThat(capturedRequests[0].action, equalTo("testAction[p]"));
assertPhase(task, "waiting_on_primary");
transport.handleRemoteError(capturedRequests[0].requestId, randomRetryPrimaryException(shardId));


if (timeout) {
// we always try at least one more time on timeout
assertThat(listener.isDone(), equalTo(false));
capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests, arrayWithSize(1));
assertThat(capturedRequests[0].action, equalTo("testAction[p]"));
assertPhase(task, "waiting_on_primary");
transport.handleRemoteError(capturedRequests[0].requestId, randomRetryPrimaryException(shardId));
assertListenerThrows("must throw index not found exception", listener, ElasticsearchException.class);
assertPhase(task, "failed");
} else {
assertThat(listener.isDone(), equalTo(false));
// generate a CS change
setState(clusterService, clusterService.state());
capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests, arrayWithSize(1));
assertThat(capturedRequests[0].action, equalTo("testAction[p]"));
}
}

private ElasticsearchException randomRetryPrimaryException(ShardId shardId) {
return randomFrom(
new ShardNotFoundException(shardId),
new IndexNotFoundException(shardId.getIndex()),
new IndexShardClosedException(shardId),
new EngineClosedException(shardId),
new TransportReplicationAction.RetryOnPrimaryException(shardId, "hello")
);
}

public void testRoutePhaseExecutesRequest() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
Expand Down Expand Up @@ -449,7 +506,7 @@ protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Re
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithRelocatingReplicasAfterPrimaryOp.new PrimaryPhase(
task, request, createTransportChannel(listener));
task, request, createTransportChannel(listener));
primaryPhase.run();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
ShardRouting relocatingReplicaShard = stateWithRelocatingReplica.getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards().get(0);
Expand Down Expand Up @@ -485,7 +542,7 @@ protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Re
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = actionWithDeletedIndexAfterPrimaryOp.new PrimaryPhase(
task, request, createTransportChannel(listener));
task, request, createTransportChannel(listener));
primaryPhase.run();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertThat("replication phase should be skipped if index gets deleted after primary operation", transport.capturedRequestsByTargetNode().size(), equalTo(0));
Expand Down Expand Up @@ -529,8 +586,8 @@ public void testWriteConsistency() throws ExecutionException, InterruptedExcepti

setState(clusterService, state(index, true, ShardRoutingState.STARTED, replicaStates));
logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]. expecting op to [{}]. using state: \n{}",
request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, passesWriteConsistency ? "succeed" : "retry",
clusterService.state().prettyPrint());
request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, passesWriteConsistency ? "succeed" : "retry",
clusterService.state().prettyPrint());

final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
PlainActionFuture<Response> listener = new PlainActionFuture<>();
Expand Down Expand Up @@ -646,7 +703,7 @@ protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shard

TransportChannel channel = createTransportChannel(listener, error::set);
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
action.new ReplicationPhase(task, request, new Response(), request.shardId(), channel, reference);
action.new ReplicationPhase(task, request, new Response(), request.shardId(), channel, reference);

assertThat(replicationPhase.totalShards(), equalTo(totalShards));
assertThat(replicationPhase.pending(), equalTo(assignedReplicas));
Expand All @@ -656,7 +713,7 @@ protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shard

HashMap<String, Request> nodesSentTo = new HashMap<>();
boolean executeOnReplica =
action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings());
action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings());
for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
// no duplicate requests
Request replicationRequest = (Request) capturedRequest.request;
Expand Down Expand Up @@ -819,7 +876,7 @@ public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedEx
final ShardId shardId = new ShardId(index, "_na_", 0);
// one replica to make sure replication is attempted
setState(clusterService, state(index, true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard();
indexShardRouting.set(primaryShard);
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Expand Down Expand Up @@ -856,7 +913,7 @@ public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedEx
public void testReplicasCounter() throws Exception {
final ShardId shardId = new ShardId("test", "_na_", 0);
setState(clusterService, state(shardId.getIndexName(), true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
final ReplicationTask task = maybeTask();
Expand Down Expand Up @@ -895,7 +952,7 @@ public void testCounterDecrementedIfShardOperationThrowsException() throws Inter
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
setState(clusterService, state(index, true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId).timeout("100ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
Expand All @@ -915,7 +972,7 @@ public void testReroutePhaseRetriedAfterDemotedPrimary() {
final ShardId shardId = new ShardId(index, "_na_", 0);
boolean localPrimary = true;
setState(clusterService, state(index, localPrimary,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
Action action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected void resolveRequest(MetaData metaData, String concreteIndex, Request request) {
Expand Down Expand Up @@ -967,7 +1024,7 @@ protected void resolveRequest(MetaData metaData, String concreteIndex, Request r
// publish a new cluster state
boolean localPrimaryOnRetry = randomBoolean();
setState(clusterService, state(index, localPrimaryOnRetry,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
CapturingTransport.CapturedRequest[] primaryRetry = transport.getCapturedRequestsAndClear();

// the request should be retried
Expand Down Expand Up @@ -1083,8 +1140,8 @@ class Action extends TransportReplicationAction<Request, Request, Response> {
ClusterService clusterService,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, null, threadPool,
new ShardStateAction(settings, clusterService, transportService, null, null, threadPool),
new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME);
new ShardStateAction(settings, clusterService, transportService, null, null, threadPool),
new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME);
}

@Override
Expand Down

0 comments on commit 48b4f08

Please sign in to comment.