Skip to content

Simplify Snapshot Create Request Handling #37464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,22 @@ protected ClusterBlockException checkBlock(CreateSnapshotRequest request, Cluste
@Override
protected void masterOperation(final CreateSnapshotRequest request, ClusterState state,
final ActionListener<CreateSnapshotResponse> listener) {
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
SnapshotsService.SnapshotRequest snapshotRequest =
new SnapshotsService.SnapshotRequest(request.repository(), snapshotName, "create_snapshot [" + snapshotName + "]")
.indices(request.indices())
.indicesOptions(request.indicesOptions())
.partial(request.partial())
.settings(request.settings())
.includeGlobalState(request.includeGlobalState())
.masterNodeTimeout(request.masterNodeTimeout());
snapshotsService.createSnapshot(snapshotRequest, new SnapshotsService.CreateSnapshotListener() {
snapshotsService.createSnapshot(request, new SnapshotsService.CreateSnapshotListener() {
@Override
public void onResponse() {
public void onResponse(Snapshot snapshotCreated) {
if (request.waitForCompletion()) {
snapshotsService.addListener(new SnapshotsService.SnapshotCompletionListener() {
@Override
public void onSnapshotCompletion(Snapshot snapshot, SnapshotInfo snapshotInfo) {
if (snapshot.getRepository().equals(request.repository()) &&
snapshot.getSnapshotId().getName().equals(snapshotName)) {
if (snapshotCreated.equals(snapshot)) {
listener.onResponse(new CreateSnapshotResponse(snapshotInfo));
snapshotsService.removeListener(this);
}
}

@Override
public void onSnapshotFailure(Snapshot snapshot, Exception e) {
if (snapshot.getRepository().equals(request.repository()) &&
snapshot.getSnapshotId().getName().equals(snapshotName)) {
if (snapshotCreated.equals(snapshot)) {
listener.onFailure(e);
snapshotsService.removeListener(this);
}
Expand Down
219 changes: 19 additions & 200 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -78,7 +78,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -92,8 +91,8 @@
* <p>
* A typical snapshot creating process looks like this:
* <ul>
* <li>On the master node the {@link #createSnapshot(SnapshotRequest, CreateSnapshotListener)} is called and makes sure that no snapshots
* is currently running and registers the new snapshot in cluster state</li>
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, CreateSnapshotListener)} is called and makes sure that
* no snapshot is currently running and registers the new snapshot in cluster state</li>
* <li>When cluster state is updated
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method kicks in and initializes
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
Expand Down Expand Up @@ -236,20 +235,20 @@ public List<SnapshotInfo> currentSnapshots(final String repositoryName) {
* @param request snapshot request
* @param listener snapshot creation listener
*/
public void createSnapshot(final SnapshotRequest request, final CreateSnapshotListener listener) {
final String repositoryName = request.repositoryName;
final String snapshotName = request.snapshotName;
public void createSnapshot(final CreateSnapshotRequest request, final CreateSnapshotListener listener) {
final String repositoryName = request.repository();
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we resolve this twice now, and if you have stuff like now in it, it might resolve to two different values. Perhaps we can wrap the original request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this instead 27ca06b ? Just pass the snapshot back in the callback and get a cleaner equals in the transport action as well? :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

validate(repositoryName, snapshotName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();

clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {

private SnapshotsInProgress.Entry newSnapshot = null;

@Override
public ClusterState execute(ClusterState currentState) {
validate(request, currentState);
validate(repositoryName, snapshotName, currentState);
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
Expand Down Expand Up @@ -302,16 +301,16 @@ public TimeValue timeout() {
/**
* Validates snapshot request
*
* @param request snapshot request
* @param repositoryName repository name
* @param snapshotName snapshot name
* @param state current cluster state
*/
private void validate(SnapshotRequest request, ClusterState state) {
private void validate(String repositoryName, String snapshotName, ClusterState state) {
RepositoriesMetaData repositoriesMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE);
final String repository = request.repositoryName;
if (repositoriesMetaData == null || repositoriesMetaData.repository(repository) == null) {
throw new RepositoryMissingException(repository);
if (repositoriesMetaData == null || repositoriesMetaData.repository(repositoryName) == null) {
throw new RepositoryMissingException(repositoryName);
}
validate(repository, request.snapshotName);
validate(repositoryName, snapshotName);
}

private static void validate(final String repositoryName, final String snapshotName) {
Expand Down Expand Up @@ -378,7 +377,7 @@ protected void doRun() {
logger.info("snapshot [{}] started", snapshot.snapshot());
if (snapshot.indices().isEmpty()) {
// No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse();
userCreateSnapshotListener.onResponse(snapshot.snapshot());
endSnapshot(snapshot);
return;
}
Expand Down Expand Up @@ -466,7 +465,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
// for processing. If client wants to wait for the snapshot completion, it can register snapshot
// completion listener in this method. For the snapshot completion to work properly, the snapshot
// should still exist when listener is registered.
userCreateSnapshotListener.onResponse();
userCreateSnapshotListener.onResponse(snapshot.snapshot());

// Now that snapshot completion listener is registered we can end the snapshot if needed
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
Expand Down Expand Up @@ -1545,8 +1544,10 @@ public interface CreateSnapshotListener {

/**
* Called when snapshot has successfully started
*
* @param snapshot snapshot that was created
*/
void onResponse();
void onResponse(Snapshot snapshot);

/**
* Called if a snapshot operation couldn't start
Expand Down Expand Up @@ -1576,186 +1577,4 @@ public interface SnapshotCompletionListener {

void onSnapshotFailure(Snapshot snapshot, Exception e);
}

/**
* Snapshot creation request
*/
public static class SnapshotRequest {

private final String cause;

private final String repositoryName;

private final String snapshotName;

private String[] indices;

private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();

private boolean partial;

private Settings settings;

private boolean includeGlobalState;

private TimeValue masterNodeTimeout;

/**
* Constructs new snapshot creation request
*
* @param repositoryName repository name
* @param snapshotName snapshot name
* @param cause cause for snapshot operation
*/
public SnapshotRequest(final String repositoryName, final String snapshotName, final String cause) {
this.repositoryName = Objects.requireNonNull(repositoryName);
this.snapshotName = Objects.requireNonNull(snapshotName);
this.cause = Objects.requireNonNull(cause);
}

/**
* Sets the list of indices to be snapshotted
*
* @param indices list of indices
* @return this request
*/
public SnapshotRequest indices(String[] indices) {
this.indices = indices;
return this;
}

/**
* Sets repository-specific snapshot settings
*
* @param settings snapshot settings
* @return this request
*/
public SnapshotRequest settings(Settings settings) {
this.settings = settings;
return this;
}

/**
* Set to true if global state should be stored as part of the snapshot
*
* @param includeGlobalState true if global state should be stored as part of the snapshot
* @return this request
*/
public SnapshotRequest includeGlobalState(boolean includeGlobalState) {
this.includeGlobalState = includeGlobalState;
return this;
}

/**
* Sets master node timeout
*
* @param masterNodeTimeout master node timeout
* @return this request
*/
public SnapshotRequest masterNodeTimeout(TimeValue masterNodeTimeout) {
this.masterNodeTimeout = masterNodeTimeout;
return this;
}

/**
* Sets the indices options
*
* @param indicesOptions indices options
* @return this request
*/
public SnapshotRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

/**
* Set to true if partial snapshot should be allowed
*
* @param partial true if partial snapshots should be allowed
* @return this request
*/
public SnapshotRequest partial(boolean partial) {
this.partial = partial;
return this;
}

/**
* Returns cause for snapshot operation
*
* @return cause for snapshot operation
*/
public String cause() {
return cause;
}

/**
* Returns the repository name
*/
public String repositoryName() {
return repositoryName;
}

/**
* Returns the snapshot name
*/
public String snapshotName() {
return snapshotName;
}

/**
* Returns the list of indices to be snapshotted
*
* @return the list of indices
*/
public String[] indices() {
return indices;
}

/**
* Returns indices options
*
* @return indices options
*/
public IndicesOptions indicesOptions() {
return indicesOptions;
}

/**
* Returns repository-specific settings for the snapshot operation
*
* @return repository-specific settings
*/
public Settings settings() {
return settings;
}

/**
* Returns true if global state should be stored as part of the snapshot
*
* @return true if global state should be stored as part of the snapshot
*/
public boolean includeGlobalState() {
return includeGlobalState;
}

/**
* Returns true if partial snapshot should be allowed
*
* @return true if partial snapshot should be allowed
*/
public boolean partial() {
return partial;
}

/**
* Returns master node timeout
*
* @return master node timeout
*/
public TimeValue masterNodeTimeout() {
return masterNodeTimeout;
}

}
}