Skip to content

[ML] Add processor autoscaling decider #89645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/89645.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 89645
summary: Add processor autoscaling decider
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -767,7 +768,7 @@ public Settings additionalSettings() {
Long.toString(OsProbe.getInstance().osStats().getMem().getAdjustedTotal().getBytes())
);
addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory()));
addMlNodeAttribute(additionalSettings, allocatedProcessorsAttrName, Integer.toString(getAllocatedProcessors()));
addMlNodeAttribute(additionalSettings, allocatedProcessorsAttrName, Double.toString(getAllocatedProcessors().count()));
// This is not used in v8 and higher, but users are still prevented from setting it directly to avoid confusion
disallowMlNodeAttributes(maxOpenJobsPerNodeNodeAttrName);
} else {
Expand All @@ -785,8 +786,8 @@ private void addMlNodeAttribute(Settings.Builder additionalSettings, String attr
}
}

private int getAllocatedProcessors() {
return EsExecutors.allocatedProcessors(settings);
private Processors getAllocatedProcessors() {
return EsExecutors.nodeProcessors(settings);
}

private void disallowMlNodeAttributes(String... mlNodeAttributes) {
Expand Down Expand Up @@ -1448,7 +1449,7 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings unused) {
ScalingExecutorBuilder pytorchComms = new ScalingExecutorBuilder(
NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME,
3,
getAllocatedProcessors() * 3,
getAllocatedProcessors().roundUp() * 3,
TimeValue.timeValueMinutes(1),
false,
"xpack.ml.native_inference_comms_thread_pool"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L

private final ScaleTimer scaleTimer;
private final MlMemoryAutoscalingDecider memoryDecider;
private final MlProcessorAutoscalingDecider processorDecider;

private volatile boolean isMaster;

Expand Down Expand Up @@ -66,6 +67,7 @@ public MlAutoscalingDeciderService(
nodeLoadDetector,
scaleTimer
);
this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer, nodeAvailabilityZoneMapper);
clusterService.addLocalNodeMasterListener(this);
}

Expand All @@ -91,12 +93,21 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
final MlAutoscalingContext mlContext = new MlAutoscalingContext(clusterState);
final NativeMemoryCapacity currentNativeMemoryCapacity = memoryDecider.currentScale(mlContext.mlNodes);
final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(mlContext.mlNodes);

final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
.setCurrentMlCapacity(
new AutoscalingCapacity(
new AutoscalingCapacity.AutoscalingResources(null, currentMemoryCapacity.tierSize(), null),
new AutoscalingCapacity.AutoscalingResources(null, currentMemoryCapacity.nodeSize(), null)
new AutoscalingCapacity.AutoscalingResources(
null,
currentMemoryCapacity.tierSize(),
currentProcessorCapacity.tierProcessors()
),
new AutoscalingCapacity.AutoscalingResources(
null,
currentMemoryCapacity.nodeSize(),
currentProcessorCapacity.nodeProcessors()
)
)
)
.setPassedConfiguration(configuration);
Expand All @@ -109,12 +120,15 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
}

MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext);
reasonBuilder.setSimpleReason(memoryCapacity.reason());
MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(configuration, context, mlContext);
reasonBuilder.setSimpleReason(
String.format(Locale.ROOT, "[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason())
);

return new AutoscalingDeciderResult(
new AutoscalingCapacity(
new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.tierSize(), null),
new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.nodeSize(), null)
new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.tierSize(), processorCapacity.tierProcessors()),
new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.nodeSize(), processorCapacity.nodeProcessors())
),
reasonBuilder.build()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -185,8 +186,6 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci

final List<String> partiallyAllocatedModels = mlContext.findPartiallyAllocatedModels();

// TODO for autoscaling by memory, we only care about if the model is allocated to at least one node (see above)
// We should do this check in our autoscaling by processor count service, which will be a separate decider for readability's sake
if (mlContext.waitingAnalyticsJobs.isEmpty() == false
|| mlContext.waitingSnapshotUpgrades.isEmpty() == false
|| mlContext.waitingAnomalyJobs.isEmpty() == false
Expand Down Expand Up @@ -257,7 +256,8 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci
if (capacity == null) {
return null;
}
// TODO we should remove this when we can auto-scale (down and up) via a new CPU auto-scaling decider
// We should keep this check here as well as in the processor decider while cloud is not
// reacting to processor autoscaling.
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) {
logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
return null;
Expand Down Expand Up @@ -822,7 +822,8 @@ static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection<TrainedModelAss
int totalMlProcessors = mlNodes.stream().mapToInt(node -> {
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
try {
return Integer.parseInt(allocatedProcessorsString);
double allocatedProcessorsAsDouble = Double.parseDouble(allocatedProcessorsString);
return allocatedProcessorsAsDouble > 0 ? Processors.of(allocatedProcessorsAsDouble).roundUp() : 0;
} catch (NumberFormatException e) {
assert e == null
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.autoscaling;

import org.elasticsearch.common.unit.Processors;

public record MlProcessorAutoscalingCapacity(Processors nodeProcessors, Processors tierProcessors, String reason) {

public static Builder builder(Processors nodeProcessors, Processors tierProcessors) {
return new Builder(nodeProcessors, tierProcessors);
}

@Override
public String toString() {
return "MlProcessorAutoscalingCapacity{"
+ "nodeProcessors="
+ nodeProcessors
+ ", tierProcessors="
+ tierProcessors
+ ", reason='"
+ reason
+ '\''
+ '}';
}

public static class Builder {

private Processors nodeProcessors;
private Processors tierProcessors;
private String reason;

public Builder(Processors nodeProcessors, Processors tierProcessors) {
this.nodeProcessors = nodeProcessors;
this.tierProcessors = tierProcessors;
}

public Builder setReason(String reason) {
this.reason = reason;
return this;
}

MlProcessorAutoscalingCapacity build() {
return new MlProcessorAutoscalingCapacity(nodeProcessors, tierProcessors, reason);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.autoscaling;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata;

import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static java.time.Instant.ofEpochMilli;
import static org.elasticsearch.common.xcontent.XContentElasticsearchExtension.DEFAULT_FORMATTER;
import static org.elasticsearch.core.Strings.format;

class MlProcessorAutoscalingDecider {

private static final Logger logger = LogManager.getLogger(MlProcessorAutoscalingDecider.class);

private final ScaleTimer scaleTimer;
private final NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper;

MlProcessorAutoscalingDecider(ScaleTimer scaleTimer, NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper) {
this.scaleTimer = Objects.requireNonNull(scaleTimer);
this.nodeAvailabilityZoneMapper = Objects.requireNonNull(nodeAvailabilityZoneMapper);
}

public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
TrainedModelAssignmentMetadata trainedModelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(context.state());

if (hasUnsatisfiedDeployments(trainedModelAssignmentMetadata, mlContext.mlNodes)) {
logger.debug(() -> "Computing required capacity as there are partially allocated deployments");
scaleTimer.resetScaleDownCoolDown();
return computeRequiredCapacity(trainedModelAssignmentMetadata).setReason(
"requesting scale up as there are unsatisfied deployments"
).build();
}

final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes);

final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();

if (requiredCapacity.tierProcessors().roundUp() == currentCapacity.tierProcessors().roundUp()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line means that if we have 2.5 processors and need 3 then we won't scale up.

Even if it's not fixed in this PR I think it's worth adding a comment to call that out. Maybe we can fix it in a followup for 8.6.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get here we are definitely not in a scenario where we want to scale up. We know all our model deployments are satisfied. The reason I added that check is to provide a nicer message why we don't downscale nor upscale if we're right where we need to be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know all our model deployments are satisfied.

But the question is whether 2.5 processors should "satisfy" a model that wanted 3. In TrainedModelAssignmentRebalancer we call getNodeAllocatedProcessors(discoveryNode).roundUp(), so it will happily allocate a model that wants 3 processors to a node that has 2.5. I think it's a followup for the future to consider whether we should scale up in that scenario. Maybe the code should be changed in TrainedModelAssignmentRebalancer rather than here though.

Anyway, I'm happy to leave it as-is for the first version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I see the point now. OK, I'll merge as-is and revisit later.

return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason("passing currently perceived capacity as it is fully used")
.build();
}

if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
trainedModelAssignmentMetadata.modelAssignments().values(),
mlContext.mlNodes
)) {
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
.build();
}

long msLeftToScale = scaleTimer.markDownScaleAndGetMillisLeftFromDelay(configuration);
if (msLeftToScale <= 0) {
return MlProcessorAutoscalingCapacity.builder(requiredCapacity.nodeProcessors(), requiredCapacity.tierProcessors())
.setReason("requesting scale down as tier and/or node size could be smaller")
.build();
}

TimeValue downScaleDelay = MlAutoscalingDeciderService.DOWN_SCALE_DELAY.get(configuration);
logger.debug(
() -> format(
"not scaling down as the current scale down delay [%s] is not satisfied."
+ " The last time scale down was detected [%s]. Calculated scaled down capacity [%s] ",
downScaleDelay.getStringRep(),
DEFAULT_FORMATTER.format(ofEpochMilli(scaleTimer.downScaleDetectedMillis())),
requiredCapacity
)
);
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason(
String.format(
Locale.ROOT,
"Passing currently perceived capacity as down scale delay has not been satisfied; configured delay [%s] "
+ "last detected scale down event [%s]. Will request scale down in approximately [%s]",
downScaleDelay.getStringRep(),
XContentElasticsearchExtension.DEFAULT_FORMATTER.format(Instant.ofEpochMilli(scaleTimer.downScaleDetectedMillis())),
TimeValue.timeValueMillis(msLeftToScale).getStringRep()
)
)
.build();
}

private boolean hasUnsatisfiedDeployments(TrainedModelAssignmentMetadata trainedModelAssignmentMetadata, List<DiscoveryNode> mlNodes) {
final Set<String> mlNodeIds = mlNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
return trainedModelAssignmentMetadata.modelAssignments()
.values()
.stream()
.anyMatch(deployment -> deployment.isSatisfied(mlNodeIds) == false);
}

private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedModelAssignmentMetadata trainedModelAssignmentMetadata) {
int maxThreadsPerAllocation = 0;
int processorCount = 0;
for (TrainedModelAssignment assignment : trainedModelAssignmentMetadata.modelAssignments().values()) {
int threadsPerAllocation = assignment.getTaskParams().getThreadsPerAllocation();
maxThreadsPerAllocation = Math.max(maxThreadsPerAllocation, threadsPerAllocation);
processorCount += assignment.getTaskParams().getNumberOfAllocations() * threadsPerAllocation;
}

final int numMlAvailabilityZones = nodeAvailabilityZoneMapper.getNumMlAvailabilityZones().orElse(1);
if (numMlAvailabilityZones > 1) {
// We assume cloud provides what we ask for tier processors for each availability zone.
// Thus we need to devide the total processor count required by the number of ML availability zones.
processorCount = (processorCount - 1) / numMlAvailabilityZones + 1;
}
processorCount = Math.max(processorCount, maxThreadsPerAllocation);

return MlProcessorAutoscalingCapacity.builder(
maxThreadsPerAllocation > 0 ? Processors.of(Double.valueOf(maxThreadsPerAllocation)) : Processors.ZERO,
processorCount > 0 ? Processors.of(Double.valueOf(processorCount)) : Processors.ZERO
);
}

MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes) {
Processors maxNodeProcessors = Processors.ZERO;
Processors tierProcessors = Processors.ZERO;
for (DiscoveryNode node : mlNodes) {
Processors nodeProcessors = getProcessors(node);
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
maxNodeProcessors = nodeProcessors;
}
tierProcessors = tierProcessors.plus(nodeProcessors);
}
return MlProcessorAutoscalingCapacity.builder(maxNodeProcessors, tierProcessors).build();
}

private Processors getProcessors(DiscoveryNode node) {
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
if (allocatedProcessorsString == null) {
return Processors.ZERO;
}
try {
double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
} catch (NumberFormatException e) {
assert e == null
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
+ " should parse because we set it internally: invalid value was ["
+ allocatedProcessorsString
+ "]";
return Processors.ZERO;
}
}
}
Loading