Skip to content

Commit 45c56b2

Browse files
[ML] Rebalance should not notify listener before update is applied (#88012)
When we are rebalancing trained model assignments, we eventually update cluster state with the new metadata. We only want to notify the listener after we have applied the cluster state update. This commit fixes a bug where we could notify before the update was actually applied resulting in NPE when writing the response of the `CreateTrainedModelAssignmentAction`. This is a fix following the work done in #87366.
1 parent b8c2f44 commit 45c56b2

File tree

3 files changed

+15
-1
lines changed

3 files changed

+15
-1
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ public boolean test(ClusterState clusterState) {
450450
.orElse(null);
451451
if (trainedModelAssignment == null) {
452452
// Something weird happened, it should NEVER be null...
453+
logger.trace(() -> format("[%s] assignment was null while waiting for state [%s]", modelId, waitForState));
453454
return true;
454455
}
455456

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ private static ClusterState update(ClusterState currentState, TrainedModelAssign
358358
}
359359

360360
private static ClusterState forceUpdate(ClusterState currentState, TrainedModelAssignmentMetadata.Builder modelAssignments) {
361+
logger.debug(() -> format("updated assignments: %s", modelAssignments.build()));
361362
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
362363
if (currentState.getNodes().getMinNodeVersion().onOrAfter(RENAME_ALLOCATION_TO_ASSIGNMENT_VERSION)) {
363364
metadata.putCustom(TrainedModelAssignmentMetadata.NAME, modelAssignments.build())
@@ -389,10 +390,14 @@ private void rebalanceAssignments(
389390
}
390391

391392
submitUnbatchedTask(reason, new ClusterStateUpdateTask() {
393+
394+
private volatile boolean isUpdated;
395+
392396
@Override
393397
public ClusterState execute(ClusterState currentState) {
394398

395399
if (areClusterStatesCompatibleForRebalance(clusterState, currentState)) {
400+
isUpdated = true;
396401
return update(currentState, rebalancedMetadata);
397402
}
398403
rebalanceAssignments(currentState, modelToAdd, reason, listener);
@@ -406,7 +411,9 @@ public void onFailure(Exception e) {
406411

407412
@Override
408413
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
409-
listener.onResponse(TrainedModelAssignmentMetadata.fromState(newState));
414+
if (isUpdated) {
415+
listener.onResponse(TrainedModelAssignmentMetadata.fromState(newState));
416+
}
410417
}
411418
});
412419
});

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentMetadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.NamedDiff;
1717
import org.elasticsearch.cluster.SimpleDiffable;
1818
import org.elasticsearch.cluster.metadata.Metadata;
19+
import org.elasticsearch.common.Strings;
1920
import org.elasticsearch.common.io.stream.StreamInput;
2021
import org.elasticsearch.common.io.stream.StreamOutput;
2122
import org.elasticsearch.xcontent.XContentBuilder;
@@ -149,6 +150,11 @@ public int hashCode() {
149150
return Objects.hash(modelRoutingEntries);
150151
}
151152

153+
@Override
154+
public String toString() {
155+
return Strings.toString(this);
156+
}
157+
152158
public static class Builder {
153159

154160
public static Builder empty() {

0 commit comments

Comments
 (0)