Skip to content

[7.x] ILM: migrate action configures the _tier_preference setting (#62829) #62860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private enum OpType {
* exist. If no nodes for any of the tiers are available, returns an empty
* {@code Optional<String>}.
*/
static Optional<String> preferredAvailableTier(String prioritizedTiers, DiscoveryNodes nodes) {
public static Optional<String> preferredAvailableTier(String prioritizedTiers, DiscoveryNodes nodes) {
String[] tiers = Strings.tokenizeToStringArray(prioritizedTiers, ",");
return Arrays.stream(tiers).filter(tier -> tierNodesPresent(tier, nodes)).findFirst();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -23,9 +21,9 @@

import java.util.HashSet;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;

import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE;
import static org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING;
import static org.elasticsearch.xpack.core.ilm.AllocationRoutedStep.getPendingAllocations;
import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.waitingForActiveShardsAllocationInfo;
Expand Down Expand Up @@ -72,44 +70,49 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), index.getName());
return new Result(false, null);
}
String destinationTier = INDEX_ROUTING_PREFER_SETTING.get(idxMeta.getSettings());
String preferredTierConfiguration = INDEX_ROUTING_PREFER_SETTING.get(idxMeta.getSettings());
Optional<String> availableDestinationTier = DataTierAllocationDecider.preferredAvailableTier(preferredTierConfiguration,
clusterState.getNodes());

if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
if (Strings.isEmpty(destinationTier)) {
if (Strings.isEmpty(preferredTierConfiguration)) {
logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
getKey().getAction(), index.getName());
} else {
logger.debug("[{}] migration of index [{}] to the [{}] tier cannot progress, as not all shards are active",
getKey().getAction(), index.getName(), destinationTier);
if (availableDestinationTier.isPresent()) {
logger.debug("[{}] migration of index [{}] to the [{}] tier preference cannot progress, as not all shards are active",
getKey().getAction(), index.getName(), preferredTierConfiguration);
} else {
logger.debug("[{}] migration of index [{}] to the next tier cannot progress as there is no available tier for the " +
"configured preferred tiers [{}] and not all shards are active", getKey().getAction(), index.getName(),
preferredTierConfiguration);
}
}
return new Result(false, waitingForActiveShardsAllocationInfo(idxMeta.getNumberOfReplicas()));
}

if (Strings.isEmpty(destinationTier)) {
logger.debug("index [{}] has no data tier routing setting configured and all its shards are active. considering the [{}] " +
"step condition met and continuing to the next step", index.getName(), getKey().getName());
if (Strings.isEmpty(preferredTierConfiguration)) {
logger.debug("index [{}] has no data tier routing preference setting configured and all its shards are active. considering " +
"the [{}] step condition met and continuing to the next step", index.getName(), getKey().getName());
// the user removed the tier routing setting and all the shards are active so we'll cary on
return new Result(true, null);
}

int allocationPendingAllShards = getPendingAllocations(index, ALLOCATION_DECIDERS, clusterState);

if (allocationPendingAllShards > 0) {
boolean targetTierNodeFound = false;
for (DiscoveryNode node : clusterState.nodes()) {
for (DiscoveryNodeRole role : node.getRoles()) {
if (role.roleName().equals(DATA_ROLE.roleName()) || role.roleName().equals(destinationTier)) {
targetTierNodeFound = true;
break;
}
}
}
String statusMessage = String.format(Locale.ROOT, "%s lifecycle action [%s] waiting for [%s] shards to be moved to the [%s] " +
"tier" + (targetTierNodeFound ? "" : " but there are currently no [%s] nodes in the cluster"),
index, getKey().getAction(), allocationPendingAllShards, destinationTier, destinationTier);
String statusMessage = availableDestinationTier.map(
s -> String.format(Locale.ROOT, "[%s] lifecycle action [%s] waiting for [%s] shards to be moved to the [%s] tier (tier " +
"migration preference configuration is [%s])", index.getName(), getKey().getAction(), allocationPendingAllShards, s,
preferredTierConfiguration)
).orElseGet(
() -> String.format(Locale.ROOT, "index [%s] has a preference for tiers [%s], but no nodes for any of those tiers are " +
"available in the cluster", index.getName(), preferredTierConfiguration));
logger.debug(statusMessage);
return new Result(false, new AllocationInfo(idxMeta.getNumberOfReplicas(), allocationPendingAllShards, true, statusMessage));
} else {
logger.debug("[{}] migration of index [{}] to tier [{}] complete", getKey().getAction(), index, destinationTier);
logger.debug("[{}] migration of index [{}] to tier [{}] (preference [{}]) complete",
getKey().getAction(), index, availableDestinationTier, preferredTierConfiguration);
return new Result(true, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* A {@link LifecycleAction} which enables or disables the automatic migration of data between
Expand All @@ -31,6 +32,11 @@ public class MigrateAction implements LifecycleAction {
public static final String NAME = "migrate";
public static final ParseField ENABLED_FIELD = new ParseField("enabled");

// Represents an ordered list of data tiers from cold to hot (or slow to fast)
private static final List<String> COLD_TO_HOT_TIERS = org.elasticsearch.common.collect.List.of(
DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT
);

private static final ConstructingObjectParser<MigrateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new MigrateAction(a[0] == null ? true : (boolean) a[0]));

Expand Down Expand Up @@ -92,7 +98,7 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
Settings.Builder migrationSettings = Settings.builder();
String dataTierName = "data_" + phase;
assert DataTier.validTierName(dataTierName) : "invalid data tier name:" + dataTierName;
migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, dataTierName);
migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, getPreferredTiersConfiguration(dataTierName));
UpdateSettingsStep updateMigrationSettingStep = new UpdateSettingsStep(migrationKey, migrationRoutedKey, client,
migrationSettings.build());
DataTierMigrationRoutedStep migrationRoutedStep = new DataTierMigrationRoutedStep(migrationRoutedKey, nextStepKey);
Expand All @@ -102,6 +108,19 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
}
}

/**
* Based on the provided target tier it will return a comma separated list of preferred tiers.
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
* This is usually used in conjunction with {@link DataTierAllocationDecider#INDEX_ROUTING_PREFER_SETTING}
*/
static String getPreferredTiersConfiguration(String targetTier) {
int indexOfTargetTier = COLD_TO_HOT_TIERS.indexOf(targetTier);
if (indexOfTargetTier == -1) {
throw new IllegalArgumentException("invalid data tier [" + targetTier + "]");
}
return COLD_TO_HOT_TIERS.stream().skip(indexOfTargetTier).collect(Collectors.joining(","));
}

@Override
public int hashCode() {
return Objects.hash(enabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testExecuteWithPendingShards() {
DataTierMigrationRoutedStep step = createRandomInstance();
Result expectedResult = new Result(false, new AllocationInfo(0, 1, true,
"[" + index.getName() + "] lifecycle action [" + step.getKey().getAction() + "] waiting for " +
"[1] shards to be moved to the [data_warm] tier")
"[1] shards to be moved to the [data_warm] tier (tier migration preference configuration is [data_warm])")
);

Result actualResult = step.isConditionMet(index, clusterState);
Expand All @@ -137,9 +137,8 @@ public void testExecuteWithPendingShardsAndTargetRoleNotPresentInCluster() {
.build();
DataTierMigrationRoutedStep step = createRandomInstance();
Result expectedResult = new Result(false, new AllocationInfo(0, 1, true,
"[" + index.getName() + "] lifecycle action [" + step.getKey().getAction() + "] waiting for " +
"[1] shards to be moved to the [data_warm] tier but there are currently no [data_warm] nodes in the cluster")
);
"index [" + index.getName() + "] has a preference for tiers [data_warm], but no nodes for any of those tiers are available " +
"in the cluster"));

Result actualResult = step.isConditionMet(index, clusterState);
assertThat(actualResult.isComplete(), is(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
import static org.elasticsearch.xpack.core.ilm.MigrateAction.getPreferredTiersConfiguration;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.COLD_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.DELETE_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.HOT_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.WARM_PHASE;
import static org.hamcrest.CoreMatchers.is;

public class MigrateActionTests extends AbstractActionTestCase<MigrateAction> {

Expand Down Expand Up @@ -56,4 +65,36 @@ public void testToSteps() {
assertEquals(0, steps.size());
}
}

public void testGetPreferredTiersConfiguration() {
assertThat(getPreferredTiersConfiguration(DATA_HOT), is(DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_WARM), is(DATA_WARM + "," + DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_COLD), is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> getPreferredTiersConfiguration("no_tier"));
assertThat(exception.getMessage(), is("invalid data tier [no_tier]"));
}

public void testMigrateActionsConfiguresTierPreference() {
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10));
MigrateAction action = new MigrateAction();
{
List<Step> steps = action.toSteps(null, HOT_PHASE, nextStepKey);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(firstStep.getSettings()),
is(DATA_HOT));
}
{
List<Step> steps = action.toSteps(null, WARM_PHASE, nextStepKey);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(firstStep.getSettings()),
is(DATA_WARM + "," + DATA_HOT));
}
{
List<Step> steps = action.toSteps(null, COLD_PHASE, nextStepKey);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(firstStep.getSettings()),
is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,19 @@ public static Settings coldNode(final Settings settings) {

public void testIndexDataTierMigration() throws Exception {
internalCluster().startMasterOnlyNodes(1, Settings.EMPTY);
logger.info("starting hot data node");
logger.info("starting 2 hot data nodes");
internalCluster().startNode(hotNode(Settings.EMPTY));
internalCluster().startNode(hotNode(Settings.EMPTY));

// it's important we start one node of each tear as otherwise all phases will be allocated on the 2 available hot nodes (as our
// tier preference configuration will not detect any available warm/cold tier node and will fallback to the available hot tier)
// we want ILM to stop in the check-migration step in the warm and cold phase so we can unblock it manually by starting another
// node in the corresponding tier (so that the index replica is allocated)
logger.info("starting a warm data node");
internalCluster().startNode(warmNode(Settings.EMPTY));

logger.info("starting a cold data node");
internalCluster().startNode(coldNode(Settings.EMPTY));

Phase hotPhase = new Phase("hot", TimeValue.ZERO, Collections.emptyMap());
Phase warmPhase = new Phase("warm", TimeValue.ZERO, Collections.emptyMap());
Expand All @@ -119,7 +130,7 @@ public void testIndexDataTierMigration() throws Exception {
assertAcked(putLifecycleResponse);

Settings settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0).put(LifecycleSettings.LIFECYCLE_NAME, policy).build();
.put(SETTING_NUMBER_OF_REPLICAS, 1).put(LifecycleSettings.LIFECYCLE_NAME, policy).build();
CreateIndexResponse res = client().admin().indices().prepareCreate(managedIndex).setSettings(settings).get();
assertTrue(res.isAcknowledged());

Expand All @@ -133,7 +144,7 @@ public void testIndexDataTierMigration() throws Exception {
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
});

logger.info("starting warm data node");
logger.info("starting a warm data node");
internalCluster().startNode(warmNode(Settings.EMPTY));
assertBusy(() -> {
ExplainLifecycleRequest explainRequest = new ExplainLifecycleRequest().indices(managedIndex);
Expand All @@ -145,7 +156,7 @@ public void testIndexDataTierMigration() throws Exception {
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
});

logger.info("starting cold data node");
logger.info("starting a cold data node");
internalCluster().startNode(coldNode(Settings.EMPTY));

// wait for lifecycle to complete in the cold phase after the index has been migrated to the cold node
Expand All @@ -162,9 +173,15 @@ public void testIndexDataTierMigration() throws Exception {

public void testUserOptsOutOfTierMigration() throws Exception {
internalCluster().startMasterOnlyNodes(1, Settings.EMPTY);
logger.info("starting hot data node");
logger.info("starting a hot data node");
internalCluster().startNode(hotNode(Settings.EMPTY));

logger.info("starting a warm data node");
internalCluster().startNode(warmNode(Settings.EMPTY));

logger.info("starting a cold data node");
internalCluster().startNode(coldNode(Settings.EMPTY));

Phase hotPhase = new Phase("hot", TimeValue.ZERO, Collections.emptyMap());
Phase warmPhase = new Phase("warm", TimeValue.ZERO, Collections.emptyMap());
Phase coldPhase = new Phase("cold", TimeValue.ZERO, Collections.emptyMap());
Expand All @@ -188,26 +205,14 @@ public void testUserOptsOutOfTierMigration() throws Exception {
IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex);
assertThat(indexLifecycleExplainResponse.getPhase(), is("warm"));
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
});
assertReplicaIsUnassigned();
}, 30, TimeUnit.SECONDS);

Settings removeTierRoutingSetting = Settings.builder().putNull(DataTierAllocationDecider.INDEX_ROUTING_PREFER).build();
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(managedIndex).settings(removeTierRoutingSetting);
assertAcked(client().admin().indices().updateSettings(updateSettingsRequest).actionGet());

assertBusy(() -> {
ExplainLifecycleRequest explainRequest = new ExplainLifecycleRequest().indices(managedIndex);
ExplainLifecycleResponse explainResponse = client().execute(ExplainLifecycleAction.INSTANCE,
explainRequest).get();

IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex);
assertThat(indexLifecycleExplainResponse.getPhase(), is("warm"));
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
assertReplicaIsUnassigned();
}, 30, TimeUnit.SECONDS);

internalCluster().startNode(coldNode(Settings.EMPTY));

// the index should successfully allocate
// the index should successfully allocate on any nodes
ensureGreen(managedIndex);

// the index is successfully allocated but the migrate action from the cold phase re-configured the tier migration setting to the
Expand All @@ -223,7 +228,7 @@ public void testUserOptsOutOfTierMigration() throws Exception {
IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex);
assertThat(indexLifecycleExplainResponse.getPhase(), is("cold"));
assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME));
});
}, 30, TimeUnit.SECONDS);

// remove the tier routing setting again
assertAcked(client().admin().indices().updateSettings(updateSettingsRequest).actionGet());
Expand Down