Skip to content

Commit 33204c2

Browse files
committed
Use peer recovery retention leases for indices without soft-deletes (#50351)
Today, the replica allocator uses peer recovery retention leases to select the best-matched copies when allocating replicas of indices with soft-deletes. We can employ this mechanism for indices without soft-deletes because the retaining sequence number of a PRRL is the persisted global checkpoint (plus one) of that copy. If the primary and replica have the same retaining sequence number, then we should be able to perform a noop recovery. The reason is that we must be retaining translog up to the local checkpoint of the safe commit, which is at most the global checkpoint of either copy). The only limitation is that we might not cancel ongoing file-based recoveries with PRRLs for noop recoveries. We can't make the translog retention policy comply with PRRLs. We also have this problem with soft-deletes if a PRRL is about to expire. Relates #45136 Relates #46959
1 parent 1dc98ad commit 33204c2

File tree

11 files changed

+89
-119
lines changed

11 files changed

+89
-119
lines changed

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,6 +1404,7 @@ public void testOperationBasedRecovery() throws Exception {
14041404
} else {
14051405
ensureGreen(index);
14061406
assertNoFileBasedRecovery(index, n -> true);
1407+
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
14071408
}
14081409
}
14091410

@@ -1429,6 +1430,7 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
14291430
ensureGreen(index);
14301431
flush(index, true);
14311432
assertEmptyTranslog(index);
1433+
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
14321434
}
14331435
}
14341436
}

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -540,28 +540,6 @@ private static Version indexVersionCreated(final String indexName) throws IOExce
540540
return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting)));
541541
}
542542

543-
/**
544-
* Returns the minimum node version among all nodes of the cluster
545-
*/
546-
private static Version minimumNodeVersion() throws IOException {
547-
final Request request = new Request("GET", "_nodes");
548-
request.addParameter("filter_path", "nodes.*.version");
549-
550-
final Response response = client().performRequest(request);
551-
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");
552-
553-
Version minVersion = null;
554-
for (Map.Entry<String, Object> node : nodes.entrySet()) {
555-
@SuppressWarnings("unchecked")
556-
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
557-
if (minVersion == null || minVersion.after(nodeVersion)) {
558-
minVersion = nodeVersion;
559-
}
560-
}
561-
assertNotNull(minVersion);
562-
return minVersion;
563-
}
564-
565543
/**
566544
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
567545
* that the index has started shards.

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -824,9 +824,7 @@ private void maybeSyncGlobalCheckpoints() {
824824
}
825825

826826
private void syncRetentionLeases() {
827-
if (indexSettings.isSoftDeleteEnabled()) {
828-
sync(IndexShard::syncRetentionLeases, "retention lease");
829-
}
827+
sync(IndexShard::syncRetentionLeases, "retention lease");
830828
}
831829

832830
private void sync(final Consumer<IndexShard> sync, final String source) {

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -906,10 +906,10 @@ public ReplicationTracker(
906906
this.pendingInSync = new HashSet<>();
907907
this.routingTable = null;
908908
this.replicationGroup = null;
909-
this.hasAllPeerRecoveryRetentionLeases = indexSettings.isSoftDeleteEnabled() &&
910-
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) ||
911-
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
912-
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN));
909+
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) ||
910+
(indexSettings.isSoftDeleteEnabled() &&
911+
indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
912+
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN);
913913
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
914914
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
915915
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
@@ -1005,10 +1005,7 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) {
10051005
updateLocalCheckpoint(shardAllocationId, checkpoints.get(shardAllocationId), localCheckpoint);
10061006
updateGlobalCheckpointOnPrimary();
10071007

1008-
if (indexSettings.isSoftDeleteEnabled()) {
1009-
addPeerRecoveryRetentionLeaseForSolePrimary();
1010-
}
1011-
1008+
addPeerRecoveryRetentionLeaseForSolePrimary();
10121009
assert invariant();
10131010
}
10141011

@@ -1373,7 +1370,7 @@ public synchronized boolean hasAllPeerRecoveryRetentionLeases() {
13731370
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
13741371
*/
13751372
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
1376-
if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) {
1373+
if (hasAllPeerRecoveryRetentionLeases == false) {
13771374
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
13781375
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
13791376
setHasAllPeerRecoveryRetentionLeases();

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1917,10 +1917,10 @@ boolean shouldRollTranslogGeneration() {
19171917
public void onSettingsChanged() {
19181918
Engine engineOrNull = getEngineOrNull();
19191919
if (engineOrNull != null) {
1920-
final boolean useRetentionLeasesInPeerRecovery = this.useRetentionLeasesInPeerRecovery;
1920+
final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery;
19211921
engineOrNull.onSettingsChanged(
1922-
useRetentionLeasesInPeerRecovery ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
1923-
useRetentionLeasesInPeerRecovery ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
1922+
disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
1923+
disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
19241924
indexSettings.getSoftDeleteRetentionOperations()
19251925
);
19261926
}
@@ -2249,7 +2249,6 @@ public boolean assertRetentionLeasesPersisted() throws IOException {
22492249
public void syncRetentionLeases() {
22502250
assert assertPrimaryMode();
22512251
verifyNotClosed();
2252-
ensureSoftDeletesEnabled("retention leases");
22532252
replicationTracker.renewPeerRecoveryRetentionLeases();
22542253
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
22552254
if (retentionLeases.v1()) {
@@ -2646,7 +2645,7 @@ public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCh
26462645
ActionListener<ReplicationResponse> listener) {
26472646
assert assertPrimaryMode();
26482647
// only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs:
2649-
assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0);
2648+
assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0) || indexSettings.isSoftDeleteEnabled() == false;
26502649
return replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
26512650
}
26522651

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,12 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
167167
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
168168
}
169169
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
170-
retentionLeaseRef.set(softDeletesEnabled ? shard.getRetentionLeases().get(
171-
ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null);
170+
retentionLeaseRef.set(
171+
shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));
172172
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
173173
shard, cancellableThreads, logger);
174174
final Engine.HistorySource historySource;
175-
if (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null) {
175+
if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) {
176176
historySource = Engine.HistorySource.INDEX;
177177
} else {
178178
historySource = Engine.HistorySource.TRANSLOG;
@@ -192,7 +192,7 @@ && isTargetSameHistory()
192192
// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery
193193
// without having a complete history.
194194

195-
if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) {
195+
if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) {
196196
// all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
197197
retentionLock.close();
198198
logger.trace("history is retained by {}", retentionLeaseRef.get());
@@ -211,7 +211,7 @@ && isTargetSameHistory()
211211
if (isSequenceNumberBasedRecovery) {
212212
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
213213
startingSeqNo = request.startingSeqNo();
214-
if (softDeletesEnabled && retentionLeaseRef.get() == null) {
214+
if (retentionLeaseRef.get() == null) {
215215
createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY));
216216
} else {
217217
sendFileStep.onResponse(SendFileResult.EMPTY);
@@ -253,36 +253,24 @@ && isTargetSameHistory()
253253
});
254254

255255
final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();
256-
if (softDeletesEnabled) {
257-
runUnderPrimaryPermit(() -> {
258-
try {
259-
// If the target previously had a copy of this shard then a file-based recovery might move its global
260-
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
261-
// new one later on in the recovery.
262-
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),
263-
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,
264-
deleteRetentionLeaseStep, false));
265-
} catch (RetentionLeaseNotFoundException e) {
266-
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
267-
deleteRetentionLeaseStep.onResponse(null);
268-
}
269-
}, shardId + " removing retention leaes for [" + request.targetAllocationId() + "]",
270-
shard, cancellableThreads, logger);
271-
} else {
272-
deleteRetentionLeaseStep.onResponse(null);
273-
}
256+
runUnderPrimaryPermit(() -> {
257+
try {
258+
// If the target previously had a copy of this shard then a file-based recovery might move its global
259+
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
260+
// new one later on in the recovery.
261+
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),
262+
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,
263+
deleteRetentionLeaseStep, false));
264+
} catch (RetentionLeaseNotFoundException e) {
265+
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
266+
deleteRetentionLeaseStep.onResponse(null);
267+
}
268+
}, shardId + " removing retention lease for [" + request.targetAllocationId() + "]",
269+
shard, cancellableThreads, logger);
274270

275271
deleteRetentionLeaseStep.whenComplete(ignored -> {
276272
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
277-
278-
final Consumer<ActionListener<RetentionLease>> createRetentionLeaseAsync;
279-
if (softDeletesEnabled) {
280-
createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l);
281-
} else {
282-
createRetentionLeaseAsync = l -> l.onResponse(null);
283-
}
284-
285-
phase1(safeCommitRef.getIndexCommit(), createRetentionLeaseAsync, () -> estimateNumOps, sendFileStep);
273+
phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep);
286274
}, onFailure);
287275

288276
} catch (final Exception e) {
@@ -454,8 +442,7 @@ static final class SendFileResult {
454442
* segments that are missing. Only segments that have the same size and
455443
* checksum can be reused
456444
*/
457-
void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> createRetentionLease,
458-
IntSupplier translogOps, ActionListener<SendFileResult> listener) {
445+
void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
459446
cancellableThreads.checkForCancel();
460447
final Store store = shard.store();
461448
try {
@@ -529,7 +516,7 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> creat
529516
sendFileInfoStep.whenComplete(r ->
530517
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure);
531518

532-
sendFilesStep.whenComplete(r -> createRetentionLease.accept(createRetentionLeaseStep), listener::onFailure);
519+
sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);
533520

534521
createRetentionLeaseStep.whenComplete(retentionLease ->
535522
{
@@ -557,7 +544,7 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> creat
557544

558545
// but we must still create a retention lease
559546
final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
560-
createRetentionLease.accept(createRetentionLeaseStep);
547+
createRetentionLease(startingSeqNo, createRetentionLeaseStep);
561548
createRetentionLeaseStep.whenComplete(retentionLease -> {
562549
final TimeValue took = stopWatch.totalTime();
563550
logger.trace("recovery [phase1]: took [{}]", took);
@@ -593,7 +580,8 @@ private void createRetentionLease(final long startingSeqNo, ActionListener<Reten
593580
// it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before
594581
// 7.4, and in that case we just create a lease using the local checkpoint of the safe commit which we're using for
595582
// recovery as a conservative estimate for the global checkpoint.
596-
assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0);
583+
assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0)
584+
|| shard.indexSettings().isSoftDeleteEnabled() == false;
597585
final StepListener<ReplicationResponse> addRetentionLeaseStep = new StepListener<>();
598586
final long estimatedGlobalCheckpoint = startingSeqNo - 1;
599587
final RetentionLease newLease = shard.addPeerRecoveryRetentionLease(request.targetNode().getId(),

server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void testPreferCopyCanPerformNoopRecovery() throws Exception {
7878
assertAcked(
7979
client().admin().indices().prepareCreate(indexName)
8080
.setSettings(Settings.builder()
81-
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
81+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
8282
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
8383
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
8484
.put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f)
@@ -211,7 +211,7 @@ public void testFullClusterRestartPerformNoopRecovery() throws Exception {
211211
assertAcked(
212212
client().admin().indices().prepareCreate(indexName)
213213
.setSettings(Settings.builder()
214-
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
214+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
215215
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
216216
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb")
217217
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
@@ -248,7 +248,7 @@ public void testPreferCopyWithHighestMatchingOperations() throws Exception {
248248
assertAcked(
249249
client().admin().indices().prepareCreate(indexName)
250250
.setSettings(Settings.builder()
251-
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
251+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
252252
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
253253
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb")
254254
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
@@ -329,7 +329,7 @@ public void testPeerRecoveryForClosedIndices() throws Exception {
329329
createIndex(indexName, Settings.builder()
330330
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
331331
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
332-
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
332+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
333333
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
334334
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
335335
.build());

0 commit comments

Comments
 (0)