Skip to content

Commit

Permalink
Merge branch '2.x' into backport/backport-13306-to-2.x
Browse files Browse the repository at this point in the history
Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
  • Loading branch information
chishui authored Apr 30, 2024
2 parents d78b7c2 + 610bb05 commit 587aed7
Show file tree
Hide file tree
Showing 31 changed files with 1,632 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386))
- [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276))
- [Batch Ingestion] Add `batch_size` to `_bulk` API. ([#12457](https://github.com/opensearch-project/OpenSearch/issues/12457))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,10 @@ public void testCacheClearanceAfterIndexClosure() throws Exception {
String index = "index";
setupIndex(client, index);

// assert there are no entries in the cache for index
assertEquals(0, getRequestCacheStats(client, index).getMemorySizeInBytes());
// assert there are no entries in the cache from other indices in the node
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
// create first cache entry in index
createCacheEntry(client, index, "hello");
assertCacheState(client, index, 0, 1);
Expand All @@ -1135,7 +1139,7 @@ public void testCacheClearanceAfterIndexClosure() throws Exception {
// sleep until cache cleaner would have cleaned up the stale key from index
assertBusy(() -> {
// cache cleaner should have cleaned up the stale keys from index
assertFalse(getNodeCacheStats(client).getMemorySizeInBytes() > 0);
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

Expand All @@ -1154,6 +1158,10 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
String index = "index";
setupIndex(client, index);

// assert there are no entries in the cache for index
assertEquals(0, getRequestCacheStats(client, index).getMemorySizeInBytes());
// assert there are no entries in the cache from other indices in the node
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
// create first cache entry in index
createCacheEntry(client, index, "hello");
assertCacheState(client, index, 0, 1);
Expand All @@ -1172,13 +1180,13 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
// sleep until cache cleaner would have cleaned up the stale key from index
assertBusy(() -> {
// cache cleaner should have cleaned up the stale keys from index
assertFalse(getNodeCacheStats(client).getMemorySizeInBytes() > 0);
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

// when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys
public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
int cacheCleanIntervalInMillis = 300;
int cacheCleanIntervalInMillis = 10;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10)
Expand All @@ -1193,37 +1201,41 @@ public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
setupIndex(client, index1);
setupIndex(client, index2);

// assert cache is empty for index1
assertEquals(0, getRequestCacheStats(client, index1).getMemorySizeInBytes());
// create first cache entry in index1
createCacheEntry(client, index1, "hello");
assertCacheState(client, index1, 0, 1);
long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(memorySizeForIndex1 > 0);
long memorySizeForIndex1With1Entries = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(memorySizeForIndex1With1Entries > 0);

// create second cache entry in index1
createCacheEntry(client, index1, "there");
assertCacheState(client, index1, 0, 2);
long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1);
long memorySizeForIndex1With2Entries = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(memorySizeForIndex1With2Entries > memorySizeForIndex1With1Entries);

// assert cache is empty for index2
assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes());
// create first cache entry in index2
createCacheEntry(client, index2, "hello");
assertCacheState(client, index2, 0, 1);
assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0);

// force refresh index 1 so that it creates 2 stale keys
flushAndRefresh(index1);
// create another cache entry in index 1, this should not be cleaned up.
// force refresh both index1 and index2
flushAndRefresh(index1, index2);
// create another cache entry in index 1 same as memorySizeForIndex1With1Entries, this should not be cleaned up.
createCacheEntry(client, index1, "hello");
// record the size of this entry
long memorySizeOfLatestEntryForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes() - finalMemorySizeForIndex1;
// force refresh index 2 so that it creates 1 stale key
flushAndRefresh(index2);
// sleep until cache cleaner would have cleaned up the stale key from index 2
// sleep until cache cleaner would have cleaned up the stale key from index2
assertBusy(() -> {
// cache cleaner should have cleaned up the stale key from index 2
// cache cleaner should have cleaned up the stale key from index2 and hence cache should be empty
assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes());
// cache cleaner should have only cleaned up the stale entities
assertEquals(memorySizeOfLatestEntryForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes());
// cache cleaner should have only cleaned up the stale entities for index1
long currentMemorySizeInBytesForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes();
// assert the memory size of index1 to only contain 1 entry added after flushAndRefresh
assertEquals(memorySizeForIndex1With1Entries, currentMemorySizeInBytesForIndex1);
// cache for index1 should not be empty since there was an item cached after flushAndRefresh
assertTrue(currentMemorySizeInBytesForIndex1 > 0);
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
Expand Down Expand Up @@ -199,4 +200,25 @@ public void setRefreshFrequency(int refreshFrequency) {
this.refreshFrequency = refreshFrequency;
}
}

public void excludeNodeSet(String attr, String value) {
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.exclude._" + attr, value))
.get()
);
}

public void stopShardRebalancing() {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none").build())
.get()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -132,8 +133,8 @@ public void testRemotePrimaryDocRepReplica() throws Exception {

/*
Scenario:
- Starts 1 docrep backed data node
- Creates an index with 0 replica
- Starts 2 docrep backed data node
- Creates an index with 1 replica
- Starts 1 remote backed data node
- Index some docs
- Move primary copy from docrep to remote through _cluster/reroute
Expand All @@ -145,14 +146,14 @@ public void testRemotePrimaryDocRepReplica() throws Exception {
public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 1 docrep data nodes");
String docrepNodeName = internalCluster().startDataOnlyNode();
logger.info("---> Starting 2 docrep data nodes");
internalCluster().startDataOnlyNodes(2);
internalCluster().validateClusterFormed();
assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0);

logger.info("---> Creating index with 0 replica");
logger.info("---> Creating index with 1 replica");
Settings zeroReplicas = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s")
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s")
.build();
Expand Down Expand Up @@ -245,14 +246,26 @@ RLs on remote enabled copies are brought up to (GlobalCkp + 1) upon a flush requ
pollAndCheckRetentionLeases(REMOTE_PRI_DOCREP_REMOTE_REP);
}

/*
Scenario:
- Starts 2 docrep backed data node
- Creates an index with 1 replica
- Starts 1 remote backed data node
- Index some docs
- Move primary copy from docrep to remote through _cluster/reroute
- Starts another remote backed data node
- Expands index to 2 replicas. One replica copy lies in remote backed node and other in docrep backed node
- Index some more docs
- Assert retention lease consistency
*/
public void testMissingRetentionLeaseCreatedOnFailedOverRemoteReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting docrep data node");
internalCluster().startDataOnlyNode();
logger.info("---> Starting 2 docrep data nodes");
internalCluster().startDataOnlyNodes(2);

Settings zeroReplicasAndOverridenSyncIntervals = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.build();
Expand Down Expand Up @@ -323,25 +336,24 @@ private void pollAndCheckRetentionLeases(String indexName) throws Exception {

/*
Scenario:
- Starts 1 docrep backed data node
- Creates an index with 0 replica
- Starts 2 docrep backed data node
- Creates an index with 1 replica
- Starts 1 remote backed data node
- Move primary copy from docrep to remote through _cluster/reroute
- Expands index to 1 replica
- Stops remote enabled node
- Ensure doc count is same after failover
- Index some more docs to ensure working of failed-over primary
*/
public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 1 docrep data nodes");
String docrepNodeName = internalCluster().startDataOnlyNode();
logger.info("---> Starting 2 docrep data nodes");
internalCluster().startDataOnlyNodes(2);
internalCluster().validateClusterFormed();
assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0);

logger.info("---> Creating index with 0 replica");
Settings excludeRemoteNode = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
Settings excludeRemoteNode = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
createIndex(FAILOVER_REMOTE_TO_DOCREP, excludeRemoteNode);
ensureGreen(FAILOVER_REMOTE_TO_DOCREP);
initDocRepToRemoteMigration();
Expand Down Expand Up @@ -376,8 +388,8 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
);
ensureGreen(FAILOVER_REMOTE_TO_DOCREP);

logger.info("---> Expanding index to 1 replica copy");
Settings twoReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
logger.info("---> Expanding index to 2 replica copies");
Settings twoReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build();
assertAcked(
internalCluster().client()
.admin()
Expand Down Expand Up @@ -412,7 +424,7 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {

logger.info("---> Stop remote store enabled node");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName));
ensureStableCluster(2);
ensureStableCluster(3);
ensureYellow(FAILOVER_REMOTE_TO_DOCREP);

shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap();
Expand All @@ -433,16 +445,16 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
refreshAndWaitForReplication(FAILOVER_REMOTE_TO_DOCREP);

shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_DOCREP).setDocs(true).get().asMap();
assertEquals(1, shardStatsMap.size());
assertEquals(2, shardStatsMap.size());
shardStatsMap.forEach(
(shardRouting, shardStats) -> { assertEquals(firstBatch + secondBatch, shardStats.getStats().getDocs().getCount()); }
);
}

/*
Scenario:
- Starts 1 docrep backed data node
- Creates an index with 0 replica
- Starts 2 docrep backed data nodes
- Creates an index with 1 replica
- Starts 1 remote backed data node
- Moves primary copy from docrep to remote through _cluster/reroute
- Starts 1 more remote backed data node
Expand All @@ -455,13 +467,13 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
public void testFailoverRemotePrimaryToRemoteReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 1 docrep data node");
String docrepNodeName = internalCluster().startDataOnlyNode();
logger.info("---> Starting 2 docrep data nodes");
List<String> docrepNodeNames = internalCluster().startDataOnlyNodes(2);
internalCluster().validateClusterFormed();
assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0);

logger.info("---> Creating index with 0 replica");
createIndex(FAILOVER_REMOTE_TO_REMOTE, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
logger.info("---> Creating index with 1 replica");
createIndex(FAILOVER_REMOTE_TO_REMOTE, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
initDocRepToRemoteMigration();

Expand All @@ -484,15 +496,17 @@ public void testFailoverRemotePrimaryToRemoteReplica() throws Exception {
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(FAILOVER_REMOTE_TO_REMOTE);
asyncIndexingService.startIndexing();

logger.info("---> Moving primary copy from docrep node {} to remote enabled node {}", docrepNodeName, remoteNodeName1);
String primaryNodeName = primaryNodeName(FAILOVER_REMOTE_TO_REMOTE);
logger.info("---> Moving primary copy from docrep node {} to remote enabled node {}", primaryNodeName, remoteNodeName1);
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, docrepNodeName, remoteNodeName1))
.add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, primaryNodeName, remoteNodeName1))
.get()
);
waitForRelocation();
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
assertEquals(primaryNodeName(FAILOVER_REMOTE_TO_REMOTE), remoteNodeName1);

Expand All @@ -507,7 +521,13 @@ public void testFailoverRemotePrimaryToRemoteReplica() throws Exception {
.indices()
.prepareUpdateSettings()
.setIndices(FAILOVER_REMOTE_TO_REMOTE)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build())
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
// prevent replica copy from being allocated to the extra docrep node
.put("index.routing.allocation.exclude._name", primaryNodeName)
.build()
)
.get()
);
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
Expand Down Expand Up @@ -536,8 +556,8 @@ public void testFailoverRemotePrimaryToRemoteReplica() throws Exception {

logger.info("---> Stop remote store enabled node hosting the primary");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName1));
ensureStableCluster(3);
ensureYellow(FAILOVER_REMOTE_TO_REMOTE);
ensureStableCluster(4);
ensureYellowAndNoInitializingShards(FAILOVER_REMOTE_TO_REMOTE);
DiscoveryNodes finalNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();

waitUntil(() -> {
Expand Down Expand Up @@ -580,7 +600,6 @@ public void testFailoverRemotePrimaryToRemoteReplica() throws Exception {
- Creates an index with 0 replica
- Starts 1 remote backed data node
- Move primary copy from docrep to remote through _cluster/reroute
- Expands index to 1 replica
- Stops remote enabled node
- Ensure doc count is same after failover
- Index some more docs to ensure working of failed-over primary
Expand Down Expand Up @@ -664,7 +683,8 @@ private void assertReplicaAndPrimaryConsistency(String indexName, int firstBatch
RemoteSegmentStats remoteSegmentStats = shardStats.getSegments().getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
} else {
}
if (shardRouting.unassigned() == false && shardRouting.primary() == false) {
boolean remoteNode = nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode();
assertEquals(
"Mismatched doc count. Is this on remote node ? " + remoteNode,
Expand Down
Loading

0 comments on commit 587aed7

Please sign in to comment.