Skip to content

Commit

Permalink
Add cluster setting to dynamically configure filter rewrite optimizat…
Browse files Browse the repository at this point in the history
…ion (#13179)


---------

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn authored Apr 16, 2024
1 parent b59c572 commit 035d8b8
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
Expand All @@ -34,6 +35,7 @@

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand Down Expand Up @@ -93,7 +95,7 @@ public void testMinDocCountOnDateHistogram() throws Exception {
final SearchResponse allResponse = client().prepareSearch("idx")
.setSize(0)
.setQuery(QUERY)
.addAggregation(dateHistogram("histo").field("date").dateHistogramInterval(DateHistogramInterval.DAY).minDocCount(0))
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
.get();

final Histogram allHisto = allResponse.getAggregations().get("histo");
Expand All @@ -104,4 +106,36 @@ public void testMinDocCountOnDateHistogram() throws Exception {
assertEquals(entry.getValue(), results.get(entry.getKey()));
}
}

public void testDisableOptimizationGivesSameResults() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.setSize(0)
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
.get();

final Histogram allHisto1 = response.getAggregations().get("histo");

final ClusterUpdateSettingsResponse updateSettingResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(MAX_AGGREGATION_REWRITE_FILTERS.getKey(), 0))
.get();

assertEquals(updateSettingResponse.getTransientSettings().get(MAX_AGGREGATION_REWRITE_FILTERS.getKey()), "0");

response = client().prepareSearch("idx")
.setSize(0)
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
.get();

final Histogram allHisto2 = response.getAggregations().get("histo");

assertEquals(allHisto1, allHisto2);

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(MAX_AGGREGATION_REWRITE_FILTERS.getKey()))
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.MAX_OPEN_SCROLL_CONTEXT,
SearchService.MAX_OPEN_PIT_CONTEXT,
SearchService.MAX_PIT_KEEPALIVE_SETTING,
SearchService.MAX_AGGREGATION_REWRITE_FILTERS,
CreatePitController.PIT_INIT_KEEP_ALIVE,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import java.util.function.LongSupplier;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;

/**
* The main search context used during search phase
Expand Down Expand Up @@ -187,6 +188,7 @@ final class DefaultSearchContext extends SearchContext {
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final boolean concurrentSearchSettingsEnabled;
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
private final int maxAggRewriteFilters;

DefaultSearchContext(
ReaderContext readerContext,
Expand Down Expand Up @@ -240,6 +242,8 @@ final class DefaultSearchContext extends SearchContext {
queryBoost = request.indexBoost();
this.lowLevelCancellation = lowLevelCancellation;
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;

this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
}

@Override
Expand Down Expand Up @@ -994,4 +998,16 @@ public boolean shouldUseTimeSeriesDescSortOptimization() {
&& sort.isSortOnTimeSeriesField()
&& sort.sort.getSort()[0].getReverse() == false;
}

@Override
public int maxAggRewriteFilters() {
return maxAggRewriteFilters;
}

private int evaluateFilterRewriteSetting() {
if (clusterService != null) {
return clusterService.getClusterSettings().get(MAX_AGGREGATION_REWRITE_FILTERS);
}
return 0;
}
}
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

// value 0 means rewrite filters optimization in aggregations will be disabled
public static final Setting<Integer> MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting(
"search.max_aggregation_rewrite_filters",
72,
0,
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ private FastFilterRewriteHelper() {}

private static final Logger logger = LogManager.getLogger(FastFilterRewriteHelper.class);

private static final int MAX_NUM_FILTER_BUCKETS = 1024;
private static final Map<Class<?>, Function<Query, Query>> queryWrappers;

// Initialize the wrapper map for unwrapping the query
Expand Down Expand Up @@ -186,8 +185,9 @@ private static Weight[] createFilterForAggregations(
int bucketCount = 0;
while (roundedLow <= fieldType.convertNanosToMillis(high)) {
bucketCount++;
if (bucketCount > MAX_NUM_FILTER_BUCKETS) {
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS);
int maxNumFilterBuckets = context.maxAggRewriteFilters();
if (bucketCount > maxNumFilterBuckets) {
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets);
return null;
}
// Below rounding is needed as the interval could return in
Expand Down Expand Up @@ -254,6 +254,8 @@ public void setAggregationType(AggregationType aggregationType) {
}

public boolean isRewriteable(final Object parent, final int subAggLength) {
if (context.maxAggRewriteFilters() == 0) return false;

boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength);
logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId());
this.rewriteable = rewriteable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,4 +522,8 @@ public String toString() {
public abstract int getTargetMaxSliceCount();

public abstract boolean shouldUseTimeSeriesDescSortOptimization();

public int maxAggRewriteFilters() {
return 0;
}
}

0 comments on commit 035d8b8

Please sign in to comment.