-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Search only replicas (scale to zero) with Reader/Writer Separation #17299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+4,826
−76
Merged
Changes from all commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
5c32f43
Scale to Zero
prudhvigodithi 1d71948
Scale to zero 2nd interation
prudhvigodithi e82f050
Merge remote-tracking branch 'upstream/main' into searchonly-2
prudhvigodithi e89b812
Upstream fetch
prudhvigodithi db5212b
Merge remote-tracking branch 'upstream/main' into searchonly-2
prudhvigodithi 97b4d0e
Upstream fetch
prudhvigodithi 6e2bb1d
Merge remote-tracking branch 'upstream/main' into searchonly-2
prudhvigodithi becf2c0
Upstream Fetch
prudhvigodithi 408bf24
Merge remote-tracking branch 'upstream/main' into searchonly-2
prudhvigodithi 1a2b5ad
Fix the error with replication checkpoint
prudhvigodithi 57ae4a4
Upstream Fetch
prudhvigodithi ba9d45d
Merge remote-tracking branch 'upstream/main' into searchonly-2
prudhvigodithi 084956f
Upstream Fetch
prudhvigodithi eae2518
Merge remote-tracking branch 'upstream/main' into searchonly-2
prudhvigodithi c50adcb
Code refactor and update based on comments
prudhvigodithi 3d2a21f
Merge remote-tracking branch 'upstream/main' into searchonly-2
prudhvigodithi 11956cd
Upstream fetch and resolve conflicts
prudhvigodithi aca79b1
Fix tests and address comments
prudhvigodithi f709020
Code clean up and address the suggestions
prudhvigodithi 929fd75
Merge remote-tracking branch 'upstream/main' into searchonly-2
prudhvigodithi ec7d969
Updated ScaleIndex IT tests
prudhvigodithi 5b1aac4
Updated CHANGELOG.md
prudhvigodithi e8baa59
Updated CHANGELOG.md
prudhvigodithi ca161bb
Fix conflicts
prudhvigodithi d7dbfa1
Update code based on comments and suggestions
prudhvigodithi 3091ea1
Fix and add new tests
prudhvigodithi d891a8e
Fix conflicts
prudhvigodithi 4f3b2a5
use acquireAllPrimaryOperationsPermits
prudhvigodithi 096672c
Fix CHANGELOG.md conflict
prudhvigodithi ec3cde7
Fix CHANGELOG.md conflict
prudhvigodithi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
366 changes: 366 additions & 0 deletions
366
...alClusterTest/java/org/opensearch/action/admin/indices/scale/searchonly/ScaleIndexIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,366 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.action.admin.indices.scale.searchonly; | ||
|
|
||
| import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; | ||
| import org.opensearch.action.index.IndexResponse; | ||
| import org.opensearch.action.search.SearchResponse; | ||
| import org.opensearch.action.support.WriteRequest; | ||
| import org.opensearch.cluster.ClusterState; | ||
| import org.opensearch.cluster.metadata.IndexMetadata; | ||
| import org.opensearch.cluster.routing.IndexRoutingTable; | ||
| import org.opensearch.cluster.routing.IndexShardRoutingTable; | ||
| import org.opensearch.cluster.routing.ShardRouting; | ||
| import org.opensearch.common.settings.Settings; | ||
| import org.opensearch.common.util.FeatureFlags; | ||
| import org.opensearch.core.rest.RestStatus; | ||
| import org.opensearch.indices.replication.common.ReplicationType; | ||
| import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; | ||
| import org.opensearch.test.InternalTestCluster; | ||
| import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
|
||
| import java.util.HashSet; | ||
| import java.util.Set; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; | ||
| import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; | ||
| import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; | ||
| import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; | ||
| import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
| import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; | ||
|
|
||
| @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
| public class ScaleIndexIT extends RemoteStoreBaseIntegTestCase { | ||
|
|
||
| private static final String TEST_INDEX = "test_scale_index"; | ||
|
|
||
| @Override | ||
| protected Settings featureFlagSettings() { | ||
| return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build(); | ||
| } | ||
|
|
||
| public Settings indexSettings() { | ||
| return Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); | ||
| } | ||
|
|
||
| /** | ||
| * Tests the full lifecycle of scaling an index down to search-only mode, | ||
| * scaling search replicas while in search-only mode, verifying cluster health in | ||
| * various states, and then scaling back up to normal mode. | ||
| */ | ||
| public void testFullSearchOnlyReplicasFullLifecycle() throws Exception { | ||
| internalCluster().startClusterManagerOnlyNode(); | ||
| internalCluster().startDataOnlyNodes(2); | ||
| internalCluster().startSearchOnlyNodes(3); | ||
|
|
||
| Settings specificSettings = Settings.builder() | ||
| .put(indexSettings()) | ||
| .put(SETTING_NUMBER_OF_SHARDS, 1) | ||
| .put(SETTING_NUMBER_OF_REPLICAS, 1) | ||
| .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) | ||
| .build(); | ||
|
|
||
| createIndex(TEST_INDEX, specificSettings); | ||
| ensureGreen(TEST_INDEX); | ||
|
|
||
| for (int i = 0; i < 10; i++) { | ||
| IndexResponse indexResponse = client().prepareIndex(TEST_INDEX) | ||
| .setId(Integer.toString(i)) | ||
| .setSource("field1", "value" + i) | ||
| .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
| .get(); | ||
| assertEquals(RestStatus.CREATED, indexResponse.status()); | ||
| } | ||
|
|
||
| assertBusy(() -> { | ||
prudhvigodithi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| SearchResponse searchResponse = client().prepareSearch(TEST_INDEX).get(); | ||
| assertHitCount(searchResponse, 10); | ||
| assertSearchNodeDocCounts(10, TEST_INDEX); | ||
| }, 30, TimeUnit.SECONDS); | ||
|
|
||
| ensureGreen(TEST_INDEX); | ||
|
|
||
| // Scale down to search-only mode | ||
| assertAcked(client().admin().indices().prepareScaleSearchOnly(TEST_INDEX, true).get()); | ||
|
|
||
| // Verify search-only setting is enabled | ||
| GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(TEST_INDEX).get(); | ||
| assertTrue(settingsResponse.getSetting(TEST_INDEX, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey()).equals("true")); | ||
|
|
||
| // Verify that write operations are blocked during scale-down | ||
| assertBusy(() -> { | ||
prudhvigodithi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| try { | ||
| // Attempt to index a document while scale-down is in progress | ||
| client().prepareIndex(TEST_INDEX) | ||
| .setId("sample-write-after-search-only-block") | ||
| .setSource("field1", "value1") | ||
| .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
| .get(); | ||
| fail("Write operation should be blocked during scale-down"); | ||
| } catch (Exception e) { | ||
| assertTrue( | ||
| "Exception should indicate index scaled down", | ||
| e.getMessage().contains("blocked by: [FORBIDDEN/20/index scaled down]") | ||
| ); | ||
| } | ||
| }, 10, TimeUnit.SECONDS); | ||
|
|
||
| ensureGreen(TEST_INDEX); | ||
|
|
||
| // Verify search still works on all search nodes | ||
| assertSearchNodeDocCounts(10, TEST_INDEX); | ||
|
|
||
| // Scale up search replicas while in search-only mode | ||
| assertAcked( | ||
| client().admin() | ||
| .indices() | ||
| .prepareUpdateSettings(TEST_INDEX) | ||
| .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 3).build()) | ||
| .get() | ||
| ); | ||
|
|
||
| ensureGreen(TEST_INDEX); | ||
|
|
||
| // Verify search still works on all search nodes | ||
| assertBusy(() -> { assertSearchNodeDocCounts(10, TEST_INDEX); }, 30, TimeUnit.SECONDS); | ||
|
|
||
| // Scale down search replicas while still in search-only mode | ||
| assertAcked( | ||
| client().admin() | ||
| .indices() | ||
| .prepareUpdateSettings(TEST_INDEX) | ||
| .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2).build()) | ||
| .get() | ||
| ); | ||
|
|
||
| ensureGreen(TEST_INDEX); | ||
|
|
||
| // Verify search still works on all search nodes | ||
| assertBusy(() -> { assertSearchNodeDocCounts(10, TEST_INDEX); }, 30, TimeUnit.SECONDS); | ||
|
|
||
| // Test cluster health when one search replica is down | ||
| internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodesWithSearchOnlyReplicas()[0])); | ||
|
|
||
| assertEquals( | ||
| "Index health should be YELLOW with one search replica down", | ||
| "YELLOW", | ||
| client().admin().cluster().prepareHealth(TEST_INDEX).get().getStatus().name() | ||
| ); | ||
|
|
||
| // Start a replacement search node and wait for recovery | ||
| internalCluster().startSearchOnlyNode(); | ||
| ensureGreen(TEST_INDEX); | ||
|
|
||
| // Scale back up to normal mode | ||
| assertAcked(client().admin().indices().prepareScaleSearchOnly(TEST_INDEX, false).get()); | ||
| ensureGreen(TEST_INDEX); | ||
|
|
||
| // Verify search-only setting is disabled | ||
| settingsResponse = client().admin().indices().prepareGetSettings(TEST_INDEX).get(); | ||
| assertFalse(settingsResponse.getSetting(TEST_INDEX, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey()).equals("true")); | ||
|
|
||
| // Verify search still works after scale-up | ||
| assertBusy(() -> { | ||
| SearchResponse response = client().prepareSearch(TEST_INDEX).get(); | ||
| assertHitCount(response, 10); | ||
| assertSearchNodeDocCounts(10, TEST_INDEX); | ||
| }, 30, TimeUnit.SECONDS); | ||
|
|
||
| // Verify writes work again after scale-up | ||
| IndexResponse indexResponse = client().prepareIndex(TEST_INDEX) | ||
| .setId("new-doc") | ||
| .setSource("field1", "new-value") | ||
| .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
| .get(); | ||
| assertEquals(RestStatus.CREATED, indexResponse.status()); | ||
|
|
||
| // Verify new document is searchable | ||
| assertBusy(() -> { | ||
| SearchResponse finalResponse = client().prepareSearch(TEST_INDEX).get(); | ||
| assertHitCount(finalResponse, 11); | ||
| assertSearchNodeDocCounts(11, TEST_INDEX); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Tests scaling down an index to search-only mode when there are no search replicas. | ||
| */ | ||
| public void testScaleDownValidationWithoutSearchReplicas() { | ||
| internalCluster().startClusterManagerOnlyNode(); | ||
| internalCluster().startDataOnlyNodes(2); | ||
| internalCluster().startSearchOnlyNode(); | ||
|
|
||
| Settings specificSettings = Settings.builder() | ||
| .put(indexSettings()) | ||
| .put(SETTING_NUMBER_OF_SHARDS, 2) | ||
| .put(SETTING_NUMBER_OF_REPLICAS, 1) | ||
| .build(); | ||
|
|
||
| createIndex(TEST_INDEX, specificSettings); | ||
| ensureYellow(TEST_INDEX); | ||
|
|
||
| IllegalArgumentException exception = expectThrows( | ||
| IllegalArgumentException.class, | ||
| () -> client().admin().indices().prepareScaleSearchOnly(TEST_INDEX, true).get() | ||
| ); | ||
|
|
||
| assertTrue( | ||
| "Expected error about missing search replicas", | ||
prudhvigodithi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| exception.getMessage().contains("Cannot scale to zero without search replicas for index:") | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Scenario 1: Tests search-only replicas recovery with persistent data directory | ||
| * and cluster.remote_store.state.enabled=false | ||
| */ | ||
| public void testSearchOnlyRecoveryWithPersistentData() throws Exception { | ||
| internalCluster().startClusterManagerOnlyNode(); | ||
| internalCluster().startDataOnlyNodes(2); | ||
| internalCluster().startSearchOnlyNode(); | ||
|
|
||
| Settings specificSettings = Settings.builder() | ||
| .put(indexSettings()) | ||
| .put(SETTING_NUMBER_OF_SHARDS, 2) | ||
| .put(SETTING_NUMBER_OF_REPLICAS, 1) | ||
| .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) | ||
| .build(); | ||
|
|
||
| createIndex(TEST_INDEX, specificSettings); | ||
| ensureGreen(TEST_INDEX); | ||
|
|
||
| assertAcked(client().admin().indices().prepareScaleSearchOnly(TEST_INDEX, true).get()); | ||
|
|
||
| assertBusy(() -> { | ||
| ClusterState state = client().admin().cluster().prepareState().get().getState(); | ||
| IndexRoutingTable routingTable = state.routingTable().index(TEST_INDEX); | ||
|
|
||
| for (IndexShardRoutingTable shardTable : routingTable) { | ||
| assertNull("Primary should be null", shardTable.primaryShard()); | ||
| assertTrue("No writer replicas should exist", shardTable.writerReplicas().isEmpty()); | ||
| assertEquals( | ||
| "One search replica should be active", | ||
| 1, | ||
| shardTable.searchOnlyReplicas().stream().filter(ShardRouting::active).count() | ||
| ); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Scenario 2: Tests behavior with cluster.remote_store.state.enabled=true | ||
| * but without data directory preservation | ||
| */ | ||
| public void testClusterRemoteStoreStateEnabled() throws Exception { | ||
| Settings remoteStoreSettings = Settings.builder().put(nodeSettings(0)).put("cluster.remote_store.state.enabled", true).build(); | ||
|
|
||
| internalCluster().startClusterManagerOnlyNode(remoteStoreSettings); | ||
| internalCluster().startDataOnlyNodes(2); | ||
| internalCluster().startSearchOnlyNode(); | ||
|
|
||
| Settings specificSettings = Settings.builder() | ||
| .put(indexSettings()) | ||
| .put(SETTING_NUMBER_OF_SHARDS, 2) | ||
| .put(SETTING_NUMBER_OF_REPLICAS, 1) | ||
| .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) | ||
| .build(); | ||
|
|
||
| createIndex(TEST_INDEX, specificSettings); | ||
| ensureGreen(TEST_INDEX); | ||
|
|
||
| assertAcked(client().admin().indices().prepareScaleSearchOnly(TEST_INDEX, true).get()); | ||
|
|
||
| internalCluster().stopAllNodes(); | ||
|
|
||
| internalCluster().startClusterManagerOnlyNode(remoteStoreSettings); | ||
| internalCluster().startDataOnlyNodes(2); | ||
| internalCluster().startSearchOnlyNode(); | ||
|
|
||
| assertBusy(() -> { | ||
| ClusterState state = client().admin().cluster().prepareState().get().getState(); | ||
| IndexRoutingTable routingTable = state.routingTable().index(TEST_INDEX); | ||
|
|
||
| for (IndexShardRoutingTable shardTable : routingTable) { | ||
| assertTrue( | ||
| "Only search replicas should be active", | ||
| shardTable.searchOnlyReplicas().stream().anyMatch(ShardRouting::active) | ||
| ); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Scenario 3: Tests recovery with persistent data directory and remote store state | ||
| */ | ||
| public void testRecoveryWithPersistentDataAndRemoteStore() throws Exception { | ||
| Settings remoteStoreSettings = Settings.builder().put(nodeSettings(0)).put("cluster.remote_store.state.enabled", true).build(); | ||
|
|
||
| internalCluster().startClusterManagerOnlyNode(remoteStoreSettings); | ||
| internalCluster().startDataOnlyNodes(2); | ||
| internalCluster().startSearchOnlyNode(); | ||
|
|
||
| Settings specificSettings = Settings.builder() | ||
| .put(indexSettings()) | ||
| .put(SETTING_NUMBER_OF_SHARDS, 2) | ||
| .put(SETTING_NUMBER_OF_REPLICAS, 1) | ||
| .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) | ||
| .build(); | ||
|
|
||
| createIndex(TEST_INDEX, specificSettings); | ||
| ensureGreen(TEST_INDEX); | ||
|
|
||
| internalCluster().fullRestart(); | ||
|
|
||
| ensureGreen(TEST_INDEX); | ||
| assertAcked(client().admin().indices().prepareScaleSearchOnly(TEST_INDEX, true).get()); | ||
|
|
||
| assertBusy(() -> { assertEquals("One search replica should be active", 1, findNodesWithSearchOnlyReplicas().length); }); | ||
| } | ||
|
|
||
| /** | ||
| * Helper method to find all nodes that contain active search-only replica shards | ||
| * @return Array of node names that have active search-only replicas | ||
| */ | ||
| private String[] findNodesWithSearchOnlyReplicas() { | ||
| ClusterState state = client().admin().cluster().prepareState().get().getState(); | ||
| IndexRoutingTable indexRoutingTable = state.routingTable().index(TEST_INDEX); | ||
|
|
||
| // Use a set to avoid duplicates if multiple shards are on the same node | ||
| Set<String> nodeNames = new HashSet<>(); | ||
|
|
||
| for (IndexShardRoutingTable shardTable : indexRoutingTable) { | ||
| for (ShardRouting searchReplica : shardTable.searchOnlyReplicas()) { | ||
| if (searchReplica.active()) { | ||
| String nodeId = searchReplica.currentNodeId(); | ||
| nodeNames.add(state.nodes().get(nodeId).getName()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (nodeNames.isEmpty()) { | ||
| throw new AssertionError("Could not find any node with active search-only replica"); | ||
| } | ||
|
|
||
| return nodeNames.toArray(new String[0]); | ||
| } | ||
|
|
||
| /** | ||
| * Assert that documents are accessible and have the expected count across all search nodes | ||
| * @param expectedDocCount Expected number of documents in the index | ||
| * @param index The index name to search | ||
| */ | ||
| protected void assertSearchNodeDocCounts(int expectedDocCount, String index) { | ||
| // Check on all nodes that have search-only replicas | ||
| String[] searchNodes = findNodesWithSearchOnlyReplicas(); | ||
| for (String node : searchNodes) { | ||
| assertHitCount(client(node).prepareSearch(index).setSize(0).get(), expectedDocCount); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.