Skip to content

[ML] Do not make autoscaling decision when memory is undetermined #90259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@

import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.function.LongSupplier;

import static org.elasticsearch.core.Strings.format;

public class MlAutoscalingDeciderService implements AutoscalingDeciderService, LocalNodeMasterListener {

private static final Logger logger = LogManager.getLogger(MlAutoscalingDeciderService.class);
Expand Down Expand Up @@ -120,9 +121,19 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
}

MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext);
if (memoryCapacity.isUndetermined()) {
// If we cannot determine memory capacity we shouldn't make any autoscaling decision
// as it could lead to undesired capacity. For example, it could be that the processor decider decides
// to scale down the cluster but as soon as we can determine memory requirements again we need to scale
// back up.
return new AutoscalingDeciderResult(
null,
reasonBuilder.setSimpleReason(format("[memory_decider] %s", memoryCapacity.reason())).build()
);
}
MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(configuration, context, mlContext);
reasonBuilder.setSimpleReason(
String.format(Locale.ROOT, "[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason())
format("[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason())
);

return new AutoscalingDeciderResult(
Expand Down Expand Up @@ -153,8 +164,7 @@ private AutoscalingDeciderResult downscaleToZero(
return new AutoscalingDeciderResult(
context.currentCapacity(),
reasonBuilder.setSimpleReason(
String.format(
Locale.ROOT,
format(
"Passing currently perceived capacity as down scale delay has not been satisfied; configured delay [%s] "
+ "last detected scale down event [%s]. Will request scale down in approximately [%s]",
DOWN_SCALE_DELAY.get(configuration).getStringRep(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@ public String toString() {
return "MlMemoryAutoscalingCapacity{" + "nodeSize=" + nodeSize + ", tierSize=" + tierSize + ", reason='" + reason + '\'' + '}';
}

public boolean isUndetermined() {
return nodeSize == null && tierSize == null;
}

public static class Builder {

private ByteSizeValue nodeSize;
private ByteSizeValue tierSize;
private String reason;

public Builder(ByteSizeValue nodeSize, ByteSizeValue tierSize) {
assert (nodeSize == null) == (tierSize == null) : "nodeSize " + nodeSize + " tierSize " + tierSize;
this.nodeSize = nodeSize;
this.tierSize = tierSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci

long maxTaskMemoryBytes = maxMemoryBytes(mlContext);

// This state is invalid, but may occur due to complex bugs that have slipped through testing.
// We could have tasks where the required job memory is 0, which should be impossible.
// This should rarely happen, it could imply a bug. However, it is possible to happen
// if there are persistent tasks that do not have matching configs stored.
// Also, it could be that we have tasks where the required job memory is 0, which should be impossible.
// This can also happen if a job that is awaiting assignment ceases to have the AWAITING_LAZY_ASSIGNMENT
// assignment explanation, for example because some other explanation overrides it. (This second situation
// arises because, for example, anomalyDetectionTasks contains a task that is waiting but waitingAnomalyJobs
Expand Down Expand Up @@ -346,8 +347,11 @@ private long maxMemoryBytes(MlAutoscalingContext mlContext) {
// Memory SHOULD be recently refreshed, so in our current state, we should at least have an idea of the memory used
.mapToLong(t -> {
Long mem = getAnomalyMemoryRequirement(t);
if (mem == null) {
logger.warn("unexpected null for anomaly detection memory requirement for [{}]", MlTasks.jobId(t.getId()));
}
assert mem != null : "unexpected null for anomaly memory requirement after recent stale check";
return mem;
return mem == null ? 0 : mem;
})
.max()
.orElse(0L),
Expand All @@ -356,8 +360,11 @@ private long maxMemoryBytes(MlAutoscalingContext mlContext) {
// Memory SHOULD be recently refreshed, so in our current state, we should at least have an idea of the memory used
.mapToLong(t -> {
Long mem = getAnomalyMemoryRequirement(t);
if (mem == null) {
logger.warn("unexpected null for snapshot upgrade memory requirement for [{}]", MlTasks.jobId(t.getId()));
}
assert mem != null : "unexpected null for anomaly memory requirement after recent stale check";
return mem;
return mem == null ? 0 : mem;
})
.max()
.orElse(0L)
Expand All @@ -369,8 +376,11 @@ private long maxMemoryBytes(MlAutoscalingContext mlContext) {
// Memory SHOULD be recently refreshed, so in our current state, we should at least have an idea of the memory used
.mapToLong(t -> {
Long mem = this.getAnalyticsMemoryRequirement(t);
if (mem == null) {
logger.warn("unexpected null for analytics memory requirement for [{}]", MlTasks.dataFrameAnalyticsId(t.getId()));
}
assert mem != null : "unexpected null for analytics memory requirement after recent stale check";
return mem;
return mem == null ? 0 : mem;
})
.max()
.orElse(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public Long getDataFrameAnalyticsJobMemoryRequirement(String id) {
*/
public Long getTrainedModelAssignmentMemoryRequirement(String modelId) {
if (isMaster == false) {
logger.warn("Request to get trained model assignment memory not on master node; modelId was [{}]", modelId);
return null;
}

Expand All @@ -282,6 +283,7 @@ public Long getTrainedModelAssignmentMemoryRequirement(String modelId) {
public Long getJobMemoryRequirement(String taskName, String id) {

if (isMaster == false) {
logger.warn("Request to get job memory not on master node; taskName [{}], id [{}]", taskName, id);
return null;
}

Expand Down Expand Up @@ -353,7 +355,9 @@ public boolean asyncRefresh() {
public void refreshAnomalyDetectorJobMemoryAndAllOthers(String jobId, ActionListener<Long> listener) {

if (isMaster == false) {
listener.onFailure(new NotMasterException("Request to refresh anomaly detector memory requirements on non-master node"));
String msg = "Request to refresh anomaly detector memory requirements on non-master node";
logger.warn(msg);
listener.onFailure(new NotMasterException(msg));
return;
}

Expand All @@ -377,7 +381,9 @@ public void refreshAnomalyDetectorJobMemoryAndAllOthers(String jobId, ActionList
public void addDataFrameAnalyticsJobMemoryAndRefreshAllOthers(String id, long mem, ActionListener<Void> listener) {

if (isMaster == false) {
listener.onFailure(new NotMasterException("Request to put data frame analytics memory requirement on non-master node"));
String msg = "Request to put data frame analytics memory requirement on non-master node";
logger.warn(msg);
listener.onFailure(new NotMasterException(msg));
return;
}

Expand Down Expand Up @@ -517,7 +523,9 @@ private void refreshAllDataFrameAnalyticsJobTasks(
*/
public void refreshAnomalyDetectorJobMemory(String jobId, ActionListener<Long> listener) {
if (isMaster == false) {
listener.onFailure(new NotMasterException("Request to refresh anomaly detector memory requirement on non-master node"));
String msg = "Request to refresh anomaly detector memory requirement on non-master node";
logger.warn(msg);
listener.onFailure(new NotMasterException(msg));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -21,14 +22,17 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.NodeLoad;
import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
import org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutorTests;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.junit.Before;

Expand All @@ -43,6 +47,8 @@
import static org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator.STATIC_JVM_UPPER_THRESHOLD;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
Expand Down Expand Up @@ -208,6 +214,52 @@ public void testScale_GivenNoML_AndNoMLNodes() {
assertThat(result.reason().summary(), containsString("Passing currently perceived capacity as no scaling changes are necessary"));
}

public void testScale_GivenUndeterminedMemory_ShouldReturnNullCapacity() {
MlAutoscalingDeciderService service = buildService();
service.onMaster();

String jobId = "a_job";
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
OpenJobPersistentTasksExecutorTests.addJobTask(jobId, randomFrom("ml-1", "ml-2"), JobState.OPENED, tasksBuilder);
Metadata.Builder metadata = Metadata.builder();
metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasksBuilder.build());

ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
.nodes(
DiscoveryNodes.builder()
.add(buildNode("ml-1", ByteSizeValue.ofGb(4), 8))
.add(buildNode("ml-2", ByteSizeValue.ofGb(4), 8))
.build()
)
.metadata(metadata)
.build();

// Making the memory tracker return 0 for an AD job memory results to the decider return
// undetermined capacity.
when(mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(jobId)).thenReturn(0L);

AutoscalingDeciderResult result = service.scale(
Settings.EMPTY,
new DeciderContext(
clusterState,
new AutoscalingCapacity(
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofGb(8), null),
new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofGb(4), null)
)
)
);

assertThat(
result.reason().summary(),
containsString(
"[memory_decider] Passing currently perceived capacity as there are running analytics "
+ "and anomaly jobs or deployed models, but their assignment explanations are unexpected or their "
+ "memory usage estimates are inaccurate."
)
);
assertThat(result.requiredCapacity(), is(nullValue()));
}

private DiscoveryNode buildNode(String id, ByteSizeValue machineMemory, int allocatedProcessors) {
return new DiscoveryNode(
id,
Expand Down