Skip to content

Commit

Permalink
Revert "Filter out remote model auto redeployment (opensearch-project…
Browse files Browse the repository at this point in the history
  • Loading branch information
rbhavna committed Jan 10, 2025
1 parent 80de6e3 commit 447f0b4
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -31,7 +30,6 @@
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.model.MLModelState;
import org.opensearch.ml.common.transport.deploy.MLDeployModelAction;
Expand Down Expand Up @@ -186,10 +184,6 @@ private void triggerAutoDeployModels(List<String> addedNodes) {
modelAutoRedeployArrangements.add(modelAutoRedeployArrangement);
});
redeployAModel();
} else {
log.info("Could not find any models in the index, not performing auto reloading!");
startCronjobAndClearListener();
return;
}
}, e -> {
if (e instanceof IndexNotFoundException) {
Expand Down Expand Up @@ -221,7 +215,7 @@ private void triggerUndeployModelsOnDataNodes(List<String> dataNodeIds) {
client.execute(MLUndeployModelAction.INSTANCE, undeployModelNodesRequest, undeployModelListener);
}
}
}, e -> { log.error("Failed to query need undeploy models, no action will be performed", e); });
}, e -> { log.error("Failed to query need undeploy models, no action will be performed"); });
queryRunningModels(listener);
}

Expand All @@ -245,9 +239,7 @@ private void queryRunningModels(ActionListener<SearchResponse> listener) {
String[] includes = new String[] {
MLModel.AUTO_REDEPLOY_RETRY_TIMES_FIELD,
MLModel.PLANNING_WORKER_NODES_FIELD,
MLModel.DEPLOY_TO_ALL_NODES_FIELD,
MLModel.FUNCTION_NAME_FIELD,
MLModel.ALGORITHM_FIELD };
MLModel.DEPLOY_TO_ALL_NODES_FIELD };

String[] excludes = new String[] { MLModel.MODEL_CONTENT_FIELD, MLModel.OLD_MODEL_CONTENT_FIELD };
FetchSourceContext fetchContext = new FetchSourceContext(true, includes, excludes);
Expand All @@ -263,34 +255,19 @@ private void queryRunningModels(ActionListener<SearchResponse> listener) {

@SuppressWarnings("unchecked")
private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeployArrangement) {
if (modelAutoRedeployArrangement == null) {
log.info("No more models in arrangement, skipping the redeployment");
startCronjobAndClearListener();
return;
}
String modelId = modelAutoRedeployArrangement.getSearchResponse().getId();
List<String> addedNodes = modelAutoRedeployArrangement.getAddedNodes();
Map<String, Object> sourceAsMap = modelAutoRedeployArrangement.getSearchResponse().getSourceAsMap();
String functionName = (String) Optional
.ofNullable(sourceAsMap.get(MLModel.FUNCTION_NAME_FIELD))
.orElse(sourceAsMap.get(MLModel.ALGORITHM_FIELD));
if (functionName == null) {
log
.error(
"Model function_name or algorithm is null, model is not in correct status, please check the model, model id is: {}",
modelId
);
redeployAModel();
return;
}
if (FunctionName.REMOTE == FunctionName.from(functionName)) {
log.info("Skipping redeploying remote model {} as remote model deployment can be done at prediction time.", modelId);
redeployAModel();
return;
}
List<String> planningWorkerNodes = (List<String>) sourceAsMap.get(MLModel.PLANNING_WORKER_NODES_FIELD);
Integer autoRedeployRetryTimes = (Integer) sourceAsMap.get(MLModel.AUTO_REDEPLOY_RETRY_TIMES_FIELD);
Boolean deployToAllNodes = (Boolean) Optional.ofNullable(sourceAsMap.get(MLModel.DEPLOY_TO_ALL_NODES_FIELD)).orElse(false);
List<String> planningWorkerNodes = (List<String>) modelAutoRedeployArrangement
.getSearchResponse()
.getSourceAsMap()
.get(MLModel.PLANNING_WORKER_NODES_FIELD);
Integer autoRedeployRetryTimes = (Integer) modelAutoRedeployArrangement
.getSearchResponse()
.getSourceAsMap()
.get(MLModel.AUTO_REDEPLOY_RETRY_TIMES_FIELD);
Boolean deployToAllNodes = (Boolean) Optional
.ofNullable(modelAutoRedeployArrangement.getSearchResponse().getSourceAsMap().get(MLModel.DEPLOY_TO_ALL_NODES_FIELD))
.orElse(false);
// calculate node ids.
String[] nodeIds = null;
if (deployToAllNodes || !allowCustomDeploymentPlan) {
Expand All @@ -309,7 +286,6 @@ private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeploy
.info(
"Allow custom deployment plan is true and deploy to all nodes is false and added nodes are not in planning worker nodes list, not to auto redeploy the model to the new nodes!"
);
redeployAModel();
return;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -609,34 +609,6 @@ public void test_redeployAModel_with_needRedeployArray_isEmpty() {
mlModelAutoReDeployer.redeployAModel();
}

public void test_buildAutoReloadArrangement_skippingRemoteModel_success() throws Exception {
Settings settings = Settings
.builder()
.put(ML_COMMONS_ONLY_RUN_ON_ML_NODE.getKey(), true)
.put(ML_COMMONS_MODEL_AUTO_REDEPLOY_LIFETIME_RETRY_TIMES.getKey(), 3)
.put(ML_COMMONS_MODEL_AUTO_REDEPLOY_ENABLE.getKey(), true)
.put(ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN.getKey(), false)
.build();

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.localNode()).thenReturn(localNode);
when(clusterService.getClusterSettings()).thenReturn(getClusterSettings(settings));
mockClusterDataNodes(clusterService);

mlModelAutoReDeployer = spy(
new MLModelAutoReDeployer(clusterService, client, settings, mlModelManager, searchRequestBuilderFactory)
);

SearchResponse searchResponse = buildDeployToAllNodesTrueSearchResponse("RemoteModelResult.json");
doAnswer(invocation -> {
ActionListener<SearchResponse> listener = invocation.getArgument(0);
listener.onResponse(searchResponse);
return null;
}).when(searchRequestBuilder).execute(isA(ActionListener.class));
mlModelAutoReDeployer.buildAutoReloadArrangement(addedNodes, clusterManagerNodeId);
verify(client, never()).execute(any(MLDeployModelAction.class), any(MLDeployModelRequest.class), any(ActionListener.class));
}

private SearchResponse buildDeployToAllNodesTrueSearchResponse(String file) throws Exception {
MLModel mlModel = buildModelWithJsonFile(file);
return createResponseWithModel(mlModel);
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 447f0b4

Please sign in to comment.