Skip to content

[ML] add duration statistics to forecast stats #31812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions x-pack/docs/en/rest-api/ml/jobcounts.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ The `forecasts_stats` object shows statistics about forecasts. It has the follow
`processing_time_ms`::
(object) Statistics about the forecast runtime in milliseconds: minimum, maximum, average and total.

`duration_ms`::
(object) Statistics about the forecast duration in milliseconds: minimum, maximum, average and total.

`status`::
(object) Counts per forecast status, for example: {"finished" : 2}.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static class Fields {
public static final String MEMORY = "memory_bytes";
public static final String RUNTIME = "processing_time_ms";
public static final String RECORDS = "records";
public static final String DURATION = "duration_ms";
public static final String STATUSES = "status";
}

Expand All @@ -36,6 +37,7 @@ public static class Fields {
private StatsAccumulator memoryStats;
private StatsAccumulator recordStats;
private StatsAccumulator runtimeStats;
private StatsAccumulator durationStats;
private CountAccumulator statusCounts;

public ForecastStats() {
Expand All @@ -44,19 +46,21 @@ public ForecastStats() {
this.memoryStats = new StatsAccumulator();
this.recordStats = new StatsAccumulator();
this.runtimeStats = new StatsAccumulator();
this.durationStats = new StatsAccumulator();
this.statusCounts = new CountAccumulator();
}

/*
* Construct ForecastStats for 1 job. Additional statistics can be added by merging other ForecastStats into it.
*/
public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator recordStats, StatsAccumulator runtimeStats,
CountAccumulator statusCounts) {
StatsAccumulator durationStats, CountAccumulator statusCounts) {
this.total = total;
this.forecastedJobs = total > 0 ? 1 : 0;
this.memoryStats = Objects.requireNonNull(memoryStats);
this.recordStats = Objects.requireNonNull(recordStats);
this.runtimeStats = Objects.requireNonNull(runtimeStats);
this.durationStats = Objects.requireNonNull(durationStats);
this.statusCounts = Objects.requireNonNull(statusCounts);
}

Expand All @@ -66,6 +70,7 @@ public ForecastStats(StreamInput in) throws IOException {
this.memoryStats = new StatsAccumulator(in);
this.recordStats = new StatsAccumulator(in);
this.runtimeStats = new StatsAccumulator(in);
this.durationStats = new StatsAccumulator(in);
this.statusCounts = new CountAccumulator(in);
}

Expand All @@ -78,6 +83,7 @@ public ForecastStats merge(ForecastStats other) {
memoryStats.merge(other.memoryStats);
recordStats.merge(other.recordStats);
runtimeStats.merge(other.runtimeStats);
durationStats.merge(other.durationStats);
statusCounts.merge(other.statusCounts);

return this;
Expand All @@ -98,6 +104,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
builder.field(Fields.MEMORY, memoryStats.asMap());
builder.field(Fields.RECORDS, recordStats.asMap());
builder.field(Fields.RUNTIME, runtimeStats.asMap());
builder.field(Fields.DURATION, durationStats.asMap());
builder.field(Fields.STATUSES, statusCounts.asMap());
}

Expand All @@ -113,6 +120,7 @@ public Map<String, Object> asMap() {
map.put(Fields.MEMORY, memoryStats.asMap());
map.put(Fields.RECORDS, recordStats.asMap());
map.put(Fields.RUNTIME, runtimeStats.asMap());
map.put(Fields.DURATION, durationStats.asMap());
map.put(Fields.STATUSES, statusCounts.asMap());
}

Expand All @@ -126,12 +134,13 @@ public void writeTo(StreamOutput out) throws IOException {
memoryStats.writeTo(out);
recordStats.writeTo(out);
runtimeStats.writeTo(out);
durationStats.writeTo(out);
statusCounts.writeTo(out);
}

@Override
public int hashCode() {
return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, statusCounts);
return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, durationStats, statusCounts);
}

@Override
Expand All @@ -147,6 +156,7 @@ public boolean equals(Object obj) {
ForecastStats other = (ForecastStats) obj;
return Objects.equals(total, other.total) && Objects.equals(forecastedJobs, other.forecastedJobs)
&& Objects.equals(memoryStats, other.memoryStats) && Objects.equals(recordStats, other.recordStats)
&& Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(statusCounts, other.statusCounts);
&& Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(durationStats, other.durationStats)
&& Objects.equals(statusCounts, other.statusCounts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void testEmpty() throws IOException {
assertFalse(properties.containsKey(Fields.RECORDS));
assertFalse(properties.containsKey(Fields.RUNTIME));
assertFalse(properties.containsKey(Fields.STATUSES));
assertFalse(properties.containsKey(Fields.DURATION));
}

public void testMerge() {
Expand All @@ -58,7 +59,12 @@ public void testMerge() {
statusStats.add("finished", 2L);
statusStats.add("failed", 5L);

ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats);
StatsAccumulator durationStats = new StatsAccumulator();
durationStats.add(96);
durationStats.add(192);
durationStats.add(96);

ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, durationStats, statusStats);

StatsAccumulator memoryStats2 = new StatsAccumulator();
memoryStats2.add(10);
Expand All @@ -76,7 +82,11 @@ public void testMerge() {
statusStats2.add("finished", 2L);
statusStats2.add("scheduled", 1L);

ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2);
StatsAccumulator durationStats2 = new StatsAccumulator();
durationStats2.add(192);
durationStats2.add(192);

ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, durationStats2, statusStats2);

forecastStats.merge(forecastStats2);

Expand Down Expand Up @@ -117,6 +127,14 @@ public void testMerge() {
assertEquals(4, mergedCountStats.get("finished").longValue());
assertEquals(5, mergedCountStats.get("failed").longValue());
assertEquals(1, mergedCountStats.get("scheduled").longValue());

@SuppressWarnings("unchecked")
Map<String, Double> mergedDurationStats = (Map<String, Double>) mergedStats.get(Fields.DURATION);

assertTrue(mergedDurationStats != null);
assertThat(mergedDurationStats.get(StatsAccumulator.Fields.AVG), equalTo(153.6));
assertThat(mergedDurationStats.get(StatsAccumulator.Fields.MAX), equalTo(192.0));
assertThat(mergedDurationStats.get(StatsAccumulator.Fields.MIN), equalTo(96.0));
}

public void testChainedMerge() {
Expand All @@ -135,7 +153,12 @@ public void testChainedMerge() {
CountAccumulator statusStats = new CountAccumulator();
statusStats.add("finished", 2L);
statusStats.add("failed", 5L);
ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats);
StatsAccumulator durationStats = new StatsAccumulator();
durationStats.add(96);
durationStats.add(192);
durationStats.add(96);

ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, durationStats, statusStats);

StatsAccumulator memoryStats2 = new StatsAccumulator();
memoryStats2.add(10);
Expand All @@ -149,7 +172,10 @@ public void testChainedMerge() {
CountAccumulator statusStats2 = new CountAccumulator();
statusStats2.add("finished", 2L);
statusStats2.add("scheduled", 1L);
ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2);
StatsAccumulator durationStats2 = new StatsAccumulator();
durationStats2.add(192);
durationStats2.add(192);
ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, durationStats2, statusStats2);

StatsAccumulator memoryStats3 = new StatsAccumulator();
memoryStats3.add(500);
Expand All @@ -159,7 +185,9 @@ public void testChainedMerge() {
runtimeStats3.add(32);
CountAccumulator statusStats3 = new CountAccumulator();
statusStats3.add("finished", 1L);
ForecastStats forecastStats3 = new ForecastStats(1, memoryStats3, recordStats3, runtimeStats3, statusStats3);
StatsAccumulator durationStats3 = new StatsAccumulator();
durationStats3.add(282);
ForecastStats forecastStats3 = new ForecastStats(1, memoryStats3, recordStats3, runtimeStats3, durationStats3, statusStats3);

ForecastStats forecastStats4 = new ForecastStats();

Expand Down Expand Up @@ -209,6 +237,14 @@ public void testChainedMerge() {
assertEquals(5, mergedCountStats.get("finished").longValue());
assertEquals(5, mergedCountStats.get("failed").longValue());
assertEquals(1, mergedCountStats.get("scheduled").longValue());

@SuppressWarnings("unchecked")
Map<String, Double> mergedDurationStats = (Map<String, Double>) mergedStats.get(Fields.DURATION);

assertTrue(mergedDurationStats != null);
assertThat(mergedDurationStats.get(StatsAccumulator.Fields.AVG), equalTo(175.0));
assertThat(mergedDurationStats.get(StatsAccumulator.Fields.MAX), equalTo(282.0));
assertThat(mergedDurationStats.get(StatsAccumulator.Fields.MIN), equalTo(96.0));
}

public void testUniqueCountOfJobs() {
Expand Down Expand Up @@ -238,7 +274,7 @@ protected Reader<ForecastStats> instanceReader() {

public ForecastStats createForecastStats(long minTotal, long maxTotal) {
ForecastStats forecastStats = new ForecastStats(randomLongBetween(minTotal, maxTotal), createStatsAccumulator(),
createStatsAccumulator(), createStatsAccumulator(), createCountAccumulator());
createStatsAccumulator(), createStatsAccumulator(), createStatsAccumulator(), createCountAccumulator());

return forecastStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
Expand Down Expand Up @@ -98,6 +99,7 @@
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.stats.CountAccumulator;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats.Fields;
import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils;
Expand Down Expand Up @@ -1135,6 +1137,9 @@ public void getForecastStats(String jobId, Consumer<ForecastStats> handler, Cons
.field(ForecastRequestStats.PROCESSED_RECORD_COUNT.getPreferredName()));
sourceBuilder.aggregation(
AggregationBuilders.stats(ForecastStats.Fields.RUNTIME).field(ForecastRequestStats.PROCESSING_TIME_MS.getPreferredName()));
Script durationScript = new Script("doc['" + ForecastRequestStats.END_TIME + "'].value.getMillis() - doc['"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have test coverage of this. There is a JobProviderIT class could you index some forecast stats in that and assert this method.

+ ForecastRequestStats.START_TIME + "'].value.getMillis()");
sourceBuilder.aggregation(AggregationBuilders.stats(Fields.DURATION).script(durationScript));
sourceBuilder.aggregation(
AggregationBuilders.terms(ForecastStats.Fields.STATUSES).field(ForecastRequestStats.STATUS.getPreferredName()));
sourceBuilder.size(0);
Expand All @@ -1156,10 +1161,12 @@ public void getForecastStats(String jobId, Consumer<ForecastStats> handler, Cons
.fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.RECORDS));
StatsAccumulator runtimeStats = StatsAccumulator
.fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.RUNTIME));
StatsAccumulator durationStats = StatsAccumulator
.fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.DURATION));
CountAccumulator statusCount = CountAccumulator
.fromTermsAggregation((StringTerms) aggregationsAsMap.get(ForecastStats.Fields.STATUSES));

ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats, recordStats, runtimeStats, statusCount);
ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats, recordStats, runtimeStats, durationStats,
statusCount);
handler.accept(forecastStats);
}, errorHandler), client::search);

Expand Down