diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java deleted file mode 100644 index 317337ecf72c7..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java +++ /dev/null @@ -1,426 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ratelimitting.admissioncontrol; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.Constants; -import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; -import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; -import org.opensearch.action.admin.indices.stats.ShardStats; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.search.SearchPhaseExecutionException; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.common.UUIDs; -import org.opensearch.common.collect.Tuple; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.node.resource.tracker.ResourceTrackerSettings; -import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; -import org.opensearch.ratelimitting.admissioncontrol.controllers.IoBasedAdmissionController; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; -import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; -import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings; -import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; -import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.After; -import org.junit.Before; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Stream; - -import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; -import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT; -import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT; -import static org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT; -import static org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1) -public class AdmissionControlMultiNodeIT extends OpenSearchIntegTestCase { - - public static final Settings settings = Settings.builder() - .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) - .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) - .put(ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(5000)) - .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) - .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0) - .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0) - .build(); - - private static final Logger LOGGER = LogManager.getLogger(AdmissionControlMultiNodeIT.class); - - public static final String INDEX_NAME = "test_index"; - - @Before - public void init() { - assertAcked( - prepareCreate( - INDEX_NAME, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - ) - ); - ensureGreen(INDEX_NAME); - } - - @After - public void cleanup() { - assertAcked( - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().putNull("*")) - .setTransientSettings(Settings.builder().putNull("*")) - ); - client().admin().indices().prepareDelete(INDEX_NAME).get(); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(settings).build(); - } - - public void testAdmissionControlRejectionOnEnforced() throws Exception { - Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); - String primaryName = primaryReplicaNodeNames.v1(); - String replicaName = primaryReplicaNodeNames.v2(); - String coordinatingOnlyNode = getCoordinatingOnlyNode(); - AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); - AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); - Thread.sleep(6000); - final BulkRequest bulkRequest = new BulkRequest(); - for (int i = 0; i < 3; ++i) { - IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) - .source(Collections.singletonMap("key", randomAlphaOfLength(50))); - bulkRequest.add(request); - } - BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); - assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); - Map admissionControlPrimaryStats = admissionControlServicePrimary.getStats(); - assertEquals( - admissionControlPrimaryStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).rejectionCount.get( - AdmissionControlActionType.INDEXING.getType() - ).longValue(), - 1 - ); - Arrays.stream(res.getItems()).forEach(bulkItemResponse -> { - assertTrue(bulkItemResponse.getFailureMessage().contains("OpenSearchRejectedExecutionException")); - }); - SearchResponse searchResponse; - try { - searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); - } catch (Exception exception) { - assertTrue(((SearchPhaseExecutionException) exception).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); - } - admissionControlPrimaryStats = admissionControlServicePrimary.getStats(); - Map admissionControlReplicaStats = admissionControlServiceReplica.getStats(); - long primaryRejectionCount = admissionControlPrimaryStats.get( - CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER - ).rejectionCount.getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); - long replicaRejectionCount = admissionControlReplicaStats.get( - CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER - ).rejectionCount.getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); - assertTrue(primaryRejectionCount == 1 || replicaRejectionCount == 1); - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - - updateSettingsRequest.transientSettings( - Settings.builder() - .put(settings) - .put(SEARCH_IO_USAGE_LIMIT.getKey(), 0) - .put(INDEXING_IO_USAGE_LIMIT.getKey(), 0) - .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101) - .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101) - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); - if (Constants.LINUX) { - assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); - } else { - LOGGER.info(res.buildFailureMessage()); - assertFalse(res.hasFailures()); - } - admissionControlPrimaryStats = admissionControlServicePrimary.getStats(); - assertEquals( - admissionControlPrimaryStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).rejectionCount.get( - AdmissionControlActionType.INDEXING.getType() - ).longValue(), - 1 - ); - if (Constants.LINUX) { - Arrays.stream(res.getItems()).forEach(bulkItemResponse -> { - assertTrue(bulkItemResponse.getFailureMessage().contains("OpenSearchRejectedExecutionException")); - }); - } - try { - searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); - } catch (Exception exception) { - assertTrue(((SearchPhaseExecutionException) exception).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); - } - admissionControlPrimaryStats = admissionControlServicePrimary.getStats(); - admissionControlReplicaStats = admissionControlServiceReplica.getStats(); - primaryRejectionCount = admissionControlPrimaryStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).rejectionCount - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); - replicaRejectionCount = admissionControlReplicaStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).rejectionCount - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); - assertTrue(primaryRejectionCount == 1 && replicaRejectionCount == 1); - if (Constants.LINUX) { - primaryRejectionCount = admissionControlPrimaryStats.get( - IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER - ).rejectionCount.getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); - replicaRejectionCount = admissionControlReplicaStats.get( - IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER - ).rejectionCount.getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); - assertTrue(primaryRejectionCount == 1 && replicaRejectionCount == 1); - } else { - assertNull(admissionControlPrimaryStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - assertNull(admissionControlReplicaStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } - } - - public void testAdmissionControlEnforcedOnNonACEnabledActions() throws Exception { - String coordinatingOnlyNode = getCoordinatingOnlyNode(); - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - Thread.sleep(6000); - updateSettingsRequest.transientSettings( - Settings.builder() - .put( - CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.ENFORCED.getMode() - ) - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); - nodesStatsRequest.clear() - .indices(true) - .addMetrics( - NodesStatsRequest.Metric.JVM.metricName(), - NodesStatsRequest.Metric.OS.metricName(), - NodesStatsRequest.Metric.FS.metricName(), - NodesStatsRequest.Metric.PROCESS.metricName(), - NodesStatsRequest.Metric.ADMISSION_CONTROL.metricName() - ); - NodesStatsResponse nodesStatsResponse = client(coordinatingOnlyNode).admin().cluster().nodesStats(nodesStatsRequest).actionGet(); - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().health(new ClusterHealthRequest()).actionGet(); - assertEquals(200, clusterHealthResponse.status().getStatus()); - assertFalse(nodesStatsResponse.hasFailures()); - updateSettingsRequest = new ClusterUpdateSettingsRequest(); - - updateSettingsRequest.transientSettings( - Settings.builder() - .put( - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.ENFORCED.getMode() - ) - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - nodesStatsRequest = new NodesStatsRequest(); - nodesStatsRequest.clear() - .indices(true) - .addMetrics( - NodesStatsRequest.Metric.JVM.metricName(), - NodesStatsRequest.Metric.OS.metricName(), - NodesStatsRequest.Metric.FS.metricName(), - NodesStatsRequest.Metric.PROCESS.metricName(), - NodesStatsRequest.Metric.ADMISSION_CONTROL.metricName() - ); - nodesStatsResponse = client(coordinatingOnlyNode).admin().cluster().nodesStats(nodesStatsRequest).actionGet(); - clusterHealthResponse = client().admin().cluster().health(new ClusterHealthRequest()).actionGet(); - assertEquals(200, clusterHealthResponse.status().getStatus()); - assertFalse(nodesStatsResponse.hasFailures()); - } - - public void testAdmissionControlRejectionOnMonitor() throws Exception { - Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); - String primaryName = primaryReplicaNodeNames.v1(); - String replicaName = primaryReplicaNodeNames.v2(); - String coordinatingOnlyNode = getCoordinatingOnlyNode(); - - AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); - AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); - Thread.sleep(6000); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - - updateSettingsRequest.transientSettings( - Settings.builder() - .put( - CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.MONITOR.getMode() - ) - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - final BulkRequest bulkRequest = new BulkRequest(); - for (int i = 0; i < 3; ++i) { - IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) - .source(Collections.singletonMap("key", randomAlphaOfLength(50))); - bulkRequest.add(request); - } - BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); - assertFalse(res.hasFailures()); - Map admissionControlPrimaryStats = admissionControlServicePrimary.getStats(); - Map admissionControlReplicaStats = admissionControlServiceReplica.getStats(); - long primaryRejectionCount = admissionControlPrimaryStats.get( - CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER - ).rejectionCount.getOrDefault(AdmissionControlActionType.INDEXING.getType(), 0L); - long replicaRejectionCount = admissionControlReplicaStats.get( - CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER - ).rejectionCount.getOrDefault(AdmissionControlActionType.INDEXING.getType(), 0L); - assertEquals(primaryRejectionCount, 1L); - assertEquals(replicaRejectionCount, 0L); - SearchResponse searchResponse; - searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); - admissionControlPrimaryStats = admissionControlServicePrimary.getStats(); - admissionControlReplicaStats = admissionControlServiceReplica.getStats(); - primaryRejectionCount = admissionControlPrimaryStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); - replicaRejectionCount = admissionControlReplicaStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); - assertTrue(primaryRejectionCount == 1 || replicaRejectionCount == 1); - assertFalse(primaryRejectionCount == 1 && replicaRejectionCount == 1); - - updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.transientSettings( - Settings.builder() - .put(settings) - .put(SEARCH_IO_USAGE_LIMIT.getKey(), 0) - .put(INDEXING_IO_USAGE_LIMIT.getKey(), 0) - .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101) - .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101) - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - try { - searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); - } catch (Exception exception) { - assertTrue(((SearchPhaseExecutionException) exception).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); - } - admissionControlPrimaryStats = admissionControlServicePrimary.getStats(); - admissionControlReplicaStats = admissionControlServiceReplica.getStats(); - primaryRejectionCount = admissionControlPrimaryStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).rejectionCount - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); - replicaRejectionCount = admissionControlReplicaStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).rejectionCount - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); - assertFalse(primaryRejectionCount == 1 && replicaRejectionCount == 1); - if (Constants.LINUX) { - primaryRejectionCount = admissionControlPrimaryStats.get( - IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER - ).rejectionCount.getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); - replicaRejectionCount = admissionControlReplicaStats.get( - IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER - ).rejectionCount.getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); - - assertTrue(primaryRejectionCount == 1 || replicaRejectionCount == 1); - } else { - assertNull(admissionControlPrimaryStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - assertNull(admissionControlReplicaStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } - } - - public void testAdmissionControlRejectionOnDisabled() throws Exception { - Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); - String primaryName = primaryReplicaNodeNames.v1(); - String replicaName = primaryReplicaNodeNames.v2(); - String coordinatingOnlyNode = getCoordinatingOnlyNode(); - - AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); - AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); - Thread.sleep(6000); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - - updateSettingsRequest.transientSettings( - Settings.builder() - .put( - CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.DISABLED.getMode() - ) - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - final BulkRequest bulkRequest = new BulkRequest(); - for (int i = 0; i < 3; ++i) { - IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) - .source(Collections.singletonMap("key", randomAlphaOfLength(50))); - bulkRequest.add(request); - } - BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); - assertFalse(res.hasFailures()); - Map admissionControlPrimaryStats = admissionControlServicePrimary.getStats(); - Map admissionControlReplicaStats = admissionControlServiceReplica.getStats(); - long primaryRejectionCount = admissionControlPrimaryStats.get( - CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER - ).rejectionCount.getOrDefault(AdmissionControlActionType.INDEXING.getType(), new AtomicLong(0).longValue()); - long replicaRejectionCount = admissionControlReplicaStats.get( - CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER - ).rejectionCount.getOrDefault(AdmissionControlActionType.INDEXING.getType(), new AtomicLong(0).longValue()); - assertEquals(primaryRejectionCount, 0); - assertEquals(replicaRejectionCount, 0); - SearchResponse searchResponse; - searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); - admissionControlPrimaryStats = admissionControlServicePrimary.getStats(); - admissionControlReplicaStats = admissionControlServiceReplica.getStats(); - primaryRejectionCount = admissionControlPrimaryStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); - replicaRejectionCount = admissionControlReplicaStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); - assertTrue(primaryRejectionCount == 0 && replicaRejectionCount == 0); - } - - private Tuple getPrimaryReplicaNodeNames(String indexName) { - IndicesStatsResponse response = client().admin().indices().prepareStats(indexName).get(); - String primaryId = Stream.of(response.getShards()) - .map(ShardStats::getShardRouting) - .filter(ShardRouting::primary) - .findAny() - .get() - .currentNodeId(); - String replicaId = Stream.of(response.getShards()) - .map(ShardStats::getShardRouting) - .filter(sr -> sr.primary() == false) - .findAny() - .get() - .currentNodeId(); - DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes(); - String primaryName = nodes.get(primaryId).getName(); - String replicaName = nodes.get(replicaId).getName(); - return new Tuple<>(primaryName, replicaName); - } - - private String getCoordinatingOnlyNode() { - return client().admin() - .cluster() - .prepareState() - .get() - .getState() - .nodes() - .getCoordinatingOnlyNodes() - .values() - .iterator() - .next() - .getName(); - } -} diff --git a/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java b/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java index 546ae07cde221..621f90e80454c 100644 --- a/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java +++ b/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java @@ -8,6 +8,7 @@ package org.opensearch.node.resource.tracker; +import org.apache.lucene.util.Constants; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -69,6 +70,9 @@ public IoUsageStats getIoUsageStats() { * Checks if all of the resource usage trackers are ready */ public boolean isReady() { + if (Constants.LINUX) { + return memoryUsageTracker.isReady() && cpuUsageTracker.isReady() && ioUsageTracker.isReady(); + } return memoryUsageTracker.isReady() && cpuUsageTracker.isReady(); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java index 67f626696838a..835f88c7a7315 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java @@ -92,18 +92,18 @@ private void applyForTransportLayer(String actionName, AdmissionControlActionTyp private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) { // check if cluster state is ready if (clusterService.state() != null && clusterService.state().nodes() != null) { - long maxIoLimit = this.getIoRejectionThreshold(admissionControlActionType); + long ioUsageThreshold = this.getIoRejectionThreshold(admissionControlActionType); Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics( this.clusterService.state().nodes().getLocalNodeId() ); if (nodePerformanceStatistics.isPresent()) { double ioUsage = nodePerformanceStatistics.get().getIoUsageStats().getIoUtilisationPercent(); - if (ioUsage >= maxIoLimit) { + if (ioUsage >= ioUsageThreshold) { LOGGER.warn( "IoBasedAdmissionController limit reached as the current IO " + "usage [{}] exceeds the allowed limit [{}] for transport action [{}] in admissionControlMode [{}]", ioUsage, - maxIoLimit, + ioUsageThreshold, actionName, this.settings.getTransportLayerAdmissionControllerMode() ); diff --git a/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java b/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java index f2ee0e61c4953..6dd90784ab65f 100644 --- a/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java +++ b/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java @@ -14,24 +14,21 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.transport.TransportAddress; -import org.opensearch.node.resource.tracker.NodeResourceUsageTracker; import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.test.OpenSearchSingleNodeTestCase; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; import org.junit.After; -import org.junit.Before; import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.opensearch.test.ClusterServiceUtils.createClusterService; +import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.greaterThan; /** @@ -39,61 +36,50 @@ * are working as expected */ public class ResourceUsageCollectorServiceTests extends OpenSearchSingleNodeTestCase { + @Override + protected boolean resetNodeAfterTest() { + return true; + } - private ClusterService clusterService; - private ResourceUsageCollectorService collector; - private ThreadPool threadpool; - NodeResourceUsageTracker tracker; - - @Before - public void setUp() throws Exception { - super.setUp(); - - threadpool = new TestThreadPool("resource_usage_collector_tests"); - - clusterService = createClusterService(threadpool); - - Settings settings = Settings.builder() - .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), new TimeValue(500, TimeUnit.MILLISECONDS)) + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(5000)) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) .build(); - tracker = new NodeResourceUsageTracker( - null, - threadpool, - settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) - ); - collector = new ResourceUsageCollectorService(tracker, clusterService, threadpool); - tracker.start(); - collector.start(); } @After - public void tearDown() throws Exception { - super.tearDown(); - threadpool.shutdownNow(); - clusterService.close(); - collector.stop(); - tracker.stop(); - collector.close(); - tracker.close(); + public void cleanup() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("*")) + .setTransientSettings(Settings.builder().putNull("*")) + ); } public void testResourceUsageStats() { - collector.collectNodeResourceUsageStats("node1", System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); - Map nodeStats = collector.getAllNodeStatistics(); + ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class); + resourceUsageCollectorService.collectNodeResourceUsageStats("node1", System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); + Map nodeStats = resourceUsageCollectorService.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); assertEquals(99.0, nodeStats.get("node1").cpuUtilizationPercent, 0.0); assertEquals(97.0, nodeStats.get("node1").memoryUtilizationPercent, 0.0); assertEquals(98, nodeStats.get("node1").getIoUsageStats().getIoUtilisationPercent(), 0.0); - Optional nodeResourceUsageStatsOptional = collector.getNodeStatistics("node1"); + Optional nodeResourceUsageStatsOptional = resourceUsageCollectorService.getNodeStatistics("node1"); assertNotNull(nodeResourceUsageStatsOptional.get()); assertEquals(99.0, nodeResourceUsageStatsOptional.get().cpuUtilizationPercent, 0.0); assertEquals(97.0, nodeResourceUsageStatsOptional.get().memoryUtilizationPercent, 0.0); assertEquals(98, nodeResourceUsageStatsOptional.get().getIoUsageStats().getIoUtilisationPercent(), 0.0); - nodeResourceUsageStatsOptional = collector.getNodeStatistics("node2"); + nodeResourceUsageStatsOptional = resourceUsageCollectorService.getNodeStatistics("node2"); assertTrue(nodeResourceUsageStatsOptional.isEmpty()); } @@ -101,26 +87,29 @@ public void testScheduler() throws Exception { /** * Wait for cluster state to be ready so that localNode().getId() is ready and we add the values to the map */ - assertBusy(() -> assertTrue(collector.getNodeStatistics(clusterService.localNode().getId()).isPresent()), 1, TimeUnit.MINUTES); - assertTrue(collector.getNodeStatistics(clusterService.localNode().getId()).isPresent()); + ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + assertBusy(() -> assertEquals(1, resourceUsageCollectorService.getAllNodeStatistics().size())); + /** * Wait for memory utilization to be reported greater than 0 */ assertBusy( () -> assertThat( - collector.getNodeStatistics(clusterService.localNode().getId()).get().getMemoryUtilizationPercent(), + resourceUsageCollectorService.getNodeStatistics(clusterService.localNode().getId()).get().getMemoryUtilizationPercent(), greaterThan(0.0) ), 5, TimeUnit.SECONDS ); - assertTrue(collector.getNodeStatistics("Invalid").isEmpty()); + assertTrue(resourceUsageCollectorService.getNodeStatistics("Invalid").isEmpty()); } /* * Test that concurrently adding values and removing nodes does not cause exceptions */ public void testConcurrentAddingAndRemovingNodes() throws Exception { + ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class); String[] nodes = new String[] { "a", "b", "c", "d" }; final CountDownLatch latch = new CountDownLatch(5); @@ -134,9 +123,9 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { } for (int i = 0; i < randomIntBetween(100, 200); i++) { if (randomBoolean()) { - collector.removeNodeResourceUsageStats(randomFrom(nodes)); + resourceUsageCollectorService.removeNodeResourceUsageStats(randomFrom(nodes)); } - collector.collectNodeResourceUsageStats( + resourceUsageCollectorService.collectNodeResourceUsageStats( randomFrom(nodes), System.currentTimeMillis(), randomIntBetween(1, 100), @@ -161,7 +150,7 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { t3.join(); t4.join(); - final Map nodeStats = collector.getAllNodeStatistics(); + final Map nodeStats = resourceUsageCollectorService.getAllNodeStatistics(); for (String nodeId : nodes) { if (nodeStats.containsKey(nodeId)) { assertThat(nodeStats.get(nodeId).memoryUtilizationPercent, greaterThan(0.0)); @@ -172,14 +161,15 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { } public void testNodeRemoval() { - collector.collectNodeResourceUsageStats( + ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class); + resourceUsageCollectorService.collectNodeResourceUsageStats( "node1", System.currentTimeMillis(), randomIntBetween(1, 100), randomIntBetween(1, 100), new IoUsageStats(randomIntBetween(1, 100)) ); - collector.collectNodeResourceUsageStats( + resourceUsageCollectorService.collectNodeResourceUsageStats( "node2", System.currentTimeMillis(), randomIntBetween(1, 100), @@ -199,8 +189,8 @@ public void testNodeRemoval() { .build(); ClusterChangedEvent event = new ClusterChangedEvent("test", newState, previousState); - collector.clusterChanged(event); - final Map nodeStats = collector.getAllNodeStatistics(); + resourceUsageCollectorService.clusterChanged(event); + final Map nodeStats = resourceUsageCollectorService.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); assertFalse(nodeStats.containsKey("node2")); } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java index bd209b3d2b183..86acbe414979f 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java @@ -45,6 +45,8 @@ */ public class AdmissionControlSingleNodeTests extends OpenSearchSingleNodeTestCase { + public static final String INDEX_NAME = "test_index"; + @Override protected boolean resetNodeAfterTest() { return true; @@ -52,6 +54,7 @@ protected boolean resetNodeAfterTest() { @After public void cleanup() { + client().admin().indices().prepareDelete(INDEX_NAME).get(); assertAcked( client().admin() .cluster() @@ -77,11 +80,10 @@ protected Settings nodeSettings() { public void testAdmissionControlRejectionEnforcedMode() throws Exception { ensureGreen(); assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); - Thread.sleep(6000); - client().admin().indices().prepareCreate("index").execute().actionGet(); + client().admin().indices().prepareCreate(INDEX_NAME).execute().actionGet(); BulkRequestBuilder bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); } // Verify that cluster state is updated ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); @@ -108,10 +110,10 @@ public void testAdmissionControlRejectionEnforcedMode() throws Exception { } else { assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); } - client().admin().indices().prepareRefresh("index").get(); + client().admin().indices().prepareRefresh(INDEX_NAME).get(); // verify search request hits 429 - SearchRequest searchRequest = new SearchRequest("index"); + SearchRequest searchRequest = new SearchRequest(INDEX_NAME); try { client().search(searchRequest).actionGet(); } catch (Exception e) { @@ -147,7 +149,7 @@ public void testAdmissionControlRejectionEnforcedMode() throws Exception { bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); } res = client().bulk(bulk.request()).actionGet(); if (Constants.LINUX) { @@ -171,10 +173,10 @@ public void testAdmissionControlRejectionEnforcedMode() throws Exception { } else { assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); } - client().admin().indices().prepareRefresh("index").get(); + client().admin().indices().prepareRefresh(INDEX_NAME).get(); // verify search request hits 429 - searchRequest = new SearchRequest("index"); + searchRequest = new SearchRequest(INDEX_NAME); try { client().search(searchRequest).actionGet(); } catch (Exception e) { @@ -201,8 +203,6 @@ public void testAdmissionControlRejectionEnforcedMode() throws Exception { public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); - Thread.sleep(6000); - // Verify that cluster state is updated ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); assertThat(future2.isDone(), is(true)); @@ -216,7 +216,7 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { BulkRequestBuilder bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); } // verify bulk request success but admission control having rejections stats BulkResponse res = client().bulk(bulk.request()).actionGet(); @@ -229,10 +229,10 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { .getRejectionCount() .get(AdmissionControlActionType.INDEXING.getType()) ); - client().admin().indices().prepareRefresh("index").get(); + client().admin().indices().prepareRefresh(INDEX_NAME).get(); // verify search request success but admission control having rejections stats - SearchRequest searchRequest = new SearchRequest("index"); + SearchRequest searchRequest = new SearchRequest(INDEX_NAME); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(3, searchResponse.getHits().getHits().length); acStats = admissionControlService.getStats(); @@ -255,7 +255,7 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); } // verify bulk request success but admission control having rejections stats res = client().bulk(bulk.request()).actionGet(); @@ -277,7 +277,7 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { } else { assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); } - searchRequest = new SearchRequest("index"); + searchRequest = new SearchRequest(INDEX_NAME); searchResponse = client().search(searchRequest).actionGet(); assertEquals(3, searchResponse.getHits().getHits().length); acStats = admissionControlService.getStats(); @@ -301,8 +301,6 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { public void testAdmissionControlRejectionDisabledMode() throws Exception { assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); - Thread.sleep(6000); - // Verify that cluster state is updated ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); assertThat(future2.isDone(), is(true)); @@ -314,7 +312,7 @@ public void testAdmissionControlRejectionDisabledMode() throws Exception { BulkRequestBuilder bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); } // verify bulk request success and no rejections BulkResponse res = client().bulk(bulk.request()).actionGet(); @@ -323,10 +321,10 @@ public void testAdmissionControlRejectionDisabledMode() throws Exception { Map acStats = admissionControlService.getStats(); assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - client().admin().indices().prepareRefresh("index").get(); + client().admin().indices().prepareRefresh(INDEX_NAME).get(); // verify search request success and no rejections - SearchRequest searchRequest = new SearchRequest("index"); + SearchRequest searchRequest = new SearchRequest(INDEX_NAME); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(3, searchResponse.getHits().getHits().length); acStats = admissionControlService.getStats(); @@ -343,7 +341,7 @@ public void testAdmissionControlRejectionDisabledMode() throws Exception { assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); } // verify bulk request success but admission control having rejections stats res = client().bulk(bulk.request()).actionGet(); @@ -356,7 +354,7 @@ public void testAdmissionControlRejectionDisabledMode() throws Exception { assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); } - searchRequest = new SearchRequest("index"); + searchRequest = new SearchRequest(INDEX_NAME); searchResponse = client().search(searchRequest).actionGet(); assertEquals(3, searchResponse.getHits().getHits().length); acStats = admissionControlService.getStats(); @@ -370,8 +368,6 @@ public void testAdmissionControlRejectionDisabledMode() throws Exception { public void testAdmissionControlWithinLimits() throws Exception { assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); - Thread.sleep(6000); - // Verify that cluster state is updated ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); assertThat(future2.isDone(), is(true)); @@ -389,7 +385,7 @@ public void testAdmissionControlWithinLimits() throws Exception { BulkRequestBuilder bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); } // verify bulk request success and no rejections BulkResponse res = client().bulk(bulk.request()).actionGet(); @@ -402,10 +398,10 @@ public void testAdmissionControlWithinLimits() throws Exception { } else { assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); } - client().admin().indices().prepareRefresh("index").get(); + client().admin().indices().prepareRefresh(INDEX_NAME).get(); // verify search request success and no rejections - SearchRequest searchRequest = new SearchRequest("index"); + SearchRequest searchRequest = new SearchRequest(INDEX_NAME); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(3, searchResponse.getHits().getHits().length); acStats = admissionControlService.getStats();