Skip to content

Commit

Permalink
Fixing node join flow with repository metadata updated after node joi…
Browse files Browse the repository at this point in the history
…ned and a node drops and joins back with older node attributes

Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
  • Loading branch information
psychbot committed Sep 7, 2023
1 parent f82142d commit 2c7b244
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailu
}

protected void deleteRepo() {
logger.info("--> Deleting all the indices");
internalCluster().wipeIndices("_all");
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
logger.info("--> Deleting the repository={}", TRANSLOG_REPOSITORY_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public Settings indexSettings() {

@After
public void teardown() {
internalCluster().wipeIndices("_all");
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void setup() {

@After
public void teardown() {
internalCluster().wipeIndices("_all");
assertAcked(clusterAdmin().prepareDeleteRepository(BASE_REMOTE_REPO));
}

Expand Down Expand Up @@ -442,7 +443,7 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionException, InterruptedException {
public void testRestoreShallowSnapshotRepository() throws ExecutionException, InterruptedException {
String indexName1 = "testindex1";
String snapshotRepoName = "test-restore-snapshot-repo";
String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX;
Expand Down Expand Up @@ -492,22 +493,7 @@ public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionExce
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

createRepository(BASE_REMOTE_REPO, "fs", absolutePath2);

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenameReplacement(restoredIndexName1)
.get();

assertTrue(restoreSnapshotResponse.getRestoreInfo().failedShards() > 0);

ensureRed(restoredIndexName1);

client().admin().indices().close(Requests.closeIndexRequest(restoredIndexName1)).get();
client().admin().indices().close(Requests.closeIndexRequest(indexName1)).get();
createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath);
RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public void teardown() {
clusterSettingsSuppliedByTest = false;
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME);
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME);
internalCluster().wipeIndices("_all");
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_2_NAME));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void setup() {

@After
public void teardown() {
internalCluster().wipeIndices("_all");
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void setup() {

@After
public void teardown() {
internalCluster().wipeIndices("_all");
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,15 @@ public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String nam
}

public void testRateLimitedRemoteUploads() throws Exception {
clusterSettingsSuppliedByTest = true;
overrideBuildRepositoryMetadata = true;
internalCluster().startNode();
Settings.Builder clusterSettings = Settings.builder()
.put(remoteStoreClusterSettings(REPOSITORY_NAME, repositoryLocation, REPOSITORY_2_NAME, repositoryLocation));
clusterSettings.put(
String.format(Locale.getDefault(), "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, REPOSITORY_NAME),
MockFsRepositoryPlugin.TYPE
);
internalCluster().startNode(clusterSettings.build());
Client client = client();
logger.info("--> updating repository");
assertAcked(
Expand All @@ -100,7 +107,6 @@ public void testRateLimitedRemoteUploads() throws Exception {
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(
Settings.builder()
.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true)
.put("location", repositoryLocation)
.put("compress", compress)
.put("max_remote_upload_bytes_per_sec", "1kb")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -133,10 +134,12 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode
boolean repositoryAlreadyPresent = false;
for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) {
if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) {
if (newRepositoryMetadata.equalsIgnoreGenerations(existingRepositoryMetadata)) {
try {
newRepositoryMetadata = repositoriesService.get()
.ensureValidSystemRepositoryUpdate(newRepositoryMetadata, existingRepositoryMetadata);
repositoryAlreadyPresent = true;
break;
} else {
} catch (RepositoryException e) {
throw new IllegalStateException(
"new repository metadata ["
+ newRepositoryMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public ClusterState execute(ClusterState currentState) {
}
ensureCryptoSettingsAreSame(repositoryMetadata, request);
found = true;
repositoriesMetadata.add(ensureValidSystemRepositoryUpdate(repositoryMetadata, newRepositoryMetadata));
repositoriesMetadata.add(ensureValidSystemRepositoryUpdate(newRepositoryMetadata, repositoryMetadata));
} else {
repositoriesMetadata.add(repositoryMetadata);
}
Expand Down Expand Up @@ -804,8 +804,11 @@ private static boolean isSystemRepositorySettingPresent(Settings repositoryMetad
return SYSTEM_REPOSITORY_SETTING.get(repositoryMetadataSettings);
}

private static boolean nullOrEqual(String newValue, String currentValue) {
if (newValue != null && newValue.equals(currentValue) == false) {
private static boolean isValueEqual(String newValue, String currentValue) {
if (newValue == null) {
throw new IllegalArgumentException("new value cannot be empty, " + "current value [" + currentValue + "]");
}
if (newValue.equals(currentValue) == false) {
throw new IllegalArgumentException(
"trying to modify an unmodifiable attribute of system repository from "
+ "current value ["
Expand All @@ -818,14 +821,14 @@ private static boolean nullOrEqual(String newValue, String currentValue) {
return true;
}

private RepositoryMetadata ensureValidSystemRepositoryUpdate(
RepositoryMetadata currentRepositoryMetadata,
RepositoryMetadata newRepositoryMetadata
public RepositoryMetadata ensureValidSystemRepositoryUpdate(
RepositoryMetadata newRepositoryMetadata,
RepositoryMetadata currentRepositoryMetadata
) {
if (isSystemRepositorySettingPresent(currentRepositoryMetadata.settings())) {
Settings.Builder updatedSettings = Settings.builder().put(newRepositoryMetadata.settings());
try {
nullOrEqual(newRepositoryMetadata.type(), currentRepositoryMetadata.name());
isValueEqual(newRepositoryMetadata.type(), currentRepositoryMetadata.type());

Repository repository = repositories.get(currentRepositoryMetadata.name());
Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings();
Expand All @@ -837,10 +840,14 @@ private RepositoryMetadata ensureValidSystemRepositoryUpdate(
.collect(Collectors.toList());

for (String restrictedSettingKey : restrictedSettings) {
nullOrEqual(
newRepositoryMetadataSettings.get(restrictedSettingKey),
currentRepositoryMetadataSettings.get(restrictedSettingKey)
);
String newSettingValue = newRepositoryMetadataSettings.get(restrictedSettingKey);
String currentSettingValue = currentRepositoryMetadataSettings.get(restrictedSettingKey);

isValueEqual(newSettingValue, currentSettingValue);

if (currentSettingValue != null) {
updatedSettings.put(restrictedSettingKey, currentSettingValue);
}
}
} catch (IllegalArgumentException e) {
throw new RepositoryException(currentRepositoryMetadata.name(), e.getMessage());
Expand Down

0 comments on commit 2c7b244

Please sign in to comment.