Skip to content

Commit 50a2819

Browse files
authored
using asyncrequest instead of direct search (#1535)
Signed-off-by: Amit Galitzky <amgalitz@amazon.com>
1 parent 03b261a commit 50a2819

File tree

4 files changed

+95
-18
lines changed

4 files changed

+95
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
1212
### Bug Fixes
1313
- Fixing concurrency bug on writer ([#1508](https://github.com/opensearch-project/anomaly-detection/pull/1508))
1414
- fix(forecast): advance past current interval & anchor on now ([#1528](https://github.com/opensearch-project/anomaly-detection/pull/1528))
15+
- Changing search calls on interval calculation ([#1535](https://github.com/opensearch-project/anomaly-detection/pull/1535))
1516

1617
### Infrastructure
1718
### Documentation

src/main/java/org/opensearch/timeseries/constant/CommonMessages.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ public static String getTooManyCategoricalFieldErr(int limit) {
7676
+ " characters.";
7777
public static final String INDEX_NOT_FOUND = "index does not exist";
7878
public static final String FAIL_TO_GET_MAPPING_MSG = "Fail to get the index mapping of %s";
79-
public static final String FAIL_TO_GET_MAPPING = "Fail to get the index mapping";
8079
public static final String TIMESTAMP_VALIDATION_FAILED = "Validation failed for timefield of %s, ";
81-
80+
public static final String NO_SHARDS_FOUND_IN_INDEX = "No accessible shards found for indices %s "
81+
+ "This could indicate: not enough data in index, connectivity issues, or permission problems.";
8282
public static final String FAIL_TO_GET_CONFIG_MSG = "Fail to get config";
8383

8484
// ======================================

src/main/java/org/opensearch/timeseries/rest/handler/IntervalCalculation.java

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import java.time.Clock;
1010
import java.time.temporal.ChronoUnit;
1111
import java.util.ArrayList;
12+
import java.util.Arrays;
1213
import java.util.List;
14+
import java.util.Locale;
1315
import java.util.Map;
1416
import java.util.concurrent.TimeUnit;
1517
import java.util.stream.Collectors;
@@ -338,8 +340,18 @@ public void findMedianIntervalAdaptive(ActionListener<IntervalTimeConfiguration>
338340
.aggregation(AggregationBuilders.max("max_ts").field(TS_FIELD));
339341

340342
SearchRequest boundsReq = new SearchRequest(config.getIndices().toArray(new String[0])).source(boundsSrc);
341-
342-
client.search(boundsReq, ActionListener.wrap(r -> {
343+
logger.debug("Min and max timestamp request: {}", boundsReq);
344+
final ActionListener<SearchResponse> boundsRequestListener = ActionListener.wrap(r -> {
345+
logger.debug("Min and max timestamp response: {}", r);
346+
347+
// Fail earlier if we aren't able to get any bounds from the query
348+
if (r.getTotalShards() == 0 || r.getSuccessfulShards() == 0) {
349+
String errorMsg = String
350+
.format(Locale.ROOT, CommonMessages.NO_SHARDS_FOUND_IN_INDEX, Arrays.toString(config.getIndices().toArray()));
351+
logger.error(errorMsg);
352+
listener.onFailure(new ValidationException(errorMsg, ValidationIssueType.INDICES, ValidationAspect.MODEL));
353+
return;
354+
}
343355

344356
Min minAgg = r.getAggregations().get("min_ts");
345357
Max maxAgg = r.getAggregations().get("max_ts");
@@ -379,8 +391,21 @@ public void findMedianIntervalAdaptive(ActionListener<IntervalTimeConfiguration>
379391
}
380392

381393
refineGap(initBucketMins, -1, baseFilter, listener, MIN_BUCKET_WIDTH_MINS, ChronoUnit.MINUTES, TS_FIELD, 0, minMs, maxMs);
382-
383-
}, listener::onFailure));
394+
}, e -> {
395+
logger.error(e.getMessage(), e);
396+
listener.onFailure(e);
397+
398+
});
399+
400+
clientUtil
401+
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
402+
boundsReq,
403+
client::search,
404+
user,
405+
client,
406+
context,
407+
boundsRequestListener
408+
);
384409
}
385410

386411
/* ----------------------------------------------------------------------
@@ -521,11 +546,11 @@ public void refineGap(
521546

522547
SearchRequest searchRequest = new SearchRequest(config.getIndices().toArray(new String[0])).source(src);
523548
logger.debug("Minimum interval search request: {}", searchRequest);
524-
client.search(searchRequest, ActionListener.wrap(resp -> {
525-
logger.debug("Minimum interval search response: {}", resp);
549+
final ActionListener<SearchResponse> minIntervalSearchListener = ActionListener.wrap(r -> {
550+
logger.debug("Minimum interval search response: {}", r);
526551
double gap = Double.NaN;
527552
boolean hasEmptyBuckets = false;
528-
Histogram histogram = resp.getAggregations().get("dyn");
553+
Histogram histogram = r.getAggregations().get("dyn");
529554
if (histogram != null) {
530555
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
531556
int firstNonEmpty = -1;
@@ -614,8 +639,20 @@ public void refineGap(
614639
}
615640

616641
refineGap(nextBucketMins, nextDir, baseFilter, listener, minBucketMins, returnUnit, tsField, depth + 1, sliceMinMs, sliceMaxMs);
617-
618-
}, listener::onFailure));
642+
}, e -> {
643+
logger.error(e.getMessage(), e);
644+
listener.onFailure(e);
645+
});
646+
647+
clientUtil
648+
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
649+
searchRequest,
650+
client::search,
651+
user,
652+
client,
653+
context,
654+
minIntervalSearchListener
655+
);
619656
}
620657

621658
/**
@@ -652,7 +689,8 @@ public void runAutoDate(BoolQueryBuilder filter, ActionListener<IntervalTimeConf
652689
.aggregation(PipelineAggregatorBuilders.minBucket("shortest", "auto>gap"));
653690

654691
SearchRequest searchRequest = new SearchRequest(config.getIndices().toArray(new String[0])).source(src);
655-
client.search(searchRequest, ActionListener.wrap(r -> {
692+
693+
final ActionListener<SearchResponse> autoDateSearchListener = ActionListener.wrap(r -> {
656694
NumericMetricsAggregation.SingleValue v = r.getAggregations().get("shortest");
657695
if (v == null || Double.isNaN(v.value())) {
658696
listener.onResponse(null);
@@ -664,7 +702,21 @@ public void runAutoDate(BoolQueryBuilder filter, ActionListener<IntervalTimeConf
664702
} else {
665703
listener.onResponse(null);
666704
}
667-
}, listener::onFailure));
705+
}, e -> {
706+
logger.error(e.getMessage(), e);
707+
listener.onFailure(e);
708+
});
709+
710+
clientUtil
711+
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
712+
searchRequest,
713+
client::search,
714+
user,
715+
client,
716+
context,
717+
autoDateSearchListener
718+
);
719+
668720
}
669721

670722
/**

src/test/java/org/opensearch/timeseries/indices/rest/handler/IntervalCalculationTests.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,19 @@
2323
import java.time.temporal.ChronoUnit;
2424
import java.util.Arrays;
2525
import java.util.Map;
26+
import java.util.concurrent.ExecutorService;
2627

2728
import org.apache.lucene.search.TotalHits;
2829
import org.junit.Before;
2930
import org.mockito.ArgumentCaptor;
31+
import org.mockito.Mock;
32+
import org.mockito.MockitoAnnotations;
3033
import org.opensearch.action.search.SearchResponse;
3134
import org.opensearch.action.search.SearchResponseSections;
3235
import org.opensearch.action.search.ShardSearchFailure;
36+
import org.opensearch.common.settings.Settings;
3337
import org.opensearch.common.unit.TimeValue;
38+
import org.opensearch.common.util.concurrent.ThreadContext;
3439
import org.opensearch.commons.authuser.User;
3540
import org.opensearch.core.action.ActionListener;
3641
import org.opensearch.index.query.BoolQueryBuilder;
@@ -41,8 +46,11 @@
4146
import org.opensearch.search.aggregations.bucket.histogram.LongBounds;
4247
import org.opensearch.search.aggregations.metrics.NumericMetricsAggregation;
4348
import org.opensearch.test.OpenSearchTestCase;
49+
import org.opensearch.threadpool.ThreadPool;
4450
import org.opensearch.timeseries.AnalysisType;
51+
import org.opensearch.timeseries.NodeStateManager;
4552
import org.opensearch.timeseries.TestHelpers;
53+
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
4654
import org.opensearch.timeseries.common.exception.ValidationException;
4755
import org.opensearch.timeseries.constant.CommonMessages;
4856
import org.opensearch.timeseries.feature.SearchFeatureDao;
@@ -61,7 +69,6 @@ public class IntervalCalculationTests extends OpenSearchTestCase {
6169
private IntervalCalculation intervalCalculation;
6270
private Clock clock;
6371
private ActionListener<IntervalTimeConfiguration> mockIntervalListener;
64-
private Client mockClient;
6572
private SecurityClientUtil mockClientUtil;
6673
private User user;
6774
private Map<String, Object> mockTopEntity;
@@ -70,25 +77,42 @@ public class IntervalCalculationTests extends OpenSearchTestCase {
7077
private Config mockConfig;
7178
private SearchFeatureDao searchFeatureDao;
7279

80+
@Mock
81+
private Client client;
82+
83+
@Mock
84+
private ThreadPool threadPool;
85+
7386
@Override
7487
@Before
7588
public void setUp() throws Exception {
7689
super.setUp();
90+
MockitoAnnotations.initMocks(this);
7791
clock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
7892
mockIntervalListener = mock(ActionListener.class);
79-
mockClient = mock(Client.class);
80-
mockClientUtil = mock(SecurityClientUtil.class);
93+
mockClientUtil = new SecurityClientUtil(mock(NodeStateManager.class), Settings.EMPTY);
8194
user = TestHelpers.randomUser();
8295
mockTopEntity = mock(Map.class);
8396
mockIntervalConfig = mock(IntervalTimeConfiguration.class);
8497
mockLongBounds = mock(LongBounds.class);
8598
mockConfig = mock(Config.class);
8699
searchFeatureDao = mock(SearchFeatureDao.class);
100+
ExecutorService executorService = mock(ExecutorService.class);
101+
when(threadPool.executor(TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME)).thenReturn(executorService);
102+
doAnswer(invocation -> {
103+
Runnable runnable = invocation.getArgument(0);
104+
runnable.run();
105+
return null;
106+
}).when(executorService).execute(any(Runnable.class));
107+
108+
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
109+
when(threadPool.getThreadContext()).thenReturn(threadContext);
110+
when(client.threadPool()).thenReturn(threadPool);
87111

88112
intervalCalculation = new IntervalCalculation(
89113
mockConfig,
90114
mock(TimeValue.class),
91-
mockClient,
115+
client,
92116
mockClientUtil,
93117
user,
94118
AnalysisType.AD,
@@ -158,7 +182,7 @@ public void testRunAutoDateReturnsCorrectInterval() throws IOException {
158182
ActionListener<SearchResponse> listener = invocation.getArgument(1);
159183
listener.onResponse(mockSearchResponse);
160184
return null;
161-
}).when(mockClient).search(any(), any());
185+
}).when(client).search(any(), any());
162186

163187
// Call runAutoDate
164188
intervalCalculation.runAutoDate(new BoolQueryBuilder(), mockIntervalListener, ChronoUnit.MINUTES, "timestamp");

0 commit comments

Comments
 (0)