Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
[Remove] TrimUnsafeCommit logic for legacy 6.x indexes (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#2225)

* [Remove] TrimUnsafeCommit logic for legacy 6.x indexes

Multiple txlog commits was introduced in legacy 7.x. Legacy 6.x indexes could
therefore not have a safe commit. Since OpenSearch 2.0 is no longer compatible
with legacy 6.x indexes, the logic to trim these unsafe commits is safely
removed.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>

* fix assertion typo

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>

* rebase and incorporate pr feedback

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize authored Mar 9, 2022
1 parent 9c679cb commit 5a9a114
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,6 @@ public void upgradeAllNodesAndPluginsToNextVersion(List<Provider<RegularFile>> p
writeUnicastHostsFiles();
}

public void fullRestart() {
stop(false);
start();
}

public void nextNodeToNextVersion() {
OpenSearchNode node = upgradeNodeToNextVersion();
node.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1335,45 +1335,6 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
}
}

public void testRecoveryWithTranslogRetentionDisabled() throws Exception {
if (isRunningAgainstOldCluster()) {
final Settings.Builder settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1);
if (minimumNodeVersion().before(Version.V_2_0_0)) {
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
}
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1");
}
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), "1kb");
}
createIndex(index, settings.build());
ensureGreen(index);
int numDocs = randomIntBetween(0, 100);
for (int i = 0; i < numDocs; i++) {
indexDocument(Integer.toString(i));
if (rarely()) {
flush(index, randomBoolean());
}
}
client().performRequest(new Request("POST", "/" + index + "/_refresh"));
if (randomBoolean()) {
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
}
if (randomBoolean()) {
flush(index, randomBoolean());
} else if (randomBoolean()) {
syncedFlush(index, randomBoolean());
}
saveInfoDocument("doc_count", Integer.toString(numDocs));
}
ensureGreen(index);
final int numDocs = Integer.parseInt(loadInfoDocument("doc_count"));
assertTotalHits(numDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
}

public void testResize() throws Exception {
int numDocs;
if (isRunningAgainstOldCluster()) {
Expand Down
117 changes: 0 additions & 117 deletions qa/translog-policy/build.gradle

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogCorruptedException;
Expand All @@ -115,7 +114,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -252,7 +250,7 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
trimUnsafeCommits(engineConfig);
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), seqNo -> {
final LocalCheckpointTracker tracker = getLocalCheckpointTracker();
assert tracker != null || getTranslog().isOpen() == false;
Expand Down Expand Up @@ -2955,15 +2953,6 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
return true;
}

private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOException {
final Store store = engineConfig.getStore();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final Path translogPath = engineConfig.getTranslogConfig().getTranslogPath();
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated());
}

/**
* Restores the live version map and local checkpoint of this engine using documents (including soft-deleted)
* after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker
Expand Down
25 changes: 7 additions & 18 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1597,27 +1597,16 @@ public void ensureIndexHasHistoryUUID() throws IOException {
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
* while the local checkpoint of c2 is 2.
* <p>
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
*/
public void trimUnsafeCommits(
final long lastSyncedGlobalCheckpoint,
final long minRetainedTranslogGen,
final org.opensearch.Version indexVersionCreated
) throws IOException {
public void trimUnsafeCommits(final Path translogPath) throws IOException {
metadataLock.writeLock().lock();
try {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(directory);
if (existingCommits.isEmpty()) {
throw new IllegalArgumentException("No index found to trim");
}
final IndexCommit lastIndexCommitCommit = existingCommits.get(existingCommits.size() - 1);
final String translogUUID = lastIndexCommitCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY);
final IndexCommit startingIndexCommit;
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
assert existingCommits.isEmpty() == false : "No index found to trim";
final IndexCommit lastIndexCommit = existingCommits.get(existingCommits.size() - 1);
final String translogUUID = lastIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
final IndexCommit startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);

if (translogUUID.equals(startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY)) == false) {
throw new IllegalStateException(
Expand All @@ -1628,7 +1617,7 @@ public void trimUnsafeCommits(
+ "]"
);
}
if (startingIndexCommit.equals(lastIndexCommitCommit) == false) {
if (startingIndexCommit.equals(lastIndexCommit) == false) {
try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
// this achieves two things:
// - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6094,7 +6094,7 @@ public void testTrimUnsafeCommits() throws Exception {
minTranslogGen = engine.getTranslog().getMinFileGeneration();
}

store.trimUnsafeCommits(globalCheckpoint.get(), minTranslogGen, config.getIndexSettings().getIndexVersionCreated());
store.trimUnsafeCommits(config.getTranslogConfig().getTranslogPath());
long safeMaxSeqNo = commitMaxSeqNo.stream()
.filter(s -> s <= globalCheckpoint.get())
.reduce((s1, s2) -> s2) // get the last one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,10 +968,7 @@ protected static void createIndex(String name, Settings settings, String mapping
entity += "}";
if (settings.getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) == false) {
expectSoftDeletesWarning(request, name);
} else if (settings.hasValue(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey())
|| settings.hasValue(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey())) {
expectTranslogRetentionWarning(request);
}
}
request.setJsonEntity(entity);
client().performRequest(request);
}
Expand Down Expand Up @@ -1025,21 +1022,6 @@ protected static void expectSoftDeletesWarning(Request request, String indexName
}
}

protected static void expectTranslogRetentionWarning(Request request) {
final List<String> expectedWarnings = Collections.singletonList(
"Translog retention settings [index.translog.retention.age] "
+ "and [index.translog.retention.size] are deprecated and effectively ignored. They will be removed in a future version."
);
final Builder requestOptions = RequestOptions.DEFAULT.toBuilder();
if (nodeVersions.stream().allMatch(version -> version.onOrAfter(LegacyESVersion.V_7_7_0))) {
requestOptions.setWarningsHandler(warnings -> warnings.equals(expectedWarnings) == false);
request.setOptions(requestOptions);
} else if (nodeVersions.stream().anyMatch(version -> version.onOrAfter(LegacyESVersion.V_7_7_0))) {
requestOptions.setWarningsHandler(warnings -> warnings.isEmpty() == false && warnings.equals(expectedWarnings) == false);
request.setOptions(requestOptions);
}
}

protected static Map<String, Object> getIndexSettings(String index) throws IOException {
Request request = new Request("GET", "/" + index + "/_settings");
request.addParameter("flat_settings", "true");
Expand Down

0 comments on commit 5a9a114

Please sign in to comment.