-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Wait for snapshot completion in SLM snapshot invocation #47051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,8 @@ | |
import org.elasticsearch.common.xcontent.ToXContent; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.common.xcontent.json.JsonXContent; | ||
import org.elasticsearch.snapshots.SnapshotException; | ||
import org.elasticsearch.snapshots.SnapshotInfo; | ||
import org.elasticsearch.xpack.core.ClientHelper; | ||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; | ||
import org.elasticsearch.xpack.core.slm.SnapshotInvocationRecord; | ||
|
@@ -91,16 +93,32 @@ public static Optional<String> maybeTakeSnapshot(final String jobId, final Clien | |
public void onResponse(CreateSnapshotResponse createSnapshotResponse) { | ||
logger.debug("snapshot response for [{}]: {}", | ||
policyMetadata.getPolicy().getId(), Strings.toString(createSnapshotResponse)); | ||
final long timestamp = Instant.now().toEpochMilli(); | ||
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(), | ||
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp)); | ||
historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(), | ||
request.snapshot())); | ||
final SnapshotInfo snapInfo = createSnapshotResponse.getSnapshotInfo(); | ||
|
||
// Check that there are no failed shards, since the request may not entirely | ||
// fail, but may still have failures (such as in the case of an aborted snapshot) | ||
if (snapInfo.failedShards() == 0) { | ||
final long timestamp = Instant.now().toEpochMilli(); | ||
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(), | ||
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp)); | ||
historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(), | ||
request.snapshot())); | ||
} else { | ||
int failures = snapInfo.failedShards(); | ||
int total = snapInfo.totalShards(); | ||
final SnapshotException e = new SnapshotException(request.repository(), request.snapshot(), | ||
"failed to create snapshot successfully, " + failures + " out of " + total + " total shards failed"); | ||
// Add each failed shard's exception as suppressed, the exception contains | ||
// information about which shard failed | ||
snapInfo.shardFailures().forEach(failure -> e.addSuppressed(failure.getCause())); | ||
// Call the failure handler to register this as a failure and persist it | ||
onFailure(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given what Armin was saying on #46988 about partial snapshots sometimes being more like successes, treating them here as a failure may be awkward for some users. That said, I think we can go with it for now and tweak the behavior later if we get strong feedback about it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in order to better handle it, we may have to change the history store to store all the states that a snapshot can be in, that way we could still tell if there were errors (PARTIAL snapshots). For now though, I think it's safer to treat PARTIAL snapshots as failures. |
||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
logger.error("failed to issue create snapshot request for snapshot lifecycle policy [{}]: {}", | ||
logger.error("failed to create snapshot for snapshot lifecycle policy [{}]: {}", | ||
policyMetadata.getPolicy().getId(), e); | ||
final long timestamp = Instant.now().toEpochMilli(); | ||
clusterService.submitStateUpdateTask("slm-record-failure-" + policyMetadata.getPolicy().getId(), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -324,6 +324,7 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete, | |
List<SnapshotInfo> snapshots = entry.getValue(); | ||
for (SnapshotInfo info : snapshots) { | ||
final String policyId = getPolicyId(info); | ||
final long deleteStartTime = nowNanoSupplier.getAsLong(); | ||
deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, ActionListener.wrap(acknowledgedResponse -> { | ||
deleted.incrementAndGet(); | ||
if (acknowledgedResponse.isAcknowledged()) { | ||
|
@@ -349,13 +350,15 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete, | |
})); | ||
// Check whether we have exceeded the maximum time allowed to spend deleting | ||
// snapshots, if we have, short-circuit the rest of the deletions | ||
TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime); | ||
logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime); | ||
if (elapsedDeletionTime.compareTo(maximumTime) > 0) { | ||
long finishTime = nowNanoSupplier.getAsLong(); | ||
TimeValue deletionTime = TimeValue.timeValueNanos(finishTime - deleteStartTime); | ||
logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), deletionTime); | ||
TimeValue totalDeletionTime = TimeValue.timeValueNanos(finishTime - startTime); | ||
if (totalDeletionTime.compareTo(maximumTime) > 0) { | ||
logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," + | ||
" maximum allowed time: [{}], deleted [{}] out of [{}] snapshots scheduled for deletion, failed to delete [{}]", | ||
elapsedDeletionTime, maximumTime, deleted, count, failed); | ||
slmStats.deletionTime(elapsedDeletionTime); | ||
totalDeletionTime, maximumTime, deleted, count, failed); | ||
slmStats.deletionTime(totalDeletionTime); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These changes aren't really related to the main purpose of this PR. They're very minor so I think it's fine, but going forward I think we should try to keep PRs focused on one conceptual change - this case bugs me a little because this PR otherwise doesn't really touch retention. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it bugs me a little too, I'll separate them in the future. |
||
slmStats.retentionTimedOut(); | ||
return; | ||
} | ||
|
@@ -387,8 +390,8 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { | |
} else { | ||
logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot); | ||
} | ||
listener.onResponse(acknowledgedResponse); | ||
slmStats.snapshotDeleted(slmPolicy); | ||
listener.onResponse(acknowledgedResponse); | ||
} | ||
|
||
@Override | ||
|
Uh oh!
There was an error while loading. Please reload this page.