-
Notifications
You must be signed in to change notification settings - Fork 25.3k
[ML] Add Missing data checking class #35310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
benwtrent
merged 10 commits into
elastic:master
from
benwtrent:feature/ml-missing-data-check
Nov 7, 2018
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
8aa9fe3
ML: Adding missing data check class
benwtrent 8e9b0a0
Merge branch 'master' into feature/ml-missing-data-check
benwtrent feb1908
reverting bad change
benwtrent d8332be
Adding bucket + missing data object for returns
benwtrent 555b42d
reverting unnecessary change
benwtrent d973f21
adding license header
benwtrent b5b73dd
Merge branch 'master' into feature/ml-missing-data-check
benwtrent 1754f95
Make client calls synchronous, akin to DatafeedJob
benwtrent 75673e4
Fixing line length
benwtrent 3101167
Renaming things, addressing PR comments
benwtrent File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
234 changes: 234 additions & 0 deletions
234
...ode-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,234 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.ml.integration; | ||
|
||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.action.bulk.BulkItemResponse; | ||
import org.elasticsearch.action.bulk.BulkRequestBuilder; | ||
import org.elasticsearch.action.bulk.BulkResponse; | ||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.action.support.WriteRequest; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.index.query.RangeQueryBuilder; | ||
import org.elasticsearch.search.aggregations.AggregationBuilders; | ||
import org.elasticsearch.search.aggregations.AggregatorFactories; | ||
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; | ||
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; | ||
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; | ||
import org.elasticsearch.xpack.core.ml.action.util.PageParams; | ||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; | ||
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; | ||
import org.elasticsearch.xpack.core.ml.job.config.DataDescription; | ||
import org.elasticsearch.xpack.core.ml.job.config.Detector; | ||
import org.elasticsearch.xpack.core.ml.job.config.Job; | ||
import org.elasticsearch.xpack.core.ml.job.results.Bucket; | ||
import org.elasticsearch.xpack.core.ml.job.results.Result; | ||
import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector; | ||
import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector.BucketWithMissingData; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
|
||
import java.util.Collections; | ||
import java.util.Date; | ||
import java.util.List; | ||
|
||
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; | ||
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; | ||
import static org.hamcrest.Matchers.equalTo; | ||
|
||
public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase { | ||
|
||
private String index = "delayed-data"; | ||
private long now = System.currentTimeMillis(); | ||
private long numDocs; | ||
|
||
@Before | ||
public void putDataintoIndex() { | ||
client().admin().indices().prepareCreate(index) | ||
.addMapping("type", "time", "type=date", "value", "type=long") | ||
.get(); | ||
numDocs = randomIntBetween(32, 128); | ||
long oneDayAgo = now - 86400000; | ||
writeData(logger, index, numDocs, oneDayAgo, now); | ||
} | ||
|
||
@After | ||
public void cleanUpTest() { | ||
cleanUp(); | ||
} | ||
|
||
public void testMissingDataDetection() throws Exception { | ||
final String jobId = "delayed-data-detection-job"; | ||
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); | ||
|
||
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); | ||
registerJob(job); | ||
putJob(job); | ||
openJob(job.getId()); | ||
|
||
registerDatafeed(datafeedConfig); | ||
putDatafeed(datafeedConfig); | ||
startDatafeed(datafeedConfig.getId(), 0L, now); | ||
waitUntilJobIsClosed(jobId); | ||
|
||
// Get the latest finalized bucket | ||
Bucket lastBucket = getLatestFinalizedBucket(jobId); | ||
|
||
DelayedDataDetector delayedDataDetector = | ||
new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); | ||
|
||
List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); | ||
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L)); | ||
|
||
long missingDocs = randomIntBetween(32, 128); | ||
// Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window | ||
// for the DelayedDataDetector | ||
writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); | ||
|
||
response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); | ||
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(missingDocs)); | ||
} | ||
|
||
public void testMissingDataDetectionInSpecificBucket() throws Exception { | ||
final String jobId = "delayed-data-detection-job-missing-test-specific-bucket"; | ||
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); | ||
|
||
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); | ||
registerJob(job); | ||
putJob(job); | ||
openJob(job.getId()); | ||
|
||
registerDatafeed(datafeedConfig); | ||
putDatafeed(datafeedConfig); | ||
|
||
startDatafeed(datafeedConfig.getId(), 0L, now); | ||
waitUntilJobIsClosed(jobId); | ||
|
||
// Get the latest finalized bucket | ||
Bucket lastBucket = getLatestFinalizedBucket(jobId); | ||
|
||
DelayedDataDetector delayedDataDetector = | ||
new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); | ||
|
||
long missingDocs = randomIntBetween(1, 10); | ||
|
||
// Write our missing data in the bucket right before the last finalized bucket | ||
writeData(logger, index, missingDocs, (lastBucket.getEpoch() - lastBucket.getBucketSpan())*1000, lastBucket.getEpoch()*1000); | ||
List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); | ||
|
||
boolean hasBucketWithMissing = false; | ||
for (BucketWithMissingData bucketWithMissingData : response) { | ||
if (bucketWithMissingData.getBucket().getEpoch() == lastBucket.getEpoch() - lastBucket.getBucketSpan()) { | ||
assertThat(bucketWithMissingData.getMissingDocumentCount(), equalTo(missingDocs)); | ||
hasBucketWithMissing = true; | ||
} | ||
} | ||
assertThat(hasBucketWithMissing, equalTo(true)); | ||
} | ||
|
||
public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception { | ||
TimeValue bucketSpan = TimeValue.timeValueMinutes(10); | ||
final String jobId = "delayed-data-detection-job-aggs-no-missing-test"; | ||
Job.Builder job = createJob(jobId, bucketSpan, "mean", "value", "doc_count"); | ||
|
||
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); | ||
AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("value").field("value"); | ||
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed", | ||
job.getId(), | ||
Collections.singletonList(index)); | ||
datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator( | ||
AggregationBuilders.histogram("time") | ||
.subAggregation(maxTime) | ||
.subAggregation(avgAggregationBuilder) | ||
.field("time") | ||
.interval(TimeValue.timeValueMinutes(5).millis()))); | ||
datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2)); | ||
datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5)); | ||
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); | ||
registerJob(job); | ||
putJob(job); | ||
openJob(job.getId()); | ||
|
||
registerDatafeed(datafeedConfig); | ||
putDatafeed(datafeedConfig); | ||
startDatafeed(datafeedConfig.getId(), 0L, now); | ||
waitUntilJobIsClosed(jobId); | ||
|
||
// Get the latest finalized bucket | ||
Bucket lastBucket = getLatestFinalizedBucket(jobId); | ||
|
||
DelayedDataDetector delayedDataDetector = | ||
new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); | ||
|
||
List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); | ||
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L)); | ||
|
||
long missingDocs = numDocs; | ||
// Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window | ||
// for the DelayedDataDetector | ||
writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); | ||
|
||
response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); | ||
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo((missingDocs+1)/2)); | ||
} | ||
|
||
private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) { | ||
return createJob(id, bucketSpan, function, field, null); | ||
} | ||
|
||
private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) { | ||
DataDescription.Builder dataDescription = new DataDescription.Builder(); | ||
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); | ||
dataDescription.setTimeField("time"); | ||
dataDescription.setTimeFormat(DataDescription.EPOCH_MS); | ||
|
||
Detector.Builder d = new Detector.Builder(function, field); | ||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); | ||
analysisConfig.setBucketSpan(bucketSpan); | ||
analysisConfig.setSummaryCountFieldName(summaryCountField); | ||
|
||
Job.Builder builder = new Job.Builder(); | ||
builder.setId(id); | ||
builder.setAnalysisConfig(analysisConfig); | ||
builder.setDataDescription(dataDescription); | ||
return builder; | ||
} | ||
|
||
private void writeData(Logger logger, String index, long numDocs, long start, long end) { | ||
int maxDelta = (int) (end - start - 1); | ||
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); | ||
for (int i = 0; i < numDocs; i++) { | ||
IndexRequest indexRequest = new IndexRequest(index, "type"); | ||
long timestamp = start + randomIntBetween(0, maxDelta); | ||
assert timestamp >= start && timestamp < end; | ||
indexRequest.source("time", timestamp, "value", i); | ||
bulkRequestBuilder.add(indexRequest); | ||
} | ||
BulkResponse bulkResponse = bulkRequestBuilder | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.get(); | ||
if (bulkResponse.hasFailures()) { | ||
int failures = 0; | ||
for (BulkItemResponse itemResponse : bulkResponse) { | ||
if (itemResponse.isFailed()) { | ||
failures++; | ||
logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); | ||
} | ||
} | ||
fail("Bulk response contained " + failures + " failures"); | ||
} | ||
logger.info("Indexed [{}] documents", numDocs); | ||
} | ||
|
||
private Bucket getLatestFinalizedBucket(String jobId) { | ||
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); | ||
getBucketsRequest.setExcludeInterim(true); | ||
getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName()); | ||
getBucketsRequest.setDescending(true); | ||
getBucketsRequest.setPageParams(new PageParams(0, 1)); | ||
return getBuckets(getBucketsRequest).get(0); | ||
} | ||
} |
158 changes: 158 additions & 0 deletions
158
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.ml.datafeed; | ||
|
||
import org.elasticsearch.action.search.SearchAction; | ||
import org.elasticsearch.action.search.SearchRequest; | ||
import org.elasticsearch.action.search.SearchResponse; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.concurrent.ThreadContext; | ||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; | ||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; | ||
import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; | ||
import org.elasticsearch.xpack.core.ml.action.util.PageParams; | ||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; | ||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; | ||
import org.elasticsearch.xpack.core.ml.job.config.Job; | ||
import org.elasticsearch.xpack.core.ml.job.results.Bucket; | ||
import org.elasticsearch.xpack.core.ml.utils.Intervals; | ||
import org.joda.time.DateTime; | ||
|
||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; | ||
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; | ||
|
||
|
||
/** | ||
* This class will search the buckets and indices over a given window to determine if any data is missing | ||
*/ | ||
public class DelayedDataDetector { | ||
|
||
private static final String DATE_BUCKETS = "date_buckets"; | ||
private final long bucketSpan; | ||
private final long window; | ||
private final DatafeedConfig datafeedConfig; | ||
private final Client client; | ||
private final Job job; | ||
|
||
public DelayedDataDetector(Job job, DatafeedConfig datafeedConfig, TimeValue window, Client client) { | ||
this.job = job; | ||
this.bucketSpan = job.getAnalysisConfig().getBucketSpan().millis(); | ||
this.datafeedConfig = datafeedConfig; | ||
long windowMillis = window.millis(); | ||
if (windowMillis < bucketSpan) { | ||
throw new IllegalArgumentException("[window] must be greater or equal to the [bucket_span]"); | ||
} | ||
if (Intervals.alignToFloor(windowMillis/bucketSpan, bucketSpan) >= 10000) { | ||
throw new IllegalArgumentException("[window] must contain less than 10000 buckets at the current [bucket_span]"); | ||
} | ||
this.window = windowMillis; | ||
this.client = client; | ||
} | ||
|
||
/** | ||
* This method looks at the {@link DatafeedConfig} from {@code latestFinalizedBucket - window} to {@code latestFinalizedBucket}. | ||
* | ||
* It is done synchronously, and can block for a considerable amount of time, it should only be executed within the appropriate | ||
* thread pool. | ||
* | ||
* @param latestFinalizedBucketMs The latest finalized bucket timestamp in milliseconds, signifies the end of the time window check | ||
* @return A List of {@link BucketWithMissingData} objects that contain each bucket with the current number of missing docs | ||
*/ | ||
public List<BucketWithMissingData> detectMissingData(long latestFinalizedBucketMs) { | ||
final long end = Intervals.alignToFloor(latestFinalizedBucketMs, bucketSpan); | ||
final long start = Intervals.alignToFloor(latestFinalizedBucketMs - window, bucketSpan); | ||
List<Bucket> finalizedBuckets = checkBucketEvents(start, end); | ||
Map<Long, Long> indexedData = checkCurrentBucketEventCount(start, end); | ||
return finalizedBuckets.stream() | ||
// We only care about the situation when data is added to the indices | ||
// Older data could have been removed from the indices, and should not be considered "missing data" | ||
.filter(bucket -> calculateMissing(indexedData, bucket) > 0) | ||
benwtrent marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.map(bucket -> BucketWithMissingData.fromMissingAndBucket(calculateMissing(indexedData, bucket), bucket)) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
private List<Bucket> checkBucketEvents(long start, long end) { | ||
GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId()); | ||
request.setStart(Long.toString(start)); | ||
request.setEnd(Long.toString(end)); | ||
request.setExcludeInterim(true); | ||
request.setPageParams(new PageParams(0, (int)((end - start)/bucketSpan))); | ||
benwtrent marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { | ||
GetBucketsAction.Response response = client.execute(GetBucketsAction.INSTANCE, request).actionGet(); | ||
return response.getBuckets().results(); | ||
} | ||
} | ||
|
||
private Map<Long, Long> checkCurrentBucketEventCount(long start, long end) { | ||
String timeField = job.getDataDescription().getTimeField(); | ||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() | ||
.size(0) | ||
.aggregation(new DateHistogramAggregationBuilder(DATE_BUCKETS).interval(bucketSpan).field(timeField)) | ||
.query(ExtractorUtils.wrapInTimeRangeQuery(datafeedConfig.getQuery(), timeField, start, end)); | ||
|
||
SearchRequest searchRequest = new SearchRequest(datafeedConfig.getIndices().toArray(new String[0])).source(searchSourceBuilder); | ||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { | ||
SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); | ||
List<? extends Histogram.Bucket> buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets(); | ||
Map<Long, Long> hashMap = new HashMap<>(buckets.size()); | ||
for (Histogram.Bucket bucket : buckets) { | ||
long bucketTime = toHistogramKeyToEpoch(bucket.getKey()); | ||
if (bucketTime < 0) { | ||
throw new IllegalStateException("Histogram key [" + bucket.getKey() + "] cannot be converted to a timestamp"); | ||
} | ||
hashMap.put(bucketTime, bucket.getDocCount()); | ||
} | ||
return hashMap; | ||
} | ||
} | ||
|
||
private static long toHistogramKeyToEpoch(Object key) { | ||
if (key instanceof DateTime) { | ||
return ((DateTime)key).getMillis(); | ||
} else if (key instanceof Double) { | ||
return ((Double)key).longValue(); | ||
} else if (key instanceof Long){ | ||
return (Long)key; | ||
} else { | ||
return -1L; | ||
} | ||
} | ||
|
||
private static long calculateMissing(Map<Long, Long> indexedData, Bucket bucket) { | ||
return indexedData.getOrDefault(bucket.getEpoch() * 1000, 0L) - bucket.getEventCount(); | ||
} | ||
|
||
public static class BucketWithMissingData { | ||
|
||
private final long missingDocumentCount; | ||
private final Bucket bucket; | ||
|
||
static BucketWithMissingData fromMissingAndBucket(long missingDocumentCount, Bucket bucket) { | ||
return new BucketWithMissingData(missingDocumentCount, bucket); | ||
} | ||
|
||
private BucketWithMissingData(long missingDocumentCount, Bucket bucket) { | ||
this.missingDocumentCount = missingDocumentCount; | ||
this.bucket = bucket; | ||
} | ||
|
||
public Bucket getBucket() { | ||
return bucket; | ||
} | ||
|
||
public long getMissingDocumentCount() { | ||
return missingDocumentCount; | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.