Skip to content

Commit

Permalink
Integrated NRTReplicationEngine with TranslogManager
Browse files Browse the repository at this point in the history
Signed-off-by: Satyajit Ganguly <satyajga@amazon.com>
  • Loading branch information
Satyajit Ganguly committed Aug 1, 2022
1 parent 0535963 commit cf972a0
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.WriteOnlyTranslogManager;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand All @@ -49,18 +52,19 @@
*
* @opensearch.internal
*/
public class NRTReplicationEngine extends Engine {
public class NRTReplicationEngine extends Engine implements LifecycleAware {

private volatile SegmentInfos lastCommittedSegmentInfos;
private final NRTReplicationReaderManager readerManager;
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final Translog translog;
private final TranslogManager translogManager;

public NRTReplicationEngine(EngineConfig engineConfig) {
super(engineConfig);
store.incRef();
NRTReplicationReaderManager readerManager = null;
TranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
Expand All @@ -71,18 +75,49 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.readerManager = readerManager;
this.readerManager.addListener(completionStatsCache);
this.translog = openTranslog(
engineConfig,
getTranslogDeletionPolicy(engineConfig),
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
translogManagerRef = new WriteOnlyTranslogManager(
engineConfig.getTranslogConfig(),
engineConfig.getPrimaryTermSupplier(),
engineConfig.getGlobalCheckpointSupplier(),
localCheckpointTracker::markSeqNoAsPersisted
getTranslogDeletionPolicy(engineConfig),
shardId,
readLock,
this::getLocalCheckpointTracker,
translogUUID,
new TranslogEventListener() {
@Override
public void onFailure(String reason, Exception ex) {
failEngine(reason, ex);
}

@Override
public void onAfterTranslogSync() {
try {
translogManager.getTranslog().trimUnreferencedReaders();
} catch (IOException ex) {
throw new TranslogException(shardId, "failed to trim unreferenced translog readers", ex);
}
}
},
this
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager);
Translog translog = null;
if (translogManagerRef != null) {
translog = translogManagerRef.getTranslog();
}
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translog);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
}

public TranslogManager translogManager() {
return translogManager;
}

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
readerManager.updateSegments(infos);
Expand All @@ -91,7 +126,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
rollTranslogGeneration();
translogManager.rollTranslogGeneration();
}
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}
Expand Down Expand Up @@ -125,7 +160,7 @@ public boolean isThrottled() {
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimOperations(belowTerm, aboveSeqNo);
translogManager.getTranslog().trimOperations(belowTerm, aboveSeqNo);
} catch (Exception e) {
try {
failEngine("translog operations trimming failed", e);
Expand All @@ -140,7 +175,7 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws E
public IndexResult index(Index index) throws IOException {
ensureOpen();
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
final Translog.Location location = translog.add(new Translog.Index(index, indexResult));
final Translog.Location location = translogManager.getTranslog().add(new Translog.Index(index, indexResult));
indexResult.setTranslogLocation(location);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
Expand All @@ -152,7 +187,7 @@ public IndexResult index(Index index) throws IOException {
public DeleteResult delete(Delete delete) throws IOException {
ensureOpen();
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
Expand All @@ -164,7 +199,8 @@ public DeleteResult delete(Delete delete) throws IOException {
public NoOpResult noOp(NoOp noOp) throws IOException {
ensureOpen();
NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
final Translog.Location location = translogManager.getTranslog()
.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
Expand All @@ -184,22 +220,22 @@ protected ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(Search

@Override
public boolean isTranslogSyncNeeded() {
return translog.syncNeeded();
return translogManager.getTranslog().syncNeeded();
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
boolean synced = translog.ensureSynced(locations);
boolean synced = translogManager.getTranslog().ensureSynced(locations);
if (synced) {
translog.trimUnreferencedReaders();
translogManager.getTranslog().trimUnreferencedReaders();
}
return synced;
}

@Override
public void syncTranslog() throws IOException {
translog.sync();
translog.trimUnreferencedReaders();
translogManager.getTranslog().sync();
translogManager.getTranslog().trimUnreferencedReaders();
}

@Override
Expand Down Expand Up @@ -242,12 +278,12 @@ public long getMinRetainedSeqNo() {

@Override
public TranslogStats getTranslogStats() {
return translog.stats();
return translogManager.getTranslog().stats();
}

@Override
public Translog.Location getTranslogLastWriteLocation() {
return translog.getLastWriteLocation();
return translogManager.getTranslog().getLastWriteLocation();
}

@Override
Expand All @@ -266,7 +302,7 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) {

@Override
public long getLastSyncedGlobalCheckpoint() {
return translog.getLastSyncedGlobalCheckpoint();
return translogManager.getTranslog().getLastSyncedGlobalCheckpoint();
}

@Override
Expand Down Expand Up @@ -302,7 +338,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
public void trimUnreferencedTranslogFiles() throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimUnreferencedReaders();
translogManager.getTranslog().trimUnreferencedReaders();
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
Expand All @@ -315,15 +351,15 @@ public void trimUnreferencedTranslogFiles() throws EngineException {

@Override
public boolean shouldRollTranslogGeneration() {
return translog.shouldRollGeneration();
return translogManager.getTranslog().shouldRollGeneration();
}

@Override
public void rollTranslogGeneration() throws EngineException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
translog.rollGeneration();
translog.trimUnreferencedReaders();
translogManager.getTranslog().rollGeneration();
translogManager.getTranslog().trimUnreferencedReaders();
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
Expand Down Expand Up @@ -370,7 +406,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
IOUtils.close(readerManager, translog, store::decRef);
IOUtils.close(readerManager, translogManager().getTranslog(), store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
} finally {
Expand Down Expand Up @@ -421,12 +457,12 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {}

public Translog getTranslog() {
return translog;
return translogManager.getTranslog();
}

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
}
Expand Down Expand Up @@ -469,21 +505,4 @@ private Translog openTranslog(
persistedSequenceNumberConsumer
);
}

private TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
.create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
}
return Objects.requireNonNullElseGet(
customTranslogDeletionPolicy,
() -> new DefaultTranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testEngineWritesOpsToTranslog() throws Exception {
// recover a new engine from the nrtEngine's xlog.
nrtEngine.syncTranslog();
try (InternalEngine engine = new InternalEngine(nrtEngine.config())) {
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
assertEquals(getDocIds(engine, true), docs);
}
assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog());
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testUpdateSegments() throws Exception {

assertEquals(
nrtEngine.getTranslog().getGeneration().translogFileGeneration,
engine.getTranslog().getGeneration().translogFileGeneration
engine.translogManager().getTranslog().getGeneration().translogFileGeneration
);

try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
Expand Down

0 comments on commit cf972a0

Please sign in to comment.