Skip to content

Commit b52c6f7

Browse files
Add node write load to the ClusterInfo (#130411)
Sets up ClusterInfoService to collect node write load and pass it into ClusterInfo. The node write load stats are not yet supplied, they'll be zero/empty in the ClusterInfo for now. Relates ES-11990
1 parent b50ea91 commit b52c6f7

File tree

30 files changed

+446
-28
lines changed

30 files changed

+446
-28
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
*/
99
package org.elasticsearch.index.shard;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.apache.lucene.index.DirectoryReader;
1214
import org.apache.lucene.store.LockObtainFailedException;
1315
import org.apache.lucene.util.SetOnce;
@@ -22,13 +24,16 @@
2224
import org.elasticsearch.cluster.EstimatedHeapUsage;
2325
import org.elasticsearch.cluster.EstimatedHeapUsageCollector;
2426
import org.elasticsearch.cluster.InternalClusterInfoService;
27+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
28+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector;
2529
import org.elasticsearch.cluster.metadata.IndexMetadata;
2630
import org.elasticsearch.cluster.node.DiscoveryNode;
2731
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
2832
import org.elasticsearch.cluster.routing.RecoverySource;
2933
import org.elasticsearch.cluster.routing.ShardRouting;
3034
import org.elasticsearch.cluster.routing.ShardRoutingState;
3135
import org.elasticsearch.cluster.routing.UnassignedInfo;
36+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
3237
import org.elasticsearch.cluster.service.ClusterService;
3338
import org.elasticsearch.common.Strings;
3439
import org.elasticsearch.common.UUIDs;
@@ -73,6 +78,7 @@
7378
import org.elasticsearch.test.ESSingleNodeTestCase;
7479
import org.elasticsearch.test.IndexSettingsModule;
7580
import org.elasticsearch.test.InternalSettingsPlugin;
81+
import org.elasticsearch.threadpool.ThreadPool;
7682
import org.elasticsearch.xcontent.XContentType;
7783
import org.junit.Assert;
7884

@@ -85,6 +91,7 @@
8591
import java.util.Arrays;
8692
import java.util.Collection;
8793
import java.util.Collections;
94+
import java.util.HashMap;
8895
import java.util.List;
8996
import java.util.Locale;
9097
import java.util.Map;
@@ -117,14 +124,20 @@
117124
import static org.hamcrest.Matchers.either;
118125
import static org.hamcrest.Matchers.equalTo;
119126
import static org.hamcrest.Matchers.greaterThan;
127+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
120128
import static org.hamcrest.Matchers.instanceOf;
121129
import static org.hamcrest.Matchers.lessThanOrEqualTo;
122130

123131
public class IndexShardIT extends ESSingleNodeTestCase {
132+
private static final Logger logger = LogManager.getLogger(IndexShardIT.class);
124133

125134
@Override
126135
protected Collection<Class<? extends Plugin>> getPlugins() {
127-
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class);
136+
return pluginList(
137+
InternalSettingsPlugin.class,
138+
BogusEstimatedHeapUsagePlugin.class,
139+
BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class
140+
);
128141
}
129142

130143
public void testLockTryingToDelete() throws Exception {
@@ -295,6 +308,53 @@ public void testHeapUsageEstimateIsPresent() {
295308
}
296309
}
297310

311+
public void testNodeWriteLoadsArePresent() {
312+
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
313+
ClusterInfoServiceUtils.refresh(clusterInfoService);
314+
Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolStats = clusterInfoService.getClusterInfo()
315+
.getNodeUsageStatsForThreadPools();
316+
assertNotNull(nodeThreadPoolStats);
317+
/** Not collecting stats yet because allocation write load stats collection is disabled by default.
318+
* see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */
319+
assertTrue(nodeThreadPoolStats.isEmpty());
320+
321+
// Enable collection for node write loads.
322+
updateClusterSettings(
323+
Settings.builder()
324+
.put(
325+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
326+
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
327+
)
328+
.build()
329+
);
330+
try {
331+
// Force a ClusterInfo refresh to run collection of the node thread pool usage stats.
332+
ClusterInfoServiceUtils.refresh(clusterInfoService);
333+
nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools();
334+
335+
/** Verify that each node has usage stats reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} implementation
336+
* generates random usage values */
337+
ClusterState state = getInstanceFromNode(ClusterService.class).state();
338+
assertEquals(state.nodes().size(), nodeThreadPoolStats.size());
339+
for (DiscoveryNode node : state.nodes()) {
340+
assertTrue(nodeThreadPoolStats.containsKey(node.getId()));
341+
NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = nodeThreadPoolStats.get(node.getId());
342+
assertThat(nodeUsageStatsForThreadPools.nodeId(), equalTo(node.getId()));
343+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = nodeUsageStatsForThreadPools
344+
.threadPoolUsageStatsMap()
345+
.get(ThreadPool.Names.WRITE);
346+
assertNotNull(writeThreadPoolStats);
347+
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0));
348+
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
349+
assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
350+
}
351+
} finally {
352+
updateClusterSettings(
353+
Settings.builder().putNull(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey()).build()
354+
);
355+
}
356+
}
357+
298358
public void testIndexCanChangeCustomDataPath() throws Exception {
299359
final String index = "test-custom-data-path";
300360
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));
@@ -875,4 +935,61 @@ public ClusterService getClusterService() {
875935
return clusterService.get();
876936
}
877937
}
938+
939+
/**
940+
* A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random
941+
* {@link NodeUsageStatsForThreadPools} for each node in the cluster.
942+
* <p>
943+
* Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the
944+
* plugin system can pick it up and use it for the test set-up.
945+
*/
946+
public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector {
947+
948+
private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin;
949+
950+
public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) {
951+
this.plugin = plugin;
952+
}
953+
954+
@Override
955+
public void collectUsageStats(ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener) {
956+
ActionListener.completeWith(
957+
listener,
958+
() -> plugin.getClusterService()
959+
.state()
960+
.nodes()
961+
.stream()
962+
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId())))
963+
);
964+
}
965+
966+
private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) {
967+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
968+
randomNonNegativeInt(),
969+
randomFloat(),
970+
randomNonNegativeLong()
971+
);
972+
Map<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats> statsForThreadPools = new HashMap<>();
973+
statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats);
974+
return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools);
975+
}
976+
}
977+
978+
/**
979+
* Make a plugin to gain access to the {@link ClusterService} instance.
980+
*/
981+
public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin {
982+
983+
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
984+
985+
@Override
986+
public Collection<?> createComponents(PluginServices services) {
987+
clusterService.set(services.clusterService());
988+
return List.of();
989+
}
990+
991+
public ClusterService getClusterService() {
992+
return clusterService.get();
993+
}
994+
}
878995
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#
2+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
# or more contributor license agreements. Licensed under the "Elastic License
4+
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
# Public License v 1"; you may not use this file except in compliance with, at
6+
# your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
# License v3.0 only", or the "Server Side Public License, v 1".
8+
#
9+
10+
org.elasticsearch.index.shard.IndexShardIT$BogusNodeUsageStatsForThreadPoolsCollector

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ static TransportVersion def(int id) {
337337
public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_EMBEDDING_TYPE = def(9_118_0_00);
338338
public static final TransportVersion ESQL_FIXED_INDEX_LIKE = def(9_119_0_00);
339339
public static final TransportVersion LOOKUP_JOIN_CCS = def(9_120_0_00);
340+
public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00);
340341

341342
/*
342343
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
5858
final Map<NodeAndShard, String> dataPath;
5959
final Map<NodeAndPath, ReservedSpace> reservedSpace;
6060
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
61+
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools;
6162

6263
protected ClusterInfo() {
63-
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
64+
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
6465
}
6566

6667
/**
@@ -73,6 +74,7 @@ protected ClusterInfo() {
7374
* @param dataPath the shard routing to datapath mapping
7475
* @param reservedSpace reserved space per shard broken down by node and data path
7576
* @param estimatedHeapUsages estimated heap usage broken down by node
77+
* @param nodeUsageStatsForThreadPools node-level usage stats (operational load) broken down by node
7678
* @see #shardIdentifierFromRouting
7779
*/
7880
public ClusterInfo(
@@ -82,7 +84,8 @@ public ClusterInfo(
8284
Map<ShardId, Long> shardDataSetSizes,
8385
Map<NodeAndShard, String> dataPath,
8486
Map<NodeAndPath, ReservedSpace> reservedSpace,
85-
Map<String, EstimatedHeapUsage> estimatedHeapUsages
87+
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
88+
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools
8689
) {
8790
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
8891
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
@@ -91,6 +94,7 @@ public ClusterInfo(
9194
this.dataPath = Map.copyOf(dataPath);
9295
this.reservedSpace = Map.copyOf(reservedSpace);
9396
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
97+
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
9498
}
9599

96100
public ClusterInfo(StreamInput in) throws IOException {
@@ -107,6 +111,11 @@ public ClusterInfo(StreamInput in) throws IOException {
107111
} else {
108112
this.estimatedHeapUsages = Map.of();
109113
}
114+
if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) {
115+
this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::new);
116+
} else {
117+
this.nodeUsageStatsForThreadPools = Map.of();
118+
}
110119
}
111120

112121
@Override
@@ -124,6 +133,9 @@ public void writeTo(StreamOutput out) throws IOException {
124133
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
125134
out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable);
126135
}
136+
if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) {
137+
out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable);
138+
}
127139
}
128140

129141
/**
@@ -204,8 +216,8 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
204216
return builder.endObject(); // NodeAndPath
205217
}),
206218
endArray() // end "reserved_sizes"
207-
// NOTE: We don't serialize estimatedHeapUsages at this stage, to avoid
208-
// committing to API payloads until the feature is settled
219+
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools at this stage, to avoid
220+
// committing to API payloads until the features are settled
209221
);
210222
}
211223

@@ -220,6 +232,13 @@ public Map<String, EstimatedHeapUsage> getEstimatedHeapUsages() {
220232
return estimatedHeapUsages;
221233
}
222234

235+
/**
236+
* Returns a map containing thread pool usage stats for each node, keyed by node ID.
237+
*/
238+
public Map<String, NodeUsageStatsForThreadPools> getNodeUsageStatsForThreadPools() {
239+
return nodeUsageStatsForThreadPools;
240+
}
241+
223242
/**
224243
* Returns a node id to disk usage mapping for the path that has the least available space on the node.
225244
* Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space.
@@ -311,12 +330,21 @@ public boolean equals(Object o) {
311330
&& shardSizes.equals(that.shardSizes)
312331
&& shardDataSetSizes.equals(that.shardDataSetSizes)
313332
&& dataPath.equals(that.dataPath)
314-
&& reservedSpace.equals(that.reservedSpace);
333+
&& reservedSpace.equals(that.reservedSpace)
334+
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools);
315335
}
316336

317337
@Override
318338
public int hashCode() {
319-
return Objects.hash(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, shardDataSetSizes, dataPath, reservedSpace);
339+
return Objects.hash(
340+
leastAvailableSpaceUsage,
341+
mostAvailableSpaceUsage,
342+
shardSizes,
343+
shardDataSetSizes,
344+
dataPath,
345+
reservedSpace,
346+
nodeUsageStatsForThreadPools
347+
);
320348
}
321349

322350
@Override

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class ClusterInfoSimulator {
3434
private final Map<ShardId, Long> shardDataSetSizes;
3535
private final Map<NodeAndShard, String> dataPath;
3636
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
37+
private final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats;
3738

3839
public ClusterInfoSimulator(RoutingAllocation allocation) {
3940
this.allocation = allocation;
@@ -43,6 +44,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
4344
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
4445
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
4546
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
47+
this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
4648
}
4749

4850
/**
@@ -156,7 +158,8 @@ public ClusterInfo getClusterInfo() {
156158
shardDataSetSizes,
157159
dataPath,
158160
Map.of(),
159-
estimatedHeapUsages
161+
estimatedHeapUsages,
162+
nodeThreadPoolUsageStats
160163
);
161164
}
162165
}

0 commit comments

Comments
 (0)