Skip to content

Commit

Permalink
[Segment Replication] Add ClusterState utility to identify SEGMENT re…
Browse files Browse the repository at this point in the history
…plication (opensearch-project#9593)

* [Segment Replication] Add ClusterState utility to identify SEGMENT replication

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comment

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 authored Aug 29, 2023
1 parent 012c4fa commit 61d4d43
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.single.shard.TransportSingleShardAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
Expand All @@ -49,12 +48,10 @@
import org.opensearch.index.get.GetResult;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Optional;

/**
* Performs the get operation.
Expand Down Expand Up @@ -92,20 +89,11 @@ protected boolean resolveIndex(GetRequest request) {
return true;
}

static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) {
return Optional.ofNullable(state.getMetadata().index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Returns true if GET request should be routed to primary shards, else false.
*/
protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) {
return isSegmentReplicationEnabled(state, indexName) && realtime && preference == null;
return state.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
}

@Override
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.Discovery;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -409,6 +410,21 @@ public boolean supersedes(ClusterState other) {

}

/**
* Utility to identify whether input index belongs to SEGMENT replication in established cluster state.
*
* @param indexName Index name
* @return true if index belong SEGMENT replication, false otherwise
*/
public boolean isSegmentReplicationEnabled(String indexName) {
return Optional.ofNullable(this.getMetadata().index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Metrics for cluster state.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.TestCustomMetadata;

Expand All @@ -73,6 +74,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -84,7 +86,7 @@ public void testSupersedes() {
final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), version);
final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), version);
final DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).build();
ClusterName name = ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY);
ClusterName name = CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY);
ClusterState noClusterManager1 = ClusterState.builder(name).version(randomInt(5)).nodes(nodes).build();
ClusterState noClusterManager2 = ClusterState.builder(name).version(randomInt(5)).nodes(nodes).build();
ClusterState withClusterManager1a = ClusterState.builder(name)
Expand Down Expand Up @@ -115,6 +117,39 @@ public void testSupersedes() {
);
}

public void testIsSegmentReplicationEnabled() {
final String indexName = "test";
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
Settings.Builder builder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(builder)
.numberOfShards(1)
.numberOfReplicas(1);
Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build());
clusterState = ClusterState.builder(clusterState)
.metadata(metadataBuilder.build())
.routingTable(routingTableBuilder.build())
.build();
assertTrue(clusterState.isSegmentReplicationEnabled(indexName));
}

public void testIsSegmentReplicationDisabled() {
final String indexName = "test";
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1);
Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build());
clusterState = ClusterState.builder(clusterState)
.metadata(metadataBuilder.build())
.routingTable(routingTableBuilder.build())
.build();
assertFalse(clusterState.isSegmentReplicationEnabled(indexName));
}

public void testBuilderRejectsNullCustom() {
final ClusterState.Builder builder = ClusterState.builder(ClusterName.DEFAULT);
final String key = randomAlphaOfLength(10);
Expand Down

0 comments on commit 61d4d43

Please sign in to comment.