Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicate write failures #23314

Merged
merged 18 commits into from
Apr 19, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix failure handling logic for bwc
  • Loading branch information
areek committed Apr 19, 2017
commit 90e2419c18765e2324f7b97222301501f4091fd5
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ static ShouldExecuteOnReplicaResult shouldExecuteOnReplica(final BulkItemRequest
: ShouldExecuteOnReplicaResult.NOOP; // no seq no generated, ignore replication
} else {
// NOTE: write requests originating from pre-6.0 nodes can send a no-op operation to
// the replica; we ignore replicatio
// the replica; we ignore replication
return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP
? ShouldExecuteOnReplicaResult.NORMAL // execution successful on primary
: ShouldExecuteOnReplicaResult.NOOP; // ignore replication
Expand Down Expand Up @@ -434,7 +434,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
+ docWriteRequest.opType().getLowercase());
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = handleOperationResult(operationResult, location);
location = syncOperationResultOrThrow(operationResult, location);
break;
case NOOP:
break;
Expand All @@ -443,7 +443,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned";
operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = handleOperationResult(operationResult, location);
location = syncOperationResultOrThrow(operationResult, location);
break;
}
} catch (Exception e) {
Expand All @@ -457,8 +457,9 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
return location;
}

private static Translog.Location handleOperationResult(final Engine.Result operationResult,
final Translog.Location currentLocation) throws Exception {
/** Syncs operation result to the translog or throws a shard not available failure */
private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult,
final Translog.Location currentLocation) throws Exception {
final Translog.Location location;
if (operationResult.hasFailure()) {
// check if any transient write operation failures should be bubbled up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,14 +614,15 @@ public IndexResult index(Index index) throws IOException {
indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing,
plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
&& indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
if (indexResult.hasFailure() == false) {
location = translog.add(new Translog.Index(index, indexResult));
} else {
} else if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog with the generated seq_no
location = translog.add(new Translog.NoOp(plan.seqNoForIndexing, index.primaryTerm(), indexResult.getFailure().getMessage()));
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
Expand Down Expand Up @@ -905,14 +906,15 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion,
plan.currentlyDeleted == false);
}
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
&& deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
if (deleteResult.hasFailure() == false) {
location = translog.add(new Translog.Delete(delete, deleteResult));
} else {
location = translog.add(new Translog.NoOp(plan.seqNoOfDeletion,
} else if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(),
delete.primaryTerm(), deleteResult.getFailure().getMessage()));
} else {
location = null;
}
deleteResult.setTranslogLocation(location);
}
Expand Down