Skip to content

Commit

Permalink
[Remote Store] Add rest endpoint for remote store restore (opensearch…
Browse files Browse the repository at this point in the history
…-project#3576)

* Add rest endpoint for remote store restore

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored and Sachin Kale committed Sep 2, 2022
1 parent 1bddbd5 commit e051d48
Show file tree
Hide file tree
Showing 13 changed files with 441 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,8 @@ public void testApiNamingConventions() throws Exception {
"nodes.hot_threads",
"nodes.usage",
"nodes.reload_secure_settings",
"search_shards", };
"search_shards",
"remote_store.restore", };
List<String> booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password");
Set<String> deprecatedMethods = new HashSet<>();
deprecatedMethods.add("indices.force_merge");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"remote_store.restore":{
"documentation":{
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/remote-store#restore",
"description":"Restores from remote store."
},
"stability":"experimental",
"url":{
"paths":[
{
"path":"/_remotestore/_restore",
"methods":[
"POST"
]
}
]
},
"params":{
"cluster_manager_timeout":{
"type":"time",
"description":"Explicit operation timeout for connection to cluster-manager node"
},
"wait_for_completion":{
"type":"boolean",
"description":"Should this request wait until the operation has completed before returning",
"default":false
}
},
"body":{
"description":"A comma separated list of index IDs",
"required":true
}
}
}
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.opensearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
import org.opensearch.action.admin.cluster.remote.RemoteInfoAction;
import org.opensearch.action.admin.cluster.remote.TransportRemoteInfoAction;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction;
import org.opensearch.action.admin.cluster.remotestore.restore.TransportRestoreRemoteStoreAction;
import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction;
import org.opensearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction;
import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction;
Expand Down Expand Up @@ -261,6 +263,7 @@
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -308,6 +311,7 @@
import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction;
import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction;
import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestSnapshotsStatusAction;
import org.opensearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
Expand Down Expand Up @@ -657,6 +661,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class);
actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class);

// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);

return unmodifiableMap(actions.getRegistry());
}

Expand Down Expand Up @@ -842,6 +849,11 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
}
registerHandler.accept(new RestCatAction(catActions));

// Remote Store APIs
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) {
registerHandler.accept(new RestRestoreRemoteStoreAction());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionType;

/**
* Restore remote store action
*
* @opensearch.internal
*/
public final class RestoreRemoteStoreAction extends ActionType<RestoreRemoteStoreResponse> {

public static final RestoreRemoteStoreAction INSTANCE = new RestoreRemoteStoreAction();
public static final String NAME = "cluster:admin/remotestore/restore";

private RestoreRemoteStoreAction() {
super(NAME, RestoreRemoteStoreResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ConstructingObjectParser;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.rest.RestStatus;
import org.opensearch.snapshots.RestoreInfo;

import java.io.IOException;
import java.util.Objects;

import static org.opensearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Contains information about remote store restores
*
* @opensearch.internal
*/
public final class RestoreRemoteStoreResponse extends ActionResponse implements ToXContentObject {

@Nullable
private final RestoreInfo restoreInfo;

public RestoreRemoteStoreResponse(@Nullable RestoreInfo restoreInfo) {
this.restoreInfo = restoreInfo;
}

public RestoreRemoteStoreResponse(StreamInput in) throws IOException {
super(in);
restoreInfo = RestoreInfo.readOptionalRestoreInfo(in);
}

/**
* Returns restore information if remote store restore was completed before this method returned, null otherwise
*
* @return restore information or null
*/
public RestoreInfo getRestoreInfo() {
return restoreInfo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(restoreInfo);
}

public RestStatus status() {
if (restoreInfo == null) {
return RestStatus.ACCEPTED;
}
return restoreInfo.status();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (restoreInfo != null) {
builder.field("remote_store");
restoreInfo.toXContent(builder, params);
} else {
builder.field("accepted", true);
}
builder.endObject();
return builder;
}

public static final ConstructingObjectParser<RestoreRemoteStoreResponse, Void> PARSER = new ConstructingObjectParser<>(
"restore_remote_store",
true,
v -> {
RestoreInfo restoreInfo = (RestoreInfo) v[0];
Boolean accepted = (Boolean) v[1];
assert (accepted == null && restoreInfo != null) || (accepted != null && accepted && restoreInfo == null) : "accepted: ["
+ accepted
+ "], restoreInfo: ["
+ restoreInfo
+ "]";
return new RestoreRemoteStoreResponse(restoreInfo);
}
);

static {
PARSER.declareObject(
optionalConstructorArg(),
(parser, context) -> RestoreInfo.fromXContent(parser),
new ParseField("remote_store")
);
PARSER.declareBoolean(optionalConstructorArg(), new ParseField("accepted"));
}

public static RestoreRemoteStoreResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RestoreRemoteStoreResponse that = (RestoreRemoteStoreResponse) o;
return Objects.equals(restoreInfo, that.restoreInfo);
}

@Override
public int hashCode() {
return Objects.hash(restoreInfo);
}

@Override
public String toString() {
return "RestoreRemoteStoreResponse{" + "restoreInfo=" + restoreInfo + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.snapshots.RestoreService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Transport action for restore remote store operation
*
* @opensearch.internal
*/
public final class TransportRestoreRemoteStoreAction extends TransportClusterManagerNodeAction<
RestoreRemoteStoreRequest,
RestoreRemoteStoreResponse> {
private final RestoreService restoreService;

@Inject
public TransportRestoreRemoteStoreAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
RestoreService restoreService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
RestoreRemoteStoreAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
RestoreRemoteStoreRequest::new,
indexNameExpressionResolver
);
this.restoreService = restoreService;
}

@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
protected RestoreRemoteStoreResponse read(StreamInput in) throws IOException {
return new RestoreRemoteStoreResponse(in);
}

@Override
protected ClusterBlockException checkBlock(RestoreRemoteStoreRequest request, ClusterState state) {
// Restoring a remote store might change the global state and create/change an index,
// so we need to check for METADATA_WRITE and WRITE blocks
ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
if (blockException != null) {
return blockException;
}
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);

}

@Override
protected void clusterManagerOperation(
final RestoreRemoteStoreRequest request,
final ClusterState state,
final ActionListener<RestoreRemoteStoreResponse> listener
) {
restoreService.restoreFromRemoteStore(
request,
ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
RestoreRemoteStoreResponse::new
);
} else {
delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo()));
}
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
* compatible open source license.
*/

/** Restore Snapshot transport handler. */
/** Restore remote store transport handler. */
package org.opensearch.action.admin.cluster.remotestore.restore;
Loading

0 comments on commit e051d48

Please sign in to comment.