Skip to content

Commit e38ce4f

Browse files
committed
Do not warm up searcher in engine constructor (#48605)
With this change, we won't warm up searchers until we externally refresh an engine. We explicitly refresh before allowing reading from a shard (i.e., move to post_recovery state) and during resetting. These guarantees that we have warmed up the engine before exposing the external searcher. Another prerequisite for #47186.
1 parent d949125 commit e38ce4f

File tree

4 files changed

+112
-32
lines changed

4 files changed

+112
-32
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
670670
}
671671
Releasable releasable = store::decRef;
672672
try {
673+
assert assertSearcherIsWarmedUp(source, scope);
673674
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
674675
final ElasticsearchDirectoryReader acquire = referenceManager.acquire();
675676
AtomicBoolean released = new AtomicBoolean(false);
@@ -705,6 +706,10 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
705706

706707
protected abstract ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope);
707708

709+
boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
710+
return true;
711+
}
712+
708713
public enum SearcherScope {
709714
EXTERNAL, INTERNAL
710715
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -320,18 +320,13 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
320320
private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
321321
private final BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener;
322322
private final ElasticsearchReaderManager internalReaderManager;
323+
private boolean isWarmedUp; //guarded by refreshLock
323324

324325
ExternalReaderManager(ElasticsearchReaderManager internalReaderManager,
325326
BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener) throws IOException {
326327
this.refreshListener = refreshListener;
327328
this.internalReaderManager = internalReaderManager;
328-
ElasticsearchDirectoryReader acquire = internalReaderManager.acquire();
329-
try {
330-
incrementAndNotify(acquire, null);
331-
current = acquire;
332-
} finally {
333-
internalReaderManager.release(acquire);
334-
}
329+
this.current = internalReaderManager.acquire(); // steal the reference without warming up
335330
}
336331

337332
@Override
@@ -340,26 +335,25 @@ protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryRea
340335
// it's a save operation since we acquire the reader which incs it's reference but then down the road
341336
// steal it by calling incRef on the "stolen" reader
342337
internalReaderManager.maybeRefreshBlocking();
343-
ElasticsearchDirectoryReader acquire = internalReaderManager.acquire();
344-
try {
345-
if (acquire == referenceToRefresh) {
346-
// nothing has changed - both ref managers share the same instance so we can use reference equality
347-
return null;
348-
} else {
349-
incrementAndNotify(acquire, referenceToRefresh);
350-
return acquire;
338+
final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();
339+
if (isWarmedUp == false || newReader != referenceToRefresh) {
340+
boolean success = false;
341+
try {
342+
refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);
343+
isWarmedUp = true;
344+
success = true;
345+
} finally {
346+
if (success == false) {
347+
internalReaderManager.release(newReader);
348+
}
351349
}
352-
} finally {
353-
internalReaderManager.release(acquire);
354350
}
355-
}
356-
357-
private void incrementAndNotify(ElasticsearchDirectoryReader reader,
358-
ElasticsearchDirectoryReader previousReader) throws IOException {
359-
reader.incRef(); // steal the reference
360-
try (Closeable c = reader::decRef) {
361-
refreshListener.accept(reader, previousReader);
362-
reader.incRef(); // double inc-ref if we were successful
351+
// nothing has changed - both ref managers share the same instance so we can use reference equality
352+
if (referenceToRefresh == newReader) {
353+
internalReaderManager.release(newReader);
354+
return null;
355+
} else {
356+
return newReader; // steal the reference
363357
}
364358
}
365359

@@ -374,7 +368,24 @@ protected int getRefCount(ElasticsearchDirectoryReader reference) {
374368
}
375369

376370
@Override
377-
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { reference.decRef(); }
371+
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
372+
reference.decRef();
373+
}
374+
}
375+
376+
@Override
377+
final boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
378+
if (scope == SearcherScope.EXTERNAL) {
379+
switch (source) {
380+
// we can access segment_stats while a shard is still in the recovering state.
381+
case "segments":
382+
case "segments_stats":
383+
break;
384+
default:
385+
assert externalReaderManager.isWarmedUp : "searcher was not warmed up yet for source[" + source + "]";
386+
}
387+
}
388+
return true;
378389
}
379390

380391
@Override

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2699,6 +2699,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
26992699
final Sort indexSort = indexSortSupplier.get();
27002700
final Engine.Warmer warmer = reader -> {
27012701
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
2702+
assert reader != null;
27022703
if (this.warmer != null) {
27032704
this.warmer.warm(reader);
27042705
}
@@ -3410,6 +3411,7 @@ public void close() throws IOException {
34103411
// TODO: add a dedicate recovery stats for the reset translog
34113412
});
34123413
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
3414+
newEngineReference.get().refresh("reset_engine");
34133415
synchronized (mutex) {
34143416
verifyNotClosed();
34153417
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@
204204
import static org.hamcrest.Matchers.hasItem;
205205
import static org.hamcrest.Matchers.hasKey;
206206
import static org.hamcrest.Matchers.hasSize;
207+
import static org.hamcrest.Matchers.in;
207208
import static org.hamcrest.Matchers.isIn;
208209
import static org.hamcrest.Matchers.lessThanOrEqualTo;
209210
import static org.hamcrest.Matchers.not;
@@ -215,6 +216,7 @@
215216
public class InternalEngineTests extends EngineTestCase {
216217

217218
public void testVersionMapAfterAutoIDDocument() throws IOException {
219+
engine.refresh("warm_up");
218220
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField("test"),
219221
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
220222
Engine.Index operation = randomBoolean() ?
@@ -926,6 +928,7 @@ public void testConcurrentGetAndFlush() throws Exception {
926928
}
927929

928930
public void testSimpleOperations() throws Exception {
931+
engine.refresh("warm_up");
929932
Engine.Searcher searchResult = engine.acquireSearcher("test");
930933
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
931934
searchResult.close();
@@ -1103,6 +1106,7 @@ public void testSimpleOperations() throws Exception {
11031106
}
11041107

11051108
public void testSearchResultRelease() throws Exception {
1109+
engine.refresh("warm_up");
11061110
Engine.Searcher searchResult = engine.acquireSearcher("test");
11071111
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
11081112
searchResult.close();
@@ -2175,7 +2179,7 @@ public void testVersioningPromotedReplica() throws IOException {
21752179
final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine);
21762180
final long currentSeqNo = getSequenceID(replicaEngine,
21772181
new Engine.Get(false, false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1();
2178-
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
2182+
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
21792183
final TotalHitCountCollector collector = new TotalHitCountCollector();
21802184
searcher.search(new MatchAllDocsQuery(), collector);
21812185
if (collector.getTotalHits() > 0) {
@@ -2740,7 +2744,7 @@ public void testEnableGcDeletes() throws Exception {
27402744
}
27412745

27422746
public void testExtractShardId() {
2743-
try (Engine.Searcher test = this.engine.acquireSearcher("test")) {
2747+
try (Engine.Searcher test = this.engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
27442748
ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader());
27452749
assertNotNull(shardId);
27462750
assertEquals(shardId, engine.config().getShardId());
@@ -3015,7 +3019,7 @@ public void testSkipTranslogReplay() throws IOException {
30153019
engine.close();
30163020
try (InternalEngine engine = new InternalEngine(config)) {
30173021
engine.skipTranslogRecovery();
3018-
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
3022+
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
30193023
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
30203024
assertThat(topDocs.totalHits.value, equalTo(0L));
30213025
}
@@ -3058,6 +3062,7 @@ public void testTranslogReplay() throws IOException {
30583062
// we need to reuse the engine config unless the parser.mappingModified won't work
30593063
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier));
30603064
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
3065+
engine.refresh("warm_up");
30613066

30623067
assertVisibleCount(engine, numDocs, false);
30633068
assertEquals(numDocs, translogHandler.appliedOperations());
@@ -3071,6 +3076,7 @@ public void testTranslogReplay() throws IOException {
30713076
engine.close();
30723077
translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
30733078
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
3079+
engine.refresh("warm_up");
30743080
assertVisibleCount(engine, numDocs, false);
30753081
assertEquals(0, translogHandler.appliedOperations());
30763082

@@ -3100,6 +3106,7 @@ public void testTranslogReplay() throws IOException {
31003106
engine.close();
31013107
translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
31023108
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
3109+
engine.refresh("warm_up");
31033110
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
31043111
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), numDocs + 1);
31053112
assertThat(topDocs.totalHits.value, equalTo(numDocs + 1L));
@@ -4491,7 +4498,7 @@ private void index(final InternalEngine engine, final int id) throws IOException
44914498
* second is the primary term.
44924499
*/
44934500
private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws EngineException {
4494-
try (Engine.Searcher searcher = engine.acquireSearcher("get")) {
4501+
try (Engine.Searcher searcher = engine.acquireSearcher("get", Engine.SearcherScope.INTERNAL)) {
44954502
final long primaryTerm;
44964503
final long seqNo;
44974504
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), get.uid());
@@ -4673,7 +4680,7 @@ public void testRefreshScopedSearcher() throws IOException {
46734680
InternalEngine engine =
46744681
// disable merges to make sure that the reader doesn't change unexpectedly during the test
46754682
createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
4676-
4683+
engine.refresh("warm_up");
46774684
try (Engine.Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
46784685
Engine.Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
46794686
assertSameReader(getSearcher, searchSearcher);
@@ -5536,7 +5543,7 @@ protected void doRun() throws Exception {
55365543

55375544
public void testAcquireSearcherOnClosingEngine() throws Exception {
55385545
engine.close();
5539-
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test"));
5546+
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL));
55405547
}
55415548

55425549
public void testNoOpOnClosingEngine() throws Exception {
@@ -6195,4 +6202,59 @@ public void afterRefresh(boolean didRefresh) {
61956202
}
61966203
}
61976204
}
6205+
6206+
public void testNotWarmUpSearcherInEngineCtor() throws Exception {
6207+
try (Store store = createStore()) {
6208+
List<ElasticsearchDirectoryReader> warmedUpReaders = new ArrayList<>();
6209+
Engine.Warmer warmer = reader -> {
6210+
assertNotNull(reader);
6211+
assertThat(reader, not(in(warmedUpReaders)));
6212+
warmedUpReaders.add(reader);
6213+
};
6214+
EngineConfig config = engine.config();
6215+
final TranslogConfig translogConfig = new TranslogConfig(config.getTranslogConfig().getShardId(),
6216+
createTempDir(), config.getTranslogConfig().getIndexSettings(), config.getTranslogConfig().getBigArrays());
6217+
EngineConfig configWithWarmer = new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(),
6218+
config.getIndexSettings(), warmer, store, config.getMergePolicy(), config.getAnalyzer(),
6219+
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
6220+
config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(),
6221+
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
6222+
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
6223+
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
6224+
try (InternalEngine engine = createEngine(configWithWarmer)) {
6225+
assertThat(warmedUpReaders, empty());
6226+
assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(),
6227+
equalTo("searcher was not warmed up yet for source[test]"));
6228+
int times = randomIntBetween(1, 10);
6229+
for (int i = 0; i < times; i++) {
6230+
engine.refresh("test");
6231+
}
6232+
assertThat(warmedUpReaders, hasSize(1));
6233+
try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
6234+
try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
6235+
assertSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader());
6236+
assertSame(warmedUpReaders.get(0), externalSearcher.getDirectoryReader());
6237+
}
6238+
}
6239+
index(engine, randomInt());
6240+
if (randomBoolean()) {
6241+
engine.refresh("test", Engine.SearcherScope.INTERNAL, true);
6242+
assertThat(warmedUpReaders, hasSize(1));
6243+
try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
6244+
try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
6245+
assertNotSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader());
6246+
}
6247+
}
6248+
}
6249+
engine.refresh("test");
6250+
assertThat(warmedUpReaders, hasSize(2));
6251+
try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
6252+
try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
6253+
assertSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader());
6254+
assertSame(warmedUpReaders.get(1), externalSearcher.getDirectoryReader());
6255+
}
6256+
}
6257+
}
6258+
}
6259+
}
61986260
}

0 commit comments

Comments
 (0)