Skip to content

Commit

Permalink
Fix remote cluster restore for data stream (#10777)
Browse files Browse the repository at this point in the history
* Fix remote cluster restore for data stream, also added integ test

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
(cherry picked from commit b5ef078)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Oct 23, 2023
1 parent d5d130e commit c80563e
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public AcknowledgedResponse createDataStream(String name) throws Exception {
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(name);
AcknowledgedResponse response = client().admin().indices().createDataStream(request).get();
assertThat(response.isAcknowledged(), is(true));
performRemoteStoreTestAction();
return response;
}

Expand Down Expand Up @@ -67,6 +68,7 @@ public RolloverResponse rolloverDataStream(String name) throws Exception {
RolloverResponse response = client().admin().indices().rolloverIndex(request).get();
assertThat(response.isAcknowledged(), is(true));
assertThat(response.isRolledOver(), is(true));
performRemoteStoreTestAction();
return response;
}

Expand Down Expand Up @@ -109,5 +111,4 @@ public AcknowledgedResponse deleteIndexTemplate(String name) throws Exception {
assertThat(response.isAcknowledged(), is(true));
return response;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.datastream.DataStreamRolloverIT;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -291,6 +292,14 @@ private void validateCurrentMetadata() throws Exception {
});
}

public void testDataStreamPostRemoteStateRestore() throws Exception {
new DataStreamRolloverIT() {
protected boolean triggerRemoteStateRestore() {
return true;
}
}.testDataStreamRollover();
}

public void testFullClusterRestoreGlobalMetadata() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,10 @@ public Metadata getLatestMetadata(String clusterName, String clusterUUID) {
// Fetch Index Metadata
Map<String, IndexMetadata> indices = getIndexMetadataMap(clusterName, clusterUUID, clusterMetadataManifest.get());

return Metadata.builder(globalMetadata).indices(indices).build();
Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
indices.values().forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata); });

return Metadata.builder(globalMetadata).indices(indexMetadataMap).build();
}

private Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,9 +826,9 @@ public void testReadLatestIndexMetadataSuccess() throws IOException {
).getIndices();

assertEquals(indexMetadataMap.size(), 1);
assertEquals(indexMetadataMap.get(index.getUUID()).getIndex().getName(), index.getName());
assertEquals(indexMetadataMap.get(index.getUUID()).getNumberOfShards(), indexMetadata.getNumberOfShards());
assertEquals(indexMetadataMap.get(index.getUUID()).getNumberOfReplicas(), indexMetadata.getNumberOfReplicas());
assertEquals(indexMetadataMap.get(index.getName()).getIndex().getName(), index.getName());
assertEquals(indexMetadataMap.get(index.getName()).getNumberOfShards(), indexMetadata.getNumberOfShards());
assertEquals(indexMetadataMap.get(index.getName()).getNumberOfReplicas(), indexMetadata.getNumberOfReplicas());
}

public void testMarkLastStateAsCommittedSuccess() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,18 @@ public void stopAllNodes() {
}
}

/**
* Replace all nodes by stopping all current node and starting new node.
* Used for remote store test cases, where remote state is restored.
*/
public void resetCluster() {
int totalClusterManagerNodes = numClusterManagerNodes();
int totalDataNodes = numDataNodes();
stopAllNodes();
startClusterManagerOnlyNodes(totalClusterManagerNodes);
startDataOnlyNodes(totalDataNodes);
}

private synchronized void startAndPublishNodesAndClients(List<NodeAndClient> nodeAndClients) {
if (nodeAndClients.size() > 0) {
final int newClusterManagers = (int) nodeAndClients.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,30 @@ protected Settings featureFlagSettings() {
return featureSettings.build();
}

/**
* Represent if it needs to trigger remote state restore or not.
* For tests with remote store enabled domain, it will be overridden to true.
*
* @return if needs to perform remote state restore or not
*/
protected boolean triggerRemoteStateRestore() {
return false;
}

/**
* For tests with remote cluster state, it will reset the cluster and cluster state will be
* restored from remote.
*/
protected void performRemoteStoreTestAction() {
if (triggerRemoteStateRestore()) {
String clusterUUIDBefore = clusterService().state().metadata().clusterUUID();
internalCluster().resetCluster();
String clusterUUIDAfter = clusterService().state().metadata().clusterUUID();
// assertion that UUID is changed post restore.
assertFalse(clusterUUIDBefore.equals(clusterUUIDAfter));
}
}

/**
* Creates one or more indices and asserts that the indices are acknowledged. If one of the indices
* already exists this method will fail and wipe all the indices created so far.
Expand Down

0 comments on commit c80563e

Please sign in to comment.