Skip to content

Remove translog retention policy #51417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 2 additions & 59 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ public final class IndexSettings {
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
Setting.timeSetting("index.translog.retention.age",
settings -> shouldDisableTranslogRetention(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12),
Setting.timeSetting("index.translog.retention.age", settings -> TimeValue.MINUS_ONE,
TimeValue.MINUS_ONE, Property.Dynamic, Property.IndexScope);

/**
Expand All @@ -259,9 +258,7 @@ public final class IndexSettings {
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size",
settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB",
Property.Dynamic, Property.IndexScope);
Setting.byteSizeSetting("index.translog.retention.size", settings -> "-1", Property.Dynamic, Property.IndexScope);

/**
* Controls the number of translog files that are no longer needed for persistence reasons will be kept around before being deleted.
Expand Down Expand Up @@ -351,8 +348,6 @@ public final class IndexSettings {
private volatile TimeValue syncInterval;
private volatile TimeValue refreshInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile TimeValue translogRetentionAge;
private volatile ByteSizeValue translogRetentionSize;
private volatile ByteSizeValue generationThresholdSize;
private volatile ByteSizeValue flushAfterMergeThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
Expand Down Expand Up @@ -512,8 +507,6 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
this.indexSortConfig = new IndexSortConfig(this);
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);
setTranslogRetentionAge(scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING));
setTranslogRetentionSize(scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING));

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
Expand Down Expand Up @@ -551,8 +544,6 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset);
Expand All @@ -578,24 +569,6 @@ private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) {
this.flushAfterMergeThresholdSize = byteSizeValue;
}

private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) {
// ignore the translog retention settings if soft-deletes enabled
this.translogRetentionSize = new ByteSizeValue(-1);
} else {
this.translogRetentionSize = byteSizeValue;
}
}

private void setTranslogRetentionAge(TimeValue age) {
if (shouldDisableTranslogRetention(settings) && age.millis() >= 0) {
// ignore the translog retention settings if soft-deletes enabled
this.translogRetentionAge = TimeValue.MINUS_ONE;
} else {
this.translogRetentionAge = age;
}
}

private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
this.generationThresholdSize = generationThresholdSize;
}
Expand Down Expand Up @@ -772,36 +745,6 @@ public TimeValue getRefreshInterval() {
*/
public ByteSizeValue getFlushAfterMergeThresholdSize() { return flushAfterMergeThresholdSize; }

/**
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
public ByteSizeValue getTranslogRetentionSize() {
assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
return translogRetentionSize;
}

/**
* Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept
* around
*/
public TimeValue getTranslogRetentionAge() {
assert shouldDisableTranslogRetention(settings) == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
return translogRetentionAge;
}

/**
* Returns the maximum number of translog files that that no longer required for persistence should be kept for peer recovery
* when soft-deletes is disabled.
*/
public int getTranslogRetentionTotalFiles() {
return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings());
}

private static boolean shouldDisableTranslogRetention(Settings settings) {
return INDEX_SOFT_DELETES_SETTING.get(settings)
&& IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(Version.V_7_4_0);
}

/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -1783,7 +1782,7 @@ public IndexCommit getIndexCommit() {
}
}

public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
public void onSettingsChanged() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
Expand Down Expand Up @@ -190,11 +188,7 @@ public InternalEngine(EngineConfig engineConfig) {
final EngineConfig engineConfig,
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
);
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
store.incRef();
IndexWriter writer = null;
Translog translog = null;
Expand Down Expand Up @@ -2432,14 +2426,11 @@ final void ensureCanFlush() {
}

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
public void onSettingsChanged() {
mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletes();
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps);
softDeletesPolicy.setRetentionOperations(config().getIndexSettings().getSoftDeleteRetentionOperations());
}

public MergeStats getMergeStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void trimUnreferencedTranslogFiles() {

if (minTranslogGeneration < lastCommitGeneration) {
// a translog deletion policy that retains nothing but the last translog generation from safe commit
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
}
final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
config.getIndexSettings().getTranslogRetentionAge().getMillis(),
config.getIndexSettings().getTranslogRetentionTotalFiles()
);
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);

try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -612,7 +611,6 @@ public void onFailure(Exception e) {
if (shardRoutings.stream().allMatch(
shr -> shr.assignedToNode() && retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shr)))) {
useRetentionLeasesInPeerRecovery = true;
turnOffTranslogRetention();
}
}
}
Expand Down Expand Up @@ -1886,35 +1884,10 @@ boolean shouldRollTranslogGeneration() {
public void onSettingsChanged() {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null) {
final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery;
engineOrNull.onSettingsChanged(
disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
);
engineOrNull.onSettingsChanged();
}
}

private void turnOffTranslogRetention() {
logger.debug("turn off the translog retention for the replication group {} " +
"as it starts using retention leases exclusively in peer recoveries", shardId);
// Off to the generic threadPool as pruning the delete tombstones can be expensive.
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to turn off translog retention", e);
}
}

@Override
protected void doRun() {
onSettingsChanged();
trimTranslog();
}
});
}

/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1672,7 +1672,7 @@ public void trimUnreferencedReaders() throws IOException {
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current);
long minReferencedGen = deletionPolicy.minTranslogGenRequired();
assert minReferencedGen >= getMinFileGeneration() :
"deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is ["
+ getMinFileGeneration() + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import org.elasticsearch.Assertions;
import org.elasticsearch.common.lease.Releasable;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

public class TranslogDeletionPolicy {
public final class TranslogDeletionPolicy {

private final Map<Object, RuntimeException> openTranslogRef;

Expand Down Expand Up @@ -59,16 +57,8 @@ public void assertNoOpenTranslogRefs() {
*/
private long translogGenerationOfLastCommit = 1;

private long retentionSizeInBytes;

private long retentionAgeInMillis;

private int retentionTotalFiles;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
this.retentionTotalFiles = retentionTotalFiles;
public TranslogDeletionPolicy() {
if (Assertions.ENABLED) {
openTranslogRef = new ConcurrentHashMap<>();
} else {
Expand All @@ -95,18 +85,6 @@ public synchronized void setTranslogGenerationOfLastCommit(long lastGen) {
translogGenerationOfLastCommit = lastGen;
}

public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}

public synchronized void setRetentionAgeInMillis(long ageInMillis) {
retentionAgeInMillis = ageInMillis;
}

synchronized void setRetentionTotalFiles(int retentionTotalFiles) {
this.retentionTotalFiles = retentionTotalFiles;
}

/**
* acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation
* will not be deleted until the returned {@link Releasable} is closed.
Expand Down Expand Up @@ -156,66 +134,9 @@ private synchronized void releaseTranslogGen(long translogGen) {
/**
* returns the minimum translog generation that is still required by the system. Any generation below
* the returned value may be safely deleted
*
* @param readers current translog readers
* @param writer current translog writer
*/
synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException {
long minByLocks = getMinTranslogGenRequiredByLocks();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled;
minByAgeAndSize = Long.MAX_VALUE;
} else {
minByAgeAndSize = Math.max(minByAge, minBySize);
}
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery));
}

static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
if (retentionSizeInBytes >= 0) {
long totalSize = writer.sizeInBytes();
long minGen = writer.getGeneration();
for (int i = readers.size() - 1; i >= 0 && totalSize < retentionSizeInBytes; i--) {
final TranslogReader reader = readers.get(i);
totalSize += reader.sizeInBytes();
minGen = reader.getGeneration();
}
return minGen;
} else {
return Long.MIN_VALUE;
}
}

static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now)
throws IOException {
if (maxRetentionAgeInMillis >= 0) {
for (TranslogReader reader: readers) {
if (now - reader.getLastModifiedTime() <= maxRetentionAgeInMillis) {
return reader.getGeneration();
}
}
return writer.getGeneration();
} else {
return Long.MIN_VALUE;
}
}

static long getMinTranslogGenByTotalFiles(List<TranslogReader> readers, TranslogWriter writer, final int maxTotalFiles) {
long minGen = writer.generation;
int totalFiles = 1; // for the current writer
for (int i = readers.size() - 1; i >= 0 && totalFiles < maxTotalFiles; i--) {
totalFiles++;
minGen = readers.get(i).generation;
}
return minGen;
}

protected long currentTime() {
return System.currentTimeMillis();
synchronized long minTranslogGenRequired() {
return Math.min(getMinTranslogGenRequiredByLocks(), minTranslogGenerationForRecovery);
}

private long getMinTranslogGenRequiredByLocks() {
Expand Down
Loading