Skip to content

Commit f0f0da2

Browse files
authored
[7.x] Add telemetry for data tiers (#63031) (#63140)
Backports the following commits to 7.x: Add telemetry for data tiers (#63031)
1 parent 6a9cde2 commit f0f0da2

File tree

11 files changed

+597
-1
lines changed

11 files changed

+597
-1
lines changed

docs/reference/rest-api/info.asciidoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ Example response:
154154
"data_streams" : {
155155
"available" : true,
156156
"enabled" : true,
157+
},
158+
"data_tiers" : {
159+
"available" : true,
160+
"enabled" : true,
157161
}
158162
},
159163
"tagline" : "You know, for X"

docs/reference/rest-api/usage.asciidoc

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,58 @@ GET /_xpack/usage
295295
"enabled" : true,
296296
"data_streams" : 0,
297297
"indices_count" : 0
298+
},
299+
"data_tiers" : {
300+
"available" : true,
301+
"enabled" : true,
302+
"data_warm" : {
303+
"node_count" : 0,
304+
"index_count" : 0,
305+
"total_shard_count" : 0,
306+
"primary_shard_count" : 0,
307+
"doc_count" : 0,
308+
"total_size_bytes" : 0,
309+
"primary_size_bytes" : 0,
310+
"primary_shard_size_avg_bytes" : 0,
311+
"primary_shard_size_median_bytes" : 0,
312+
"primary_shard_size_mad_bytes" : 0
313+
},
314+
"data_cold" : {
315+
"node_count" : 0,
316+
"index_count" : 0,
317+
"total_shard_count" : 0,
318+
"primary_shard_count" : 0,
319+
"doc_count" : 0,
320+
"total_size_bytes" : 0,
321+
"primary_size_bytes" : 0,
322+
"primary_shard_size_avg_bytes" : 0,
323+
"primary_shard_size_median_bytes" : 0,
324+
"primary_shard_size_mad_bytes" : 0
325+
},
326+
"data_content" : {
327+
"node_count" : 0,
328+
"index_count" : 0,
329+
"total_shard_count" : 0,
330+
"primary_shard_count" : 0,
331+
"doc_count" : 0,
332+
"total_size_bytes" : 0,
333+
"primary_size_bytes" : 0,
334+
"primary_shard_size_avg_bytes" : 0,
335+
"primary_shard_size_median_bytes" : 0,
336+
"primary_shard_size_mad_bytes" : 0
337+
},
338+
"data_hot" : {
339+
"node_count" : 0,
340+
"index_count" : 0,
341+
"total_shard_count" : 0,
342+
"primary_shard_count" : 0,
343+
"doc_count" : 0,
344+
"total_size_bytes" : 0,
345+
"primary_size_bytes" : 0,
346+
"primary_shard_size_avg_bytes" : 0,
347+
"primary_shard_size_median_bytes" : 0,
348+
"primary_shard_size_mad_bytes" : 0
349+
}
298350
}
299351
}
300352
------------------------------------------------------------

x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierIT.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,23 @@
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
1414
import org.elasticsearch.cluster.metadata.Template;
1515
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.index.mapper.MapperService;
1617
import org.elasticsearch.plugins.Plugin;
1718
import org.elasticsearch.test.ESIntegTestCase;
1819
import org.elasticsearch.xpack.core.DataTier;
20+
import org.elasticsearch.xpack.core.DataTiersFeatureSetUsage;
1921
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
22+
import org.elasticsearch.xpack.core.XPackFeatureSet;
23+
import org.elasticsearch.xpack.core.action.XPackUsageRequestBuilder;
24+
import org.elasticsearch.xpack.core.action.XPackUsageResponse;
2025

2126
import java.util.Arrays;
2227
import java.util.Collection;
2328
import java.util.Collections;
29+
import java.util.stream.Collectors;
2430

2531
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
2633

2734
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0)
2835
public class DataTierIT extends ESIntegTestCase {
@@ -194,6 +201,64 @@ public void testTemplateOverridesDefaults() {
194201
ensureYellow(index);
195202
}
196203

204+
public void testDataTierTelemetry() {
205+
startContentOnlyNode();
206+
startContentOnlyNode();
207+
startHotOnlyNode();
208+
209+
client().admin().indices().prepareCreate(index)
210+
.setSettings(Settings.builder()
211+
.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, "data_hot")
212+
.put("index.number_of_shards", 2)
213+
.put("index.number_of_replicas", 0))
214+
.setWaitForActiveShards(0)
215+
.get();
216+
217+
client().admin().indices().prepareCreate(index + "2")
218+
.setSettings(Settings.builder()
219+
.put("index.number_of_shards", 1)
220+
.put("index.number_of_replicas", 1))
221+
.setWaitForActiveShards(0)
222+
.get();
223+
224+
ensureGreen();
225+
client().prepareIndex(index, MapperService.SINGLE_MAPPING_NAME).setSource("foo", "bar").get();
226+
client().prepareIndex(index + "2", MapperService.SINGLE_MAPPING_NAME).setSource("foo", "bar").get();
227+
client().prepareIndex(index + "2", MapperService.SINGLE_MAPPING_NAME).setSource("foo", "bar").get();
228+
refresh(index, index + "2");
229+
230+
DataTiersFeatureSetUsage usage = getUsage();
231+
// We can't guarantee that internal indices aren't created, so some of these are >= checks
232+
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).nodeCount, equalTo(2));
233+
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).indexCount, greaterThanOrEqualTo(1));
234+
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).totalShardCount, greaterThanOrEqualTo(2));
235+
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).primaryShardCount, greaterThanOrEqualTo(1));
236+
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).docCount, greaterThanOrEqualTo(2L));
237+
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).primaryByteCount, greaterThanOrEqualTo(1L));
238+
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).primaryByteCountMedian, greaterThanOrEqualTo(1L));
239+
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).primaryShardBytesMAD, greaterThanOrEqualTo(0L));
240+
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).nodeCount, equalTo(1));
241+
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).indexCount, greaterThanOrEqualTo(1));
242+
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).totalShardCount, greaterThanOrEqualTo(2));
243+
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).primaryShardCount, greaterThanOrEqualTo(2));
244+
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).docCount, greaterThanOrEqualTo(1L));
245+
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).primaryByteCount, greaterThanOrEqualTo(1L));
246+
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).primaryByteCountMedian, greaterThanOrEqualTo(1L));
247+
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).primaryShardBytesMAD, greaterThanOrEqualTo(0L));
248+
}
249+
250+
private DataTiersFeatureSetUsage getUsage() {
251+
XPackUsageResponse usages = new XPackUsageRequestBuilder(client()).execute().actionGet();
252+
XPackFeatureSet.Usage dtUsage = usages.getUsages().stream()
253+
.filter(u -> u instanceof DataTiersFeatureSetUsage)
254+
.collect(Collectors.toList())
255+
.get(0);
256+
if (dtUsage == null) {
257+
throw new IllegalArgumentException("no data tier usage found");
258+
}
259+
return (DataTiersFeatureSetUsage) dtUsage;
260+
}
261+
197262
public void startDataNode() {
198263
Settings nodeSettings = Settings.builder()
199264
.putList("node.roles", Arrays.asList("master", "data", "ingest"))

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.elasticsearch.index.shard.IndexSettingProvider;
1717
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
1818

19+
import java.util.Arrays;
20+
import java.util.HashSet;
1921
import java.util.Set;
2022

2123
/**
@@ -34,6 +36,8 @@ public class DataTier {
3436
public static final String DATA_WARM = "data_warm";
3537
public static final String DATA_COLD = "data_cold";
3638

39+
public static final Set<String> ALL_DATA_TIERS = new HashSet<>(Arrays.asList(DATA_CONTENT, DATA_HOT, DATA_WARM, DATA_COLD));
40+
3741
/**
3842
* Returns true if the given tier name is a valid tier
3943
*/
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core;
8+
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
11+
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
12+
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
13+
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
16+
import org.elasticsearch.cluster.routing.RoutingNode;
17+
import org.elasticsearch.cluster.routing.RoutingNodes;
18+
import org.elasticsearch.cluster.routing.ShardRouting;
19+
import org.elasticsearch.cluster.routing.ShardRoutingState;
20+
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.inject.Inject;
22+
import org.elasticsearch.index.Index;
23+
import org.elasticsearch.index.store.StoreStats;
24+
import org.elasticsearch.search.aggregations.metrics.TDigestState;
25+
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Set;
30+
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.concurrent.atomic.AtomicLong;
32+
import java.util.stream.Collectors;
33+
34+
public class DataTiersFeatureSet implements XPackFeatureSet {
35+
36+
private final Client client;
37+
private final ClusterService clusterService;
38+
39+
@Inject
40+
public DataTiersFeatureSet(Client client, ClusterService clusterService) {
41+
this.client = client;
42+
this.clusterService = clusterService;
43+
}
44+
45+
@Override
46+
public String name() {
47+
return XPackField.DATA_TIERS;
48+
}
49+
50+
@Override
51+
public boolean available() {
52+
return true;
53+
}
54+
55+
@Override
56+
public boolean enabled() {
57+
return true;
58+
}
59+
60+
@Override
61+
public Map<String, Object> nativeCodeInfo() {
62+
return null;
63+
}
64+
65+
@Override
66+
public void usage(ActionListener<Usage> listener) {
67+
final ClusterState state = clusterService.state();
68+
client.admin().cluster().prepareNodesStats()
69+
.all()
70+
.setIndices(CommonStatsFlags.ALL)
71+
.execute(ActionListener.wrap(nodesStatsResponse -> {
72+
final RoutingNodes routingNodes = state.getRoutingNodes();
73+
74+
// First separate the nodes into separate tiers, note that nodes *may* be duplicated
75+
Map<String, List<NodeStats>> tierSpecificNodeStats = separateTiers(nodesStatsResponse);
76+
77+
// Generate tier specific stats for the nodes
78+
Map<String, DataTiersFeatureSetUsage.TierSpecificStats> tierSpecificStats = tierSpecificNodeStats.entrySet()
79+
.stream().collect(Collectors.toMap(Map.Entry::getKey, ns -> calculateStats(ns.getValue(), routingNodes)));
80+
81+
listener.onResponse(new DataTiersFeatureSetUsage(tierSpecificStats));
82+
}, listener::onFailure));
83+
}
84+
85+
// Visible for testing
86+
static Map<String, List<NodeStats>> separateTiers(NodesStatsResponse nodesStatsResponse) {
87+
Map<String, List<NodeStats>> responses = new HashMap<>();
88+
DataTier.ALL_DATA_TIERS.forEach(tier ->
89+
responses.put(tier, nodesStatsResponse.getNodes().stream()
90+
.filter(stats -> stats.getNode().getRoles().stream()
91+
.map(DiscoveryNodeRole::roleName)
92+
.anyMatch(rn -> rn.equals(tier)))
93+
.collect(Collectors.toList())));
94+
return responses;
95+
}
96+
97+
private DataTiersFeatureSetUsage.TierSpecificStats calculateStats(List<NodeStats> nodesStats, RoutingNodes routingNodes) {
98+
int nodeCount = 0;
99+
int indexCount = 0;
100+
int totalShardCount = 0;
101+
long totalByteCount = 0;
102+
long docCount = 0;
103+
final AtomicInteger primaryShardCount = new AtomicInteger(0);
104+
final AtomicLong primaryByteCount = new AtomicLong(0);
105+
final TDigestState valueSketch = new TDigestState(1000);
106+
for (NodeStats nodeStats : nodesStats) {
107+
nodeCount++;
108+
totalByteCount += nodeStats.getIndices().getStore().getSizeInBytes();
109+
docCount += nodeStats.getIndices().getDocs().getCount();
110+
String nodeId = nodeStats.getNode().getId();
111+
final RoutingNode node = routingNodes.node(nodeId);
112+
if (node != null) {
113+
totalShardCount += node.shardsWithState(ShardRoutingState.STARTED).size();
114+
Set<Index> indicesOnNode = node.shardsWithState(ShardRoutingState.STARTED).stream()
115+
.map(ShardRouting::index)
116+
.collect(Collectors.toSet());
117+
indexCount += indicesOnNode.size();
118+
indicesOnNode.forEach(index -> {
119+
nodeStats.getIndices().getShardStats(index).stream()
120+
.filter(shardStats -> shardStats.getPrimary().getStore() != null)
121+
.forEach(shardStats -> {
122+
StoreStats primaryStoreStats = shardStats.getPrimary().getStore();
123+
// If storeStats is null, it means this is not a replica
124+
primaryShardCount.incrementAndGet();
125+
long primarySize = primaryStoreStats.getSizeInBytes();
126+
primaryByteCount.addAndGet(primarySize);
127+
valueSketch.add(primarySize);
128+
});
129+
});
130+
}
131+
}
132+
long primaryShardSizeMedian = (long) valueSketch.quantile(0.5);
133+
long primaryShardSizeMAD = computeMedianAbsoluteDeviation(valueSketch);
134+
return new DataTiersFeatureSetUsage.TierSpecificStats(nodeCount, indexCount, totalShardCount, primaryShardCount.get(), docCount,
135+
totalByteCount, primaryByteCount.get(), primaryShardSizeMedian, primaryShardSizeMAD);
136+
}
137+
138+
// Visible for testing
139+
static long computeMedianAbsoluteDeviation(TDigestState valuesSketch) {
140+
if (valuesSketch.size() == 0) {
141+
return 0;
142+
} else {
143+
final double approximateMedian = valuesSketch.quantile(0.5);
144+
final TDigestState approximatedDeviationsSketch = new TDigestState(valuesSketch.compression());
145+
valuesSketch.centroids().forEach(centroid -> {
146+
final double deviation = Math.abs(approximateMedian - centroid.mean());
147+
approximatedDeviationsSketch.add(deviation, centroid.count());
148+
});
149+
150+
return (long) approximatedDeviationsSketch.quantile(0.5);
151+
}
152+
}
153+
}

0 commit comments

Comments
 (0)