diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RemoteStoreRestoreClusterStateListener.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RemoteStoreRestoreClusterStateListener.java deleted file mode 100644 index 7cbbd5e707b3b..0000000000000 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RemoteStoreRestoreClusterStateListener.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.admin.cluster.remotestore.restore; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.ActionListener; -import org.opensearch.cluster.ClusterChangedEvent; -import org.opensearch.cluster.ClusterStateListener; -import org.opensearch.cluster.RestoreInProgress; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.collect.ImmutableOpenMap; -import org.opensearch.index.shard.ShardId; -import org.opensearch.snapshots.RestoreInfo; -import org.opensearch.snapshots.RestoreService; - -import static org.opensearch.snapshots.RestoreService.restoreInProgress; - -/** - * Transport listener for cluster state updates - * - * @opensearch.internal - */ -public final class RemoteStoreRestoreClusterStateListener implements ClusterStateListener { - - private static final Logger logger = LogManager.getLogger(RemoteStoreRestoreClusterStateListener.class); - - private final ClusterService clusterService; - private final String uuid; - private final ActionListener listener; - - private RemoteStoreRestoreClusterStateListener( - ClusterService clusterService, - RestoreService.RestoreCompletionResponse response, - ActionListener listener - ) { - this.clusterService = clusterService; - this.uuid = response.getUuid(); - this.listener = listener; - - } - - @Override - public void clusterChanged(ClusterChangedEvent changedEvent) { - final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid); - final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid); - if (prevEntry == null) { - // When there is a cluster-manager failure after a restore has been started, this listener might not be registered - // on the current cluster-manager and as such it might miss some intermediary cluster states due to batching. - // Clean up listener in that case and acknowledge completion of restore operation to client. - clusterService.removeListener(this); - listener.onResponse(new RestoreRemoteStoreResponse((RestoreInfo) null)); - } else if (newEntry == null) { - clusterService.removeListener(this); - ImmutableOpenMap shards = prevEntry.shards(); - assert prevEntry.state().completed() : "expected completed remote store restore state but was " + prevEntry.state(); - assert RestoreService.completed(shards) : "expected all restore entries to be completed"; - RestoreInfo ri = new RestoreInfo( - "remote_store", - prevEntry.indices(), - shards.size(), - +shards.size() - RestoreService.failedShards(shards) - ); - RestoreRemoteStoreResponse response = new RestoreRemoteStoreResponse(ri); - logger.debug("restore from remote store completed"); - listener.onResponse(response); - } else { - // restore not completed yet, wait for next cluster state update - } - } - - /** - * Creates a cluster state listener and registers it with the cluster service. The listener passed as a - * parameter will be called when the restore is complete. - */ - public static void createAndRegisterListener( - ClusterService clusterService, - RestoreService.RestoreCompletionResponse response, - ActionListener listener - ) { - clusterService.addListener(new RemoteStoreRestoreClusterStateListener(clusterService, response, listener)); - } -} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java index 957b47478ee53..7304ba25717ac 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java @@ -9,6 +9,7 @@ package org.opensearch.action.admin.cluster.remotestore.restore; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; @@ -78,7 +79,7 @@ protected ClusterBlockException checkBlock(RestoreRemoteStoreRequest request, Cl } @Override - protected void masterOperation( + protected void clusterManagerOperation( final RestoreRemoteStoreRequest request, final ClusterState state, final ActionListener listener @@ -87,10 +88,11 @@ protected void masterOperation( request, ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> { if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { - RemoteStoreRestoreClusterStateListener.createAndRegisterListener( + RestoreClusterStateListener.createAndRegisterListener( clusterService, restoreCompletionResponse, - delegatedListener + delegatedListener, + RestoreRemoteStoreResponse::new ); } else { delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo())); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java index 7d2ca99e3dbf5..d0f78e85e26a5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionResponse; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.RestoreInProgress; @@ -44,6 +45,8 @@ import org.opensearch.snapshots.RestoreInfo; import org.opensearch.snapshots.RestoreService; +import java.util.function.Function; + import static org.opensearch.snapshots.RestoreService.restoreInProgress; /** @@ -51,22 +54,27 @@ * * @opensearch.internal */ -public class RestoreClusterStateListener implements ClusterStateListener { +public class RestoreClusterStateListener implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class); private final ClusterService clusterService; private final String uuid; - private final ActionListener listener; + private final String restoreIdentifier; + private final ActionListener listener; + private final Function actionResponseFactory; private RestoreClusterStateListener( ClusterService clusterService, RestoreService.RestoreCompletionResponse response, - ActionListener listener + ActionListener listener, + Function actionResponseFactory ) { this.clusterService = clusterService; this.uuid = response.getUuid(); + this.restoreIdentifier = response.getSnapshot() != null ? response.getSnapshot().getSnapshotId().getName() : "remote_store"; this.listener = listener; + this.actionResponseFactory = actionResponseFactory; } @Override @@ -78,23 +86,23 @@ public void clusterChanged(ClusterChangedEvent changedEvent) { // on the current cluster-manager and as such it might miss some intermediary cluster states due to batching. // Clean up listener in that case and acknowledge completion of restore operation to client. clusterService.removeListener(this); - listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null)); + listener.onResponse(actionResponseFactory.apply(null)); } else if (newEntry == null) { clusterService.removeListener(this); ImmutableOpenMap shards = prevEntry.shards(); - assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); + assert prevEntry.state().completed() : "expected completed snapshot/remote store restore state but was " + prevEntry.state(); assert RestoreService.completed(shards) : "expected all restore entries to be completed"; RestoreInfo ri = new RestoreInfo( - prevEntry.snapshot().getSnapshotId().getName(), + restoreIdentifier, prevEntry.indices(), shards.size(), shards.size() - RestoreService.failedShards(shards) ); - RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); - logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId()); + T response = actionResponseFactory.apply(ri); + logger.debug("restore of [{}] completed", restoreIdentifier); listener.onResponse(response); } else { - // restore not completed yet, wait for next cluster state update + logger.debug("restore not completed yet, wait for next cluster state update"); } } @@ -102,11 +110,12 @@ public void clusterChanged(ClusterChangedEvent changedEvent) { * Creates a cluster state listener and registers it with the cluster service. The listener passed as a * parameter will be called when the restore is complete. */ - public static void createAndRegisterListener( + public static void createAndRegisterListener( ClusterService clusterService, RestoreService.RestoreCompletionResponse response, - ActionListener listener + ActionListener listener, + Function actionResponseFactory ) { - clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener)); + clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener, actionResponseFactory)); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index e7d95b9e40880..c2f79b2a27157 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -109,7 +109,12 @@ protected void clusterManagerOperation( ) { restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> { if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { - RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener); + RestoreClusterStateListener.createAndRegisterListener( + clusterService, + restoreCompletionResponse, + delegatedListener, + RestoreSnapshotResponse::new + ); } else { delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); }