From 4980ae24448c6c9d5d61f80e539caefdffe9cfd9 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 22 Mar 2022 00:11:23 -0400 Subject: [PATCH] Introduce QueryPhaseSearcher extension point (SearchPlugin) (#1931) Signed-off-by: Andriy Redko (cherry picked from commit 82fb7abb7158788fa727543dbcc89cffc318c67a) --- .../main/java/org/opensearch/node/Node.java | 14 +++- .../org/opensearch/plugins/SearchPlugin.java | 36 ++++++++++ .../search/DefaultSearchContext.java | 7 +- .../org/opensearch/search/SearchModule.java | 52 ++++++++++++++ .../org/opensearch/search/SearchService.java | 11 ++- .../opensearch/search/query/QueryPhase.java | 70 ++++++++++++++++++- .../search/query/QueryPhaseSearcher.java | 41 +++++++++++ .../search/DefaultSearchContextTests.java | 18 +++-- .../snapshots/SnapshotResiliencyTests.java | 5 +- .../java/org/opensearch/node/MockNode.java | 11 ++- .../opensearch/search/MockSearchService.java | 15 +++- 11 files changed, 259 insertions(+), 21 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/query/QueryPhaseSearcher.java diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index b8068a92443bc..72c291fceccd8 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -175,6 +175,7 @@ import org.opensearch.search.SearchService; import org.opensearch.search.aggregations.support.AggregationUsageService; import org.opensearch.search.fetch.FetchPhase; +import org.opensearch.search.query.QueryPhase; import org.opensearch.snapshots.InternalSnapshotsInfoService; import org.opensearch.snapshots.RestoreService; import org.opensearch.snapshots.SnapshotShardsService; @@ -211,6 +212,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.UnaryOperator; @@ -843,9 +845,11 @@ protected Node( threadPool, scriptService, bigArrays, + searchModule.getQueryPhase(), searchModule.getFetchPhase(), responseCollectorService, - circuitBreakerService + circuitBreakerService, + searchModule.getIndexSearcherExecutor(threadPool) ); final List> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class) @@ -1402,9 +1406,11 @@ protected SearchService newSearchService( ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, + QueryPhase queryPhase, FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, - CircuitBreakerService circuitBreakerService + CircuitBreakerService circuitBreakerService, + Executor indexSearcherExecutor ) { return new SearchService( clusterService, @@ -1412,9 +1418,11 @@ protected SearchService newSearchService( threadPool, scriptService, bigArrays, + queryPhase, fetchPhase, responseCollectorService, - circuitBreakerService + circuitBreakerService, + indexSearcherExecutor ); } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java index 3a5b965d07fb6..f650a1ce5512f 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java @@ -32,6 +32,7 @@ package org.opensearch.plugins; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.Sort; import org.opensearch.common.CheckedFunction; @@ -40,6 +41,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.lucene.search.function.ScoreFunction; +import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.ContextParser; import org.opensearch.common.xcontent.XContent; import org.opensearch.common.xcontent.XContentParser; @@ -61,6 +63,7 @@ import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.highlight.Highlighter; +import org.opensearch.search.query.QueryPhaseSearcher; import org.opensearch.search.rescore.Rescorer; import org.opensearch.search.rescore.RescorerBuilder; import org.opensearch.search.sort.SortBuilder; @@ -68,11 +71,14 @@ import org.opensearch.search.suggest.Suggest; import org.opensearch.search.suggest.Suggester; import org.opensearch.search.suggest.SuggestionBuilder; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -178,6 +184,36 @@ default List> getRescorers() { return emptyList(); } + /** + * The new {@link QueryPhaseSearcher} added by this plugin. At the moment, only one {@link QueryPhaseSearcher} is supported per + * instance, the {@link IllegalStateException} is going to be thrown if more then one plugin tries to register + * {@link QueryPhaseSearcher} implementation. + */ + default Optional getQueryPhaseSearcher() { + return Optional.empty(); + } + + /** + * The executor service provider (registered through {@link Plugin#getExecutorBuilders(Settings)} to be used at search + * time by {@link IndexSearcher}. The {@link IllegalStateException} is going to be thrown if more then one + * plugin tries to register index searcher executor. + */ + default Optional getIndexSearcherExecutorProvider() { + return Optional.empty(); + } + + /** + * Executor service provider + */ + interface ExecutorServiceProvider { + /** + * Provides an executor service instance + * @param threadPool thread pool + * @return executor service instance + */ + ExecutorService getExecutor(ThreadPool threadPool); + } + /** * Specification of custom {@link ScoreFunction}. */ diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 23d32521d3853..5d938683ae29a 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -95,6 +95,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.function.LongSupplier; final class DefaultSearchContext extends SearchContext { @@ -178,7 +179,8 @@ final class DefaultSearchContext extends SearchContext { FetchPhase fetchPhase, boolean lowLevelCancellation, Version minNodeVersion, - boolean validate + boolean validate, + Executor executor ) throws IOException { this.readerContext = readerContext; this.request = request; @@ -199,7 +201,8 @@ final class DefaultSearchContext extends SearchContext { engineSearcher.getSimilarity(), engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy(), - lowLevelCancellation + lowLevelCancellation, + executor ); this.relativeTimeSupplier = relativeTimeSupplier; this.timeout = timeout; diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java index cf6aceeb52452..938a11ea505cc 100644 --- a/server/src/main/java/org/opensearch/search/SearchModule.java +++ b/server/src/main/java/org/opensearch/search/SearchModule.java @@ -34,6 +34,7 @@ import org.apache.lucene.search.BooleanQuery; import org.opensearch.common.NamedRegistry; +import org.opensearch.common.Nullable; import org.opensearch.common.ParseField; import org.opensearch.common.geo.GeoShapeType; import org.opensearch.common.geo.ShapesAvailability; @@ -274,6 +275,8 @@ import org.opensearch.search.fetch.subphase.highlight.Highlighter; import org.opensearch.search.fetch.subphase.highlight.PlainHighlighter; import org.opensearch.search.fetch.subphase.highlight.UnifiedHighlighter; +import org.opensearch.search.query.QueryPhase; +import org.opensearch.search.query.QueryPhaseSearcher; import org.opensearch.search.rescore.QueryRescorerBuilder; import org.opensearch.search.rescore.RescorerBuilder; import org.opensearch.search.sort.FieldSortBuilder; @@ -294,11 +297,14 @@ import org.opensearch.search.suggest.phrase.StupidBackoff; import org.opensearch.search.suggest.term.TermSuggestion; import org.opensearch.search.suggest.term.TermSuggestionBuilder; +import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.function.Function; @@ -331,6 +337,8 @@ public class SearchModule { private final List namedWriteables = new ArrayList<>(); private final List namedXContents = new ArrayList<>(); private final ValuesSourceRegistry valuesSourceRegistry; + private final QueryPhaseSearcher queryPhaseSearcher; + private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider; /** * Constructs a new SearchModule object @@ -359,6 +367,8 @@ public SearchModule(Settings settings, boolean transportClient, List spec) { ); } + private QueryPhaseSearcher registerQueryPhaseSearcher(List plugins) { + QueryPhaseSearcher searcher = null; + + for (SearchPlugin plugin : plugins) { + final Optional searcherOpt = plugin.getQueryPhaseSearcher(); + + if (searcher == null) { + searcher = searcherOpt.orElse(null); + } else if (searcherOpt.isPresent()) { + throw new IllegalStateException("Only one QueryPhaseSearcher is allowed, but more than one are provided by the plugins"); + } + } + + return searcher; + } + + private SearchPlugin.ExecutorServiceProvider registerIndexSearcherExecutorProvider(List plugins) { + SearchPlugin.ExecutorServiceProvider provider = null; + + for (SearchPlugin plugin : plugins) { + final Optional providerOpt = plugin.getIndexSearcherExecutorProvider(); + + if (provider == null) { + provider = providerOpt.orElse(null); + } else if (providerOpt.isPresent()) { + throw new IllegalStateException( + "The index searcher executor is already assigned but more than one are provided by the plugins" + ); + } + } + + return provider; + } + public FetchPhase getFetchPhase() { return new FetchPhase(fetchSubPhases); } + + public QueryPhase getQueryPhase() { + return (queryPhaseSearcher == null) ? new QueryPhase() : new QueryPhase(queryPhaseSearcher); + } + + public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) { + return (indexSearcherExecutorProvider == null) ? null : indexSearcherExecutorProvider.getExecutor(pool); + } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index de4586efd60b1..a9154171f79c7 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -256,6 +256,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final AtomicInteger openScrollContexts = new AtomicInteger(); private final String sessionId = UUIDs.randomBase64UUID(); + private final Executor indexSearcherExecutor; public SearchService( ClusterService clusterService, @@ -263,9 +264,11 @@ public SearchService( ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, + QueryPhase queryPhase, FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, - CircuitBreakerService circuitBreakerService + CircuitBreakerService circuitBreakerService, + Executor indexSearcherExecutor ) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; @@ -274,13 +277,14 @@ public SearchService( this.scriptService = scriptService; this.responseCollectorService = responseCollectorService; this.bigArrays = bigArrays; - this.queryPhase = new QueryPhase(); + this.queryPhase = queryPhase; this.fetchPhase = fetchPhase; this.multiBucketConsumerService = new MultiBucketConsumerService( clusterService, settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST) ); + this.indexSearcherExecutor = indexSearcherExecutor; TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings); setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings)); @@ -884,7 +888,8 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear fetchPhase, lowLevelCancellation, clusterService.state().nodes().getMinNodeVersion(), - validate + validate, + indexSearcherExecutor ); // we clone the query shard context here just for rewriting otherwise we // might end up with incorrect state since we are using now() or script services diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index 2e4b159b7db17..2de2c54b1df81 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -76,6 +76,7 @@ import java.io.IOException; import java.util.LinkedList; +import java.util.Objects; import java.util.concurrent.ExecutorService; import static org.opensearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext; @@ -92,12 +93,19 @@ public class QueryPhase { private static final Logger LOGGER = LogManager.getLogger(QueryPhase.class); // TODO: remove this property public static final boolean SYS_PROP_REWRITE_SORT = Booleans.parseBoolean(System.getProperty("opensearch.search.rewrite_sort", "true")); + public static final QueryPhaseSearcher DEFAULT_QUERY_PHASE_SEARCHER = new DefaultQueryPhaseSearcher(); + private final QueryPhaseSearcher queryPhaseSearcher; private final AggregationPhase aggregationPhase; private final SuggestPhase suggestPhase; private final RescorePhase rescorePhase; public QueryPhase() { + this(DEFAULT_QUERY_PHASE_SEARCHER); + } + + public QueryPhase(QueryPhaseSearcher queryPhaseSearcher) { + this.queryPhaseSearcher = Objects.requireNonNull(queryPhaseSearcher, "QueryPhaseSearcher is required"); this.aggregationPhase = new AggregationPhase(); this.suggestPhase = new SuggestPhase(); this.rescorePhase = new RescorePhase(); @@ -143,7 +151,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep // request, preProcess is called on the DFS phase phase, this is why we pre-process them // here to make sure it happens during the QUERY phase aggregationPhase.preProcess(searchContext); - boolean rescore = executeInternal(searchContext); + boolean rescore = executeInternal(searchContext, queryPhaseSearcher); if (rescore) { // only if we do a regular search rescorePhase.execute(searchContext); @@ -163,6 +171,15 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep * @return whether the rescoring phase should be executed */ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExecutionException { + return executeInternal(searchContext, QueryPhase.DEFAULT_QUERY_PHASE_SEARCHER); + } + + /** + * In a package-private method so that it can be tested without having to + * wire everything (mapperService, etc.) + * @return whether the rescoring phase should be executed + */ + static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher queryPhaseSearcher) throws QueryPhaseExecutionException { final ContextIndexSearcher searcher = searchContext.searcher(); final IndexReader reader = searcher.getIndexReader(); QuerySearchResult queryResult = searchContext.queryResult(); @@ -266,13 +283,22 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe } try { - boolean shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet); + boolean shouldRescore = queryPhaseSearcher.searchWith( + searchContext, + searcher, + query, + collectors, + hasFilterCollector, + timeoutSet + ); + ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); if (executor instanceof QueueResizingOpenSearchThreadPoolExecutor) { QueueResizingOpenSearchThreadPoolExecutor rExecutor = (QueueResizingOpenSearchThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); } + return shouldRescore; } finally { // Search phase has finished, no longer need to check for timeout @@ -384,5 +410,43 @@ private static boolean canEarlyTerminate(IndexReader reader, SortAndFormats sort return true; } - private static class TimeExceededException extends RuntimeException {} + /** + * The exception being raised when search timeout is reached. + */ + public static class TimeExceededException extends RuntimeException { + private static final long serialVersionUID = 1L; + } + + /** + * Default {@link QueryPhaseSearcher} implementation which delegates to the {@link QueryPhase}. + */ + public static class DefaultQueryPhaseSearcher implements QueryPhaseSearcher { + /** + * Please use {@link QueryPhase#DEFAULT_QUERY_PHASE_SEARCHER} + */ + protected DefaultQueryPhaseSearcher() {} + + @Override + public boolean searchWith( + SearchContext searchContext, + ContextIndexSearcher searcher, + Query query, + LinkedList collectors, + boolean hasFilterCollector, + boolean hasTimeout + ) throws IOException { + return searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); + } + + protected boolean searchWithCollector( + SearchContext searchContext, + ContextIndexSearcher searcher, + Query query, + LinkedList collectors, + boolean hasFilterCollector, + boolean hasTimeout + ) throws IOException { + return QueryPhase.searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); + } + } } diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcher.java b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcher.java new file mode 100644 index 0000000000000..9a48f030fe147 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcher.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.query; + +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.Query; +import org.opensearch.search.internal.ContextIndexSearcher; +import org.opensearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.LinkedList; + +/** + * The extension point which allows to plug in custom search implementation to be + * used at {@link QueryPhase}. + */ +public interface QueryPhaseSearcher { + /** + * Perform search using {@link CollectorManager} + * @param searchContext search context + * @param searcher context index searcher + * @param query query + * @param hasTimeout "true" if timeout was set, "false" otherwise + * @return is rescoring required or not + * @throws IOException IOException + */ + boolean searchWith( + SearchContext searchContext, + ContextIndexSearcher searcher, + Query query, + LinkedList collectors, + boolean hasFilterCollector, + boolean hasTimeout + ) throws IOException; +} diff --git a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java index f01a4ff2ea104..3011cb3c8e4dd 100644 --- a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java @@ -182,7 +182,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) { null, false, Version.CURRENT, - false + false, + null ); contextWithoutScroll.from(300); contextWithoutScroll.close(); @@ -223,7 +224,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) { null, false, Version.CURRENT, - false + false, + null ); context1.from(300); exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false)); @@ -292,7 +294,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) { null, false, Version.CURRENT, - false + false, + null ); SliceBuilder sliceBuilder = mock(SliceBuilder.class); @@ -330,7 +333,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) { null, false, Version.CURRENT, - false + false, + null ); ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery(); context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false); @@ -360,7 +364,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) { null, false, Version.CURRENT, - false + false, + null ); context4.sliceBuilder(new SliceBuilder(1, 2)).parsedQuery(parsedQuery).preProcess(false); Query query1 = context4.query(); @@ -439,7 +444,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) { null, false, Version.CURRENT, - false + false, + null ); assertThat(context.searcher().hasCancellations(), is(false)); context.searcher().addQueryCancellation(() -> {}); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 9496054e80ad8..88dff785af929 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -197,6 +197,7 @@ import org.opensearch.search.SearchService; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.FetchPhase; +import org.opensearch.search.query.QueryPhase; import org.opensearch.snapshots.mockstore.MockEventuallyConsistentRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.disruption.DisruptableMockTransport; @@ -1979,9 +1980,11 @@ public void onFailure(final Exception e) { threadPool, scriptService, bigArrays, + new QueryPhase(), new FetchPhase(Collections.emptyList()), responseCollectorService, - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + null ); SearchPhaseController searchPhaseController = new SearchPhaseController( writableRegistry(), diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index 858d9f1e5e8e8..17a949ee129a1 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -58,6 +58,7 @@ import org.opensearch.search.MockSearchService; import org.opensearch.search.SearchService; import org.opensearch.search.fetch.FetchPhase; +import org.opensearch.search.query.QueryPhase; import org.opensearch.test.MockHttpTransport; import org.opensearch.test.transport.MockTransportService; import org.opensearch.threadpool.ThreadPool; @@ -70,6 +71,7 @@ import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; import java.util.function.Function; /** @@ -147,9 +149,11 @@ protected SearchService newSearchService( ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, + QueryPhase queryPhase, FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, - CircuitBreakerService circuitBreakerService + CircuitBreakerService circuitBreakerService, + Executor indexSearcherExecutor ) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { return super.newSearchService( @@ -158,9 +162,11 @@ protected SearchService newSearchService( threadPool, scriptService, bigArrays, + queryPhase, fetchPhase, responseCollectorService, - circuitBreakerService + circuitBreakerService, + indexSearcherExecutor ); } return new MockSearchService( @@ -169,6 +175,7 @@ protected SearchService newSearchService( threadPool, scriptService, bigArrays, + queryPhase, fetchPhase, circuitBreakerService ); diff --git a/test/framework/src/main/java/org/opensearch/search/MockSearchService.java b/test/framework/src/main/java/org/opensearch/search/MockSearchService.java index a05066cef1804..fd522cd298632 100644 --- a/test/framework/src/main/java/org/opensearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/opensearch/search/MockSearchService.java @@ -41,6 +41,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.internal.ReaderContext; +import org.opensearch.search.query.QueryPhase; import org.opensearch.threadpool.ThreadPool; import java.util.HashMap; @@ -91,10 +92,22 @@ public MockSearchService( ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, + QueryPhase queryPhase, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService ) { - super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null, circuitBreakerService); + super( + clusterService, + indicesService, + threadPool, + scriptService, + bigArrays, + queryPhase, + fetchPhase, + null, + circuitBreakerService, + null + ); } @Override