Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 39 additions & 99 deletions x-pack/plugins/ml/server/lib/alerts/alerting_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type AggResultsResponse = { key?: number } & {
};
};

const TIME_RANGE_PADDING = 10;

/**
* Mapping for result types and corresponding score fields.
*/
Expand All @@ -63,43 +65,6 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
};
};

const getCommonScriptedFields = () => {
return {
start: {
script: {
lang: 'painless',
source: `LocalDateTime.ofEpochSecond((doc["timestamp"].value.getMillis()-((doc["bucket_span"].value * 1000)
* params.padding)) / 1000, 0, ZoneOffset.UTC).toString()+\":00.000Z\"`,
params: {
padding: 10,
},
},
},
end: {
script: {
lang: 'painless',
source: `LocalDateTime.ofEpochSecond((doc["timestamp"].value.getMillis()+((doc["bucket_span"].value * 1000)
* params.padding)) / 1000, 0, ZoneOffset.UTC).toString()+\":00.000Z\"`,
params: {
padding: 10,
},
},
},
timestamp_epoch: {
script: {
lang: 'painless',
source: 'doc["timestamp"].value.getMillis()/1000',
},
},
timestamp_iso8601: {
script: {
lang: 'painless',
source: 'doc["timestamp"].value',
},
},
};
};

/**
* Builds an agg query based on the requested result type.
* @param resultType
Expand All @@ -110,9 +75,9 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
severity: number,
useInitialScore?: boolean
) => {
const influencerScoreField = `${useInitialScore ? 'initial_' : ''}influencer_score`;
const recordScoreField = `${useInitialScore ? 'initial_' : ''}record_score`;
const bucketScoreField = `${useInitialScore ? 'initial_' : ''}anomaly_score`;
const influencerScoreField = getScoreFields(ANOMALY_RESULT_TYPE.INFLUENCER, useInitialScore);
const recordScoreField = getScoreFields(ANOMALY_RESULT_TYPE.RECORD, useInitialScore);
const bucketScoreField = getScoreFields(ANOMALY_RESULT_TYPE.BUCKET, useInitialScore);

return {
influencer_results: {
Expand Down Expand Up @@ -140,27 +105,13 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
'influencer_field_name',
'influencer_field_value',
'influencer_score',
'initial_influencer_score',
'is_interim',
'job_id',
'bucket_span',
],
},
size: 3,
script_fields: {
...getCommonScriptedFields(),
score: {
script: {
lang: 'painless',
source: `Math.floor(doc["${influencerScoreField}"].value)`,
},
},
unique_key: {
script: {
lang: 'painless',
source:
'doc["timestamp"].value + "_" + doc["influencer_field_name"].value + "_" + doc["influencer_field_value"].value',
},
},
},
},
},
},
Expand Down Expand Up @@ -188,6 +139,7 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
'result_type',
'timestamp',
'record_score',
'initial_record_score',
'is_interim',
'function',
'field_name',
Expand All @@ -199,24 +151,10 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
'partition_field_value',
'job_id',
'detector_index',
'bucket_span',
],
},
size: 3,
script_fields: {
...getCommonScriptedFields(),
score: {
script: {
lang: 'painless',
source: `Math.floor(doc["${recordScoreField}"].value)`,
},
},
unique_key: {
script: {
lang: 'painless',
source: 'doc["timestamp"].value + "_" + doc["function"].value',
},
},
},
},
},
},
Expand Down Expand Up @@ -247,25 +185,12 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
'result_type',
'timestamp',
'anomaly_score',
'initial_anomaly_score',
'is_interim',
'bucket_span',
],
},
size: 1,
script_fields: {
...getCommonScriptedFields(),
score: {
script: {
lang: 'painless',
source: `Math.floor(doc["${bucketScoreField}"].value)`,
},
},
unique_key: {
script: {
lang: 'painless',
source: 'doc["timestamp"].value',
},
},
},
},
},
},
Expand All @@ -282,6 +207,10 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
return source.job_id;
};

const getScoreFields = (resultType: AnomalyResultType, useInitialScore?: boolean) => {
return `${useInitialScore ? 'initial_' : ''}${resultTypeScoreMapping[resultType]}`;
};

const getRecordKey = (source: AnomalyRecordDoc): string => {
let alertInstanceKey = `${source.job_id}_${source.timestamp}`;

Expand All @@ -294,18 +223,23 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
return alertInstanceKey;
};

const getResultsFormatter = (resultType: AnomalyResultType) => {
/**
* Returns a callback for formatting elasticsearch aggregation response
* to the alert context.
* @param resultType
*/
const getResultsFormatter = (resultType: AnomalyResultType, useInitialScore: boolean = false) => {
const resultsLabel = getAggResultsLabel(resultType);
return (v: AggResultsResponse): AlertExecutionResult | undefined => {
const aggTypeResults = v[resultsLabel.aggGroupLabel];
if (aggTypeResults.doc_count === 0) {
return;
}

const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;

const topAnomaly = requestedAnomalies[0];
const alertInstanceKey = getAlertInstanceKey(topAnomaly._source);
const timestamp = topAnomaly._source.timestamp;
const bucketSpanInSeconds = topAnomaly._source.bucket_span;

return {
count: aggTypeResults.doc_count,
Expand All @@ -315,26 +249,32 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
alertInstanceKey,
jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))],
isInterim: requestedAnomalies.some((h) => h._source.is_interim),
timestamp: topAnomaly._source.timestamp,
timestampIso8601: topAnomaly.fields.timestamp_iso8601[0],
timestampEpoch: topAnomaly.fields.timestamp_epoch[0],
score: topAnomaly.fields.score[0],
timestamp,
timestampIso8601: new Date(timestamp).toISOString(),
timestampEpoch: timestamp / 1000,
score: Math.floor(topAnomaly._source[getScoreFields(resultType, useInitialScore)]),
bucketRange: {
start: topAnomaly.fields.start[0],
end: topAnomaly.fields.end[0],
start: new Date(
timestamp - bucketSpanInSeconds * 1000 * TIME_RANGE_PADDING
).toISOString(),
end: new Date(timestamp + bucketSpanInSeconds * 1000 * TIME_RANGE_PADDING).toISOString(),
},
topRecords: v.record_results.top_record_hits.hits.hits.map((h) => {
return {
...h._source,
score: h.fields.score[0],
score: Math.floor(
h._source[getScoreFields(ANOMALY_RESULT_TYPE.RECORD, useInitialScore)]
),
unique_key: getRecordKey(h._source),
};
}) as RecordAnomalyAlertDoc[],
topInfluencers: v.influencer_results.top_influencer_hits.hits.hits.map((h) => {
return {
...h._source,
score: h.fields.score[0],
unique_key: h.fields.unique_key[0],
score: Math.floor(
h._source[getScoreFields(ANOMALY_RESULT_TYPE.INFLUENCER, useInitialScore)]
),
unique_key: `${h._source.timestamp}_${h._source.influencer_field_name}_${h._source.influencer_field_value}`,
};
}) as InfluencerAnomalyAlertDoc[],
};
Expand Down Expand Up @@ -447,7 +387,7 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da

const resultsLabel = getAggResultsLabel(params.resultType);

const formatter = getResultsFormatter(params.resultType);
const formatter = getResultsFormatter(params.resultType, !!previewTimeInterval);

return (previewTimeInterval
? (result as {
Expand Down