Skip to content

Commit eb8c4ba

Browse files
authored
Keep track of desired nodes status in cluster state (#87474)
This commit adds desired nodes status tracking to the cluster state. Previously status was tracked in-memory by DesiredNodesMembershipService this approach had certain limitations, and made the consumer code more complex. This takes a simpler approach to keep the status updated when the desired nodes are updated or when a new node joins, storing the status in the cluster state, this allows to consume that information easily where it is necessary. Additionally, this commit moves test code from depending directly of DesiredNodes which can be seen as an internal data structure to rely more on UpdateDesiredNodesRequest. Relates #84165
1 parent 43b415f commit eb8c4ba

File tree

25 files changed

+1311
-731
lines changed

25 files changed

+1311
-731
lines changed

docs/changelog/87474.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 87474
2+
summary: Keep track of desired nodes status in cluster state
3+
area: Autoscaling
4+
type: enhancement
5+
issues: []

libs/x-content/src/main/java/org/elasticsearch/xcontent/XContentBuilder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -977,13 +977,17 @@ public XContentBuilder stringListField(String name, Collection<String> values) t
977977
}
978978

979979
public XContentBuilder xContentList(String name, Collection<? extends ToXContent> values) throws IOException {
980+
return xContentList(name, values, ToXContent.EMPTY_PARAMS);
981+
}
982+
983+
public XContentBuilder xContentList(String name, Collection<? extends ToXContent> values, ToXContent.Params params) throws IOException {
980984
field(name);
981985
if (values == null) {
982986
return nullValue();
983987
}
984988
startArray();
985989
for (ToXContent value : values) {
986-
value(value);
990+
value(value, params);
987991
}
988992
endArray();
989993
return this;

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DesiredNodesUpgradeIT.java

Lines changed: 103 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,25 @@
99
package org.elasticsearch.upgrades;
1010

1111
import org.elasticsearch.Version;
12+
import org.elasticsearch.action.admin.cluster.desirednodes.UpdateDesiredNodesRequest;
1213
import org.elasticsearch.client.Request;
13-
import org.elasticsearch.client.Response;
1414
import org.elasticsearch.client.ResponseException;
1515
import org.elasticsearch.cluster.metadata.DesiredNode;
16+
import org.elasticsearch.cluster.metadata.DesiredNodeWithStatus;
17+
import org.elasticsearch.common.Strings;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.unit.ByteSizeValue;
20+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
21+
import org.elasticsearch.xcontent.json.JsonXContent;
1622

23+
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
1729
import static org.hamcrest.Matchers.equalTo;
30+
import static org.hamcrest.Matchers.greaterThan;
1831
import static org.hamcrest.Matchers.is;
1932

2033
public class DesiredNodesUpgradeIT extends AbstractRollingTestCase {
@@ -25,93 +38,119 @@ public void testUpgradeDesiredNodes() throws Exception {
2538
}
2639

2740
switch (CLUSTER_TYPE) {
28-
case OLD -> {
29-
var response = updateDesiredNodes(1, desiredNodesWithIntegerProcessor());
30-
var statusCode = response.getStatusLine().getStatusCode();
31-
assertThat(statusCode, equalTo(200));
32-
}
41+
case OLD -> addClusterNodesToDesiredNodesWithIntegerProcessors(1);
3342
case MIXED -> {
34-
final var historyVersion = FIRST_MIXED_ROUND ? 2 : 3;
43+
int version = FIRST_MIXED_ROUND ? 2 : 3;
3544
if (UPGRADE_FROM_VERSION.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) {
36-
var response = updateDesiredNodes(historyVersion, desiredNodesWithRangeOrFloatProcessors());
37-
var statusCode = response.getStatusLine().getStatusCode();
38-
assertThat(statusCode, equalTo(200));
45+
addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(version);
3946
} else {
4047
// Processor ranges or float processors are forbidden during upgrades: 8.2 -> 8.3 clusters
4148
final var responseException = expectThrows(
4249
ResponseException.class,
43-
() -> updateDesiredNodes(historyVersion, desiredNodesWithRangeOrFloatProcessors())
50+
() -> addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(version)
4451
);
45-
var statusCode = responseException.getResponse().getStatusLine().getStatusCode();
52+
final var statusCode = responseException.getResponse().getStatusLine().getStatusCode();
4653
assertThat(statusCode, is(equalTo(400)));
4754
}
4855
}
4956
case UPGRADED -> {
50-
var response = updateDesiredNodes(4, desiredNodesWithRangeOrFloatProcessors());
51-
var statusCode = response.getStatusLine().getStatusCode();
52-
assertThat(statusCode, equalTo(200));
57+
assertAllDesiredNodesAreActualized();
58+
addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(4);
5359
}
5460
}
5561

5662
final var getDesiredNodesRequest = new Request("GET", "/_internal/desired_nodes/_latest");
57-
Response response = client().performRequest(getDesiredNodesRequest);
63+
final var response = client().performRequest(getDesiredNodesRequest);
5864
assertThat(response.getStatusLine().getStatusCode(), is(equalTo(200)));
5965
}
6066

61-
private Response updateDesiredNodes(int version, String body) throws Exception {
62-
final var updateDesiredNodesRequest = new Request("PUT", "/_internal/desired_nodes/history/" + version);
63-
updateDesiredNodesRequest.setJsonEntity(body);
64-
return client().performRequest(updateDesiredNodesRequest);
67+
private void assertAllDesiredNodesAreActualized() throws Exception {
68+
final var request = new Request("GET", "_cluster/state/metadata");
69+
final var response = client().performRequest(request);
70+
assertThat(response.getStatusLine().getStatusCode(), is(equalTo(200)));
71+
Map<String, Object> responseMap = responseAsMap(response);
72+
List<Map<String, Object>> nodes = extractValue(responseMap, "metadata.desired_nodes.latest.nodes");
73+
assertThat(nodes.size(), is(greaterThan(0)));
74+
for (Map<String, Object> desiredNode : nodes) {
75+
final int status = extractValue(desiredNode, "status");
76+
assertThat((short) status, is(equalTo(DesiredNodeWithStatus.Status.ACTUALIZED.getValue())));
77+
}
6578
}
6679

67-
private String desiredNodesWithRangeOrFloatProcessors() {
80+
private void addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(int version) throws Exception {
81+
final List<DesiredNode> nodes;
6882
if (randomBoolean()) {
69-
return """
70-
{
71-
"nodes" : [
72-
{
73-
"settings" : {
74-
"node.name" : "instance-000187"
75-
},
76-
"processors_range" : {"min": 9.0, "max": 10.0},
77-
"memory" : "58gb",
78-
"storage" : "1tb",
79-
"node_version" : "99.1.0"
80-
}
81-
]
82-
}""";
83+
nodes = getNodeNames().stream()
84+
.map(
85+
nodeName -> new DesiredNode(
86+
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
87+
0.5f,
88+
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
89+
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
90+
Version.CURRENT
91+
)
92+
)
93+
.toList();
8394
} else {
84-
return """
85-
{
86-
"nodes" : [
87-
{
88-
"settings" : {
89-
"node.name" : "instance-000187"
90-
},
91-
"processors" : 9.5,
92-
"memory" : "58gb",
93-
"storage" : "1tb",
94-
"node_version" : "99.1.0"
95-
}
96-
]
97-
}""";
95+
nodes = getNodeNames().stream()
96+
.map(
97+
nodeName -> new DesiredNode(
98+
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
99+
new DesiredNode.ProcessorsRange(randomIntBetween(1, 10), (float) randomIntBetween(20, 30)),
100+
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
101+
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
102+
Version.CURRENT
103+
)
104+
)
105+
.toList();
98106
}
107+
updateDesiredNodes(nodes, version);
108+
}
109+
110+
private void addClusterNodesToDesiredNodesWithIntegerProcessors(int version) throws Exception {
111+
final var nodes = getNodeNames().stream()
112+
.map(
113+
nodeName -> new DesiredNode(
114+
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
115+
randomIntBetween(1, 24),
116+
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
117+
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
118+
Version.CURRENT
119+
)
120+
)
121+
.toList();
122+
updateDesiredNodes(nodes, version);
123+
}
124+
125+
private void updateDesiredNodes(List<DesiredNode> nodes, int version) throws IOException {
126+
final var request = new Request("PUT", "/_internal/desired_nodes/history/" + version);
127+
try (var builder = JsonXContent.contentBuilder()) {
128+
builder.startObject();
129+
builder.xContentList(UpdateDesiredNodesRequest.NODES_FIELD.getPreferredName(), nodes);
130+
builder.endObject();
131+
request.setJsonEntity(Strings.toString(builder));
132+
final var response = client().performRequest(request);
133+
final var statusCode = response.getStatusLine().getStatusCode();
134+
assertThat(statusCode, equalTo(200));
135+
}
136+
}
137+
138+
private List<String> getNodeNames() throws Exception {
139+
final var request = new Request("GET", "/_nodes");
140+
final var response = client().performRequest(request);
141+
Map<String, Object> responseMap = responseAsMap(response);
142+
Map<String, Map<String, Object>> nodes = extractValue(responseMap, "nodes");
143+
final List<String> nodeNames = new ArrayList<>();
144+
for (Map.Entry<String, Map<String, Object>> nodeInfoEntry : nodes.entrySet()) {
145+
final String nodeName = extractValue(nodeInfoEntry.getValue(), "name");
146+
nodeNames.add(nodeName);
147+
}
148+
149+
return nodeNames;
99150
}
100151

101-
private String desiredNodesWithIntegerProcessor() {
102-
return """
103-
{
104-
"nodes" : [
105-
{
106-
"settings" : {
107-
"node.name" : "instance-000187"
108-
},
109-
"processors" : 9,
110-
"memory" : "58gb",
111-
"storage" : "1tb",
112-
"node_version" : "99.1.0"
113-
}
114-
]
115-
}""";
152+
@SuppressWarnings("unchecked")
153+
private static <T> T extractValue(Map<String, Object> map, String path) {
154+
return (T) XContentMapValues.extractValue(path, map);
116155
}
117156
}

0 commit comments

Comments
 (0)