Skip to content

[ML] Removing some code that's obsolete for 8.0 #79444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import static org.elasticsearch.test.NodeRoles.onlyRole;
import static org.elasticsearch.test.NodeRoles.removeRoles;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.notNullValue;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -251,7 +253,7 @@ public void testDedicatedMlNode() throws Exception {
PersistentTask<?> task = tasks.getTask(MlTasks.jobTaskId(jobId));

DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "512"));
assertThat(node.getAttributes(), hasEntry(equalTo(MachineLearning.MACHINE_MEMORY_NODE_ATTR), notNullValue()));
JobTaskState jobTaskState = (JobTaskState) task.getState();
assertNotNull(jobTaskState);
assertEquals(JobState.OPENED, jobTaskState.getState());
Expand Down Expand Up @@ -513,7 +515,7 @@ private void assertJobTask(String jobId, JobState expectedState, boolean hasExec
assertNotNull(task.getExecutorNode());
assertFalse(needsReassignment(task.getAssignment(), clusterState.nodes()));
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "512"));
assertThat(node.getAttributes(), hasEntry(equalTo(MachineLearning.MACHINE_MEMORY_NODE_ATTR), notNullValue()));

JobTaskState jobTaskState = (JobTaskState) task.getState();
assertNotNull(jobTaskState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
return Collections.singletonMap(InferenceProcessor.TYPE, inferenceFactory);
}

// This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion
private static final String PRE_V7_ML_ENABLED_NODE_ATTR = "ml.enabled";
public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs";
// This is not used in v8 and higher, but users are still prevented from setting it directly to avoid confusion
private static final String PRE_V8_MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs";
public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory";
public static final String MAX_JVM_SIZE_NODE_ATTR = "ml.max_jvm_size";
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
Expand Down Expand Up @@ -622,7 +621,7 @@ public MachineLearning(Settings settings, Path configPath) {
public static boolean isMlNode(DiscoveryNode node) {
Map<String, String> nodeAttributes = node.getAttributes();
try {
return Integer.parseInt(nodeAttributes.get(MAX_OPEN_JOBS_NODE_ATTR)) > 0;
return Long.parseLong(nodeAttributes.get(MACHINE_MEMORY_NODE_ATTR)) > 0;
} catch (NumberFormatException e) {
return false;
}
Expand Down Expand Up @@ -652,29 +651,24 @@ public List<Setting<?>> getSettings() {
}

public Settings additionalSettings() {
String mlEnabledNodeAttrName = "node.attr." + PRE_V7_ML_ENABLED_NODE_ATTR;
String maxOpenJobsPerNodeNodeAttrName = "node.attr." + MAX_OPEN_JOBS_NODE_ATTR;
String maxOpenJobsPerNodeNodeAttrName = "node.attr." + PRE_V8_MAX_OPEN_JOBS_NODE_ATTR;
String machineMemoryAttrName = "node.attr." + MACHINE_MEMORY_NODE_ATTR;
String jvmSizeAttrName = "node.attr." + MAX_JVM_SIZE_NODE_ATTR;

if (enabled == false) {
disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName);
disallowMlNodeAttributes(maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName, jvmSizeAttrName);
return Settings.EMPTY;
}

Settings.Builder additionalSettings = Settings.builder();
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.ML_ROLE)) {
// TODO: stop setting this attribute in 8.0.0 but disallow it (like mlEnabledNodeAttrName below)
// The ML UI will need to be changed to check machineMemoryAttrName instead before this is done
addMlNodeAttribute(additionalSettings, maxOpenJobsPerNodeNodeAttrName,
String.valueOf(MAX_OPEN_JOBS_PER_NODE.get(settings)));
addMlNodeAttribute(additionalSettings, machineMemoryAttrName,
Long.toString(OsProbe.getInstance().osStats().getMem().getAdjustedTotal().getBytes()));
addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory()));
// This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion
disallowMlNodeAttributes(mlEnabledNodeAttrName);
// This is not used in v8 and higher, but users are still prevented from setting it directly to avoid confusion
disallowMlNodeAttributes(maxOpenJobsPerNodeNodeAttrName);
} else {
disallowMlNodeAttributes(mlEnabledNodeAttrName,
disallowMlNodeAttributes(
maxOpenJobsPerNodeNodeAttrName,
machineMemoryAttrName,
jvmSizeAttrName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private List<MlDataRemover> createDataRemovers(OriginSettingClient client,
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
new ExpiredModelSnapshotsRemover(client,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId, jobResultsProvider, auditor),
new UnusedStateRemover(client, clusterService, parentTaskId),
new UnusedStateRemover(client, parentTaskId),
new EmptyStateIndexRemover(client, parentTaskId),
new UnusedStatsRemover(client, parentTaskId),
new ExpiredAnnotationsRemover(client,
Expand All @@ -226,7 +226,7 @@ private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTask
threadPool, parentTaskId,
jobResultsProvider,
auditor),
new UnusedStateRemover(client, clusterService, parentTaskId),
new UnusedStateRemover(client, parentTaskId),
new EmptyStateIndexRemover(client, parentTaskId),
new UnusedStatsRemover(client, parentTaskId),
new ExpiredAnnotationsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, auditor, threadPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
boolean nodeIsMemoryAccurate = true;
for (DiscoveryNode node : nodes) {
NodeLoad nodeLoad = nodeLoadDetector.detectNodeLoad(clusterState,
true,
node,
maxOpenJobs,
maxMachineMemoryPercent,
Expand Down Expand Up @@ -859,7 +858,6 @@ Optional<NativeMemoryCapacity> calculateFutureAvailableCapacity(PersistentTasksC
Map<String, Long> freeMemoryByNodeId = new HashMap<>();
for (DiscoveryNode node : mlNodes) {
NodeLoad nodeLoad = nodeLoadDetector.detectNodeLoad(clusterState,
true,
node,
maxOpenJobs,
maxMachineMemoryPercent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ static boolean shouldAllocateModels(final ClusterChangedEvent event) {
}

Optional<String> nodeHasCapacity(ClusterState state, StartTrainedModelDeploymentAction.TaskParams params, DiscoveryNode node) {
NodeLoad load = nodeLoadDetector.detectNodeLoad(state, true, node, maxOpenJobs, maxMemoryPercentage, useAuto);
NodeLoad load = nodeLoadDetector.detectNodeLoad(state, node, maxOpenJobs, maxMemoryPercentage, useAuto);
return handleNodeLoad(load, node.getId(), params);
}

Expand All @@ -460,7 +460,6 @@ Optional<String> nodeHasCapacity(
NodeLoad load = nodeLoadDetector.detectNodeLoad(
state,
builder.build(),
true,
node,
maxOpenJobs,
maxMemoryPercentage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public Tuple<NativeMemoryCapacity, Long> perceivedCapacityAndMaxFreeMemory(int m
long mostAvailableMemory = capableNodes.stream()
.map(n -> nodeLoadDetector.detectNodeLoad(
clusterState,
true,
n,
maxOpenJobs,
maxMachineMemoryPercent,
Expand Down Expand Up @@ -171,7 +170,6 @@ public PersistentTasksCustomMetadata.Assignment selectNode(Long estimatedMemoryF
}
NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad(
clusterState,
true, // Remove in 8.0.0
node,
dynamicMaxOpenJobs,
maxMachineMemoryPercent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,13 @@ public MlMemoryTracker getMlMemoryTracker() {
}

public NodeLoad detectNodeLoad(ClusterState clusterState,
boolean allNodesHaveDynamicMaxWorkers,
DiscoveryNode node,
int dynamicMaxOpenJobs,
int maxMachineMemoryPercent,
boolean useAutoMachineMemoryCalculation) {
return detectNodeLoad(
clusterState,
TrainedModelAllocationMetadata.fromState(clusterState),
allNodesHaveDynamicMaxWorkers,
node,
dynamicMaxOpenJobs,
maxMachineMemoryPercent,
Expand All @@ -61,25 +59,13 @@ public NodeLoad detectNodeLoad(ClusterState clusterState,

public NodeLoad detectNodeLoad(ClusterState clusterState,
TrainedModelAllocationMetadata allocationMetadata,
boolean allNodesHaveDynamicMaxWorkers,
DiscoveryNode node,
int dynamicMaxOpenJobs,
int maxNumberOfOpenJobs,
int maxMachineMemoryPercent,
boolean useAutoMachineMemoryCalculation) {
PersistentTasksCustomMetadata persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
Map<String, String> nodeAttributes = node.getAttributes();
List<String> errors = new ArrayList<>();
int maxNumberOfOpenJobs = dynamicMaxOpenJobs;
// TODO: remove this in 8.0.0
if (allNodesHaveDynamicMaxWorkers == false) {
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
errors.add(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer");
maxNumberOfOpenJobs = -1;
}
}
OptionalLong maxMlMemory = NativeMemoryCalculator.allowedBytesForMl(node,
maxMachineMemoryPercent,
useAutoMachineMemoryCalculation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
Expand Down Expand Up @@ -52,13 +50,10 @@ public class UnusedStateRemover implements MlDataRemover {
private static final Logger LOGGER = LogManager.getLogger(UnusedStateRemover.class);

private final OriginSettingClient client;
private final ClusterService clusterService;
private final TaskId parentTaskId;

public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService,
TaskId parentTaskId) {
public UnusedStateRemover(OriginSettingClient client, TaskId parentTaskId) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.parentTaskId = Objects.requireNonNull(parentTaskId);
}

Expand Down Expand Up @@ -111,10 +106,6 @@ private Set<String> getJobIds() {
private Set<String> getAnomalyDetectionJobIds() {
Set<String> jobIds = new HashSet<>();

// TODO Once at 8.0, we can stop searching for jobs in cluster state
// and remove cluster service as a member all together.
jobIds.addAll(MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet());

DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, MlConfigIndex.indexName(),
QueryBuilders.termQuery(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE));
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,7 @@ private void givenJobs(List<Job> jobs, List<GetJobsStatsAction.Response.JobStats
private ClusterState givenNodeCount(int nodeCount) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (int i = 0; i < nodeCount; i++) {
Map<String, String> attrs = new HashMap<>();
attrs.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(20));
Map<String, String> attrs = Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000");
Set<DiscoveryNodeRole> roles = new HashSet<>();
roles.add(DiscoveryNodeRole.DATA_ROLE);
roles.add(DiscoveryNodeRole.MASTER_ROLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,7 @@ public void testNoAttributes_givenSameAndMlEnabled() {

public void testNoAttributes_givenClash() {
Settings.Builder builder = Settings.builder();
if (randomBoolean()) {
builder.put("node.attr.ml.enabled", randomBoolean());
} else {
builder.put("node.attr.ml.max_open_jobs", randomIntBetween(13, 15));
}
builder.put("node.attr.ml.max_open_jobs", randomIntBetween(13, 15));
MachineLearning machineLearning = createMachineLearning(builder.put("path.home", createTempDir()).build());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, machineLearning::additionalSettings);
assertThat(e.getMessage(), startsWith("Directly setting [node.attr.ml."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private static DiscoveryNode createNode(int i, boolean isMlNode, Version nodeVer
"_node_name" + i,
"_node_id" + i,
new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i),
Map.of("ml.max_open_jobs", isMlNode ? "10" : "0", "ml.machine_memory", String.valueOf(ByteSizeValue.ofGb(1).getBytes())),
isMlNode ? Map.of("ml.machine_memory", String.valueOf(ByteSizeValue.ofGb(1).getBytes())) : Map.of(),
Collections.emptySet(),
nodeVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void setup() {
when(mlMemoryTracker.getTrainedModelAllocationMemoryRequirement(any())).thenReturn(DEFAULT_JOB_SIZE);
nodeLoadDetector = mock(NodeLoadDetector.class);
when(nodeLoadDetector.getMlMemoryTracker()).thenReturn(mlMemoryTracker);
when(nodeLoadDetector.detectNodeLoad(any(), anyBoolean(), any(), anyInt(), anyInt(), anyBoolean()))
when(nodeLoadDetector.detectNodeLoad(any(), any(), anyInt(), anyInt(), anyBoolean()))
.thenReturn(NodeLoad.builder("any")
.setUseMemory(true)
.incAssignedJobMemory(ByteSizeValue.ofGb(1).getBytes())
Expand Down Expand Up @@ -704,9 +704,8 @@ private static List<DiscoveryNode> withMlNodes(String... nodeName) {
MapBuilder.<String, String>newMapBuilder()
.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, String.valueOf(DEFAULT_NODE_SIZE))
.put(MachineLearning.MAX_JVM_SIZE_NODE_ATTR, String.valueOf(DEFAULT_JVM_SIZE))
.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, String.valueOf(10))
.map(),
new HashSet<>(Arrays.asList(DiscoveryNodeRole.MASTER_ROLE)),
new HashSet<>(List.of(DiscoveryNodeRole.MASTER_ROLE)),
Version.CURRENT))
.collect(Collectors.toList());
}
Expand Down
Loading