Skip to content

Don't schedule a new snapshot for an SLM policy if one is already running #90105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.slm.SnapshotInvocationRecord;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleStats;
import org.elasticsearch.xpack.ilm.LifecyclePolicySecurityClient;
Expand All @@ -36,9 +37,12 @@

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.elasticsearch.core.Strings.format;

Expand All @@ -50,6 +54,11 @@ public class SnapshotLifecycleTask implements SchedulerEngine.Listener {
private final ClusterService clusterService;
private final SnapshotHistoryStore historyStore;

/**
* Set of all currently running {@link SnapshotLifecyclePolicy} ids, used to prevent starting multiple snapshots for the same policy.
*/
private final Set<String> runningPolicies = Collections.synchronizedSet(new HashSet<>());

public SnapshotLifecycleTask(final Client client, final ClusterService clusterService, final SnapshotHistoryStore historyStore) {
this.client = client;
this.clusterService = clusterService;
Expand All @@ -59,21 +68,14 @@ public SnapshotLifecycleTask(final Client client, final ClusterService clusterSe
@Override
public void triggered(SchedulerEngine.Event event) {
logger.debug("snapshot lifecycle policy task triggered from job [{}]", event.getJobName());

final Optional<String> snapshotName = maybeTakeSnapshot(event.getJobName(), client, clusterService, historyStore);

// Would be cleaner if we could use Optional#ifPresentOrElse
snapshotName.ifPresent(
maybeTakeSnapshot(event.getJobName(), client, clusterService, historyStore).ifPresentOrElse(
name -> logger.info(
"snapshot lifecycle policy job [{}] issued new snapshot creation for [{}] successfully",
event.getJobName(),
name
)
),
() -> logger.warn("snapshot lifecycle policy job [{}] did not issue new snapshot creation", event.getJobName())
);

if (snapshotName.isPresent() == false) {
logger.warn("snapshot lifecycle policy for job [{}] no longer exists, snapshot not created", event.getJobName());
}
}

/**
Expand All @@ -87,9 +89,38 @@ public static Optional<String> maybeTakeSnapshot(
final Client client,
final ClusterService clusterService,
final SnapshotHistoryStore historyStore
) {
return maybeTakeSnapshot(jobId, client, clusterService, historyStore, new HashSet<>());
}

/**
* For the given job id (a combination of policy id and version), issue a create snapshot
* request. On a successful or failed create snapshot issuing the state is stored in the cluster
* state in the policy's metadata.
*
* The set of running policies is used as a mutex, so that if there's a snapshot already running
* for a given policy we won't run an additional one.
*
* @return An optional snapshot name if the request was issued successfully
*/
private static Optional<String> maybeTakeSnapshot(
final String jobId,
final Client client,
final ClusterService clusterService,
final SnapshotHistoryStore historyStore,
final Set<String> runningPolicies
) {
Optional<SnapshotLifecyclePolicyMetadata> maybeMetadata = getSnapPolicyMetadata(jobId, clusterService.state());
String snapshotName = maybeMetadata.map(policyMetadata -> {
if (runningPolicies.add(policyMetadata.getPolicy().getId()) == false) {
// policy is already running, log that that's the case and don't schedule a new snapshot for it
logger.info(
"snapshot lifecycle policy [{}] current has a snapshot in progress, skipping",
policyMetadata.getPolicy().getId()
);
return null;
}

// don't time out on this request to not produce failed SLM runs in case of a temporarily slow master node
CreateSnapshotRequest request = policyMetadata.getPolicy().toRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
final LifecyclePolicySecurityClient clientWithHeaders = new LifecyclePolicySecurityClient(
Expand All @@ -105,6 +136,7 @@ public static Optional<String> maybeTakeSnapshot(
clientWithHeaders.admin().cluster().createSnapshot(request, new ActionListener<>() {
@Override
public void onResponse(CreateSnapshotResponse createSnapshotResponse) {
runningPolicies.remove(policyMetadata.getPolicy().getId());
logger.debug(
"snapshot response for [{}]: {}",
policyMetadata.getPolicy().getId(),
Expand All @@ -114,12 +146,13 @@ public void onResponse(CreateSnapshotResponse createSnapshotResponse) {
// Check that there are no failed shards, since the request may not entirely
// fail, but may still have failures (such as in the case of an aborted snapshot)
if (snapInfo.failedShards() == 0) {
long snapshotStartTime = snapInfo.startTime();
final long timestamp = Instant.now().toEpochMilli();
long startTime = snapInfo.startTime();
long endTime = snapInfo.endTime();
submitUnbatchedTask(
clusterService,
"slm-record-success-" + policyMetadata.getPolicy().getId(),
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), snapshotStartTime, timestamp)
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), startTime, endTime)
);
historyStore.putAsync(
SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(), request.snapshot())
Expand All @@ -139,6 +172,7 @@ public void onResponse(CreateSnapshotResponse createSnapshotResponse) {

@Override
public void onFailure(Exception e) {
runningPolicies.remove(policyMetadata.getPolicy().getId());
logger.error("failed to create snapshot for snapshot lifecycle policy [{}]: {}", policyMetadata.getPolicy().getId(), e);
final long timestamp = Instant.now().toEpochMilli();
submitUnbatchedTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
* is triggered for execution. It constructs a snapshot request and runs it as the user who originally set up the policy. The bulk of this
* logic is contained in the
* {@link org.elasticsearch.xpack.slm.SnapshotLifecycleTask#maybeTakeSnapshot(String, Client, ClusterService,
* SnapshotHistoryStore)} method. After a snapshot request has been submitted, it persists the result (success or failure) in a history
* store (an index), caching the latest success and failure information in the cluster state. It is important to note that this task
* fires the snapshot request off and forgets it; It does not wait until the entire snapshot completes. Any success or failure that this
* task sees will be from the initial submission of the snapshot request only.
* SnapshotHistoryStore)} method. After a snapshot request has been completed, it persists the result (success or failure) in a history
* store (an index), caching the latest success and failure information in the cluster state.
*/
package org.elasticsearch.xpack.slm;

Expand Down