Skip to content

Fail engine if hit document failure on replicas #43523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jul 14, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,11 @@ public IndexResult index(Index index) throws IOException {
}
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
} else {
maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
}
} catch (Exception inner) {
e.addSuppressed(inner);
}
Expand Down Expand Up @@ -1055,7 +1059,8 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
}
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
if (ex instanceof AlreadyClosedException == false &&
indexWriter.getTragicException() == null && treatDocumentFailureAsTragicError(index) == false) {
/* There is no tragic event recorded so this must be a document failure.
*
* The handling inside IW doesn't guarantee that an tragic / aborting exception
Expand All @@ -1076,6 +1081,16 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
}
}

/**
* Whether we should treat any document failure as tragic error.
* If we hit any failure while processing an indexing on a replica, we should treat that error as tragic and fail the engine.
* However, we prefer to fail a request individually (instead of a shard) if we hit a document failure on the primary.
*/
private boolean treatDocumentFailureAsTragicError(Index index) {
// TODO: can we enable this all origins except primary on the leader?
return index.origin() == Operation.Origin.REPLICA;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add TODO here that we want to also investigate this for other origins.

}

/**
* returns true if the indexing operation may have already be processed by this engine.
* Note that it is OK to rarely return true even if this is not the case. However a `false`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5902,4 +5902,28 @@ private Map<BytesRef, DeleteVersionValue> tombstonesInVersionMap(InternalEngine
.filter(e -> e.getValue() instanceof DeleteVersionValue)
.collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue()));
}

public void testHandleDocumentFailureOnReplica() throws Exception {
AtomicReference<IOException> addDocException = new AtomicReference<>();
IndexWriterFactory indexWriterFactory = (dir, iwc) -> new IndexWriter(dir, iwc) {
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
final IOException ex = addDocException.getAndSet(null);
if (ex != null) {
throw ex;
}
return super.addDocument(doc);
}
};
try (Store store = createStore();
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) {
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
Engine.Index index = new Engine.Index(newUid(doc), doc, randomNonNegativeLong(), primaryTerm.get(),
randomNonNegativeLong(), null, REPLICA, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
addDocException.set(new IOException("simulated"));
expectThrows(IOException.class, () -> engine.index(index));
assertTrue(engine.isClosed.get());
assertNotNull(engine.failedEngine.get());
}
}
}