Skip to content

Commit

Permalink
[Backport 2.x] Changes to encapsulate Translog into TranslogManager (#…
Browse files Browse the repository at this point in the history
…4095)

* Changes to encapsulate Translog into TranslogManager. 

Signed-off-by: Satyajit Ganguly <satyajga@amazon.com>
Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
satyajitg28 and Bukhtawar authored Aug 3, 2022
1 parent 01123f8 commit 98dfdf5
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats;
Expand Down Expand Up @@ -134,8 +133,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -300,7 +297,7 @@ public void onFailure(String reason, Exception ex) {
logger,
translogDeletionPolicy,
softDeletesPolicy,
translogManager.getTranslog()::getLastSyncedGlobalCheckpoint
translogManager::getLastSyncedGlobalCheckpoint
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
Expand Down Expand Up @@ -334,7 +331,7 @@ public void onFailure(String reason, Exception ex) {
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint());
this.internalReaderManager.addListener(lastRefreshedCheckpointListener);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(
SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog().getMaxSeqNo())
SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getMaxSeqNo())
);
if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) {
Expand All @@ -352,11 +349,7 @@ public void onFailure(String reason, Exception ex) {
success = true;
} finally {
if (success == false) {
Translog translog = null;
if (translogManagerRef != null) {
translog = translogManagerRef.getTranslog();
}
IOUtils.closeWhileHandlingException(writer, translog, internalReaderManager, externalReaderManager, scheduler);
IOUtils.closeWhileHandlingException(writer, translogManagerRef, internalReaderManager, externalReaderManager, scheduler);
if (isClosed.get() == false) {
// failure we need to dec the store reference
store.decRef();
Expand Down Expand Up @@ -389,7 +382,7 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1;
}
return new SoftDeletesPolicy(
translogManager.getTranslog()::getLastSyncedGlobalCheckpoint,
translogManager::getLastSyncedGlobalCheckpoint,
lastMinRetainedSeqNo,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(),
engineConfig.retentionLeasesSupplier()
Expand Down Expand Up @@ -543,15 +536,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (translogManager().getPendingTranslogRecovery().get() == false) {
if (translogManager.getPendingTranslogRecovery().get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo);
} catch (Exception e) {
try {
translogManager().getPendingTranslogRecovery().set(true); // just play safe and never allow commits on this see
// #ensureCanFlush
translogManager.getPendingTranslogRecovery().set(true); // just play safe and never allow commits on this see
// #ensureCanFlush
failEngine("failed to recover from translog", e);
} catch (Exception inner) {
e.addSuppressed(inner);
Expand All @@ -571,7 +564,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
final int opsRecovered;
final long localCheckpoint = getProcessedLocalCheckpoint();
if (localCheckpoint < recoverUpToSeqNo) {
try (Translog.Snapshot snapshot = translogManager().getTranslog().newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
try (Translog.Snapshot snapshot = translogManager.getTranslog().newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
Expand All @@ -581,38 +574,17 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert translogManager().getPendingTranslogRecovery().get() : "translogRecovery is not pending but should be";
translogManager().getPendingTranslogRecovery().set(false); // we are good - now we can commit
assert translogManager.getPendingTranslogRecovery().get() : "translogRecovery is not pending but should be";
translogManager.getPendingTranslogRecovery().set(false); // we are good - now we can commit
logger.trace(
() -> new ParameterizedMessage(
"flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
opsRecovered,
translogManager().getTranslog().currentFileGeneration()
translogManager.getTranslog().currentFileGeneration()
)
);
flush(false, true);
translogManager().getTranslog().trimUnreferencedReaders();
}

private Translog openTranslog(
EngineConfig engineConfig,
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongConsumer persistedSequenceNumberConsumer
) throws IOException {

final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(
translogConfig,
translogUUID,
translogDeletionPolicy,
globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier(),
persistedSequenceNumberConsumer
);
translogManager.trimUnreferencedReaders();
}

// Package private for testing purposes only
Expand All @@ -622,40 +594,35 @@ boolean hasSnapshottedCommits() {

@Override
public boolean isTranslogSyncNeeded() {
return translogManager().getTranslog().syncNeeded();
return translogManager.getTranslog().syncNeeded();
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
final boolean synced = translogManager().getTranslog().ensureSynced(locations);
if (synced) {
revisitIndexDeletionPolicyOnTranslogSynced();
}
return synced;
return translogManager.ensureTranslogSynced(locations);
}

@Override
public void syncTranslog() throws IOException {
translogManager().getTranslog().sync();
revisitIndexDeletionPolicyOnTranslogSynced();
translogManager.syncTranslog();
}

@Override
public TranslogStats getTranslogStats() {
return translogManager().getTranslog().stats();
return translogManager.getTranslog().stats();
}

@Override
public Translog.Location getTranslogLastWriteLocation() {
return translogManager().getTranslog().getLastWriteLocation();
return translogManager.getTranslogLastWriteLocation();
}

private void revisitIndexDeletionPolicyOnTranslogSynced() {
try {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
}
translogManager.getTranslog().trimUnreferencedReaders();
translogManager.trimUnreferencedReaders();
} catch (IOException ex) {
throw new TranslogException(shardId, "Failed to execute index deletion policy on translog synced", ex);
}
Expand Down Expand Up @@ -747,7 +714,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher>
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
Translog.Operation operation = translogManager.getTranslog().readOperation(versionValue.getLocation());
Translog.Operation operation = translogManager.readOperation(versionValue.getLocation());
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
final Translog.Index index = (Translog.Index) operation;
Expand Down Expand Up @@ -1052,7 +1019,7 @@ public IndexResult index(Index index) throws IOException {
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translogManager.getTranslog().add(new Translog.Index(index, indexResult));
location = translogManager.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(
Expand Down Expand Up @@ -1492,7 +1459,7 @@ public DeleteResult delete(Delete delete) throws IOException {
}
}
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult));
final Translog.Location location = translogManager.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
}
localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo());
Expand Down Expand Up @@ -1819,8 +1786,9 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
}
noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translogManager.getTranslog()
.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
final Translog.Location location = translogManager.add(
new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())
);
noOpResult.setTranslogLocation(location);
}
}
Expand Down Expand Up @@ -1999,7 +1967,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {

// we need to refresh in order to clear older version values
refresh("version_table_flush", SearcherScope.INTERNAL, true);
translogManager.getTranslog().trimUnreferencedReaders();
translogManager.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
Expand Down Expand Up @@ -2723,7 +2691,7 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran
// the setting will be re-interpreted if it's set to true
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy();
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps);
Expand All @@ -2739,7 +2707,7 @@ LocalCheckpointTracker getLocalCheckpointTracker() {

@Override
public long getLastSyncedGlobalCheckpoint() {
return translogManager.getTranslog().getLastSyncedGlobalCheckpoint();
return translogManager.getLastSyncedGlobalCheckpoint();
}

public long getProcessedLocalCheckpoint() {
Expand Down
Loading

0 comments on commit 98dfdf5

Please sign in to comment.