Skip to content

Commit

Permalink
Minor bug fix related to cluster manager throttling settings (#5524)
Browse files Browse the repository at this point in the history
* Minor bug fix related to cluster manager throttling settings

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>

* Incorporated comments

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel authored Dec 16, 2022
1 parent d27c30a commit b782299
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 28 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495))
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))


### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down Expand Up @@ -113,6 +114,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Fixed
- Fix 1.x compatibility bug with stored Tasks ([#5412](https://github.com/opensearch-project/OpenSearch/pull/5412))
- Fix case sensitivity for wildcard queries ([#5462](https://github.com/opensearch-project/OpenSearch/pull/5462))
- Apply cluster manager throttling settings during bootstrap ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))
- Update thresholds map when cluster manager throttling setting is removed ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.4...HEAD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
Expand Down Expand Up @@ -50,15 +52,18 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private final Supplier<Version> minNodeVersionSupplier;

public ClusterManagerTaskThrottler(
final Settings settings,
final ClusterSettings clusterSettings,
final Supplier<Version> minNodeVersionSupplier,
final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener
) {
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
this.minNodeVersionSupplier = minNodeVersionSupplier;
this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener;
tasksCount = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment
tasksThreshold = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment
this.minNodeVersionSupplier = minNodeVersionSupplier;
this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener;
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
// Required for setting values as per current settings during node bootstrap
updateSetting(THRESHOLD_SETTINGS.get(settings));
}

/**
Expand Down Expand Up @@ -128,10 +133,16 @@ void validateSetting(final Settings settings) {
}
}

void updateSetting(final Settings settings) {
Map<String, Settings> groups = settings.getAsGroups();
for (String key : groups.keySet()) {
updateLimit(key, groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE));
void updateSetting(final Settings newSettings) {
Map<String, Settings> groups = newSettings.getAsGroups();
Set<String> settingKeys = new HashSet<>();
// Adding keys which are present in new Setting
settingKeys.addAll(groups.keySet());
// Adding existing keys that may need to be set to a default value if that is removed in new setting.
settingKeys.addAll(tasksThreshold.keySet());
for (String key : settingKeys) {
Settings setting = groups.get(key);
updateLimit(key, setting == null ? MIN_THRESHOLD_VALUE : setting.getAsInt("value", MIN_THRESHOLD_VALUE));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,12 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
);

this.throttlingStats = new ClusterManagerThrottlingStats();
this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(clusterSettings, this::getMinNodeVersion, throttlingStats);
this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
this::getMinNodeVersion,
throttlingStats
);
this.threadPool = threadPool;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ public class ClusterManagerTaskThrottlerTests extends OpenSearchTestCase {

private static ThreadPool threadPool;
private ClusterService clusterService;
private DiscoveryNode localNode;
private DiscoveryNode[] allNodes;
private ClusterManagerThrottlingStats throttlingStats;

@BeforeClass
public static void beforeClass() {
Expand All @@ -56,15 +53,6 @@ public static void beforeClass() {
public void setUp() throws Exception {
super.setUp();
clusterService = ClusterServiceUtils.createClusterService(threadPool);
localNode = new DiscoveryNode(
"local_node",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE),
Version.V_2_4_0
);
allNodes = new DiscoveryNode[] { localNode };
throttlingStats = new ClusterManagerThrottlingStats();
}

@After
Expand All @@ -82,9 +70,10 @@ public static void afterClass() {
public void testDefaults() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);
throttler.registerClusterManagerTask("create-index", true);
Expand All @@ -103,9 +92,10 @@ public void testValidateSettingsForDifferentVersion() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

Expand Down Expand Up @@ -135,9 +125,10 @@ public void testValidateSettingsForTaskWihtoutRetryOnDataNode() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", false);

Expand All @@ -148,6 +139,60 @@ public void testValidateSettingsForTaskWihtoutRetryOnDataNode() {
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings));
}

public void testUpdateSettingsForNullValue() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode })
);

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

// set some limit for put-mapping tasks
int newLimit = randomIntBetween(1, 10);
Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build();
clusterSettings.applySettings(newSettings);
assertEquals(newLimit, throttler.getThrottlingLimit("put-mapping").intValue());

// set limit to null
Settings nullSettings = Settings.builder().build();
clusterSettings.applySettings(nullSettings);
assertNull(throttler.getThrottlingLimit("put-mapping"));
}

public void testSettingsOnBootstrap() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode })
);

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
int put_mapping_threshold_value = randomIntBetween(1, 10);
Settings initialSettings = Settings.builder()
.put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value)
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
initialSettings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

// assert that limit is applied on throttler
assertEquals(put_mapping_threshold_value, throttler.getThrottlingLimit("put-mapping").intValue());
}

public void testValidateSettingsForUnknownTask() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
Expand All @@ -158,14 +203,14 @@ public void testValidateSettingsForUnknownTask() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);

// set some limit for update snapshot tasks
int newLimit = randomIntBetween(1, 10);

Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.random-task.value", newLimit).build();
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings));
}
Expand All @@ -180,9 +225,10 @@ public void testUpdateThrottlingLimitForBasicSanity() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

Expand All @@ -209,9 +255,10 @@ public void testValidateSettingForLimit() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

Expand All @@ -222,9 +269,10 @@ public void testValidateSettingForLimit() {
public void testUpdateLimit() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
new ClusterManagerThrottlingStats()
);
throttler.registerClusterManagerTask("put-mapping", true);

Expand Down Expand Up @@ -255,9 +303,11 @@ private DiscoveryNode getClusterManagerNode(Version version) {
}

public void testThrottlingForDisabledThrottlingTask() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand All @@ -275,9 +325,11 @@ public void testThrottlingForDisabledThrottlingTask() {
}

public void testThrottling() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
Settings.EMPTY,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand Down

0 comments on commit b782299

Please sign in to comment.