Skip to content

Commit 9be3ebd

Browse files
committed
Don't swallow exceptions on replication (#31179)
Swallowing these exceptions is dangerous as they can result in replicas going out-of-sync with the primary. Follow-up to #28571
1 parent 50dbd7d commit 9be3ebd

File tree

3 files changed

+24
-43
lines changed

3 files changed

+24
-43
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -487,32 +487,24 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
487487
BulkItemRequest item = request.items()[i];
488488
final Engine.Result operationResult;
489489
DocWriteRequest docWriteRequest = item.request();
490-
try {
491-
switch (replicaItemExecutionMode(item, i)) {
492-
case NORMAL:
493-
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
494-
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
495-
assert operationResult != null : "operation result must never be null when primary response has no failure";
496-
location = syncOperationResultOrThrow(operationResult, location);
497-
break;
498-
case NOOP:
499-
break;
500-
case FAILURE:
501-
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
502-
assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned";
503-
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
504-
assert operationResult != null : "operation result must never be null when primary response has no failure";
505-
location = syncOperationResultOrThrow(operationResult, location);
506-
break;
507-
default:
508-
throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
509-
}
510-
} catch (Exception e) {
511-
// if its not an ignore replica failure, we need to make sure to bubble up the failure
512-
// so we will fail the shard
513-
if (!TransportActions.isShardNotAvailableException(e)) {
514-
throw e;
515-
}
490+
switch (replicaItemExecutionMode(item, i)) {
491+
case NORMAL:
492+
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
493+
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
494+
assert operationResult != null : "operation result must never be null when primary response has no failure";
495+
location = syncOperationResultOrThrow(operationResult, location);
496+
break;
497+
case NOOP:
498+
break;
499+
case FAILURE:
500+
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
501+
assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned";
502+
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
503+
assert operationResult != null : "operation result must never be null when primary response has no failure";
504+
location = syncOperationResultOrThrow(operationResult, location);
505+
break;
506+
default:
507+
throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
516508
}
517509
}
518510
return location;

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -121,19 +121,12 @@ protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest re
121121
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
122122
Translog.Location location = null;
123123
for (Translog.Operation operation : request.getOperations()) {
124-
try {
125-
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
126-
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
127-
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
128-
"Mappings are not available on the replica yet, triggered update: " + operationResult.getRequiredMappingUpdate());
129-
}
130-
location = syncOperationResultOrThrow(operationResult, location);
131-
} catch (Exception e) {
132-
// if its not a failure to be ignored, let it bubble up
133-
if (!TransportActions.isShardNotAvailableException(e)) {
134-
throw e;
135-
}
124+
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
125+
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
126+
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
127+
"Mappings are not available on the replica yet, triggered update: " + operationResult.getRequiredMappingUpdate());
136128
}
129+
location = syncOperationResultOrThrow(operationResult, location);
137130
}
138131
if (request.getTrimAboveSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
139132
replica.trimOperationOfPreviousPrimaryTerms(request.getTrimAboveSeqNo());

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,7 @@ protected static Location syncOperationResultOrThrow(final Engine.Result operati
7676
// check if any transient write operation failures should be bubbled up
7777
Exception failure = operationResult.getFailure();
7878
assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
79-
if (!TransportActions.isShardNotAvailableException(failure)) {
80-
throw failure;
81-
} else {
82-
location = currentLocation;
83-
}
79+
throw failure;
8480
} else {
8581
location = locationToSync(currentLocation, operationResult.getTranslogLocation());
8682
}

0 commit comments

Comments
 (0)