Skip to content

Commit cb65261

Browse files
Disable request cache for streaming and other fixes (#19520)
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
1 parent 0c3a313 commit cb65261

File tree

10 files changed

+160
-9
lines changed

10 files changed

+160
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5353
- Query planning to determine flush mode for streaming aggregations ([#19488](https://github.com/opensearch-project/OpenSearch/pull/19488))
5454
- Harden the circuit breaker and failure handle logic in query result consumer ([#19396](https://github.com/opensearch-project/OpenSearch/pull/19396))
5555
- Add streaming cardinality aggregator ([#19484](https://github.com/opensearch-project/OpenSearch/pull/19484))
56+
- Disable request cache for streaming aggregation queries ([#19520](https://github.com/opensearch-project/OpenSearch/pull/19520))
5657

5758
### Changed
5859
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1910,6 +1910,12 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
19101910
return false;
19111911
}
19121912

1913+
// Currently, we cannot cache stream search results as we never compute and reduce full resultset per
1914+
// shard at data nodes and let coordinator handle the reduce from batched results from shards.
1915+
if (context.isStreamSearch()) {
1916+
return false;
1917+
}
1918+
19131919
// We cannot cache with DFS because results depend not only on the content of the index but also
19141920
// on the overridden statistics. So if you ran two queries on the same index with different stats
19151921
// (because an other shard was updated) you would get wrong results because of the scores

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ public Aggregator build(
120120
}
121121
if (execution == null) {
122122
// Check if streaming is enabled and flush mode allows it (null means not yet evaluated)
123-
FlushMode flushMode = context.getFlushMode();
124-
if (context.isStreamSearch() && (flushMode == null || flushMode == FlushMode.PER_SEGMENT)) {
123+
if (context.isStreamSearch() && (context.getFlushMode() == null || context.getFlushMode() == FlushMode.PER_SEGMENT)) {
125124
return createStreamStringTermsAggregator(
126125
name,
127126
factories,
@@ -231,8 +230,7 @@ public Aggregator build(
231230
}
232231
resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError);
233232
}
234-
FlushMode flushMode = context.getFlushMode();
235-
if (context.isStreamSearch() && (flushMode == null || flushMode == FlushMode.PER_SEGMENT)) {
233+
if (context.isStreamSearch() && (context.getFlushMode() == null || context.getFlushMode() == FlushMode.PER_SEGMENT)) {
236234
return createStreamNumericTermsAggregator(
237235
name,
238236
factories,
@@ -373,7 +371,7 @@ static SubAggCollectionMode pickSubAggCollectMode(AggregatorFactories factories,
373371
// We expect to return all buckets so delaying them won't save any time
374372
return SubAggCollectionMode.DEPTH_FIRST;
375373
}
376-
if (context.isStreamSearch()) {
374+
if (context.isStreamSearch() && (context.getFlushMode() == null || context.getFlushMode() == FlushMode.PER_SEGMENT)) {
377375
return SubAggCollectionMode.DEPTH_FIRST;
378376
}
379377
if (maxOrd == -1 || maxOrd > expectedSize) {

server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
4343
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
4444
import org.opensearch.search.internal.SearchContext;
45+
import org.opensearch.search.streaming.FlushMode;
4546

4647
import java.io.IOException;
4748
import java.util.Locale;
@@ -104,7 +105,8 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
104105

105106
@Override
106107
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
107-
if (searchContext.isStreamSearch()) {
108+
if (searchContext.isStreamSearch()
109+
&& (searchContext.getFlushMode() == null || searchContext.getFlushMode() == FlushMode.PER_SEGMENT)) {
108110
return new StreamCardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode);
109111
}
110112
return new CardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode);
@@ -117,7 +119,8 @@ protected Aggregator doCreateInternal(
117119
CardinalityUpperBound cardinality,
118120
Map<String, Object> metadata
119121
) throws IOException {
120-
if (searchContext.isStreamSearch()) {
122+
if (searchContext.isStreamSearch()
123+
&& (searchContext.getFlushMode() == null || searchContext.getFlushMode() == FlushMode.PER_SEGMENT)) {
121124
return new StreamCardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode);
122125
}
123126
return queryShardContext.getValuesSourceRegistry()

server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
5858
import org.opensearch.search.internal.SearchContext;
5959
import org.opensearch.search.startree.StarTreeQueryHelper;
60+
import org.opensearch.search.streaming.Streamable;
61+
import org.opensearch.search.streaming.StreamingCostMetrics;
6062

6163
import java.io.IOException;
6264
import java.util.Arrays;
@@ -71,7 +73,7 @@
7173
*
7274
* @opensearch.internal
7375
*/
74-
class MaxAggregator extends NumericMetricsAggregator.SingleValue implements StarTreePreComputeCollector {
76+
class MaxAggregator extends NumericMetricsAggregator.SingleValue implements StarTreePreComputeCollector, Streamable {
7577

7678
final ValuesSource.Numeric valuesSource;
7779
final DocValueFormat formatter;
@@ -280,4 +282,9 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
280282
public void doReset() {
281283
maxes.fill(0, maxes.size(), Double.NEGATIVE_INFINITY);
282284
}
285+
286+
@Override
287+
public StreamingCostMetrics getStreamingCostMetrics() {
288+
return new StreamingCostMetrics(true, 1, 1, 1, 1);
289+
}
283290
}

server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
5858
import org.opensearch.search.internal.SearchContext;
5959
import org.opensearch.search.startree.StarTreeQueryHelper;
60+
import org.opensearch.search.streaming.Streamable;
61+
import org.opensearch.search.streaming.StreamingCostMetrics;
6062

6163
import java.io.IOException;
6264
import java.util.Map;
@@ -70,7 +72,7 @@
7072
*
7173
* @opensearch.internal
7274
*/
73-
class MinAggregator extends NumericMetricsAggregator.SingleValue implements StarTreePreComputeCollector {
75+
class MinAggregator extends NumericMetricsAggregator.SingleValue implements StarTreePreComputeCollector, Streamable {
7476
private static final int MAX_BKD_LOOKUPS = 1024;
7577

7678
final ValuesSource.Numeric valuesSource;
@@ -271,4 +273,14 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
271273
(bucket, metricValue) -> mins.set(bucket, Math.min(mins.get(bucket), NumericUtils.sortableLongToDouble(metricValue)))
272274
);
273275
}
276+
277+
@Override
278+
public void doReset() {
279+
mins.fill(0, mins.size(), Double.POSITIVE_INFINITY);
280+
}
281+
282+
@Override
283+
public StreamingCostMetrics getStreamingCostMetrics() {
284+
return new StreamingCostMetrics(true, 1, 1, 1, 1);
285+
}
274286
}

server/src/main/java/org/opensearch/search/profile/aggregation/ProfilingAggregator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ public static Aggregator unwrap(Aggregator agg) {
165165
return agg;
166166
}
167167

168+
public Aggregator getDelegate() {
169+
return delegate;
170+
}
171+
168172
@Override
169173
public StreamingCostMetrics getStreamingCostMetrics() {
170174
return delegate instanceof Streamable ? ((Streamable) delegate).getStreamingCostMetrics() : StreamingCostMetrics.nonStreamable();

server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.common.settings.Setting;
1717
import org.opensearch.search.aggregations.AggregatorBase;
1818
import org.opensearch.search.aggregations.MultiBucketCollector;
19+
import org.opensearch.search.profile.aggregation.ProfilingAggregator;
1920

2021
/**
2122
* Analyzes collector trees to determine optimal {@link FlushMode} for streaming aggregations.
@@ -147,6 +148,9 @@ private static Collector[] getChildren(Collector collector) {
147148
if (collector instanceof MultiBucketCollector) {
148149
return ((MultiBucketCollector) collector).getCollectors();
149150
}
151+
if (collector instanceof ProfilingAggregator) {
152+
return getChildren(((ProfilingAggregator) collector).getDelegate());
153+
}
150154
return new Collector[0];
151155
}
152156

server/src/test/java/org/opensearch/search/aggregations/metrics/MaxAggregatorTests.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,4 +996,62 @@ public void testScriptCaching() throws Exception {
996996
directory.close();
997997
unmappedDirectory.close();
998998
}
999+
1000+
public void testStreamingCostMetrics() throws IOException {
1001+
Directory directory = newDirectory();
1002+
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
1003+
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 1)));
1004+
indexWriter.close();
1005+
1006+
IndexReader indexReader = DirectoryReader.open(directory);
1007+
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
1008+
1009+
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.INTEGER);
1010+
MaxAggregationBuilder aggregationBuilder = new MaxAggregationBuilder("max").field("value");
1011+
1012+
MaxAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
1013+
1014+
// Test streaming cost metrics
1015+
org.opensearch.search.streaming.StreamingCostMetrics metrics = aggregator.getStreamingCostMetrics();
1016+
assertNotNull(metrics);
1017+
assertTrue("MaxAggregator should be streamable", metrics.streamable());
1018+
assertEquals(1, metrics.topNSize());
1019+
assertEquals(1, metrics.estimatedBucketCount());
1020+
assertEquals(1, metrics.segmentCount());
1021+
assertEquals(1, metrics.estimatedDocCount());
1022+
1023+
indexReader.close();
1024+
directory.close();
1025+
}
1026+
1027+
public void testDoReset() throws IOException {
1028+
Directory directory = newDirectory();
1029+
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
1030+
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 5)));
1031+
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 10)));
1032+
indexWriter.close();
1033+
1034+
IndexReader indexReader = DirectoryReader.open(directory);
1035+
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
1036+
1037+
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.INTEGER);
1038+
MaxAggregationBuilder aggregationBuilder = new MaxAggregationBuilder("max").field("value");
1039+
1040+
MaxAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
1041+
1042+
aggregator.preCollection();
1043+
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
1044+
aggregator.postCollection();
1045+
1046+
InternalMax result1 = (InternalMax) aggregator.buildAggregation(0L);
1047+
assertEquals(10.0, result1.getValue(), 0);
1048+
1049+
aggregator.doReset();
1050+
1051+
InternalMax result2 = (InternalMax) aggregator.buildAggregation(0L);
1052+
assertEquals(Double.NEGATIVE_INFINITY, result2.getValue(), 0);
1053+
1054+
indexReader.close();
1055+
directory.close();
1056+
}
9991057
}

server/src/test/java/org/opensearch/search/aggregations/metrics/MinAggregatorTests.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,4 +777,62 @@ protected List<ValuesSourceType> getSupportedValuesSourceTypes() {
777777
protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) {
778778
return new MinAggregationBuilder("foo").field(fieldName);
779779
}
780+
781+
public void testStreamingCostMetrics() throws IOException {
782+
Directory directory = newDirectory();
783+
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
784+
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 1)));
785+
indexWriter.close();
786+
787+
IndexReader indexReader = DirectoryReader.open(directory);
788+
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
789+
790+
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.INTEGER);
791+
MinAggregationBuilder aggregationBuilder = new MinAggregationBuilder("min").field("value");
792+
793+
MinAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
794+
795+
// Test streaming cost metrics
796+
org.opensearch.search.streaming.StreamingCostMetrics metrics = aggregator.getStreamingCostMetrics();
797+
assertNotNull(metrics);
798+
assertTrue("MinAggregator should be streamable", metrics.streamable());
799+
assertEquals(1, metrics.topNSize());
800+
assertEquals(1, metrics.estimatedBucketCount());
801+
assertEquals(1, metrics.segmentCount());
802+
assertEquals(1, metrics.estimatedDocCount());
803+
804+
indexReader.close();
805+
directory.close();
806+
}
807+
808+
public void testDoReset() throws IOException {
809+
Directory directory = newDirectory();
810+
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
811+
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 5)));
812+
indexWriter.addDocument(singleton(new NumericDocValuesField("value", 10)));
813+
indexWriter.close();
814+
815+
IndexReader indexReader = DirectoryReader.open(directory);
816+
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
817+
818+
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("value", NumberFieldMapper.NumberType.INTEGER);
819+
MinAggregationBuilder aggregationBuilder = new MinAggregationBuilder("min").field("value");
820+
821+
MinAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
822+
823+
aggregator.preCollection();
824+
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
825+
aggregator.postCollection();
826+
827+
InternalMin result1 = (InternalMin) aggregator.buildAggregation(0L);
828+
assertEquals(5.0, result1.getValue(), 0);
829+
830+
aggregator.doReset();
831+
832+
InternalMin result2 = (InternalMin) aggregator.buildAggregation(0L);
833+
assertEquals(Double.POSITIVE_INFINITY, result2.getValue(), 0);
834+
835+
indexReader.close();
836+
directory.close();
837+
}
780838
}

0 commit comments

Comments
 (0)