Skip to content

Commit

Permalink
Addressing Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
  • Loading branch information
Ajay Kumar Movva committed Mar 14, 2024
1 parent 1985da4 commit 4626c01
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand Down Expand Up @@ -63,16 +61,18 @@ public AdmissionControlService(
this.clusterService = clusterService;
this.settings = settings;
this.resourceUsageCollectorService = resourceUsageCollectorService;
this.initialise();
this.initialize();
}

/**
* Initialise and Register all the admissionControllers
*/
private void initialise() {
private void initialize() {
// Initialise different type of admission controllers
registerAdmissionController(CPU_BASED_ADMISSION_CONTROLLER);
registerAdmissionController(IO_BASED_ADMISSION_CONTROLLER);
if (Constants.LINUX) {
registerAdmissionController(IO_BASED_ADMISSION_CONTROLLER);
}
}

/**
Expand Down Expand Up @@ -143,29 +143,11 @@ public AdmissionControlStats stats() {
List<AdmissionControllerStats> statsList = new ArrayList<>();
if (!this.admissionControllers.isEmpty()) {
this.admissionControllers.forEach((controllerName, admissionController) -> {
if (controllerName.equals(IO_BASED_ADMISSION_CONTROLLER)) {
if (Constants.LINUX) {
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController);
if (admissionControllerStats.rejectionCount != null) {
statsList.add(admissionControllerStats);
}
}
} else {
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController);
statsList.add(admissionControllerStats);
}
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController);
statsList.add(admissionControllerStats);
});
return new AdmissionControlStats(statsList);
}
return null;
}

// used for testing
Map<String, AdmissionControllerStats> getStats() {
Map<String, AdmissionControllerStats> acStats = new HashMap<>();
for (AdmissionControllerStats admissionControllerStats : this.stats().getAdmissionControllerStatsList()) {
acStats.put(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats);
}
return acStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.node.NodeResourceUsageStats;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings;

import java.util.Locale;
Expand Down Expand Up @@ -47,14 +45,6 @@ public IoBasedAdmissionController(
this.settings = new IoBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings);
}

@Override
public boolean isEnabledForTransportLayer(AdmissionControlMode admissionControlMode) {
if (Constants.LINUX) {
return super.isEnabledForTransportLayer(admissionControlMode);
}
return false;
}

/**
* Apply admission control based on the resource usage for an action
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.ratelimitting.admissioncontrol;

import org.apache.lucene.util.Constants;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -48,13 +49,21 @@ public void tearDown() throws Exception {

public void testWhenAdmissionControllerRegistered() {
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null);
assertEquals(admissionControlService.getAdmissionControllers().size(), 2);
if (Constants.LINUX) {
assertEquals(admissionControlService.getAdmissionControllers().size(), 2);
} else {
assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
}
}

public void testRegisterInvalidAdmissionController() {
String test = "TEST";
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null);
assertEquals(admissionControlService.getAdmissionControllers().size(), 2);
if (Constants.LINUX) {
assertEquals(admissionControlService.getAdmissionControllers().size(), 2);
} else {
assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
}
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> admissionControlService.registerAdmissionController(test)
Expand All @@ -66,7 +75,11 @@ public void testAdmissionControllerSettings() {
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null);
AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings;
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
assertEquals(admissionControllerList.size(), 2);
if (Constants.LINUX) {
assertEquals(admissionControllerList.size(), 2);
} else {
assertEquals(admissionControllerList.size(), 1);
}
CpuBasedAdmissionController cpuBasedAdmissionController = (CpuBasedAdmissionController) admissionControlService
.getAdmissionController(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER);
assertEquals(
Expand Down Expand Up @@ -132,7 +145,11 @@ public void testApplyAdmissionControllerEnabled() {
.build();
clusterService.getClusterSettings().applySettings(settings);
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
assertEquals(admissionControllerList.size(), 2);
if (Constants.LINUX) {
assertEquals(admissionControllerList.size(), 2);
} else {
assertEquals(admissionControllerList.size(), 1);
}
}

public void testApplyAdmissionControllerEnforced() {
Expand All @@ -153,6 +170,10 @@ public void testApplyAdmissionControllerEnforced() {
.build();
clusterService.getClusterSettings().applySettings(settings);
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
assertEquals(admissionControllerList.size(), 2);
if (Constants.LINUX) {
assertEquals(admissionControllerList.size(), 2);
} else {
assertEquals(admissionControllerList.size(), 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.junit.After;

import java.util.HashMap;
import java.util.Map;

import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
Expand Down Expand Up @@ -93,7 +94,7 @@ public void testAdmissionControlRejectionEnforcedMode() throws Exception {
BulkResponse res = client().bulk(bulk.request()).actionGet();
assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus());
AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class);
Map<String, AdmissionControllerStats> acStats = admissionControlService.getStats();
Map<String, AdmissionControllerStats> acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(
1,
(long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
Expand All @@ -119,7 +120,7 @@ public void testAdmissionControlRejectionEnforcedMode() throws Exception {
} catch (Exception e) {
assertTrue(((SearchPhaseExecutionException) e).getDetailedMessage().contains("OpenSearchRejectedExecutionException"));
}
acStats = admissionControlService.getStats();
acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(
1,
(long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
Expand Down Expand Up @@ -156,7 +157,7 @@ public void testAdmissionControlRejectionEnforcedMode() throws Exception {
assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus());
}
admissionControlService = getInstanceFromNode(AdmissionControlService.class);
acStats = admissionControlService.getStats();
acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(
1,
(long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
Expand All @@ -182,7 +183,7 @@ public void testAdmissionControlRejectionEnforcedMode() throws Exception {
} catch (Exception e) {
assertTrue(((SearchPhaseExecutionException) e).getDetailedMessage().contains("OpenSearchRejectedExecutionException"));
}
acStats = admissionControlService.getStats();
acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(
1,
(long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
Expand Down Expand Up @@ -222,7 +223,7 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception {
BulkResponse res = client().bulk(bulk.request()).actionGet();
assertFalse(res.hasFailures());
AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class);
Map<String, AdmissionControllerStats> acStats = admissionControlService.getStats();
Map<String, AdmissionControllerStats> acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(
1,
(long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
Expand All @@ -235,7 +236,7 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception {
SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(3, searchResponse.getHits().getHits().length);
acStats = admissionControlService.getStats();
acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(
1,
(long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
Expand All @@ -260,7 +261,7 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception {
// verify bulk request success but admission control having rejections stats
res = client().bulk(bulk.request()).actionGet();
assertFalse(res.hasFailures());
acStats = admissionControlService.getStats();
acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(
1,
(long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
Expand All @@ -280,7 +281,7 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception {
searchRequest = new SearchRequest(INDEX_NAME);
searchResponse = client().search(searchRequest).actionGet();
assertEquals(3, searchResponse.getHits().getHits().length);
acStats = admissionControlService.getStats();
acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(
1,
(long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
Expand Down Expand Up @@ -318,7 +319,7 @@ public void testAdmissionControlRejectionDisabledMode() throws Exception {
BulkResponse res = client().bulk(bulk.request()).actionGet();
assertFalse(res.hasFailures());
AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class);
Map<String, AdmissionControllerStats> acStats = admissionControlService.getStats();
Map<String, AdmissionControllerStats> acStats = this.getAdmissionControlStats(admissionControlService);

assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
client().admin().indices().prepareRefresh(INDEX_NAME).get();
Expand All @@ -327,7 +328,7 @@ public void testAdmissionControlRejectionDisabledMode() throws Exception {
SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(3, searchResponse.getHits().getHits().length);
acStats = admissionControlService.getStats();
acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(
Expand All @@ -346,7 +347,7 @@ public void testAdmissionControlRejectionDisabledMode() throws Exception {
// verify bulk request success but admission control having rejections stats
res = client().bulk(bulk.request()).actionGet();
assertFalse(res.hasFailures());
acStats = admissionControlService.getStats();
acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
if (Constants.LINUX) {
assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
Expand All @@ -357,7 +358,7 @@ public void testAdmissionControlRejectionDisabledMode() throws Exception {
searchRequest = new SearchRequest(INDEX_NAME);
searchResponse = client().search(searchRequest).actionGet();
assertEquals(3, searchResponse.getHits().getHits().length);
acStats = admissionControlService.getStats();
acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
if (Constants.LINUX) {
assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
Expand Down Expand Up @@ -391,7 +392,7 @@ public void testAdmissionControlWithinLimits() throws Exception {
BulkResponse res = client().bulk(bulk.request()).actionGet();
assertFalse(res.hasFailures());
AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class);
Map<String, AdmissionControllerStats> acStats = admissionControlService.getStats();
Map<String, AdmissionControllerStats> acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
if (Constants.LINUX) {
assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
Expand All @@ -404,12 +405,20 @@ public void testAdmissionControlWithinLimits() throws Exception {
SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(3, searchResponse.getHits().getHits().length);
acStats = admissionControlService.getStats();
acStats = this.getAdmissionControlStats(admissionControlService);
assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
if (Constants.LINUX) {
assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
} else {
assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
}
}

Map<String, AdmissionControllerStats> getAdmissionControlStats(AdmissionControlService admissionControlService) {
Map<String, AdmissionControllerStats> acStats = new HashMap<>();
for (AdmissionControllerStats admissionControllerStats : admissionControlService.stats().getAdmissionControllerStatsList()) {
acStats.put(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats);
}
return acStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.ratelimitting.admissioncontrol.controllers;

import org.apache.lucene.util.Constants;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -77,15 +76,7 @@ public void testCheckUpdateSettings() {
assertEquals(admissionController.getName(), IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER);
assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0);
assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED);
if (Constants.LINUX) {
assertTrue(
admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())
);
} else {
assertFalse(
admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())
);
}
assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode()));
}

public void testApplyControllerWithDefaultSettings() {
Expand Down Expand Up @@ -117,15 +108,7 @@ public void testApplyControllerWhenSettingsEnabled() throws Exception {
clusterService,
settings
);
if (Constants.LINUX) {
assertTrue(
admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())
);
} else {
assertFalse(
admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())
);
}
assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode()));
assertTrue(
admissionController.isAdmissionControllerEnforced(admissionController.settings.getTransportLayerAdmissionControllerMode())
);
Expand Down

0 comments on commit 4626c01

Please sign in to comment.