Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicate write failures #23314

Merged
merged 18 commits into from
Apr 19, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Consolidate possible replica action for bulk item request
depanding on it's primary execution
  • Loading branch information
areek committed Apr 19, 2017
commit 8d4d6469d176dd0f6de4dac8d3c807f4c8d90ebe
Original file line number Diff line number Diff line change
Expand Up @@ -377,16 +377,46 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest);
}

static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int index) {
/** Result Enum for executing bulk item request on replica */
public enum ShouldExecuteOnReplicaResult {

/**
* When primary execution succeeded
*/
NORMAL,

/**
* When primary execution failed before sequence no was generated
* or primary execution was a noop (only possible when request is originating from pre-6.0 nodes)
*/
NOOP,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an assert that will fail on 7.0.0 that we should remove the NOOP and all handling code for it?


/**
* When primary execution failed after sequence no was generated
*/
FAILURE
}

/**
* Determines whether a bulk item request should be executed on the replica.
* @return {@link ShouldExecuteOnReplicaResult#NORMAL} upon normal primary execution with no failures
* {@link ShouldExecuteOnReplicaResult#FAILURE} upon primary execution failure after sequence no generation
* {@link ShouldExecuteOnReplicaResult#NOOP} upon primary execution failure before sequence no generation or
* when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes)
*/
static ShouldExecuteOnReplicaResult shouldExecuteOnReplica(final BulkItemRequest request, final int index) {
final BulkItemResponse primaryResponse = request.getPrimaryResponse();
assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]";
if (primaryResponse.isFailed()) {
return primaryResponse.getFailure().getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
return primaryResponse.getFailure().getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO
? ShouldExecuteOnReplicaResult.FAILURE // we have a seq no generated with the failure, replicate as no-op
: ShouldExecuteOnReplicaResult.NOOP; // no seq no generated, ignore replication
} else {
// NOTE: pre-6.0 write requests has unassigned seq no
// and in case of failure, requests don't reach the replica
// so we execute on replica when the primary execution is not a noop
return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP;
// NOTE: write requests originating from pre-6.0 nodes can send a no-op operation to
// the replica; we ignore replicatio
return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP
? ShouldExecuteOnReplicaResult.NORMAL // execution successful on primary
: ShouldExecuteOnReplicaResult.NOOP; // ignore replication
}
}

Expand All @@ -400,17 +430,12 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
if (shouldExecuteReplicaItem(item, i)) {
DocWriteRequest docWriteRequest = item.request();
final Engine.Result operationResult;
try {
if (item.getPrimaryResponse().isFailed()) {
// execution on primary resulted in a failure
// if primary execution generated a sequence no, execute a noop on the replica engine to record it in the translog
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned";
operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica);
} else {
final ShouldExecuteOnReplicaResult shouldExecuteOnReplicaResult = shouldExecuteOnReplica(item, i);
final Engine.Result operationResult;
DocWriteRequest docWriteRequest = item.request();
try {
switch (shouldExecuteOnReplicaResult) {
case NORMAL:
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
switch (docWriteRequest.opType()) {
case CREATE:
Expand All @@ -422,32 +447,50 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
+ docWriteRequest.opType().getLowercase());
+ docWriteRequest.opType().getLowercase());
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
}
if (operationResult.hasFailure()) {
// check if any transient write operation failures should be bubbled up
Exception failure = operationResult.getFailure();
assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
if (!TransportActions.isShardNotAvailableException(failure)) {
throw failure;
}
} else {
location = locationToSync(location, operationResult.getTranslogLocation());
}
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
if (!TransportActions.isShardNotAvailableException(e)) {
throw e;
}
location = handleOperationResult(operationResult, location);
break;
case NOOP:
break;
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned";
operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = handleOperationResult(operationResult, location);
break;
}
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
if (!TransportActions.isShardNotAvailableException(e)) {
throw e;
}
}
}
return location;
}

private static Translog.Location handleOperationResult(final Engine.Result operationResult,
final Translog.Location currentLocation) throws Exception {
final Translog.Location location;
if (operationResult.hasFailure()) {
// check if any transient write operation failures should be bubbled up
Exception failure = operationResult.getFailure();
assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
if (!TransportActions.isShardNotAvailableException(failure)) {
throw failure;
} else {
location = currentLocation;
}
} else {
location = locationToSync(currentLocation, operationResult.getTranslogLocation());
}
return location;
}

private static Translog.Location locationToSync(Translog.Location current,
Translog.Location next) {
/* here we are moving forward in the translog with each operation. Under the hood this might
Expand Down Expand Up @@ -518,8 +561,8 @@ private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request,
}

/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
private static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
// Update the mappings if parsing the documents includes new dynamic updates
final Engine.Index preUpdateOperation;
final Mapping mappingUpdate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -34,14 +33,9 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
Expand All @@ -52,13 +46,8 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.MappingUpdatePerformer;
import org.elasticsearch.action.bulk.BulkItemResultHolder;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -96,26 +85,32 @@ public void testShouldExecuteReplicaItem() throws Exception {
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean());
BulkItemRequest request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response));
assertTrue(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.NORMAL));

// Failed index requests should not be replicated (for now!)
// Failed index requests without sequence no should not be replicated
writeRequest = new IndexRequest("index", "type", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean());
request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("index", "type", "id",
new IllegalArgumentException("i died"))));
assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.NOOP));

// Failed index requests with sequence no should be replicated
request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("index", "type", "id",
new IllegalArgumentException("i died after sequence no was generated"), 1)));
assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.FAILURE));
// NOOP requests should not be replicated
writeRequest = new UpdateRequest("index", "type", "id");
response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP);
request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE,
response));
assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.NOOP));
}


Expand Down