Skip to content

Commit

Permalink
Use RestoreClusterStateListener with generic type that extends Action…
Browse files Browse the repository at this point in the history
…Response

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Aug 4, 2022
1 parent dc13f01 commit 5288f3e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 105 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -78,7 +79,7 @@ protected ClusterBlockException checkBlock(RestoreRemoteStoreRequest request, Cl
}

@Override
protected void masterOperation(
protected void clusterManagerOperation(
final RestoreRemoteStoreRequest request,
final ClusterState state,
final ActionListener<RestoreRemoteStoreResponse> listener
Expand All @@ -87,10 +88,11 @@ protected void masterOperation(
request,
ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RemoteStoreRestoreClusterStateListener.createAndRegisterListener(
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener
delegatedListener,
RestoreRemoteStoreResponse::new
);
} else {
delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionResponse;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.RestoreInProgress;
Expand All @@ -44,29 +45,36 @@
import org.opensearch.snapshots.RestoreInfo;
import org.opensearch.snapshots.RestoreService;

import java.util.function.Function;

import static org.opensearch.snapshots.RestoreService.restoreInProgress;

/**
* Transport listener for cluster state updates
*
* @opensearch.internal
*/
public class RestoreClusterStateListener implements ClusterStateListener {
public class RestoreClusterStateListener<T extends ActionResponse> implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class);

private final ClusterService clusterService;
private final String uuid;
private final ActionListener<RestoreSnapshotResponse> listener;
private final String restoreIdentifier;
private final ActionListener<T> listener;
private final Function<RestoreInfo, T> actionResponseFactory;

private RestoreClusterStateListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener
ActionListener<T> listener,
Function<RestoreInfo, T> actionResponseFactory
) {
this.clusterService = clusterService;
this.uuid = response.getUuid();
this.restoreIdentifier = response.getSnapshot() != null ? response.getSnapshot().getSnapshotId().getName() : "remote_store";
this.listener = listener;
this.actionResponseFactory = actionResponseFactory;
}

@Override
Expand All @@ -78,35 +86,36 @@ public void clusterChanged(ClusterChangedEvent changedEvent) {
// on the current cluster-manager and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
listener.onResponse(actionResponseFactory.apply(null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert prevEntry.state().completed() : "expected completed snapshot/remote store restore state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(
prevEntry.snapshot().getSnapshotId().getName(),
restoreIdentifier,
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards)
);
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
T response = actionResponseFactory.apply(ri);
logger.debug("restore of [{}] completed", restoreIdentifier);
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
logger.debug("restore not completed yet, wait for next cluster state update");
}
}

/**
* Creates a cluster state listener and registers it with the cluster service. The listener passed as a
* parameter will be called when the restore is complete.
*/
public static void createAndRegisterListener(
public static <T extends ActionResponse> void createAndRegisterListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener
ActionListener<T> listener,
Function<RestoreInfo, T> actionResponseFactory
) {
clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener));
clusterService.addListener(new RestoreClusterStateListener<T>(clusterService, response, listener, actionResponseFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,12 @@ protected void clusterManagerOperation(
) {
restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener);
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
RestoreSnapshotResponse::new
);
} else {
delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
}
Expand Down

0 comments on commit 5288f3e

Please sign in to comment.