Skip to content

Commit 144456f

Browse files
[7.9] [ML] Fix datafeed start time is incorrect when the job has trailing empty buckets (#71976) (#72145)
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
1 parent 329d2fb commit 144456f

File tree

5 files changed

+49
-5
lines changed

5 files changed

+49
-5
lines changed

x-pack/plugins/ml/common/types/anomaly_detection_jobs/summary_job.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export interface MlSummaryJob {
3030
isSingleMetricViewerJob: boolean;
3131
deleting?: boolean;
3232
latestTimestampSortValue?: number;
33+
earliestStartTimestampMs?: number;
3334
}
3435

3536
export interface AuditMessage {

x-pack/plugins/ml/common/util/job_utils.test.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ import {
1818
prefixDatafeedId,
1919
getSafeAggregationName,
2020
getLatestDataOrBucketTimestamp,
21+
getEarliestDatafeedStartTime,
2122
} from './job_utils';
2223
import { CombinedJob, Job } from '../types/anomaly_detection_jobs';
24+
import moment from 'moment';
2325

2426
describe('ML - job utils', () => {
2527
describe('calculateDatafeedFrequencyDefaultSeconds', () => {
@@ -581,4 +583,22 @@ describe('ML - job utils', () => {
581583
expect(getLatestDataOrBucketTimestamp(undefined, undefined)).toBe(undefined);
582584
});
583585
});
586+
587+
describe('getEarliestDatafeedStartTime', () => {
588+
test('returns expected value when no gap in data at end of bucket processing', () => {
589+
expect(getEarliestDatafeedStartTime(1549929594000, 1549928700000)).toBe(1549929594000);
590+
});
591+
test('returns expected value when there is a gap in data at end of bucket processing', () => {
592+
expect(getEarliestDatafeedStartTime(1549929594000, 1562256600000)).toBe(1562256600000);
593+
});
594+
test('returns expected value when bucket span is provided', () => {
595+
expect(
596+
getEarliestDatafeedStartTime(1549929594000, 1562256600000, moment.duration(1, 'h'))
597+
).toBe(1562260200000);
598+
});
599+
600+
test('returns expected value when job has not run', () => {
601+
expect(getLatestDataOrBucketTimestamp(undefined, undefined)).toBe(undefined);
602+
});
603+
});
584604
});

x-pack/plugins/ml/common/util/job_utils.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import _ from 'lodash';
88
import semver from 'semver';
9-
import { Duration } from 'moment';
9+
import moment, { Duration } from 'moment';
1010
// @ts-ignore
1111
import numeral from '@elastic/numeral';
1212

@@ -621,6 +621,23 @@ function isValidTimeInterval(value: string | undefined): boolean {
621621
return parseTimeIntervalForJob(value) !== null;
622622
}
623623

624+
// The earliest start time for the datafeed should be the max(latest_record_timestamp, latest_bucket.timestamp + bucket_span).
625+
export function getEarliestDatafeedStartTime(
626+
latestRecordTimestamp: number | undefined,
627+
latestBucketTimestamp: number | undefined,
628+
bucketSpan?: Duration | null | undefined
629+
): number | undefined {
630+
if (latestRecordTimestamp !== undefined && latestBucketTimestamp !== undefined) {
631+
// if bucket span is available (e.g. 15m) add it to the latest bucket timestamp in ms
632+
const adjustedBucketStartTime = bucketSpan
633+
? moment(latestBucketTimestamp).add(bucketSpan).valueOf()
634+
: latestBucketTimestamp;
635+
return Math.max(latestRecordTimestamp, adjustedBucketStartTime);
636+
} else {
637+
return latestRecordTimestamp !== undefined ? latestRecordTimestamp : latestBucketTimestamp;
638+
}
639+
}
640+
624641
// Returns the latest of the last source data and last processed bucket timestamp,
625642
// as used for example in setting the end time of results views for cases where
626643
// anomalies might have been raised after the point at which data ingest has stopped.

x-pack/plugins/ml/public/application/jobs/jobs_list/components/start_datafeed_modal/start_datafeed_modal.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,6 @@ StartDatafeedModal.propTypes = {
222222
};
223223

224224
function getLowestLatestTime(jobs) {
225-
const times = jobs.map((j) => j.latestTimestampSortValue);
225+
const times = jobs.map((j) => j.earliestStartTimestampMs || 0);
226226
return moment(Math.min(...times));
227227
}

x-pack/plugins/ml/server/models/job_service/jobs.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { i18n } from '@kbn/i18n';
88
import { uniq } from 'lodash';
99
import Boom from 'boom';
1010
import { ILegacyScopedClusterClient } from 'kibana/server';
11+
import { parseTimeIntervalForJob } from '../../../common/util/job_utils';
1112
import { JOB_STATE, DATAFEED_STATE } from '../../../common/constants/states';
1213
import {
1314
MlSummaryJob,
@@ -24,11 +25,11 @@ import { resultsServiceProvider } from '../results_service';
2425
import { CalendarManager, Calendar } from '../calendar';
2526
import { fillResultsWithTimeouts, isRequestTimeout } from './error_utils';
2627
import {
28+
getEarliestDatafeedStartTime,
2729
getLatestDataOrBucketTimestamp,
2830
isTimeSeriesViewJob,
2931
} from '../../../common/util/job_utils';
3032
import { groupsProvider } from './groups';
31-
3233
export interface MlJobsResponse {
3334
jobs: Job[];
3435
count: number;
@@ -171,6 +172,11 @@ export function jobsProvider(mlClusterClient: ILegacyScopedClusterClient) {
171172
description: job.description || '',
172173
groups: Array.isArray(job.groups) ? job.groups.sort() : [],
173174
processed_record_count: job.data_counts?.processed_record_count,
175+
earliestStartTimestampMs: getEarliestDatafeedStartTime(
176+
dataCounts?.latest_record_timestamp,
177+
dataCounts?.latest_bucket_timestamp,
178+
parseTimeIntervalForJob(job.analysis_config?.bucket_span)
179+
),
174180
memory_status: job.model_size_stats ? job.model_size_stats.memory_status : '',
175181
jobState: job.deleting === true ? deletingStr : job.state,
176182
hasDatafeed,
@@ -182,8 +188,8 @@ export function jobsProvider(mlClusterClient: ILegacyScopedClusterClient) {
182188
latestTimestampMs: dataCounts?.latest_record_timestamp,
183189
earliestTimestampMs: dataCounts?.earliest_record_timestamp,
184190
latestResultsTimestampMs: getLatestDataOrBucketTimestamp(
185-
dataCounts?.latest_record_timestamp as number,
186-
dataCounts?.latest_bucket_timestamp as number
191+
dataCounts?.latest_record_timestamp,
192+
dataCounts?.latest_bucket_timestamp
187193
),
188194
isSingleMetricViewerJob: isTimeSeriesViewJob(job),
189195
nodeName: job.node ? job.node.name : undefined,

0 commit comments

Comments
 (0)