-
Notifications
You must be signed in to change notification settings - Fork 25.3k
[ML] Add processor autoscaling decider #89645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4d0f16a
40f6d1a
8dc4ac3
0b750e4
04d5d81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 89645 | ||
summary: Add processor autoscaling decider | ||
area: Machine Learning | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.ml.autoscaling; | ||
|
||
import org.elasticsearch.common.unit.Processors; | ||
|
||
public record MlProcessorAutoscalingCapacity(Processors nodeProcessors, Processors tierProcessors, String reason) { | ||
|
||
public static Builder builder(Processors nodeProcessors, Processors tierProcessors) { | ||
return new Builder(nodeProcessors, tierProcessors); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "MlProcessorAutoscalingCapacity{" | ||
+ "nodeProcessors=" | ||
+ nodeProcessors | ||
+ ", tierProcessors=" | ||
+ tierProcessors | ||
+ ", reason='" | ||
+ reason | ||
+ '\'' | ||
+ '}'; | ||
} | ||
|
||
public static class Builder { | ||
|
||
private Processors nodeProcessors; | ||
private Processors tierProcessors; | ||
private String reason; | ||
|
||
public Builder(Processors nodeProcessors, Processors tierProcessors) { | ||
this.nodeProcessors = nodeProcessors; | ||
this.tierProcessors = tierProcessors; | ||
} | ||
|
||
public Builder setReason(String reason) { | ||
this.reason = reason; | ||
return this; | ||
} | ||
|
||
MlProcessorAutoscalingCapacity build() { | ||
return new MlProcessorAutoscalingCapacity(nodeProcessors, tierProcessors, reason); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.ml.autoscaling; | ||
|
||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.Processors; | ||
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.logging.LogManager; | ||
import org.elasticsearch.logging.Logger; | ||
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext; | ||
import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment; | ||
import org.elasticsearch.xpack.ml.MachineLearning; | ||
import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata; | ||
|
||
import java.time.Instant; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static java.time.Instant.ofEpochMilli; | ||
import static org.elasticsearch.common.xcontent.XContentElasticsearchExtension.DEFAULT_FORMATTER; | ||
import static org.elasticsearch.core.Strings.format; | ||
|
||
class MlProcessorAutoscalingDecider { | ||
|
||
private static final Logger logger = LogManager.getLogger(MlProcessorAutoscalingDecider.class); | ||
|
||
private final ScaleTimer scaleTimer; | ||
private final NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper; | ||
|
||
MlProcessorAutoscalingDecider(ScaleTimer scaleTimer, NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper) { | ||
this.scaleTimer = Objects.requireNonNull(scaleTimer); | ||
this.nodeAvailabilityZoneMapper = Objects.requireNonNull(nodeAvailabilityZoneMapper); | ||
} | ||
|
||
public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) { | ||
TrainedModelAssignmentMetadata trainedModelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(context.state()); | ||
|
||
if (hasUnsatisfiedDeployments(trainedModelAssignmentMetadata, mlContext.mlNodes)) { | ||
logger.debug(() -> "Computing required capacity as there are partially allocated deployments"); | ||
scaleTimer.resetScaleDownCoolDown(); | ||
return computeRequiredCapacity(trainedModelAssignmentMetadata).setReason( | ||
"requesting scale up as there are unsatisfied deployments" | ||
).build(); | ||
} | ||
|
||
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes); | ||
|
||
final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build(); | ||
|
||
if (requiredCapacity.tierProcessors().roundUp() == currentCapacity.tierProcessors().roundUp()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line means that if we have 2.5 processors and need 3 then we won't scale up. Even if it's not fixed in this PR I think it's worth adding a comment to call that out. Maybe we can fix it in a followup for 8.6. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we get here we are definitely not in a scenario where we want to scale up. We know all our model deployments are satisfied. The reason I added that check is to provide a nicer message why we don't downscale nor upscale if we're right where we need to be. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
But the question is whether 2.5 processors should "satisfy" a model that wanted 3. In Anyway, I'm happy to leave it as-is for the first version. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, I see the point now. OK, I'll merge as-is and revisit later. |
||
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors()) | ||
.setReason("passing currently perceived capacity as it is fully used") | ||
.build(); | ||
} | ||
|
||
if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu( | ||
trainedModelAssignmentMetadata.modelAssignments().values(), | ||
mlContext.mlNodes | ||
)) { | ||
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors()) | ||
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors") | ||
.build(); | ||
} | ||
|
||
long msLeftToScale = scaleTimer.markDownScaleAndGetMillisLeftFromDelay(configuration); | ||
if (msLeftToScale <= 0) { | ||
return MlProcessorAutoscalingCapacity.builder(requiredCapacity.nodeProcessors(), requiredCapacity.tierProcessors()) | ||
.setReason("requesting scale down as tier and/or node size could be smaller") | ||
.build(); | ||
} | ||
|
||
TimeValue downScaleDelay = MlAutoscalingDeciderService.DOWN_SCALE_DELAY.get(configuration); | ||
logger.debug( | ||
() -> format( | ||
"not scaling down as the current scale down delay [%s] is not satisfied." | ||
+ " The last time scale down was detected [%s]. Calculated scaled down capacity [%s] ", | ||
downScaleDelay.getStringRep(), | ||
DEFAULT_FORMATTER.format(ofEpochMilli(scaleTimer.downScaleDetectedMillis())), | ||
requiredCapacity | ||
) | ||
); | ||
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors()) | ||
.setReason( | ||
String.format( | ||
Locale.ROOT, | ||
"Passing currently perceived capacity as down scale delay has not been satisfied; configured delay [%s] " | ||
+ "last detected scale down event [%s]. Will request scale down in approximately [%s]", | ||
downScaleDelay.getStringRep(), | ||
XContentElasticsearchExtension.DEFAULT_FORMATTER.format(Instant.ofEpochMilli(scaleTimer.downScaleDetectedMillis())), | ||
TimeValue.timeValueMillis(msLeftToScale).getStringRep() | ||
) | ||
) | ||
.build(); | ||
} | ||
|
||
private boolean hasUnsatisfiedDeployments(TrainedModelAssignmentMetadata trainedModelAssignmentMetadata, List<DiscoveryNode> mlNodes) { | ||
final Set<String> mlNodeIds = mlNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); | ||
return trainedModelAssignmentMetadata.modelAssignments() | ||
.values() | ||
.stream() | ||
.anyMatch(deployment -> deployment.isSatisfied(mlNodeIds) == false); | ||
} | ||
|
||
private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedModelAssignmentMetadata trainedModelAssignmentMetadata) { | ||
int maxThreadsPerAllocation = 0; | ||
int processorCount = 0; | ||
for (TrainedModelAssignment assignment : trainedModelAssignmentMetadata.modelAssignments().values()) { | ||
int threadsPerAllocation = assignment.getTaskParams().getThreadsPerAllocation(); | ||
maxThreadsPerAllocation = Math.max(maxThreadsPerAllocation, threadsPerAllocation); | ||
processorCount += assignment.getTaskParams().getNumberOfAllocations() * threadsPerAllocation; | ||
} | ||
|
||
final int numMlAvailabilityZones = nodeAvailabilityZoneMapper.getNumMlAvailabilityZones().orElse(1); | ||
if (numMlAvailabilityZones > 1) { | ||
// We assume cloud provides what we ask for tier processors for each availability zone. | ||
// Thus we need to devide the total processor count required by the number of ML availability zones. | ||
processorCount = (processorCount - 1) / numMlAvailabilityZones + 1; | ||
} | ||
processorCount = Math.max(processorCount, maxThreadsPerAllocation); | ||
|
||
return MlProcessorAutoscalingCapacity.builder( | ||
maxThreadsPerAllocation > 0 ? Processors.of(Double.valueOf(maxThreadsPerAllocation)) : Processors.ZERO, | ||
processorCount > 0 ? Processors.of(Double.valueOf(processorCount)) : Processors.ZERO | ||
); | ||
} | ||
|
||
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes) { | ||
Processors maxNodeProcessors = Processors.ZERO; | ||
Processors tierProcessors = Processors.ZERO; | ||
for (DiscoveryNode node : mlNodes) { | ||
Processors nodeProcessors = getProcessors(node); | ||
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) { | ||
maxNodeProcessors = nodeProcessors; | ||
} | ||
tierProcessors = tierProcessors.plus(nodeProcessors); | ||
} | ||
return MlProcessorAutoscalingCapacity.builder(maxNodeProcessors, tierProcessors).build(); | ||
} | ||
|
||
private Processors getProcessors(DiscoveryNode node) { | ||
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR); | ||
if (allocatedProcessorsString == null) { | ||
return Processors.ZERO; | ||
} | ||
try { | ||
double processorsAsDouble = Double.parseDouble(allocatedProcessorsString); | ||
return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO; | ||
} catch (NumberFormatException e) { | ||
assert e == null | ||
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR | ||
+ " should parse because we set it internally: invalid value was [" | ||
+ allocatedProcessorsString | ||
+ "]"; | ||
return Processors.ZERO; | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.