Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add async implementation of getFeaturesForPeriod. #15

Merged
merged 1 commit into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,68 @@ public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long sta
.orElse(new SinglePointFeatures(currentPoint, Optional.empty()));
}

/**
* Returns to listener unprocessed features and processed features (such as shingle) for the current data point.
*
* @param detector anomaly detector for which the features are returned
* @param startTime start time of the data point in epoch milliseconds
* @param endTime end time of the data point in epoch milliseconds
* @param listener onResponse is called with unprocessed features and processed features for the current data point
*/
public void getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime, ActionListener<SinglePointFeatures> listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you replace the deprecated getCurrentFeatures method with this one ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewriting caller code will be a much larger change in a separate pr.


Deque<Entry<Long, double[]>> shingle = detectorIdsToTimeShingles
.computeIfAbsent(detector.getDetectorId(), id -> new ArrayDeque<Entry<Long, double[]>>(shingleSize));
if (shingle.isEmpty() || shingle.getLast().getKey() < endTime) {
searchFeatureDao
.getFeaturesForPeriod(
detector,
startTime,
endTime,
ActionListener
.wrap(point -> updateUnprocessedFeatures(point, shingle, detector, endTime, listener), listener::onFailure)
);
} else {
getProcessedFeatures(shingle, detector, endTime, listener);
}
}

private void updateUnprocessedFeatures(
Optional<double[]> point,
Deque<Entry<Long, double[]>> shingle,
AnomalyDetector detector,
long endTime,
ActionListener<SinglePointFeatures> listener
) {
if (point.isPresent()) {
if (shingle.size() == shingleSize) {
shingle.remove();
}
shingle.add(new SimpleImmutableEntry<>(endTime, point.get()));
getProcessedFeatures(shingle, detector, endTime, listener);
} else {
listener.onResponse(new SinglePointFeatures(Optional.empty(), Optional.empty()));
}
}

private void getProcessedFeatures(
Deque<Entry<Long, double[]>> shingle,
AnomalyDetector detector,
long endTime,
ActionListener<SinglePointFeatures> listener
) {

double[][] currentPoints = filterAndFill(shingle, endTime, detector);
Optional<double[]> currentPoint = Optional.ofNullable(shingle.peekLast()).map(Entry::getValue);
listener
.onResponse(
Optional
.ofNullable(currentPoints)
.map(points -> new SinglePointFeatures(currentPoint, Optional.of(batchShingle(points, shingleSize)[0])))
.orElse(new SinglePointFeatures(currentPoint, Optional.empty()))
);
}

private double[][] filterAndFill(Deque<Entry<Long, double[]>> shingle, long endTime, AnomalyDetector detector) {
long intervalMilli = ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMillis();
double[][] result = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,40 @@ public Optional<Long> getLatestDataTime(AnomalyDetector detector) {
/**
* Gets features for the given time period.
*
* @deprecated use getFeaturesForPeriod with listener instead.
*
* @param detector info about indices, documents, feature query
* @param startTime epoch milliseconds at the beginning of the period
* @param endTime epoch milliseconds at the end of the period
* @throws IllegalStateException when unexpected failures happen
* @return features from search results, empty when no data found
*/
@Deprecated
public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime) {
SearchRequest searchRequest = createFeatureSearchRequest(detector, startTime, endTime, Optional.empty());
return clientUtil
.<SearchRequest, SearchResponse>timedRequest(searchRequest, logger, client::search)
.flatMap(resp -> parseResponse(resp, detector.getEnabledFeatureIds()));
}

/**
* Returns to listener features for the given time period.
*
* @param detector info about indices, feature query
* @param startTime epoch milliseconds at the beginning of the period
* @param endTime epoch milliseconds at the end of the period
* @param listener onResponse is called with features for the given time period.
*/
public void getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime, ActionListener<Optional<double[]>> listener) {
SearchRequest searchRequest = createFeatureSearchRequest(detector, startTime, endTime, Optional.empty());
client
.search(
searchRequest,
ActionListener
.wrap(response -> listener.onResponse(parseResponse(response, detector.getEnabledFeatureIds())), listener::onFailure)
);
}

private Optional<double[]> parseResponse(SearchResponse response, List<String> featureIds) {
return Optional
.ofNullable(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;

import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
Expand All @@ -85,11 +86,14 @@
import static org.junit.Assert.assertTrue;

import static org.mockito.Mockito.mock;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@PowerMockIgnore("javax.management.*")
Expand Down Expand Up @@ -132,6 +136,7 @@ public class SearchFeatureDaoTests {
private AnomalyDetector detector;

private SearchSourceBuilder featureQuery = new SearchSourceBuilder();
private Map<String, Object> searchRequestParams;
private SearchRequest searchRequest;
private SearchSourceBuilder searchSourceBuilder;
private MultiSearchRequest multiSearchRequest;
Expand All @@ -155,6 +160,7 @@ public void setup() throws Exception {

searchSourceBuilder = SearchSourceBuilder
.fromXContent(XContentType.JSON.xContent().createParser(xContent, LoggingDeprecationHandler.INSTANCE, "{}"));
searchRequestParams = new HashMap<>();
searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0]));
aggsMap = new HashMap<>();
aggsList = new ArrayList<>();
Expand Down Expand Up @@ -316,6 +322,71 @@ public void getFeaturesForPeriod_throwIllegalState_forUnknownAggregation(
getFeaturesForPeriod_returnExpected_givenData(aggs, featureIds, expected);
}

@Test
@Parameters(method = "getFeaturesForPeriodData")
@SuppressWarnings("unchecked")
public void getFeaturesForPeriod_returnExpectedToListener(List<Aggregation> aggs, List<String> featureIds, double[] expected)
throws Exception {

long start = 100L;
long end = 200L;
when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder);
when(searchResponse.getAggregations()).thenReturn(new Aggregations(aggs));
when(detector.getEnabledFeatureIds()).thenReturn(featureIds);
doAnswer(invocation -> {
ActionListener<SearchResponse> listener = invocation.getArgument(1);
listener.onResponse(searchResponse);
return null;
}).when(client).search(eq(searchRequest), any(ActionListener.class));

ActionListener<Optional<double[]>> listener = mock(ActionListener.class);
searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener);

ArgumentCaptor<Optional<double[]>> captor = ArgumentCaptor.forClass(Optional.class);
verify(listener).onResponse(captor.capture());
Optional<double[]> result = captor.getValue();
assertTrue(Arrays.equals(expected, result.orElse(null)));
}

@Test
@SuppressWarnings("unchecked")
public void getFeaturesForPeriod_throwToListener_whenSearchFails() throws Exception {

long start = 100L;
long end = 200L;
when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder);
doAnswer(invocation -> {
ActionListener<SearchResponse> listener = invocation.getArgument(1);
listener.onFailure(new RuntimeException());
return null;
}).when(client).search(eq(searchRequest), any(ActionListener.class));

ActionListener<Optional<double[]>> listener = mock(ActionListener.class);
searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener);

verify(listener).onFailure(any(Exception.class));
}

@Test
@SuppressWarnings("unchecked")
public void getFeaturesForPeriod_throwToListener_whenResponseParsingFails() throws Exception {

long start = 100L;
long end = 200L;
when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder);
when(detector.getEnabledFeatureIds()).thenReturn(null);
doAnswer(invocation -> {
ActionListener<SearchResponse> listener = invocation.getArgument(1);
listener.onResponse(searchResponse);
return null;
}).when(client).search(eq(searchRequest), any(ActionListener.class));

ActionListener<Optional<double[]>> listener = mock(ActionListener.class);
searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener);

verify(listener).onFailure(any(Exception.class));
}

private Object[] getFeatureSamplesForPeriodsData() {
String maxName = "max";
double maxValue = 2;
Expand Down