Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Bug Fixes
- Fixing concurrency bug on writer ([#1508](https://github.com/opensearch-project/anomaly-detection/pull/1508))
- fix(forecast): advance past current interval & anchor on now ([#1528](https://github.com/opensearch-project/anomaly-detection/pull/1528))
- Changing search calls on interval calculation ([#1535](https://github.com/opensearch-project/anomaly-detection/pull/1535))

### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public static String getTooManyCategoricalFieldErr(int limit) {
+ " characters.";
public static final String INDEX_NOT_FOUND = "index does not exist";
public static final String FAIL_TO_GET_MAPPING_MSG = "Fail to get the index mapping of %s";
public static final String FAIL_TO_GET_MAPPING = "Fail to get the index mapping";
public static final String TIMESTAMP_VALIDATION_FAILED = "Validation failed for timefield of %s, ";

public static final String NO_SHARDS_FOUND_IN_INDEX = "No accessible shards found for indices %s "
+ "This could indicate: not enough data in index, connectivity issues, or permission problems.";
public static final String FAIL_TO_GET_CONFIG_MSG = "Fail to get config";

// ======================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import java.time.Clock;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -338,8 +340,18 @@ public void findMedianIntervalAdaptive(ActionListener<IntervalTimeConfiguration>
.aggregation(AggregationBuilders.max("max_ts").field(TS_FIELD));

SearchRequest boundsReq = new SearchRequest(config.getIndices().toArray(new String[0])).source(boundsSrc);

client.search(boundsReq, ActionListener.wrap(r -> {
logger.debug("Min and max timestamp request: {}", boundsReq);
final ActionListener<SearchResponse> boundsRequestListener = ActionListener.wrap(r -> {
logger.debug("Min and max timestamp response: {}", r);

// Fail earlier if we aren't able to get any bounds from the query
if (r.getTotalShards() == 0 || r.getSuccessfulShards() == 0) {
String errorMsg = String
.format(Locale.ROOT, CommonMessages.NO_SHARDS_FOUND_IN_INDEX, Arrays.toString(config.getIndices().toArray()));
logger.error(errorMsg);
listener.onFailure(new ValidationException(errorMsg, ValidationIssueType.INDICES, ValidationAspect.MODEL));
return;
}

Min minAgg = r.getAggregations().get("min_ts");
Max maxAgg = r.getAggregations().get("max_ts");
Expand Down Expand Up @@ -379,8 +391,21 @@ public void findMedianIntervalAdaptive(ActionListener<IntervalTimeConfiguration>
}

refineGap(initBucketMins, -1, baseFilter, listener, MIN_BUCKET_WIDTH_MINS, ChronoUnit.MINUTES, TS_FIELD, 0, minMs, maxMs);

}, listener::onFailure));
}, e -> {
logger.error(e.getMessage(), e);
listener.onFailure(e);

});

clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
boundsReq,
client::search,
user,
client,
context,
boundsRequestListener
);
}

/* ----------------------------------------------------------------------
Expand Down Expand Up @@ -521,11 +546,11 @@ public void refineGap(

SearchRequest searchRequest = new SearchRequest(config.getIndices().toArray(new String[0])).source(src);
logger.debug("Minimum interval search request: {}", searchRequest);
client.search(searchRequest, ActionListener.wrap(resp -> {
logger.debug("Minimum interval search response: {}", resp);
final ActionListener<SearchResponse> minIntervalSearchListener = ActionListener.wrap(r -> {
logger.debug("Minimum interval search response: {}", r);
double gap = Double.NaN;
boolean hasEmptyBuckets = false;
Histogram histogram = resp.getAggregations().get("dyn");
Histogram histogram = r.getAggregations().get("dyn");
if (histogram != null) {
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
int firstNonEmpty = -1;
Expand Down Expand Up @@ -614,8 +639,20 @@ public void refineGap(
}

refineGap(nextBucketMins, nextDir, baseFilter, listener, minBucketMins, returnUnit, tsField, depth + 1, sliceMinMs, sliceMaxMs);

}, listener::onFailure));
}, e -> {
logger.error(e.getMessage(), e);
listener.onFailure(e);
});

clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
user,
client,
context,
minIntervalSearchListener
);
}

/**
Expand Down Expand Up @@ -652,7 +689,8 @@ public void runAutoDate(BoolQueryBuilder filter, ActionListener<IntervalTimeConf
.aggregation(PipelineAggregatorBuilders.minBucket("shortest", "auto>gap"));

SearchRequest searchRequest = new SearchRequest(config.getIndices().toArray(new String[0])).source(src);
client.search(searchRequest, ActionListener.wrap(r -> {

final ActionListener<SearchResponse> autoDateSearchListener = ActionListener.wrap(r -> {
NumericMetricsAggregation.SingleValue v = r.getAggregations().get("shortest");
if (v == null || Double.isNaN(v.value())) {
listener.onResponse(null);
Expand All @@ -664,7 +702,21 @@ public void runAutoDate(BoolQueryBuilder filter, ActionListener<IntervalTimeConf
} else {
listener.onResponse(null);
}
}, listener::onFailure));
}, e -> {
logger.error(e.getMessage(), e);
listener.onFailure(e);
});

clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
user,
client,
context,
autoDateSearchListener
);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import org.apache.lucene.search.TotalHits;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchResponseSections;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.query.BoolQueryBuilder;
Expand All @@ -41,8 +46,11 @@
import org.opensearch.search.aggregations.bucket.histogram.LongBounds;
import org.opensearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TestHelpers;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.common.exception.ValidationException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.feature.SearchFeatureDao;
Expand All @@ -61,7 +69,6 @@ public class IntervalCalculationTests extends OpenSearchTestCase {
private IntervalCalculation intervalCalculation;
private Clock clock;
private ActionListener<IntervalTimeConfiguration> mockIntervalListener;
private Client mockClient;
private SecurityClientUtil mockClientUtil;
private User user;
private Map<String, Object> mockTopEntity;
Expand All @@ -70,25 +77,42 @@ public class IntervalCalculationTests extends OpenSearchTestCase {
private Config mockConfig;
private SearchFeatureDao searchFeatureDao;

@Mock
private Client client;

@Mock
private ThreadPool threadPool;

@Override
@Before
public void setUp() throws Exception {
super.setUp();
MockitoAnnotations.initMocks(this);
clock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
mockIntervalListener = mock(ActionListener.class);
mockClient = mock(Client.class);
mockClientUtil = mock(SecurityClientUtil.class);
mockClientUtil = new SecurityClientUtil(mock(NodeStateManager.class), Settings.EMPTY);
user = TestHelpers.randomUser();
mockTopEntity = mock(Map.class);
mockIntervalConfig = mock(IntervalTimeConfiguration.class);
mockLongBounds = mock(LongBounds.class);
mockConfig = mock(Config.class);
searchFeatureDao = mock(SearchFeatureDao.class);
ExecutorService executorService = mock(ExecutorService.class);
when(threadPool.executor(TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME)).thenReturn(executorService);
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(executorService).execute(any(Runnable.class));

ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(client.threadPool()).thenReturn(threadPool);

intervalCalculation = new IntervalCalculation(
mockConfig,
mock(TimeValue.class),
mockClient,
client,
mockClientUtil,
user,
AnalysisType.AD,
Expand Down Expand Up @@ -158,7 +182,7 @@ public void testRunAutoDateReturnsCorrectInterval() throws IOException {
ActionListener<SearchResponse> listener = invocation.getArgument(1);
listener.onResponse(mockSearchResponse);
return null;
}).when(mockClient).search(any(), any());
}).when(client).search(any(), any());

// Call runAutoDate
intervalCalculation.runAutoDate(new BoolQueryBuilder(), mockIntervalListener, ChronoUnit.MINUTES, "timestamp");
Expand Down
Loading