Skip to content

[ML] prevent accidentally asking for more resources when scaling down and improve scaling size estimations #74691

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;

public class AutoscalingIT extends MlNativeAutodetectIntegTestCase {
Expand All @@ -52,7 +52,7 @@ public void testMLAutoscalingCapacity() {
Settings.builder().put(MlAutoscalingDeciderService.DOWN_SCALE_DELAY.getKey(), TimeValue.ZERO).build());
final PutAutoscalingPolicyAction.Request request = new PutAutoscalingPolicyAction.Request(
"ml_test",
new TreeSet<>(Arrays.asList("ml")),
new TreeSet<>(Arrays.asList("master","data","ingest","ml")),
deciders
);
assertAcked(client().execute(PutAutoscalingPolicyAction.INSTANCE, request).actionGet());
Expand Down Expand Up @@ -151,8 +151,8 @@ private void assertMlCapacity(GetAutoscalingCapacityAction.Response capacity, St

AutoscalingDeciderResult autoscalingDeciderResult = autoscalingDeciderResults.results().get("ml");
assertThat(autoscalingDeciderResult.reason().summary(), containsString(reason));
assertThat(autoscalingDeciderResult.requiredCapacity().total().memory().getBytes(), equalTo(tierBytes));
assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), equalTo(nodeBytes));
assertThat(autoscalingDeciderResult.requiredCapacity().total().memory().getBytes(), greaterThanOrEqualTo(tierBytes - 1L));
assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), greaterThanOrEqualTo(nodeBytes - 1L));
}

private void putJob(String jobId, long limitMb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
private static final Duration DEFAULT_MEMORY_REFRESH_RATE = Duration.ofMinutes(15);
private static final String MEMORY_STALE = "unable to make scaling decision as job memory requirements are stale";
private static final long NO_SCALE_DOWN_POSSIBLE = -1L;
// If ensureScaleDown changes the calculation by more than this much, log the error
private static final long ACCEPTABLE_DIFFERENCE = ByteSizeValue.ofMb(1).getBytes();

public static final String NAME = "ml";
public static final Setting<Integer> NUM_ANOMALY_JOBS_IN_QUEUE = Setting.intSetting("num_anomaly_jobs_in_queue", 0, 0);
Expand Down Expand Up @@ -359,6 +361,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider

final List<DiscoveryNode> nodes = getNodes(clusterState);
final NativeMemoryCapacity currentScale = currentScale(nodes);

final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder()
.setWaitingAnomalyJobs(waitingAnomalyJobs)
.setWaitingAnalyticsJobs(waitingAnalyticsJobs)
Expand Down Expand Up @@ -527,9 +530,18 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
.build()));
}

final Optional<AutoscalingDeciderResult> scaleDownDecision = checkForScaleDown(nodeLoads, largestJob, currentScale, reasonBuilder);
final Optional<AutoscalingDeciderResult> maybeScaleDown = checkForScaleDown(nodeLoads, largestJob, currentScale, reasonBuilder)
// Due to weird rounding errors, it may be that a scale down result COULD cause a scale up
// Ensuring the scaleDown here forces the scale down result to always be lower than the current capacity.
// This is safe as we know that ALL jobs are assigned at the current capacity
.map(result -> new AutoscalingDeciderResult(
ensureScaleDown(result.requiredCapacity(), context.currentCapacity()), result.reason()
));

if (maybeScaleDown.isPresent()) {
final AutoscalingDeciderResult scaleDownDecisionResult = maybeScaleDown.get();

if (scaleDownDecision.isPresent()) {
context.currentCapacity();
// Given maxOpenJobs, could we scale down to just one node?
// We have no way of saying "we need X nodes"
if (nodeLoads.size() > 1) {
Expand All @@ -546,22 +558,22 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
MAX_OPEN_JOBS_PER_NODE.getKey());
logger.info(() -> new ParameterizedMessage("{} Calculated potential scaled down capacity [{}] ",
msg,
scaleDownDecision.get().requiredCapacity()));
scaleDownDecisionResult.requiredCapacity()));
return new AutoscalingDeciderResult(context.currentCapacity(), reasonBuilder.setSimpleReason(msg).build());
}
}

long msLeftToScale = msLeftToDownScale(configuration);
if (msLeftToScale <= 0) {
return scaleDownDecision.get();
return scaleDownDecisionResult;
}
TimeValue downScaleDelay = DOWN_SCALE_DELAY.get(configuration);
logger.debug(() -> new ParameterizedMessage(
"not scaling down as the current scale down delay [{}] is not satisfied." +
" The last time scale down was detected [{}]. Calculated scaled down capacity [{}] ",
downScaleDelay.getStringRep(),
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(scaleDownDetected),
scaleDownDecision.get().requiredCapacity()));
scaleDownDecisionResult.requiredCapacity()));
return new AutoscalingDeciderResult(
context.currentCapacity(),
reasonBuilder
Expand All @@ -586,6 +598,28 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
.build()));
}

static AutoscalingCapacity ensureScaleDown(AutoscalingCapacity scaleDownResult, AutoscalingCapacity currentCapacity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that this method is needed suggests there are bugs in the other calculations. It's good to have it as a safety net, but I think it would be good if it could log a warning if it changes the answer by more than the few bytes that could be attributed to rounding errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I think it would be good if it could log a warning if it changes the answer by more than the few bytes that could be attributed to rounding errors.

100%, I will add a log line.

I do think we need this method as this has been an issue in a couple known scenarios and it can be exceptionally frustrating to troubleshoot.

AutoscalingCapacity newCapacity = new AutoscalingCapacity(
new AutoscalingCapacity.AutoscalingResources(
currentCapacity.total().storage(),
ByteSizeValue.ofBytes(Math.min(scaleDownResult.total().memory().getBytes(), currentCapacity.total().memory().getBytes()))
),
new AutoscalingCapacity.AutoscalingResources(
currentCapacity.node().storage(),
ByteSizeValue.ofBytes(Math.min(scaleDownResult.node().memory().getBytes(), currentCapacity.node().memory().getBytes()))
)
);
if (scaleDownResult.node().memory().getBytes() - newCapacity.node().memory().getBytes() > ACCEPTABLE_DIFFERENCE
|| scaleDownResult.total().memory().getBytes() - newCapacity.total().memory().getBytes() > ACCEPTABLE_DIFFERENCE) {
logger.warn(
"scale down accidentally requested a scale up, auto-corrected; initial scaling [{}], corrected [{}]",
scaleDownResult,
newCapacity
);
}
return newCapacity;
}

AutoscalingDeciderResult noScaleResultOrRefresh(MlScalingReason.Builder reasonBuilder,
boolean memoryTrackingStale,
AutoscalingDeciderResult potentialResult) {
Expand Down Expand Up @@ -842,8 +876,11 @@ Optional<AutoscalingDeciderResult> checkForScaleDown(List<NodeLoad> nodeLoads,
// Or our largest job could be on a smaller node (meaning the same size tier but smaller nodes are possible).
if (currentlyNecessaryTier < currentCapacity.getTier() || currentlyNecessaryNode < currentCapacity.getNode()) {
NativeMemoryCapacity nativeMemoryCapacity = new NativeMemoryCapacity(
currentlyNecessaryTier,
currentlyNecessaryNode,
// Since we are in the `scaleDown` branch, we know jobs are running and we could be smaller
// If we have some weird rounding errors, it may be that the `currentlyNecessary` values are larger than
// current capacity. We never want to accidentally say "scale up" via a scale down.
Math.min(currentlyNecessaryTier, currentCapacity.getTier()),
Math.min(currentlyNecessaryNode, currentCapacity.getNode()),
// If our newly suggested native capacity is the same, we can use the previously stored jvm size
currentlyNecessaryNode == currentCapacity.getNode() ? currentCapacity.getJvmSize() : null);
AutoscalingCapacity requiredCapacity = nativeMemoryCapacity.autoscalingCapacity(maxMachineMemoryPercent, useAuto);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;

import java.util.Objects;
import java.util.Optional;

import static org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator.dynamicallyCalculateJvmSizeFromNativeMemorySize;

// Used for storing native memory capacity and then transforming it into an autoscaling capacity
// which takes into account the whole node size
Expand Down Expand Up @@ -49,22 +52,26 @@ NativeMemoryCapacity merge(NativeMemoryCapacity nativeMemoryCapacity) {
return this;
}

AutoscalingCapacity autoscalingCapacity(int maxMemoryPercent, boolean useAuto) {
public AutoscalingCapacity autoscalingCapacity(int maxMemoryPercent, boolean useAuto) {
// We calculate the JVM size here first to ensure it stays the same given the rest of the calculations
final Long jvmSize = useAuto ?
Optional.ofNullable(this.jvmSize).orElse(dynamicallyCalculateJvmSizeFromNativeMemorySize(node)) :
null;
// We first need to calculate the actual node size given the current native memory size.
// This way we can accurately determine the required node size AND what the overall memory percentage will be
long actualNodeSize = NativeMemoryCalculator.calculateApproxNecessaryNodeSize(node, jvmSize, maxMemoryPercent, useAuto);
// We make the assumption that the JVM size is the same across the entire tier
// This simplifies calculating the tier as it means that each node in the tier
// will have the same dynamic memory calculation. And thus the tier is simply the sum of the memory necessary
// times that scaling factor.
int memoryPercentForMl = (int)Math.floor(NativeMemoryCalculator.modelMemoryPercent(
double memoryPercentForMl = NativeMemoryCalculator.modelMemoryPercent(
actualNodeSize,
jvmSize,
maxMemoryPercent,
useAuto
));
);
double inverseScale = memoryPercentForMl <= 0 ? 0 : 100.0 / memoryPercentForMl;
long actualTier = (long)Math.ceil(tier * inverseScale);
long actualTier = Math.round(tier * inverseScale);
return new AutoscalingCapacity(
// Tier should always be AT LEAST the largest node size.
// This Math.max catches any strange rounding errors or weird input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

public final class NativeMemoryCalculator {

private static final long STATIC_JVM_UPPER_THRESHOLD = ByteSizeValue.ofGb(2).getBytes();
static final long MINIMUM_AUTOMATIC_NODE_SIZE = ByteSizeValue.ofGb(1).getBytes();
private static final long OS_OVERHEAD = ByteSizeValue.ofMb(200L).getBytes();

Expand Down Expand Up @@ -80,15 +81,11 @@ public static long calculateApproxNecessaryNodeSize(long nativeMachineMemory, Lo
if (useAuto) {
// TODO utilize official ergonomic JVM size calculations when available.
jvmSize = jvmSize == null ? dynamicallyCalculateJvmSizeFromNativeMemorySize(nativeMachineMemory) : jvmSize;
// We use a Math.floor here to ensure we have AT LEAST enough memory given rounding.
int modelMemoryPercent = (int)Math.floor(modelMemoryPercent(
nativeMachineMemory + jvmSize + OS_OVERHEAD,
jvmSize,
maxMemoryPercent,
true));
// We calculate the inverse percentage of `nativeMachineMemory + OS_OVERHEAD` as `OS_OVERHEAD` is always present
// on the native memory side and we need to account for it when we invert the model memory percentage
return Math.max((long)Math.ceil((100.0/modelMemoryPercent) * (nativeMachineMemory + OS_OVERHEAD)), MINIMUM_AUTOMATIC_NODE_SIZE);
// We haven't reached our 90% threshold, so, simply summing up the values is adequate
if ((jvmSize + OS_OVERHEAD)/(double)nativeMachineMemory > 0.1) {
return Math.max(nativeMachineMemory + jvmSize + OS_OVERHEAD, MINIMUM_AUTOMATIC_NODE_SIZE);
}
return Math.round((nativeMachineMemory/0.9));
}
return (long) ((100.0/maxMemoryPercent) * nativeMachineMemory);
}
Expand Down Expand Up @@ -118,18 +115,11 @@ public static double modelMemoryPercent(long machineMemory, Long jvmSize, int ma
return maxMemoryPercent;
}

public static int modelMemoryPercent(long machineMemory, int maxMemoryPercent, boolean useAuto) {
return (int)Math.ceil(modelMemoryPercent(machineMemory,
null,
maxMemoryPercent,
useAuto));
}

private static long allowedBytesForMl(long machineMemory, Long jvmSize, int maxMemoryPercent, boolean useAuto) {
static long allowedBytesForMl(long machineMemory, Long jvmSize, int maxMemoryPercent, boolean useAuto) {
if (useAuto && jvmSize != null) {
// It is conceivable that there is a machine smaller than 200MB.
// If the administrator wants to use the auto configuration, the node should be larger.
if (machineMemory - jvmSize < OS_OVERHEAD || machineMemory == 0) {
if (machineMemory - jvmSize <= OS_OVERHEAD || machineMemory == 0) {
return machineMemory / 100;
}
// This calculation is dynamic and designed to maximally take advantage of the underlying machine for machine learning
Expand All @@ -139,8 +129,8 @@ private static long allowedBytesForMl(long machineMemory, Long jvmSize, int maxM
// 2GB node -> 66%
// 16GB node -> 87%
// 64GB node -> 90%
long memoryPercent = Math.min(90, (int)Math.ceil(((machineMemory - jvmSize - OS_OVERHEAD) / (double)machineMemory) * 100.0D));
return (long)(machineMemory * (memoryPercent / 100.0));
double memoryProportion = Math.min(0.90, (machineMemory - jvmSize - OS_OVERHEAD) / (double)machineMemory);
return Math.round(machineMemory * memoryProportion);
}

return (long)(machineMemory * (maxMemoryPercent / 100.0));
Expand All @@ -154,30 +144,33 @@ public static long allowedBytesForMl(long machineMemory, int maxMemoryPercent, b
}

// TODO replace with official ergonomic calculation
private static long dynamicallyCalculateJvmSizeFromNodeSize(long nodeSize) {
if (nodeSize < ByteSizeValue.ofGb(2).getBytes()) {
return (long) (nodeSize * 0.40);
public static long dynamicallyCalculateJvmSizeFromNodeSize(long nodeSize) {
// While the original idea here was to predicate on 2Gb, it has been found that the knot points of
// 2GB and 8GB cause weird issues where the JVM size will "jump the gap" from one to the other when
// considering true tier sizes in elastic cloud.
if (nodeSize < ByteSizeValue.ofMb(1280).getBytes()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add a comment here that the reason this is different to the value used in the JVM heap size formula is that that formula does not have a well-defined inverse, so we pick a value here that makes the inverse pick the correct value for some node sizes we know Cloud uses, in particular 2GB.

return (long)(nodeSize * 0.40);
}
if (nodeSize < ByteSizeValue.ofGb(8).getBytes()) {
return (long) (nodeSize * 0.25);
return (long)(nodeSize * 0.25);
}
return ByteSizeValue.ofGb(2).getBytes();
return STATIC_JVM_UPPER_THRESHOLD;
}

private static long dynamicallyCalculateJvmSizeFromNativeMemorySize(long nativeMachineMemory) {
public static long dynamicallyCalculateJvmSizeFromNativeMemorySize(long nativeMachineMemory) {
// See dynamicallyCalculateJvm the following JVM calculations are arithmetic inverses of JVM calculation
//
// Example: For < 2GB node, the JVM is 0.4 * total_node_size. This means, the rest is 0.6 the node size.
// So, the `nativeAndOverhead` is == 0.6 * total_node_size => total_node_size = (nativeAndOverHead / 0.6)
// Consequently jvmSize = (nativeAndOverHead / 0.6)*0.4 = nativeAndOverHead * 2/3
long nativeAndOverhead = nativeMachineMemory + OS_OVERHEAD;
if (nativeAndOverhead < (ByteSizeValue.ofGb(2).getBytes() * 0.60)) {
return (long) Math.ceil(nativeAndOverhead * (2.0 / 3.0));
return Math.round((nativeAndOverhead / 0.6) * 0.4);
}
if (nativeAndOverhead < (ByteSizeValue.ofGb(8).getBytes() * 0.75)) {
return (long) Math.ceil(nativeAndOverhead / 3.0);
return Math.round((nativeAndOverhead / 0.75) * 0.25);
}
return ByteSizeValue.ofGb(2).getBytes();
return STATIC_JVM_UPPER_THRESHOLD;
}

}
Loading