Skip to content

Commit 54ccdc7

Browse files
authored
Do not create engine under IndexShard#mutex (#45263)
Today we create new engines under IndexShard#mutex. This is not ideal because it can block the cluster state updates which also execute under the same mutex. We can avoid this problem by creating new engines under a separate mutex. Closes #43699
1 parent 361d637 commit 54ccdc7

File tree

4 files changed

+167
-65
lines changed

4 files changed

+167
-65
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 88 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
213213

214214
protected volatile ShardRouting shardRouting;
215215
protected volatile IndexShardState state;
216+
// ensure happens-before relation between addRefreshListener() and postRecovery()
217+
private final Object postRecoveryMutex = new Object();
216218
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
217-
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
219+
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
220+
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
218221
final EngineFactory engineFactory;
219222

220223
private final IndexingOperationListener indexingOperationListeners;
@@ -1192,20 +1195,23 @@ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
11921195
* @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present.
11931196
*/
11941197
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
1198+
assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex";
11951199
Engine.IndexCommitRef indexCommit = null;
11961200
store.incRef();
11971201
try {
1198-
Engine engine;
1199-
synchronized (mutex) {
1202+
synchronized (engineMutex) {
12001203
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
1201-
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
1202-
// That can be done out of mutex, since the engine can be closed half way.
1203-
engine = getEngineOrNull();
1204-
if (engine == null) {
1204+
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1205+
synchronized (mutex) {
1206+
final Engine engine = getEngineOrNull();
1207+
if (engine != null) {
1208+
indexCommit = engine.acquireLastIndexCommit(false);
1209+
}
1210+
}
1211+
if (indexCommit == null) {
12051212
return store.getMetadata(null, true);
12061213
}
12071214
}
1208-
indexCommit = engine.acquireLastIndexCommit(false);
12091215
return store.getMetadata(indexCommit.getIndexCommit());
12101216
} finally {
12111217
store.decRef();
@@ -1334,23 +1340,24 @@ public void close(String reason, boolean flushEngine) throws IOException {
13341340
}
13351341
}
13361342

1337-
public IndexShard postRecovery(String reason)
1338-
throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
1339-
synchronized (mutex) {
1340-
if (state == IndexShardState.CLOSED) {
1341-
throw new IndexShardClosedException(shardId);
1342-
}
1343-
if (state == IndexShardState.STARTED) {
1344-
throw new IndexShardStartedException(shardId);
1345-
}
1343+
public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
1344+
synchronized (postRecoveryMutex) {
13461345
// we need to refresh again to expose all operations that were index until now. Otherwise
13471346
// we may not expose operations that were indexed with a refresh listener that was immediately
1348-
// responded to in addRefreshListener.
1347+
// responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener
1348+
// and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard).
13491349
getEngine().refresh("post_recovery");
1350-
recoveryState.setStage(RecoveryState.Stage.DONE);
1351-
changeState(IndexShardState.POST_RECOVERY, reason);
1350+
synchronized (mutex) {
1351+
if (state == IndexShardState.CLOSED) {
1352+
throw new IndexShardClosedException(shardId);
1353+
}
1354+
if (state == IndexShardState.STARTED) {
1355+
throw new IndexShardStartedException(shardId);
1356+
}
1357+
recoveryState.setStage(RecoveryState.Stage.DONE);
1358+
changeState(IndexShardState.POST_RECOVERY, reason);
1359+
}
13521360
}
1353-
return this;
13541361
}
13551362

13561363
/**
@@ -1583,6 +1590,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException {
15831590
}
15841591

15851592
private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
1593+
assert Thread.holdsLock(mutex) == false : "opening engine under mutex";
15861594
if (state != IndexShardState.RECOVERING) {
15871595
throw new IndexShardNotRecoveringException(shardId, state);
15881596
}
@@ -1595,16 +1603,24 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
15951603
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
15961604
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
15971605
+ "] but got " + getRetentionLeases();
1598-
synchronized (mutex) {
1599-
verifyNotClosed();
1600-
assert currentEngineReference.get() == null : "engine is running";
1606+
synchronized (engineMutex) {
16011607
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
16021608
final Engine newEngine = engineFactory.newReadWriteEngine(config);
1603-
onNewEngine(newEngine);
1604-
currentEngineReference.set(newEngine);
1605-
// We set active because we are now writing operations to the engine; this way,
1606-
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
1607-
active.set(true);
1609+
synchronized (mutex) {
1610+
try {
1611+
verifyNotClosed();
1612+
assert currentEngineReference.get() == null : "engine is running";
1613+
onNewEngine(newEngine);
1614+
currentEngineReference.set(newEngine);
1615+
// We set active because we are now writing operations to the engine; this way,
1616+
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
1617+
active.set(true);
1618+
} finally {
1619+
if (currentEngineReference.get() != newEngine) {
1620+
newEngine.close();
1621+
}
1622+
}
1623+
}
16081624
}
16091625
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
16101626
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
@@ -1627,6 +1643,7 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
16271643
}
16281644

16291645
private void onNewEngine(Engine newEngine) {
1646+
assert Thread.holdsLock(engineMutex);
16301647
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
16311648
}
16321649

@@ -2675,7 +2692,13 @@ private DocumentMapperForType docMapper(String type) {
26752692
}
26762693

26772694
private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
2678-
Sort indexSort = indexSortSupplier.get();
2695+
final Sort indexSort = indexSortSupplier.get();
2696+
final Engine.Warmer warmer = reader -> {
2697+
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
2698+
if (this.warmer != null) {
2699+
this.warmer.warm(reader);
2700+
}
2701+
};
26792702
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
26802703
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
26812704
mapperService != null ? mapperService.indexAnalyzer() : null,
@@ -3237,10 +3260,10 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
32373260
if (isReadAllowed()) {
32383261
readAllowed = true;
32393262
} else {
3240-
// check again under mutex. this is important to create a happens before relationship
3263+
// check again under postRecoveryMutex. this is important to create a happens before relationship
32413264
// between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
32423265
// to a listener before a refresh actually happened that contained that operation.
3243-
synchronized (mutex) {
3266+
synchronized (postRecoveryMutex) {
32443267
readAllowed = isReadAllowed();
32453268
}
32463269
}
@@ -3305,6 +3328,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
33053328
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
33063329
*/
33073330
void resetEngineToGlobalCheckpoint() throws IOException {
3331+
assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
33083332
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
33093333
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
33103334
sync(); // persist the global checkpoint to disk
@@ -3316,23 +3340,28 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
33163340
SetOnce<Engine> newEngineReference = new SetOnce<>();
33173341
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
33183342
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
3319-
synchronized (mutex) {
3320-
verifyNotClosed();
3321-
// we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata,
3343+
synchronized (engineMutex) {
3344+
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
33223345
// acquireXXXCommit and close works.
33233346
final Engine readOnlyEngine =
33243347
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) {
33253348
@Override
33263349
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
3327-
synchronized (mutex) {
3350+
synchronized (engineMutex) {
3351+
if (newEngineReference.get() == null) {
3352+
throw new AlreadyClosedException("engine was closed");
3353+
}
33283354
// ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay
33293355
return newEngineReference.get().acquireLastIndexCommit(false);
33303356
}
33313357
}
33323358

33333359
@Override
33343360
public IndexCommitRef acquireSafeIndexCommit() {
3335-
synchronized (mutex) {
3361+
synchronized (engineMutex) {
3362+
if (newEngineReference.get() == null) {
3363+
throw new AlreadyClosedException("engine was closed");
3364+
}
33363365
return newEngineReference.get().acquireSafeIndexCommit();
33373366
}
33383367
}
@@ -3349,9 +3378,28 @@ public void close() throws IOException {
33493378
IOUtils.close(super::close, newEngine);
33503379
}
33513380
};
3352-
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
3353-
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
3354-
onNewEngine(newEngineReference.get());
3381+
synchronized (mutex) {
3382+
try {
3383+
verifyNotClosed();
3384+
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
3385+
} finally {
3386+
if (currentEngineReference.get() != readOnlyEngine) {
3387+
readOnlyEngine.close();
3388+
}
3389+
}
3390+
}
3391+
final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker));
3392+
synchronized (mutex) {
3393+
try {
3394+
verifyNotClosed();
3395+
newEngineReference.set(newReadWriteEngine);
3396+
onNewEngine(newReadWriteEngine);
3397+
} finally {
3398+
if (newEngineReference.get() != newReadWriteEngine) {
3399+
newReadWriteEngine.close(); // shard was closed
3400+
}
3401+
}
3402+
}
33553403
}
33563404
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
33573405
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,12 @@
7979
import org.elasticsearch.env.NodeEnvironment;
8080
import org.elasticsearch.index.IndexSettings;
8181
import org.elasticsearch.index.VersionType;
82+
import org.elasticsearch.index.codec.CodecService;
8283
import org.elasticsearch.index.engine.CommitStats;
8384
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
8485
import org.elasticsearch.index.engine.Engine;
8586
import org.elasticsearch.index.engine.Engine.DeleteResult;
87+
import org.elasticsearch.index.engine.EngineConfig;
8688
import org.elasticsearch.index.engine.EngineTestCase;
8789
import org.elasticsearch.index.engine.InternalEngine;
8890
import org.elasticsearch.index.engine.InternalEngineFactory;
@@ -4131,4 +4133,39 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
41314133
assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs));
41324134
closeShards(readonlyShard);
41334135
}
4136+
4137+
public void testCloseShardWhileEngineIsWarming() throws Exception {
4138+
CountDownLatch warmerStarted = new CountDownLatch(1);
4139+
CountDownLatch warmerBlocking = new CountDownLatch(1);
4140+
IndexShard shard = newShard(true, Settings.EMPTY, config -> {
4141+
Engine.Warmer warmer = reader -> {
4142+
try {
4143+
warmerStarted.countDown();
4144+
warmerBlocking.await();
4145+
config.getWarmer().warm(reader);
4146+
} catch (InterruptedException e) {
4147+
throw new AssertionError(e);
4148+
}
4149+
};
4150+
EngineConfig configWithWarmer = new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(),
4151+
config.getIndexSettings(), warmer, config.getStore(), config.getMergePolicy(), config.getAnalyzer(),
4152+
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
4153+
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
4154+
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
4155+
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
4156+
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
4157+
return new InternalEngine(configWithWarmer);
4158+
});
4159+
Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard)));
4160+
recoveryThread.start();
4161+
try {
4162+
warmerStarted.await();
4163+
shard.close("testing", false);
4164+
assertThat(shard.state, equalTo(IndexShardState.CLOSED));
4165+
} finally {
4166+
warmerBlocking.countDown();
4167+
}
4168+
recoveryThread.join();
4169+
shard.store().close();
4170+
}
41344171
}

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,34 +1130,38 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio
11301130
}
11311131

11321132
public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) throws IOException {
1133-
if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false || engine instanceof InternalEngine == false) {
1134-
return;
1133+
if (engine instanceof InternalEngine) {
1134+
try {
1135+
engine.refresh("test");
1136+
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
1137+
assertAtMostOneLuceneDocumentPerSequenceNumber(engine.config().getIndexSettings(), searcher.getDirectoryReader());
1138+
}
1139+
} catch (AlreadyClosedException ignored) {
1140+
// engine was closed
1141+
}
11351142
}
1136-
try {
1137-
engine.refresh("test");
1138-
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
1139-
DirectoryReader reader = Lucene.wrapAllDocsLive(searcher.getDirectoryReader());
1140-
Set<Long> seqNos = new HashSet<>();
1141-
for (LeafReaderContext leaf : reader.leaves()) {
1142-
NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
1143-
NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
1144-
int docId;
1145-
while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
1146-
assertTrue(seqNoDocValues.advanceExact(docId));
1147-
long seqNo = seqNoDocValues.longValue();
1148-
assertThat(seqNo, greaterThanOrEqualTo(0L));
1149-
if (primaryTermDocValues.advanceExact(docId)) {
1150-
if (seqNos.add(seqNo) == false) {
1151-
final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor();
1152-
leaf.reader().document(docId, idFieldVisitor);
1153-
throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId());
1154-
}
1155-
}
1143+
}
1144+
1145+
public static void assertAtMostOneLuceneDocumentPerSequenceNumber(IndexSettings indexSettings,
1146+
DirectoryReader reader) throws IOException {
1147+
Set<Long> seqNos = new HashSet<>();
1148+
final DirectoryReader wrappedReader = indexSettings.isSoftDeleteEnabled() ? Lucene.wrapAllDocsLive(reader) : reader;
1149+
for (LeafReaderContext leaf : wrappedReader.leaves()) {
1150+
NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
1151+
NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
1152+
int docId;
1153+
while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
1154+
assertTrue(seqNoDocValues.advanceExact(docId));
1155+
long seqNo = seqNoDocValues.longValue();
1156+
assertThat(seqNo, greaterThanOrEqualTo(0L));
1157+
if (primaryTermDocValues.advanceExact(docId)) {
1158+
if (seqNos.add(seqNo) == false) {
1159+
final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor();
1160+
leaf.reader().document(docId, idFieldVisitor);
1161+
throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId());
11561162
}
11571163
}
11581164
}
1159-
} catch (AlreadyClosedException ignored) {
1160-
11611165
}
11621166
}
11631167

0 commit comments

Comments
 (0)