Skip to content

Commit 7673092

Browse files
committed
[ENGINE] Make translog fully private to the engine
This commit moves the translog creation into the InternalEngine to ensure the transactino log is created after we acquired the write lock on the index. This also prevents races when ShadowEngines are shutting down due to node restarts where another node already takes over the not yet fully synced transaction log.
1 parent 7e5c238 commit 7673092

18 files changed

+110
-105
lines changed

src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ public final Searcher acquireSearcher(String source) throws EngineException {
278278
}
279279

280280
/** returns the translog for this engine */
281-
public abstract Translog translog();
281+
public abstract Translog getTranslog();
282282

283283
protected void ensureOpen() {
284284
if (isClosed.get()) {

src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.elasticsearch.common.unit.ByteSizeUnit;
3030
import org.elasticsearch.common.unit.ByteSizeValue;
3131
import org.elasticsearch.common.unit.TimeValue;
32+
import org.elasticsearch.common.util.BigArray;
33+
import org.elasticsearch.common.util.BigArrays;
3234
import org.elasticsearch.common.util.concurrent.EsExecutors;
3335
import org.elasticsearch.index.codec.CodecService;
3436
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
@@ -39,9 +41,12 @@
3941
import org.elasticsearch.index.shard.ShardId;
4042
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
4143
import org.elasticsearch.index.store.Store;
44+
import org.elasticsearch.index.translog.fs.FsTranslog;
4245
import org.elasticsearch.indices.IndicesWarmer;
4346
import org.elasticsearch.threadpool.ThreadPool;
4447

48+
import java.io.IOException;
49+
import java.nio.file.Path;
4550
import java.util.concurrent.TimeUnit;
4651

4752
/*
@@ -77,6 +82,8 @@ public final class EngineConfig {
7782
private final boolean ignoreUnknownTranslog;
7883
private final QueryCache filterCache;
7984
private final QueryCachingPolicy filterCachingPolicy;
85+
private final BigArrays bigArrays;
86+
private final Path translogPath;
8087

8188
/**
8289
* Index setting for index concurrency / number of threadstates in the indexwriter.
@@ -139,10 +146,10 @@ public final class EngineConfig {
139146
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
140147
*/
141148
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService,
142-
IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
143-
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, Analyzer analyzer,
144-
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
145-
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache filterCache, QueryCachingPolicy filterCachingPolicy) {
149+
IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
150+
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, Analyzer analyzer,
151+
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
152+
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache filterCache, QueryCachingPolicy filterCachingPolicy, BigArrays bigArrays, Path translogPath) {
146153
this.shardId = shardId;
147154
this.threadPool = threadPool;
148155
this.indexingService = indexingService;
@@ -156,6 +163,8 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService
156163
this.similarity = similarity;
157164
this.codecService = codecService;
158165
this.failedEngineListener = failedEngineListener;
166+
this.bigArrays = bigArrays;
167+
this.translogPath = translogPath;
159168
Settings indexSettings = indexSettingsService.getSettings();
160169
this.optimizeAutoGenerateId = indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false);
161170
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
@@ -421,4 +430,25 @@ public QueryCache getFilterCache() {
421430
public QueryCachingPolicy getFilterCachingPolicy() {
422431
return filterCachingPolicy;
423432
}
433+
434+
/**
435+
* Returns a BigArrays instance for this engine
436+
*/
437+
public BigArrays getBigArrays() {
438+
return bigArrays;
439+
}
440+
441+
/**
442+
* Returns the translog path for this engine
443+
*/
444+
public Path getTranslogPath() {
445+
return translogPath;
446+
}
447+
448+
/**
449+
* Returns the {@link org.elasticsearch.index.settings.IndexSettingsService} for this engine.
450+
*/
451+
public IndexSettingsService getIndesSettingService() {
452+
return indexSettingsService;
453+
}
424454
}

src/main/java/org/elasticsearch/index/engine/EngineFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
*/
2626
public interface EngineFactory {
2727

28-
public Engine newReadWriteEngine(EngineConfig config, FsTranslog translog, boolean skipTranslogRecovery);
28+
public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery);
2929

3030
public Engine newReadOnlyEngine(EngineConfig config);
3131
}

src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,19 +104,18 @@ public class InternalEngine extends Engine {
104104

105105
private final IndexThrottle throttle;
106106

107-
public InternalEngine(EngineConfig engineConfig, FsTranslog translog, boolean skipInitialTranslogRecovery) throws EngineException {
107+
public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException {
108108
super(engineConfig);
109-
Preconditions.checkNotNull(translog, "Translog must be provided to the engine");
110109
this.versionMap = new LiveVersionMap();
111110
store.incRef();
112111
IndexWriter writer = null;
112+
FsTranslog translog = null;
113113
SearcherManager manager = null;
114114
boolean success = false;
115115
try {
116116
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
117117
this.indexingService = engineConfig.getIndexingService();
118118
this.warmer = engineConfig.getWarmer();
119-
this.translog = translog;
120119
this.mergePolicyProvider = engineConfig.getMergePolicyProvider();
121120
this.mergeScheduler = engineConfig.getMergeScheduler();
122121
this.dirtyLocks = new Object[engineConfig.getIndexConcurrency() * 50]; // we multiply it to have enough...
@@ -130,10 +129,12 @@ public InternalEngine(EngineConfig engineConfig, FsTranslog translog, boolean sk
130129
try {
131130
writer = createWriter();
132131
indexWriter = writer;
132+
translog = new FsTranslog(engineConfig.getShardId(), engineConfig.getIndesSettingService(), engineConfig.getBigArrays(), engineConfig.getTranslogPath(), engineConfig.getThreadPool());
133133
committedTranslogId = loadCommittedTranslogId(writer, translog);
134134
} catch (IOException e) {
135135
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
136136
}
137+
this.translog = translog;
137138
manager = createSearcherManager();
138139
this.searcherManager = manager;
139140
this.versionMap.setManager(searcherManager);
@@ -154,7 +155,7 @@ public InternalEngine(EngineConfig engineConfig, FsTranslog translog, boolean sk
154155
success = true;
155156
} finally {
156157
if (success == false) {
157-
IOUtils.closeWhileHandlingException(writer, manager);
158+
IOUtils.closeWhileHandlingException(writer, translog, manager);
158159
versionMap.clear();
159160
if (isClosed.get() == false) {
160161
// failure we need to dec the store reference
@@ -166,7 +167,7 @@ public InternalEngine(EngineConfig engineConfig, FsTranslog translog, boolean sk
166167
}
167168

168169
@Override
169-
public Translog translog() {
170+
public Translog getTranslog() {
170171
ensureOpen();
171172
return translog;
172173
}
@@ -913,7 +914,6 @@ protected final void closeNoLock(String reason) {
913914
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
914915
try {
915916
this.versionMap.clear();
916-
logger.trace("close searcherManager");
917917
try {
918918
IOUtils.close(searcherManager);
919919
} catch (Throwable t) {

src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222

2323
public class InternalEngineFactory implements EngineFactory {
2424
@Override
25-
public Engine newReadWriteEngine(EngineConfig config, FsTranslog translog, boolean skipTranslogRecovery) {
26-
return new InternalEngine(config, translog, skipTranslogRecovery);
25+
public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) {
26+
return new InternalEngine(config, skipTranslogRecovery);
2727
}
2828

2929
@Override

src/main/java/org/elasticsearch/index/engine/ShadowEngine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ public GetResult get(Get get) throws EngineException {
169169
}
170170

171171
@Override
172-
public Translog translog() {
173-
throw new UnsupportedOperationException("shard engines don't have translogs");
172+
public Translog getTranslog() {
173+
throw new UnsupportedOperationException("shadow engines don't have translogs");
174174
}
175175

176176
@Override

src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
102102
import org.elasticsearch.index.translog.Translog;
103103
import org.elasticsearch.index.translog.TranslogStats;
104-
import org.elasticsearch.index.translog.fs.FsTranslog;
105104
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
106105
import org.elasticsearch.index.warmer.WarmerStats;
107106
import org.elasticsearch.indices.IndicesLifecycle;
@@ -259,8 +258,8 @@ public boolean canIndex() {
259258
return true;
260259
}
261260

262-
public Translog translog() {
263-
return engine().translog();
261+
public Translog.View newTranslogView() {
262+
return engine().getTranslog().newView();
264263
}
265264

266265
public ShardIndexingService indexingService() {
@@ -655,7 +654,7 @@ public IdCacheStats idCacheStats() {
655654
}
656655

657656
public TranslogStats translogStats() {
658-
return engine().translog().stats();
657+
return engine().getTranslog().stats();
659658
}
660659

661660
public SuggestStats suggestStats() {
@@ -817,12 +816,18 @@ private Map<String, Mapping> internalPerformTranslogRecovery(boolean skipTranslo
817816
* After the store has been recovered, we need to start the engine. This method starts a new engine but skips
818817
* the replay of the transaction log which is required in cases where we restore a previous index or recover from
819818
* a remote peer.
819+
*
820+
* @param wipeTranslogs if set to <code>true</code> all skipped / uncommitted translogs are removed.
820821
*/
821-
public void skipTranslogRecovery() {
822+
public void skipTranslogRecovery(boolean wipeTranslogs) throws IOException {
822823
assert engineUnsafe() == null : "engine was already created";
823824
Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(true);
824825
assert recoveredTypes.isEmpty();
825826
assert recoveryState.getTranslog().recoveredOperations() == 0;
827+
if (wipeTranslogs) {
828+
final Translog translog = engine().getTranslog();
829+
translog.markCommitted(translog.currentId());
830+
}
826831
}
827832

828833
/** called if recovery has to be restarted after network error / delay ** */
@@ -964,7 +969,7 @@ public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValu
964969
}
965970
Engine engine = engineUnsafe();
966971
if (engine != null) {
967-
engine.translog().updateBuffer(shardTranslogBufferSize);
972+
engine.getTranslog().updateBuffer(shardTranslogBufferSize);
968973
}
969974
}
970975

@@ -1218,21 +1223,7 @@ private void createNewEngine(boolean skipTranslogRecovery, EngineConfig config)
12181223
}
12191224

12201225
protected Engine newEngine(boolean skipTranslogRecovery, EngineConfig config) {
1221-
final FsTranslog translog;
1222-
try {
1223-
translog = new FsTranslog(shardId, indexSettingsService, bigArrays, path, threadPool);
1224-
} catch (IOException e) {
1225-
throw new EngineCreationFailureException(shardId, "failed to create translog", e);
1226-
}
1227-
Engine engine = null;
1228-
try {
1229-
engine = engineFactory.newReadWriteEngine(config, translog, skipTranslogRecovery);
1230-
} finally {
1231-
if (engine == null) {
1232-
IOUtils.closeWhileHandlingException(translog);
1233-
}
1234-
}
1235-
return engine;
1226+
return engineFactory.newReadWriteEngine(config, skipTranslogRecovery);
12361227
}
12371228

12381229
/**
@@ -1293,6 +1284,6 @@ protected void operationProcessed() {
12931284
};
12941285
return new EngineConfig(shardId,
12951286
threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, mergePolicyProvider, mergeScheduler,
1296-
mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.filter(), indexCache.filterPolicy());
1287+
mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.filter(), indexCache.filterPolicy(), bigArrays, shardPath().resolveTranslog());
12971288
}
12981289
}

src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,4 @@ protected Engine newEngine(boolean skipInitialTranslogRecovery, EngineConfig con
121121
public boolean allowsPrimaryPromotion() {
122122
return false;
123123
}
124-
125-
@Override
126-
@Nullable
127-
public Translog translog() {
128-
throw new UnsupportedOperationException("shadow shards don't have a translog");
129-
}
130124
}

src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@ public void restore(final RecoveryState recoveryState) {
125125
snapshotShardId = new ShardId(restoreSource.index(), shardId.id());
126126
}
127127
indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryState);
128-
indexShard.skipTranslogRecovery();
129-
indexShard.translog().markCommitted(indexShard.translog().currentId());
128+
indexShard.skipTranslogRecovery(true);
130129
indexShard.finalizeRecovery();
131130
indexShard.postRecovery("restore done");
132131
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId);

src/main/java/org/elasticsearch/index/translog/TranslogService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void run() {
142142
return;
143143
}
144144

145-
if (indexShard.translog() == null) {
145+
if (indexShard.engine().getTranslog() == null) {
146146
reschedule();
147147
return;
148148
}

0 commit comments

Comments
 (0)