Skip to content

Commit

Permalink
Decouple remote state configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Jan 11, 2024
1 parent 5ccd134 commit d8ab19e
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreSegmentOrTranslogAttributePresent;

/**
* Service responsible for submitting create index requests
Expand Down Expand Up @@ -959,7 +959,7 @@ private static void updateReplicationStrategy(
indexReplicationType = INDEX_REPLICATION_TYPE_SETTING.get(combinedTemplateSettings);
} else if (CLUSTER_REPLICATION_TYPE_SETTING.exists(clusterSettings)) {
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.get(clusterSettings);
} else if (isRemoteStoreAttributePresent(clusterSettings)) {
} else if (isRemoteStoreSegmentOrTranslogAttributePresent(clusterSettings)) {
indexReplicationType = ReplicationType.SEGMENT;
} else {
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.getDefault(clusterSettings);
Expand All @@ -973,7 +973,7 @@ private static void updateReplicationStrategy(
* @param clusterSettings cluster level settings
*/
private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) {
if (isRemoteStoreAttributePresent(clusterSettings)) {
if (isRemoteStoreSegmentOrTranslogAttributePresent(clusterSettings)) {
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(
SETTING_REMOTE_SEGMENT_STORE_REPOSITORY,
Expand Down Expand Up @@ -1565,7 +1565,7 @@ public static void validateRefreshIntervalSettings(Settings requestSettings, Clu
* @param clusterSettings cluster setting
*/
static void validateTranslogDurabilitySettings(Settings requestSettings, ClusterSettings clusterSettings, Settings settings) {
if (isRemoteStoreAttributePresent(settings) == false
if (isRemoteStoreSegmentOrTranslogAttributePresent(settings) == false
|| IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.exists(requestSettings) == false
|| clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING) == false) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* This is an abstraction for validating and storing information specific to remote backed storage nodes.
Expand Down Expand Up @@ -131,12 +132,16 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na
}

private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
validateSegmentAttributes(node);
List<RepositoryMetadata> repositoryMetadataList = new ArrayList<>();
Set<String> repositoryNames = new HashSet<>();

repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
Set<String> repositoryNames = Stream.of(
REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY,
REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY,
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
)
.map(repoKey -> node.getAttributes().get(repoKey))
.filter(repositoryName -> Strings.isNullOrEmpty(repositoryName) == false)
.collect(Collectors.toSet());

for (String repositoryName : repositoryNames) {
repositoryMetadataList.add(buildRepositoryMetadata(node, repositoryName));
Expand All @@ -145,12 +150,31 @@ private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
return new RepositoriesMetadata(repositoryMetadataList);
}

private void validateSegmentAttributes(DiscoveryNode node) {
if (node.getAttributes().containsKey(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)
|| node.getAttributes().containsKey(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
}
}

public static boolean isRemoteStoreAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty() == false;
}

public static boolean isRemoteStoreSegmentOrTranslogAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false
|| settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false;
}

public static boolean isRemoteClusterStateAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}

public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteStoreAttributePresent(settings);
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings)
&& isRemoteClusterStateAttributePresent(settings);
}

public RepositoriesMetadata getRepositoriesMetadata() {
Expand All @@ -175,8 +199,12 @@ public int hashCode() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

RemoteStoreNodeAttribute that = (RemoteStoreNodeAttribute) o;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,14 @@ public void testPreventJoinClusterWithRemoteStoreNodeWithPartialAttributesJoinin
);
assertTrue(
e.getMessage().equals("joining node [" + joiningNode + "] doesn't have the node attribute [" + nodeAttribute.getKey() + "]")
|| e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);

remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1898,7 +1898,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() {
request,
Settings.EMPTY,
null,
Settings.builder().put("node.attr.remote_store.setting", "test").build(),
Settings.builder().put("node.attr.remote_store.segment.repository", "test").build(),
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
Expand Down

0 comments on commit d8ab19e

Please sign in to comment.