Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple remote state configuration #11858

Merged
merged 4 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING;
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;

/**
* Service responsible for submitting create index requests
Expand Down Expand Up @@ -971,7 +971,7 @@ private static void updateReplicationStrategy(
indexReplicationType = INDEX_REPLICATION_TYPE_SETTING.get(combinedTemplateSettings);
} else if (CLUSTER_REPLICATION_TYPE_SETTING.exists(clusterSettings)) {
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.get(clusterSettings);
} else if (isRemoteStoreAttributePresent(clusterSettings)) {
} else if (isRemoteDataAttributePresent(clusterSettings)) {
indexReplicationType = ReplicationType.SEGMENT;
} else {
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.getDefault(clusterSettings);
Expand All @@ -985,7 +985,7 @@ private static void updateReplicationStrategy(
* @param clusterSettings cluster level settings
*/
private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) {
if (isRemoteStoreAttributePresent(clusterSettings)) {
if (isRemoteDataAttributePresent(clusterSettings)) {
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(
SETTING_REMOTE_SEGMENT_STORE_REPOSITORY,
Expand Down Expand Up @@ -1577,7 +1577,7 @@ public static void validateRefreshIntervalSettings(Settings requestSettings, Clu
* @param clusterSettings cluster setting
*/
static void validateTranslogDurabilitySettings(Settings requestSettings, ClusterSettings clusterSettings, Settings settings) {
if (isRemoteStoreAttributePresent(settings) == false
if (isRemoteDataAttributePresent(settings) == false
|| IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.exists(requestSettings) == false
|| clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING) == false) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,8 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na
}

private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
Set<String> repositoryNames = getValidatedRepositoryNames(node);
List<RepositoryMetadata> repositoryMetadataList = new ArrayList<>();
Set<String> repositoryNames = new HashSet<>();

repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));

for (String repositoryName : repositoryNames) {
repositoryMetadataList.add(buildRepositoryMetadata(node, repositoryName));
Expand All @@ -145,12 +141,36 @@ private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
return new RepositoriesMetadata(repositoryMetadataList);
}

private Set<String> getValidatedRepositoryNames(DiscoveryNode node) {
Set<String> repositoryNames = new HashSet<>();
if (node.getAttributes().containsKey(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)
|| node.getAttributes().containsKey(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
} else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
}
return repositoryNames;
}

public static boolean isRemoteStoreAttributePresent(Settings settings) {
soosinha marked this conversation as resolved.
Show resolved Hide resolved
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty() == false;
}

public static boolean isRemoteDataAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false
|| settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false;
}

public static boolean isRemoteClusterStateAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}

public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteStoreAttributePresent(settings);
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings)
&& isRemoteClusterStateAttributePresent(settings);
}

public RepositoriesMetadata getRepositoriesMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ public void testJoinClusterWithNonRemoteStoreNodeJoining() {
}

public void testJoinClusterWithRemoteStoreNodeJoining() {
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
Map<String, String> map = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
DiscoveryNode joiningNode = newDiscoveryNode(map);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(joiningNode).build())
.build();
Expand Down Expand Up @@ -582,12 +583,94 @@ public void testPreventJoinClusterWithRemoteStoreNodeWithPartialAttributesJoinin
);
assertTrue(
e.getMessage().equals("joining node [" + joiningNode + "] doesn't have the node attribute [" + nodeAttribute.getKey() + "]")
|| e.getMessage()
soosinha marked this conversation as resolved.
Show resolved Hide resolved
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
soosinha marked this conversation as resolved.
Show resolved Hide resolved
);

remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue());
}
}

public void testJoinClusterWithRemoteStateNodeJoiningRemoteStateCluster() {
Map<String, String> existingNodeAttributes = remoteStateNodeAttributes(CLUSTER_STATE_REPO);
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
existingNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();
DiscoveryNode joiningNode = newDiscoveryNode(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster() {
Map<String, String> existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
existingNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();
DiscoveryNode joiningNode = newDiscoveryNode(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
Exception e = assertThrows(
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);
}

public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster() {
Map<String, String> existingNodeAttributes = remoteStateNodeAttributes(CLUSTER_STATE_REPO);
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
existingNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
Exception e = assertThrows(
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);
}

public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception {
Map<String, String> remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
final AllocationService allocationService = mock(AllocationService.class);
Expand Down Expand Up @@ -869,6 +952,23 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);

return new HashMap<>() {
{
put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName);
put(segmentRepositoryTypeAttributeKey, "s3");
put(segmentRepositorySettingsAttributeKeyPrefix + "bucket", "segment_bucket");
put(segmentRepositorySettingsAttributeKeyPrefix + "base_path", "/segment/path");
put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName);
putIfAbsent(translogRepositoryTypeAttributeKey, "s3");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "bucket", "translog_bucket");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "base_path", "/translog/path");
putAll(remoteStateNodeAttributes(clusterStateRepo));
}
};
}

private Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) {
String clusterStateRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
Expand All @@ -882,14 +982,6 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St

return new HashMap<>() {
{
put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName);
put(segmentRepositoryTypeAttributeKey, "s3");
put(segmentRepositorySettingsAttributeKeyPrefix + "bucket", "segment_bucket");
put(segmentRepositorySettingsAttributeKeyPrefix + "base_path", "/segment/path");
put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName);
putIfAbsent(translogRepositoryTypeAttributeKey, "s3");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "bucket", "translog_bucket");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "base_path", "/translog/path");
put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, clusterStateRepo);
putIfAbsent(clusterStateRepositoryTypeAttributeKey, "s3");
putIfAbsent(clusterStateRepositorySettingsAttributeKeyPrefix + "bucket", "state_bucket");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1901,7 +1901,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() {
request,
Settings.EMPTY,
null,
Settings.builder().put("node.attr.remote_store.setting", "test").build(),
Settings.builder().put("node.attr.remote_store.segment.repository", "test").build(),
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
Expand Down
Loading