Skip to content

Commit cb3e0cb

Browse files
authored
Fail engine if hit document failure on replicas (#43523)
An indexing on a replica should never fail after it was successfully indexed on a primary. Hence, we should fail an engine if we hit any failure (document level or tragic failure) when processing an indexing on a replica. Relates #43228 Closes #40435
1 parent 797628e commit cb3e0cb

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,11 @@ public IndexResult index(Index index) throws IOException {
929929
}
930930
} catch (RuntimeException | IOException e) {
931931
try {
932-
maybeFailEngine("index", e);
932+
if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
933+
failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
934+
} else {
935+
maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
936+
}
933937
} catch (Exception inner) {
934938
e.addSuppressed(inner);
935939
}
@@ -1055,7 +1059,8 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
10551059
}
10561060
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
10571061
} catch (Exception ex) {
1058-
if (indexWriter.getTragicException() == null) {
1062+
if (ex instanceof AlreadyClosedException == false &&
1063+
indexWriter.getTragicException() == null && treatDocumentFailureAsTragicError(index) == false) {
10591064
/* There is no tragic event recorded so this must be a document failure.
10601065
*
10611066
* The handling inside IW doesn't guarantee that an tragic / aborting exception
@@ -1076,6 +1081,16 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
10761081
}
10771082
}
10781083

1084+
/**
1085+
* Whether we should treat any document failure as tragic error.
1086+
* If we hit any failure while processing an indexing on a replica, we should treat that error as tragic and fail the engine.
1087+
* However, we prefer to fail a request individually (instead of a shard) if we hit a document failure on the primary.
1088+
*/
1089+
private boolean treatDocumentFailureAsTragicError(Index index) {
1090+
// TODO: can we enable this all origins except primary on the leader?
1091+
return index.origin() == Operation.Origin.REPLICA;
1092+
}
1093+
10791094
/**
10801095
* returns true if the indexing operation may have already be processed by this engine.
10811096
* Note that it is OK to rarely return true even if this is not the case. However a `false`

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5902,4 +5902,28 @@ private Map<BytesRef, DeleteVersionValue> tombstonesInVersionMap(InternalEngine
59025902
.filter(e -> e.getValue() instanceof DeleteVersionValue)
59035903
.collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue()));
59045904
}
5905+
5906+
public void testHandleDocumentFailureOnReplica() throws Exception {
5907+
AtomicReference<IOException> addDocException = new AtomicReference<>();
5908+
IndexWriterFactory indexWriterFactory = (dir, iwc) -> new IndexWriter(dir, iwc) {
5909+
@Override
5910+
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
5911+
final IOException ex = addDocException.getAndSet(null);
5912+
if (ex != null) {
5913+
throw ex;
5914+
}
5915+
return super.addDocument(doc);
5916+
}
5917+
};
5918+
try (Store store = createStore();
5919+
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) {
5920+
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
5921+
Engine.Index index = new Engine.Index(newUid(doc), doc, randomNonNegativeLong(), primaryTerm.get(),
5922+
randomNonNegativeLong(), null, REPLICA, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
5923+
addDocException.set(new IOException("simulated"));
5924+
expectThrows(IOException.class, () -> engine.index(index));
5925+
assertTrue(engine.isClosed.get());
5926+
assertNotNull(engine.failedEngine.get());
5927+
}
5928+
}
59055929
}

0 commit comments

Comments
 (0)