Skip to content

Commit 85177b6

Browse files
author
Hendrik Muhs
committed
[ML] Return statistics about forecasts as part of the jobsstats and usage API (#31647)
This change adds stats about forecasts, to the jobstats api as well as xpack/_usage. The following information is collected: _xpack/ml/anomaly_detectors/{jobid|_all}/_stats: - total number of forecasts - memory statistics (mean/min/max) - runtime statistics - record statistics - counts by status _xpack/usage - collected by job status as well as overall (_all): - total number of forecasts - number of jobs that have at least 1 forecast - memory, runtime, record statistics - counts by status Fixes #31395
1 parent 1ff7066 commit 85177b6

File tree

18 files changed

+1071
-160
lines changed

18 files changed

+1071
-160
lines changed

x-pack/docs/en/rest-api/ml/jobcounts.asciidoc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ progress of a job.
2020
(object) An object that provides information about the size and contents of the model.
2121
See <<ml-modelsizestats,model size stats objects>>
2222

23+
`forecasts_stats`::
24+
(object) An object that provides statistical information about forecasts
25+
of this job. See <<ml-forecastsstats, forecasts stats objects>>
26+
2327
`node`::
2428
(object) For open jobs only, contains information about the node where the
2529
job runs. See <<ml-stats-node,node object>>.
@@ -177,6 +181,33 @@ NOTE: The `over` field values are counted separately for each detector and parti
177181
`timestamp`::
178182
(date) The timestamp of the `model_size_stats` according to the timestamp of the data.
179183

184+
[float]
185+
[[ml-forecastsstats]]
186+
==== Forecasts Stats Objects
187+
188+
The `forecasts_stats` object shows statistics about forecasts. It has the following properties:
189+
190+
`total`::
191+
(long) The number of forecasts currently available for this model.
192+
193+
`forecasted_jobs`::
194+
(long) The number of jobs that have at least one forecast.
195+
196+
`memory_bytes`::
197+
(object) Statistics about the memory usage: minimum, maximum, average and total.
198+
199+
`records`::
200+
(object) Statistics about the number of forecast records: minimum, maximum, average and total.
201+
202+
`processing_time_ms`::
203+
(object) Statistics about the forecast runtime in milliseconds: minimum, maximum, average and total.
204+
205+
`status`::
206+
(object) Counts per forecast status, for example: {"finished" : 2}.
207+
208+
NOTE: `memory_bytes`, `records`, `processing_time_ms` and `status` require at least 1 forecast, otherwise
209+
these fields are ommitted.
210+
180211
[float]
181212
[[ml-stats-node]]
182213
==== Node Objects

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
2222
public static final String DATAFEEDS_FIELD = "datafeeds";
2323
public static final String COUNT = "count";
2424
public static final String DETECTORS = "detectors";
25+
public static final String FORECASTS = "forecasts";
2526
public static final String MODEL_SIZE = "model_size";
2627

2728
private final Map<String, Object> jobsUsage;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.xpack.core.ml.job.config.JobState;
3232
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
3333
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
34+
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
3435
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3536

3637
import java.io.IOException;
@@ -46,6 +47,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
4647

4748
private static final String DATA_COUNTS = "data_counts";
4849
private static final String MODEL_SIZE_STATS = "model_size_stats";
50+
private static final String FORECASTS_STATS = "forecasts_stats";
4951
private static final String STATE = "state";
5052
private static final String NODE = "node";
5153

@@ -159,18 +161,22 @@ public static class JobStats implements ToXContentObject, Writeable {
159161
@Nullable
160162
private ModelSizeStats modelSizeStats;
161163
@Nullable
164+
private ForecastStats forecastStats;
165+
@Nullable
162166
private TimeValue openTime;
163167
private JobState state;
164168
@Nullable
165169
private DiscoveryNode node;
166170
@Nullable
167171
private String assignmentExplanation;
168172

169-
public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state,
170-
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
173+
public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats,
174+
@Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node,
175+
@Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
171176
this.jobId = Objects.requireNonNull(jobId);
172177
this.dataCounts = Objects.requireNonNull(dataCounts);
173178
this.modelSizeStats = modelSizeStats;
179+
this.forecastStats = forecastStats;
174180
this.state = Objects.requireNonNull(state);
175181
this.node = node;
176182
this.assignmentExplanation = assignmentExplanation;
@@ -185,6 +191,9 @@ public JobStats(StreamInput in) throws IOException {
185191
node = in.readOptionalWriteable(DiscoveryNode::new);
186192
assignmentExplanation = in.readOptionalString();
187193
openTime = in.readOptionalTimeValue();
194+
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
195+
forecastStats = in.readOptionalWriteable(ForecastStats::new);
196+
}
188197
}
189198

190199
public String getJobId() {
@@ -198,6 +207,10 @@ public DataCounts getDataCounts() {
198207
public ModelSizeStats getModelSizeStats() {
199208
return modelSizeStats;
200209
}
210+
211+
public ForecastStats getForecastStats() {
212+
return forecastStats;
213+
}
201214

202215
public JobState getState() {
203216
return state;
@@ -231,6 +244,10 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc
231244
if (modelSizeStats != null) {
232245
builder.field(MODEL_SIZE_STATS, modelSizeStats);
233246
}
247+
if (forecastStats != null) {
248+
builder.field(FORECASTS_STATS, forecastStats);
249+
}
250+
234251
builder.field(STATE, state.toString());
235252
if (node != null) {
236253
builder.startObject(NODE);
@@ -264,11 +281,14 @@ public void writeTo(StreamOutput out) throws IOException {
264281
out.writeOptionalWriteable(node);
265282
out.writeOptionalString(assignmentExplanation);
266283
out.writeOptionalTimeValue(openTime);
284+
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
285+
out.writeOptionalWriteable(forecastStats);
286+
}
267287
}
268288

269289
@Override
270290
public int hashCode() {
271-
return Objects.hash(jobId, dataCounts, modelSizeStats, state, node, assignmentExplanation, openTime);
291+
return Objects.hash(jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime);
272292
}
273293

274294
@Override
@@ -283,6 +303,7 @@ public boolean equals(Object obj) {
283303
return Objects.equals(jobId, other.jobId)
284304
&& Objects.equals(this.dataCounts, other.dataCounts)
285305
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
306+
&& Objects.equals(this.forecastStats, other.forecastStats)
286307
&& Objects.equals(this.state, other.state)
287308
&& Objects.equals(this.node, other.node)
288309
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation)
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.ml.stats;
8+
9+
import org.elasticsearch.common.io.stream.StreamInput;
10+
import org.elasticsearch.common.io.stream.StreamOutput;
11+
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
13+
14+
import java.io.IOException;
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
import java.util.Objects;
18+
import java.util.Map.Entry;
19+
import java.util.stream.Collectors;
20+
import java.util.stream.Stream;
21+
22+
/**
23+
* An accumulator for simple counts where statistical measures
24+
* are not of interest.
25+
*/
26+
public class CountAccumulator implements Writeable {
27+
28+
private Map<String, Long> counts;
29+
30+
public CountAccumulator() {
31+
this.counts = new HashMap<String, Long>();
32+
}
33+
34+
private CountAccumulator(Map<String, Long> counts) {
35+
this.counts = counts;
36+
}
37+
38+
public CountAccumulator(StreamInput in) throws IOException {
39+
this.counts = in.readMap(StreamInput::readString, StreamInput::readLong);
40+
}
41+
42+
public void merge(CountAccumulator other) {
43+
counts = Stream.of(counts, other.counts).flatMap(m -> m.entrySet().stream())
44+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (x, y) -> x + y));
45+
}
46+
47+
public void add(String key, Long count) {
48+
counts.put(key, counts.getOrDefault(key, 0L) + count);
49+
}
50+
51+
public Map<String, Long> asMap() {
52+
return counts;
53+
}
54+
55+
public static CountAccumulator fromTermsAggregation(StringTerms termsAggregation) {
56+
return new CountAccumulator(termsAggregation.getBuckets().stream()
57+
.collect(Collectors.toMap(bucket -> bucket.getKeyAsString(), bucket -> bucket.getDocCount())));
58+
}
59+
60+
public void writeTo(StreamOutput out) throws IOException {
61+
out.writeMap(counts, StreamOutput::writeString, StreamOutput::writeLong);
62+
}
63+
64+
@Override
65+
public int hashCode() {
66+
return Objects.hash(counts);
67+
}
68+
69+
@Override
70+
public boolean equals(Object obj) {
71+
if (obj == null) {
72+
return false;
73+
}
74+
75+
if (getClass() != obj.getClass()) {
76+
return false;
77+
}
78+
79+
CountAccumulator other = (CountAccumulator) obj;
80+
return Objects.equals(counts, other.counts);
81+
}
82+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.ml.stats;
8+
9+
import org.elasticsearch.common.io.stream.StreamInput;
10+
import org.elasticsearch.common.io.stream.StreamOutput;
11+
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.common.xcontent.ToXContentObject;
13+
import org.elasticsearch.common.xcontent.XContentBuilder;
14+
15+
import java.io.IOException;
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
import java.util.Objects;
19+
20+
/**
21+
* A class to hold statistics about forecasts.
22+
*/
23+
public class ForecastStats implements ToXContentObject, Writeable {
24+
25+
public static class Fields {
26+
public static final String TOTAL = "total";
27+
public static final String FORECASTED_JOBS = "forecasted_jobs";
28+
public static final String MEMORY = "memory_bytes";
29+
public static final String RUNTIME = "processing_time_ms";
30+
public static final String RECORDS = "records";
31+
public static final String STATUSES = "status";
32+
}
33+
34+
private long total;
35+
private long forecastedJobs;
36+
private StatsAccumulator memoryStats;
37+
private StatsAccumulator recordStats;
38+
private StatsAccumulator runtimeStats;
39+
private CountAccumulator statusCounts;
40+
41+
public ForecastStats() {
42+
this.total = 0;
43+
this.forecastedJobs = 0;
44+
this.memoryStats = new StatsAccumulator();
45+
this.recordStats = new StatsAccumulator();
46+
this.runtimeStats = new StatsAccumulator();
47+
this.statusCounts = new CountAccumulator();
48+
}
49+
50+
/*
51+
* Construct ForecastStats for 1 job. Additional statistics can be added by merging other ForecastStats into it.
52+
*/
53+
public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator recordStats, StatsAccumulator runtimeStats,
54+
CountAccumulator statusCounts) {
55+
this.total = total;
56+
this.forecastedJobs = total > 0 ? 1 : 0;
57+
this.memoryStats = Objects.requireNonNull(memoryStats);
58+
this.recordStats = Objects.requireNonNull(recordStats);
59+
this.runtimeStats = Objects.requireNonNull(runtimeStats);
60+
this.statusCounts = Objects.requireNonNull(statusCounts);
61+
}
62+
63+
public ForecastStats(StreamInput in) throws IOException {
64+
this.total = in.readLong();
65+
this.forecastedJobs = in.readLong();
66+
this.memoryStats = new StatsAccumulator(in);
67+
this.recordStats = new StatsAccumulator(in);
68+
this.runtimeStats = new StatsAccumulator(in);
69+
this.statusCounts = new CountAccumulator(in);
70+
}
71+
72+
public ForecastStats merge(ForecastStats other) {
73+
if (other == null) {
74+
return this;
75+
}
76+
total += other.total;
77+
forecastedJobs += other.forecastedJobs;
78+
memoryStats.merge(other.memoryStats);
79+
recordStats.merge(other.recordStats);
80+
runtimeStats.merge(other.runtimeStats);
81+
statusCounts.merge(other.statusCounts);
82+
83+
return this;
84+
}
85+
86+
@Override
87+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
88+
builder.startObject();
89+
doXContentBody(builder, params);
90+
return builder.endObject();
91+
}
92+
93+
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
94+
builder.field(Fields.TOTAL, total);
95+
builder.field(Fields.FORECASTED_JOBS, forecastedJobs);
96+
97+
if (total > 0) {
98+
builder.field(Fields.MEMORY, memoryStats.asMap());
99+
builder.field(Fields.RECORDS, recordStats.asMap());
100+
builder.field(Fields.RUNTIME, runtimeStats.asMap());
101+
builder.field(Fields.STATUSES, statusCounts.asMap());
102+
}
103+
104+
return builder;
105+
}
106+
107+
public Map<String, Object> asMap() {
108+
Map<String, Object> map = new HashMap<>();
109+
map.put(Fields.TOTAL, total);
110+
map.put(Fields.FORECASTED_JOBS, forecastedJobs);
111+
112+
if (total > 0) {
113+
map.put(Fields.MEMORY, memoryStats.asMap());
114+
map.put(Fields.RECORDS, recordStats.asMap());
115+
map.put(Fields.RUNTIME, runtimeStats.asMap());
116+
map.put(Fields.STATUSES, statusCounts.asMap());
117+
}
118+
119+
return map;
120+
}
121+
122+
@Override
123+
public void writeTo(StreamOutput out) throws IOException {
124+
out.writeLong(total);
125+
out.writeLong(forecastedJobs);
126+
memoryStats.writeTo(out);
127+
recordStats.writeTo(out);
128+
runtimeStats.writeTo(out);
129+
statusCounts.writeTo(out);
130+
}
131+
132+
@Override
133+
public int hashCode() {
134+
return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, statusCounts);
135+
}
136+
137+
@Override
138+
public boolean equals(Object obj) {
139+
if (obj == null) {
140+
return false;
141+
}
142+
143+
if (getClass() != obj.getClass()) {
144+
return false;
145+
}
146+
147+
ForecastStats other = (ForecastStats) obj;
148+
return Objects.equals(total, other.total) && Objects.equals(forecastedJobs, other.forecastedJobs)
149+
&& Objects.equals(memoryStats, other.memoryStats) && Objects.equals(recordStats, other.recordStats)
150+
&& Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(statusCounts, other.statusCounts);
151+
}
152+
}

0 commit comments

Comments
 (0)