Skip to content

Commit

Permalink
Minor bug fix related to cluster manager throttling settings
Browse files Browse the repository at this point in the history
Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel committed Dec 13, 2022
1 parent bceb40c commit a5998a8
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211))
- Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366))
- Added experimental extensions to main ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347))
- Add minor bug fix for cluster manager throttling settings ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
Expand Down Expand Up @@ -50,15 +52,18 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private final Supplier<Version> minNodeVersionSupplier;

public ClusterManagerTaskThrottler(
final Settings settings,
final ClusterSettings clusterSettings,
final Supplier<Version> minNodeVersionSupplier,
final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener
) {
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
this.minNodeVersionSupplier = minNodeVersionSupplier;
this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener;
tasksCount = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment
tasksThreshold = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment
this.minNodeVersionSupplier = minNodeVersionSupplier;
this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener;
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
// Required for setting values as per current settings during node bootstrap
updateSetting(THRESHOLD_SETTINGS.get(settings));
}

/**
Expand Down Expand Up @@ -128,10 +133,21 @@ void validateSetting(final Settings settings) {
}
}

void updateSetting(final Settings settings) {
Map<String, Settings> groups = settings.getAsGroups();
for (String key : groups.keySet()) {
updateLimit(key, groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE));
void updateSetting(final Settings newSettings) {
Map<String, Settings> groups = newSettings.getAsGroups();
Set<String> settingKeys = new HashSet<>();
// Adding keys which are present in new Setting
settingKeys.addAll(groups.keySet());
// Adding already configured keys as well,
// we might need to set it to default value if it's value is null in new setting
settingKeys.addAll(tasksThreshold.keySet());
for (String key : settingKeys) {
Settings setting = groups.get(key);
if (setting == null) {
updateLimit(key, MIN_THRESHOLD_VALUE);
} else {
updateLimit(key, setting.getAsInt("value", MIN_THRESHOLD_VALUE));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,12 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
);

this.throttlingStats = new ClusterManagerThrottlingStats();
this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(clusterSettings, this::getMinNodeVersion, throttlingStats);
this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
this::getMinNodeVersion,
throttlingStats
);
this.threadPool = threadPool;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ClusterManagerTaskThrottlerTests extends OpenSearchTestCase {
private DiscoveryNode localNode;
private DiscoveryNode[] allNodes;
private ClusterManagerThrottlingStats throttlingStats;
private Settings settings = Settings.EMPTY;

@BeforeClass
public static void beforeClass() {
Expand Down Expand Up @@ -82,6 +83,7 @@ public static void afterClass() {
public void testDefaults() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand All @@ -103,6 +105,7 @@ public void testValidateSettingsForDifferentVersion() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand Down Expand Up @@ -135,6 +138,7 @@ public void testValidateSettingsForTaskWihtoutRetryOnDataNode() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand All @@ -148,6 +152,60 @@ public void testValidateSettingsForTaskWihtoutRetryOnDataNode() {
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings));
}

public void testUpdateSettingsForNullValue() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode })
);

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
);
throttler.registerClusterManagerTask("put-mapping", true);

// set some limit for put-mapping tasks
int newLimit = randomIntBetween(1, 10);
Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build();
clusterSettings.applySettings(newSettings);
assertEquals(newLimit, throttler.getThrottlingLimit("put-mapping").intValue());

// set limit to null
Settings nullSettings = Settings.builder().build();
clusterSettings.applySettings(nullSettings);
assertNull(throttler.getThrottlingLimit("put-mapping"));
}

public void testSettingsOnBootstrap() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode })
);

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
int put_mapping_threshold_value = randomIntBetween(1, 10);
Settings initialSettings = Settings.builder()
.put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value)
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
initialSettings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
);
throttler.registerClusterManagerTask("put-mapping", true);

// assert that limit is applied on throttler
assertEquals(put_mapping_threshold_value, throttler.getThrottlingLimit("put-mapping").intValue());
}

public void testValidateSettingsForUnknownTask() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
Expand All @@ -158,14 +216,14 @@ public void testValidateSettingsForUnknownTask() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
);

// set some limit for update snapshot tasks
int newLimit = randomIntBetween(1, 10);

Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.random-task.value", newLimit).build();
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings));
}
Expand All @@ -180,6 +238,7 @@ public void testUpdateThrottlingLimitForBasicSanity() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand Down Expand Up @@ -209,6 +268,7 @@ public void testValidateSettingForLimit() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand All @@ -222,6 +282,7 @@ public void testValidateSettingForLimit() {
public void testUpdateLimit() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand Down Expand Up @@ -258,6 +319,7 @@ public void testThrottlingForDisabledThrottlingTask() {
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand All @@ -278,6 +340,7 @@ public void testThrottling() {
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
settings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
Expand Down

0 comments on commit a5998a8

Please sign in to comment.