Skip to content

Plumb ValuesSourceRegistry through to QuerySearchContext #51710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -381,23 +382,21 @@ public static Type defaultStoreType(final boolean allowMmap) {
}
}

public IndexService newIndexService(
IndexService.IndexCreationContext indexCreationContext,
NodeEnvironment environment,
NamedXContentRegistry xContentRegistry,
IndexService.ShardStoreDeleter shardStoreDeleter,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
ScriptService scriptService,
ClusterService clusterService,
Client client,
IndicesQueryCache indicesQueryCache,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled)
throws IOException {
public IndexService newIndexService(IndexService.IndexCreationContext indexCreationContext,
NodeEnvironment environment,
NamedXContentRegistry xContentRegistry,
IndexService.ShardStoreDeleter shardStoreDeleter,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
ScriptService scriptService,
ClusterService clusterService,
Client client,
IndicesQueryCache indicesQueryCache,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled, ValuesSourceRegistry valuesSourceRegistry) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory =
indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
Expand All @@ -424,7 +423,7 @@ public IndexService newIndexService(
new SimilarityService(indexSettings, scriptService, similarities), shardStoreDeleter, indexAnalyzers,
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled);
indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, valuesSourceRegistry);
success = true;
return indexService;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
Expand Down Expand Up @@ -142,6 +143,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final Client client;
private final CircuitBreakerService circuitBreakerService;
private Supplier<Sort> indexSortSupplier;
private ValuesSourceRegistry valuesSourceRegistry;

public IndexService(
IndexSettings indexSettings,
Expand All @@ -166,13 +168,15 @@ public IndexService(
List<SearchOperationListener> searchOperationListeners,
List<IndexingOperationListener> indexingOperationListeners,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled) {
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry) {
super(indexSettings);
this.indexSettings = indexSettings;
this.xContentRegistry = xContentRegistry;
this.similarityService = similarityService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.circuitBreakerService = circuitBreakerService;
this.valuesSourceRegistry = valuesSourceRegistry;
if (needsMapperService(indexSettings, indexCreationContext)) {
assert indexAnalyzers != null;
this.mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
Expand Down Expand Up @@ -568,7 +572,7 @@ public QueryShardContext newQueryShardContext(int shardId, IndexSearcher searche
return new QueryShardContext(
shardId, indexSettings, bigArrays, indexCache.bitsetFilterCache(), indexFieldData::getForField, mapperService(),
similarityService(), scriptService, xContentRegistry, namedWriteableRegistry, client, searcher, nowInMillis, clusterAlias,
indexNameMatcher);
indexNameMatcher, valuesSourceRegistry);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptFactory;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.transport.RemoteClusterAware;

Expand Down Expand Up @@ -97,6 +98,7 @@ public class QueryShardContext extends QueryRewriteContext {
private boolean allowUnmappedFields;
private boolean mapUnmappedFieldAsString;
private NestedScope nestedScope;
private ValuesSourceRegistry valuesSourceRegistry;

public QueryShardContext(int shardId,
IndexSettings indexSettings,
Expand All @@ -112,18 +114,19 @@ public QueryShardContext(int shardId,
IndexSearcher searcher,
LongSupplier nowInMillis,
String clusterAlias,
Predicate<String> indexNameMatcher) {
Predicate<String> indexNameMatcher,
ValuesSourceRegistry valuesSourceRegistry) {
this(shardId, indexSettings, bigArrays, bitsetFilterCache, indexFieldDataLookup, mapperService, similarityService,
scriptService, xContentRegistry, namedWriteableRegistry, client, searcher, nowInMillis, indexNameMatcher,
new Index(RemoteClusterAware.buildRemoteIndexName(clusterAlias, indexSettings.getIndex().getName()),
indexSettings.getIndex().getUUID()));
indexSettings.getIndex().getUUID()), valuesSourceRegistry);
}

public QueryShardContext(QueryShardContext source) {
this(source.shardId, source.indexSettings, source.bigArrays, source.bitsetFilterCache, source.indexFieldDataService,
source.mapperService, source.similarityService, source.scriptService, source.getXContentRegistry(),
source.getWriteableRegistry(), source.client, source.searcher, source.nowInMillis, source.indexNameMatcher,
source.fullyQualifiedIndex);
source.fullyQualifiedIndex, source.valuesSourceRegistry);
}

private QueryShardContext(int shardId,
Expand All @@ -140,7 +143,8 @@ private QueryShardContext(int shardId,
IndexSearcher searcher,
LongSupplier nowInMillis,
Predicate<String> indexNameMatcher,
Index fullyQualifiedIndex) {
Index fullyQualifiedIndex,
ValuesSourceRegistry valuesSourceRegistry) {
super(xContentRegistry, namedWriteableRegistry, client, nowInMillis);
this.shardId = shardId;
this.similarityService = similarityService;
Expand All @@ -155,6 +159,7 @@ private QueryShardContext(int shardId,
this.searcher = searcher;
this.indexNameMatcher = indexNameMatcher;
this.fullyQualifiedIndex = fullyQualifiedIndex;
this.valuesSourceRegistry = valuesSourceRegistry;
}

private void reset() {
Expand Down Expand Up @@ -249,6 +254,10 @@ public Analyzer getSearchQuoteAnalyzer(MappedFieldType fieldType) {
return getMapperService().searchQuoteAnalyzer();
}

public ValuesSourceRegistry getValuesSourceRegistry() {
return valuesSourceRegistry;
}

public void setAllowUnmappedFields(boolean allowUnmappedFields) {
this.allowUnmappedFields = allowUnmappedFields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
Expand Down Expand Up @@ -225,6 +226,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final EsThreadPoolExecutor danglingIndicesThreadPoolExecutor;
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
private final boolean nodeWriteDanglingIndicesInfo;
private ValuesSourceRegistry valuesSourceRegistry;


@Override
Expand All @@ -239,12 +241,13 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ScriptService scriptService, ClusterService clusterService, Client client, MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories) {
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories, ValuesSourceRegistry valuesSourceRegistry) {
this.settings = settings;
this.threadPool = threadPool;
this.pluginsService = pluginsService;
this.nodeEnv = nodeEnv;
this.xContentRegistry = xContentRegistry;
this.valuesSourceRegistry = valuesSourceRegistry;
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
this.analysisRegistry = analysisRegistry;
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand Down Expand Up @@ -609,7 +612,8 @@ private synchronized IndexService createIndexService(IndexService.IndexCreationC
mapperRegistry,
indicesFieldDataCache,
namedWriteableRegistry,
this::isIdFieldDataEnabled
this::isIdFieldDataEnabled,
valuesSourceRegistry
);
}

Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ protected Node(
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories);
clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories,
searchModule.getValuesSourceRegistry());

final AliasValidator aliasValidator = new AliasValidator();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ public class SearchModule {
private final Settings settings;
private final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
private final List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
private ValuesSourceRegistry valuesSourceRegistry;

/**
* Constructs a new SearchModule object
Expand All @@ -298,6 +299,7 @@ public class SearchModule {
*/
public SearchModule(Settings settings, List<SearchPlugin> plugins) {
this.settings = settings;
this.valuesSourceRegistry = new ValuesSourceRegistry();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you still like the idea of changing the ctor to take an Iterable? Is that a thing for a follow up change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see a clean way to do this. We already manually list all the aggregations once in registerAggregations, and that still feels like the right place to hook in the values source mappings for each aggregation. To do that in the constructor, I'd basically need to duplicate that list. I thought briefly about some kind of visitor pattern to cover both uses, but it seemed like a lot of boiler plate that didn't add any clarity. If you've got a good idea for how we could make this cleaner, I'm happy to work with you on it, but I don't see a better way than what we're doing now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is cool to merge as is and grab that in a follow up. Would it be ok if I gave it a shot myself once this is in?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely!

registerSuggesters(plugins);
highlighters = setupHighlighters(settings, plugins);
registerScoreFunctions(plugins);
Expand All @@ -322,6 +324,10 @@ public List<NamedXContentRegistry.Entry> getNamedXContents() {
return namedXContents;
}

public ValuesSourceRegistry getValuesSourceRegistry() {
return valuesSourceRegistry;
}

/**
* Returns the {@link Highlighter} registry
*/
Expand Down Expand Up @@ -447,7 +453,7 @@ private void registerAggregation(AggregationSpec spec) {
}
Consumer<ValuesSourceRegistry> register = spec.getAggregatorRegistrar();
if (register != null) {
register.accept(ValuesSourceRegistry.getInstance());
register.accept(this.valuesSourceRegistry);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A builder for histograms on numeric fields. This builder can operate on either base numeric fields, or numeric range fields. IP range
Expand Down Expand Up @@ -88,11 +87,8 @@ public static HistogramAggregationBuilder parse(String aggregationName, XContent
return PARSER.parse(parser, new HistogramAggregationBuilder(aggregationName), null);
}

private static AtomicBoolean wasRegistered = new AtomicBoolean(false);
public static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
if (wasRegistered.compareAndSet(false, true)) {
HistogramAggregatorFactory.registerAggregators(valuesSourceRegistry);
}
HistogramAggregatorFactory.registerAggregators(valuesSourceRegistry);
}

private double interval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource,
return asMultiBucketAggregator(this, searchContext, parent);
}

AggregatorSupplier aggregatorSupplier = ValuesSourceRegistry.getInstance().getAggregator(config.valueSourceType(),
AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config.valueSourceType(),
HistogramAggregationBuilder.NAME);
if (aggregatorSupplier instanceof HistogramAggregatorSupplier == false) {
throw new AggregationExecutionException("Registry miss-match - expected HistogramAggregatorSupplier, found [" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<TermsAggregationBuilder>
implements MultiBucketAggregationBuilder {
Expand Down Expand Up @@ -102,11 +101,8 @@ public static AggregationBuilder parse(String aggregationName, XContentParser pa
return PARSER.parse(parser, new TermsAggregationBuilder(aggregationName), null);
}

private static AtomicBoolean wasRegistered = new AtomicBoolean(false);
public static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
if (wasRegistered.compareAndSet(false, true)) {
TermsAggregatorFactory.registerAggregators(valuesSourceRegistry);
}
TermsAggregatorFactory.registerAggregators(valuesSourceRegistry);
}

private BucketOrder order = BucketOrder.compound(BucketOrder.count(false)); // automatically adds tie-breaker key asc order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource,
return asMultiBucketAggregator(this, searchContext, parent);
}

AggregatorSupplier aggregatorSupplier = ValuesSourceRegistry.getInstance().getAggregator(config.valueSourceType(),
AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config.valueSourceType(),
TermsAggregationBuilder.NAME);
if (aggregatorSupplier instanceof TermsAggregatorSupplier == false) {
throw new AggregationExecutionException("Registry miss-match - expected TermsAggregatorSupplier, found [" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

public final class CardinalityAggregationBuilder
extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource, CardinalityAggregationBuilder> {
Expand All @@ -63,11 +62,8 @@ public static AggregationBuilder parse(String aggregationName, XContentParser pa
return PARSER.parse(parser, new CardinalityAggregationBuilder(aggregationName), null);
}

private static AtomicBoolean wasRegistered = new AtomicBoolean(false);
public static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
if (wasRegistered.compareAndSet(false, true)) {
CardinalityAggregatorFactory.registerAggregators(valuesSourceRegistry);
}
CardinalityAggregatorFactory.registerAggregators(valuesSourceRegistry);
}

private Long precisionThreshold = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource,
boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
AggregatorSupplier aggregatorSupplier = ValuesSourceRegistry.getInstance().getAggregator(config.valueSourceType(),
AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config.valueSourceType(),
CardinalityAggregationBuilder.NAME);
if (aggregatorSupplier instanceof CardinalityAggregatorSupplier == false) {
throw new AggregationExecutionException("Registry miss-match - expected CardinalityAggregatorSupplier, found [" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public class PercentilesAggregationBuilder extends LeafOnly<ValuesSource, PercentilesAggregationBuilder> {
Expand Down Expand Up @@ -119,11 +118,8 @@ public static AggregationBuilder parse(String aggregationName, XContentParser pa
return returnedAgg;
}

private static AtomicBoolean wasRegistered = new AtomicBoolean(false);
public static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
if (wasRegistered.compareAndSet(false, true)) {
PercentilesAggregatorFactory.registerAggregators(valuesSourceRegistry);
}
PercentilesAggregatorFactory.registerAggregators(valuesSourceRegistry);
}

private static <T> void setIfNotNull(Consumer<T> consumer, T value) {
Expand Down
Loading