Skip to content

Commit

Permalink
Add default config change setting for replica count
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed Dec 21, 2022
1 parent d76adf3 commit 65a1d52
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance.USE_FORCE_ZONE_FOR_REPLICA_SETTING;
import static org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance.maxAwarenessAttributes;

/**
* Service responsible for submitting create index requests
Expand Down Expand Up @@ -554,7 +556,6 @@ private ClusterState applyCreateIndexRequestWithV1Templates(
xContentRegistry
)
);

final Settings aggregatedIndexSettings = aggregateIndexSettings(
currentState,
request,
Expand Down Expand Up @@ -862,7 +863,12 @@ static Settings aggregateIndexSettings(
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, INDEX_NUMBER_OF_SHARDS_SETTING.get(settings));
}
if (INDEX_NUMBER_OF_REPLICAS_SETTING.exists(indexSettingsBuilder) == false) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings));
if(USE_FORCE_ZONE_FOR_REPLICA_SETTING.get(currentState.metadata().settings())) {
int replicaCount = maxAwarenessAttributes(currentState.metadata().settings()) - 1;
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, replicaCount);
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings));
}
}
if (settings.get(SETTING_AUTO_EXPAND_REPLICAS) != null && indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS) == null) {
indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS));
Expand Down Expand Up @@ -1194,6 +1200,9 @@ List<String> getIndexSettingsValidationErrors(
IndexMetadata.SETTING_NUMBER_OF_REPLICAS,
INDEX_NUMBER_OF_REPLICAS_SETTING.getDefault(Settings.EMPTY)
);
if(INDEX_NUMBER_OF_REPLICAS_SETTING.exists(settings) == false && awarenessReplicaBalance.getUseForceZoneForReplicaSetting()){
replicaCount = awarenessReplicaBalance.maxAwarenessAttributes() - 1;
}
AutoExpandReplicas autoExpandReplica = AutoExpandReplicas.SETTING.get(settings);
Optional<String> error = awarenessReplicaBalance.validate(replicaCount, autoExpandReplica);
if (error.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@

package org.opensearch.cluster.routing.allocation;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.AutoExpandReplicas;
import org.opensearch.cluster.metadata.MetadataCreateIndexService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.replication.common.ReplicationType;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;

import static java.lang.Math.log;
import static java.lang.Math.max;
import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING;
Expand All @@ -31,19 +33,56 @@
* Helps in balancing shards across all awareness attributes and ensuring high availability of data.
*/
public class AwarenessReplicaBalance {
public static final String SETTING_CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE = "cluster.routing.allocation.awareness.balance";
public static final String SETTING_USE_FORCE_ZONE_FOR_REPLICA = "cluster.use_force_zone_for_replica";
private static final Logger logger = LogManager.getLogger(AwarenessReplicaBalance.class);
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING = Setting.boolSetting(
"cluster.routing.allocation.awareness.balance",
SETTING_CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Boolean> USE_FORCE_ZONE_FOR_REPLICA_SETTING = Setting.boolSetting(
SETTING_USE_FORCE_ZONE_FOR_REPLICA,
false,
new Setting.Validator<>() {

@Override
public void validate(final Boolean value) {}

@Override
public void validate(final Boolean value, final Map<Setting<?>, Object> settings) {
final Boolean clusterAwarenessSetting = (Boolean) settings.get(CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING);
if (clusterAwarenessSetting == false && value == true) {
throw new IllegalArgumentException(
"To enable "
+ USE_FORCE_ZONE_FOR_REPLICA_SETTING.getKey()
+ ", "
+ CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey()
+ " should be enabled "
);
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING);
return settings.iterator();
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private volatile List<String> awarenessAttributes;

private volatile Map<String, List<String>> forcedAwarenessAttributes;

private volatile Boolean awarenessBalance;

private volatile Boolean useForceZoneForReplica;

public AwarenessReplicaBalance(Settings settings, ClusterSettings clusterSettings) {
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes);
Expand All @@ -54,14 +93,22 @@ public AwarenessReplicaBalance(Settings settings, ClusterSettings clusterSetting
);
setAwarenessBalance(CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, this::setAwarenessBalance);

this.useForceZoneForReplica = USE_FORCE_ZONE_FOR_REPLICA_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_FORCE_ZONE_FOR_REPLICA_SETTING, this::setUseForceZoneForReplica);
}

private void setAwarenessBalance(Boolean awarenessBalance) {
this.awarenessBalance = awarenessBalance;
}

private void setForcedAwarenessAttributes(Settings forceSettings) {
this.forcedAwarenessAttributes = getForcedAwarenessAttributes(forceSettings);
}

private void setUseForceZoneForReplica(Boolean useForceZoneForReplica){
this.useForceZoneForReplica = useForceZoneForReplica;
}
public static Map<String, List<String>> getForcedAwarenessAttributes(Settings forceSettings) {
Map<String, List<String>> forcedAwarenessAttributes = new HashMap<>();
Map<String, Settings> forceGroups = forceSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : forceGroups.entrySet()) {
Expand All @@ -70,13 +117,16 @@ private void setForcedAwarenessAttributes(Settings forceSettings) {
forcedAwarenessAttributes.put(entry.getKey(), aValues);
}
}
this.forcedAwarenessAttributes = forcedAwarenessAttributes;
return forcedAwarenessAttributes;
}

private void setAwarenessAttributes(List<String> awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

public Boolean getUseForceZoneForReplicaSetting(){
return this.useForceZoneForReplica;
}
/*
For a cluster having zone as awareness attribute , it will return the size of zones if set it forced awareness attributes
Expand All @@ -102,6 +152,24 @@ public int maxAwarenessAttributes() {
return awarenessAttributes;
}

public static int maxAwarenessAttributes(Settings settings) {
Boolean awarenessBalance = CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.get(settings);
List<String> awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
Map<String, List<String>> forcedAwarenessAttributes = getForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings));
logger.info(forcedAwarenessAttributes);
logger.info(awarenessAttributes);
logger.info(awarenessBalance);
int defaultAwarenessAttributes = 1;
if (awarenessBalance == false) {
return defaultAwarenessAttributes;
}
for (String awarenessAttribute : awarenessAttributes) {
if (forcedAwarenessAttributes.containsKey(awarenessAttribute)) {
defaultAwarenessAttributes = max(defaultAwarenessAttributes, forcedAwarenessAttributes.get(awarenessAttribute).size());
}
}
return defaultAwarenessAttributes;
}
public Optional<String> validate(int replicaCount, AutoExpandReplicas autoExpandReplica) {
if (autoExpandReplica.isEnabled()) {
if ((autoExpandReplica.getMaxReplicas() != Integer.MAX_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public void apply(Settings value, Settings current, Settings previous) {
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING,
AwarenessReplicaBalance.USE_FORCE_ZONE_FOR_REPLICA_SETTING,
BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,
BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING,
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
Expand Down

0 comments on commit 65a1d52

Please sign in to comment.