Skip to content

Auto-expand replicas when adding or removing nodes #30423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ Rollup::
* Validate timezone in range queries to ensure they match the selected job when
searching ({pull}30338[#30338])


Allocation::

Auto-expand replicas when adding or removing nodes to prevent shard copies from
being dropped and resynced when a data node rejoins the cluster ({pull}30423[#30423])

//[float]
//=== Regressions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,36 @@
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* This class acts as a functional wrapper around the {@code index.auto_expand_replicas} setting.
* This setting or rather it's value is expanded into a min and max value which requires special handling
* based on the number of datanodes in the cluster. This class handles all the parsing and streamlines the access to these values.
*/
final class AutoExpandReplicas {
public final class AutoExpandReplicas {
// the value we recognize in the "max" position to mean all the nodes
private static final String ALL_NODES_VALUE = "all";
public static final Setting<AutoExpandReplicas> SETTING = new Setting<>(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "false", (value) -> {

private static final AutoExpandReplicas FALSE_INSTANCE = new AutoExpandReplicas(0, 0, false);

public static final Setting<AutoExpandReplicas> SETTING = new Setting<>(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "false",
AutoExpandReplicas::parse, Property.Dynamic, Property.IndexScope);

private static AutoExpandReplicas parse(String value) {
final int min;
final int max;
if (Booleans.isFalse(value)) {
return new AutoExpandReplicas(0, 0, false);
return FALSE_INSTANCE;
}
final int dash = value.indexOf('-');
if (-1 == dash) {
Expand All @@ -57,7 +70,7 @@ final class AutoExpandReplicas {
}
}
return new AutoExpandReplicas(min, max, true);
}, Property.Dynamic, Property.IndexScope);
}

private final int minReplicas;
private final int maxReplicas;
Expand All @@ -80,6 +93,24 @@ int getMaxReplicas(int numDataNodes) {
return Math.min(maxReplicas, numDataNodes-1);
}

Optional<Integer> getDesiredNumberOfReplicas(int numDataNodes) {
if (enabled) {
final int min = getMinReplicas();
final int max = getMaxReplicas(numDataNodes);
int numberOfReplicas = numDataNodes - 1;
if (numberOfReplicas < min) {
numberOfReplicas = min;
} else if (numberOfReplicas > max) {
numberOfReplicas = max;
}

if (numberOfReplicas >= min && numberOfReplicas <= max) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this . If min<= max, we always match this clause?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, if (only) min <= max. Unfortunately we never validated this condition anywhere (I would have expected this during setting validation, but to my surprise, it was not there), and I didn't feel like making a breaking change in this PR by strengthening the validation logic. For now, just as before, if min > max, the auto-expand replicas will be ignored.

return Optional.of(numberOfReplicas);
}
}
return Optional.empty();
}

@Override
public String toString() {
return enabled ? minReplicas + "-" + maxReplicas : "false";
Expand All @@ -88,6 +119,31 @@ public String toString() {
boolean isEnabled() {
return enabled;
}

/**
* Checks if the are replicas with the auto-expand feature that need to be adapted.
* Returns a map of updates, which maps the indices to be updated to the desired number of replicas.
* The map has the desired number of replicas as key and the indices to update as value, as this allows the result
* of this method to be directly applied to RoutingTable.Builder#updateNumberOfReplicas.
*/
public static Map<Integer, List<String>> getAutoExpandReplicaChanges(MetaData metaData, DiscoveryNodes discoveryNodes) {
// used for translating "all" to a number
final int dataNodeCount = discoveryNodes.getDataNodes().size();

Map<Integer, List<String>> nrReplicasChanged = new HashMap<>();

for (final IndexMetaData indexMetaData : metaData) {
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
AutoExpandReplicas autoExpandReplicas = SETTING.get(indexMetaData.getSettings());
autoExpandReplicas.getDesiredNumberOfReplicas(dataNodeCount).ifPresent(numberOfReplicas -> {
if (numberOfReplicas != indexMetaData.getNumberOfReplicas()) {
nrReplicasChanged.computeIfAbsent(numberOfReplicas, ArrayList::new).add(indexMetaData.getIndex().getName());
}
});
}
}
return nrReplicasChanged;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand All @@ -42,16 +40,12 @@
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
Expand All @@ -61,7 +55,7 @@
/**
* Service responsible for submitting update index settings requests
*/
public class MetaDataUpdateSettingsService extends AbstractComponent implements ClusterStateListener {
public class MetaDataUpdateSettingsService extends AbstractComponent {

private final ClusterService clusterService;

Expand All @@ -77,87 +71,11 @@ public MetaDataUpdateSettingsService(Settings settings, ClusterService clusterSe
super(settings);
this.clusterService = clusterService;
this.threadPool = threadPool;
this.clusterService.addListener(this);
this.allocationService = allocationService;
this.indexScopedSettings = indexScopedSettings;
this.indicesService = indicesService;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
// update an index with number of replicas based on data nodes if possible
if (!event.state().nodes().isLocalNodeElectedMaster()) {
return;
}
// we will want to know this for translating "all" to a number
final int dataNodeCount = event.state().nodes().getDataNodes().size();

Map<Integer, List<Index>> nrReplicasChanged = new HashMap<>();
// we need to do this each time in case it was changed by update settings
for (final IndexMetaData indexMetaData : event.state().metaData()) {
AutoExpandReplicas autoExpandReplicas = IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetaData.getSettings());
if (autoExpandReplicas.isEnabled()) {
/*
* we have to expand the number of replicas for this index to at least min and at most max nodes here
* so we are bumping it up if we have to or reduce it depending on min/max and the number of datanodes.
* If we change the number of replicas we just let the shard allocator do it's thing once we updated it
* since it goes through the index metadata to figure out if something needs to be done anyway. Do do that
* we issue a cluster settings update command below and kicks off a reroute.
*/
final int min = autoExpandReplicas.getMinReplicas();
final int max = autoExpandReplicas.getMaxReplicas(dataNodeCount);
int numberOfReplicas = dataNodeCount - 1;
if (numberOfReplicas < min) {
numberOfReplicas = min;
} else if (numberOfReplicas > max) {
numberOfReplicas = max;
}
// same value, nothing to do there
if (numberOfReplicas == indexMetaData.getNumberOfReplicas()) {
continue;
}

if (numberOfReplicas >= min && numberOfReplicas <= max) {

if (!nrReplicasChanged.containsKey(numberOfReplicas)) {
nrReplicasChanged.put(numberOfReplicas, new ArrayList<>());
}

nrReplicasChanged.get(numberOfReplicas).add(indexMetaData.getIndex());
}
}
}

if (nrReplicasChanged.size() > 0) {
// update settings and kick of a reroute (implicit) for them to take effect
for (final Integer fNumberOfReplicas : nrReplicasChanged.keySet()) {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build();
final List<Index> indices = nrReplicasChanged.get(fNumberOfReplicas);

UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest()
.indices(indices.toArray(new Index[indices.size()])).settings(settings)
.ackTimeout(TimeValue.timeValueMillis(0)) //no need to wait for ack here
.masterNodeTimeout(TimeValue.timeValueMinutes(10));

updateSettings(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
for (Index index : indices) {
logger.info("{} auto expanded replicas to [{}]", index, fNumberOfReplicas);
}
}

@Override
public void onFailure(Exception t) {
for (Index index : indices) {
logger.warn("{} fail to auto expand replicas to [{}]", index, fNumberOfReplicas);
}
}
});
}
}
}

public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
final Settings normalizedSettings = Settings.builder().put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build();
Settings.Builder settingsForClosedIndices = Settings.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
Expand All @@ -46,6 +47,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -206,11 +208,12 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis
* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
* if needed.
*/
public ClusterState deassociateDeadNodes(final ClusterState clusterState, boolean reroute, String reason) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());

// first, clear from the shards any node id they used to belong to that is now dead
Expand All @@ -220,12 +223,40 @@ public ClusterState deassociateDeadNodes(final ClusterState clusterState, boolea
reroute(allocation);
}

if (allocation.routingNodesChanged() == false) {
if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}

/**
* Checks if the are replicas with the auto-expand feature that need to be adapted.
* Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required.
*/
private ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
final Map<Integer, List<String>> autoExpandReplicaChanges =
AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes());
if (autoExpandReplicaChanges.isEmpty()) {
return clusterState;
} else {
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterState.routingTable());
final MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData());
for (Map.Entry<Integer, List<String>> entry : autoExpandReplicaChanges.entrySet()) {
final int numberOfReplicas = entry.getKey();
final String[] indices = entry.getValue().toArray(new String[entry.getValue().size()]);
// we do *not* update the in sync allocation ids as they will be removed upon the first index
// operation which make these copies stale
routingTableBuilder.updateNumberOfReplicas(numberOfReplicas, indices);
metaDataBuilder.updateNumberOfReplicas(numberOfReplicas, indices);
logger.info("updating number_of_replicas to [{}] for indices {}", numberOfReplicas, indices);
}
final ClusterState fixedState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build())
.metaData(metaDataBuilder).build();
assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), fixedState.nodes()).isEmpty();
return fixedState;
}
}

/**
* Removes delay markers from unassigned shards based on current time stamp.
*/
Expand Down Expand Up @@ -301,6 +332,7 @@ public CommandsResult reroute(final ClusterState clusterState, AllocationCommand
if (retryFailed) {
resetFailedAllocationCounter(allocation);
}

reroute(allocation);
return new CommandsResult(explanations, buildResultAndLogHealthChange(clusterState, allocation, "reroute commands"));
}
Expand All @@ -320,15 +352,17 @@ public ClusterState reroute(ClusterState clusterState, String reason) {
* <p>
* If the same instance of ClusterState is returned, then no change has been made.
*/
protected ClusterState reroute(final ClusterState clusterState, String reason, boolean debug) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) {
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);

RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());
allocation.debugDecision(debug);
reroute(allocation);
if (allocation.routingNodesChanged() == false) {
if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
Expand All @@ -353,6 +387,8 @@ private boolean hasDeadNodes(RoutingAllocation allocation) {

private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
"auto-expand replicas out of sync with number of nodes in the cluster";

// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
Expand Down
Loading