Skip to content

Wire Percentiles aggregator into new VS framework #51639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ private void registerAggregations(List<SearchPlugin> plugins) {
registerAggregation(new AggregationSpec(PercentilesAggregationBuilder.NAME, PercentilesAggregationBuilder::new,
PercentilesAggregationBuilder::parse)
.addResultReader(InternalTDigestPercentiles.NAME, InternalTDigestPercentiles::new)
.addResultReader(InternalHDRPercentiles.NAME, InternalHDRPercentiles::new));
.addResultReader(InternalHDRPercentiles.NAME, InternalHDRPercentiles::new)
.setAggregatorRegistrar(PercentilesAggregationBuilder::registerAggregators));
registerAggregation(new AggregationSpec(PercentileRanksAggregationBuilder.NAME, PercentileRanksAggregationBuilder::new,
PercentileRanksAggregationBuilder::parse)
.addResultReader(InternalTDigestPercentileRanks.NAME, InternalTDigestPercentileRanks::new)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public class PercentilesAggregationBuilder extends LeafOnly<ValuesSource, PercentilesAggregationBuilder> {
Expand Down Expand Up @@ -118,6 +120,13 @@ public static AggregationBuilder parse(String aggregationName, XContentParser pa
return returnedAgg;
}

private static AtomicBoolean wasRegistered = new AtomicBoolean(false);
public static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
if (wasRegistered.compareAndSet(false, true) == true) {
PercentilesAggregatorFactory.registerAggregators(valuesSourceRegistry);
}
}

private static <T> void setIfNotNull(Consumer<T> consumer, T value) {
if (value != null) {
consumer.accept(value);
Expand Down Expand Up @@ -271,16 +280,18 @@ protected ValuesSourceAggregatorFactory innerBuild(QueryShardContext queryShardC
ValuesSourceConfig config,
AggregatorFactory parent,
Builder subFactoriesBuilder) throws IOException {
switch (method) {
case TDIGEST:
return new TDigestPercentilesAggregatorFactory(name, config, percents, compression, keyed, queryShardContext, parent,
subFactoriesBuilder, metaData);
case HDR:
return new HDRPercentilesAggregatorFactory(name, config, percents,
numberOfSignificantValueDigits, keyed, queryShardContext, parent, subFactoriesBuilder, metaData);
default:
PercentilesConfig percentilesConfig;
if (method.equals(PercentilesMethod.TDIGEST)) {
percentilesConfig = new PercentilesConfig.TDigestConfig(compression);
} else if (method.equals(PercentilesMethod.HDR)) {
percentilesConfig = new PercentilesConfig.HdrHistoConfig(numberOfSignificantValueDigits);
} else {
throw new IllegalStateException("Illegal method [" + method + "]");
}

return new PercentilesAggregatorFactory(name, config, percents, percentilesConfig, keyed, queryShardContext, parent,
subFactoriesBuilder, metaData);

}

@Override
Expand Down Expand Up @@ -365,4 +376,47 @@ public InternalBuilder method(PercentilesMethod method) {
}
}
}

/**
* A small config object that carries algo-specific settings. This allows the factory to have
* a single unified constructor for both algos, but internally switch execution
* depending on which algo is selected
*/
abstract static class PercentilesConfig {
private final PercentilesMethod method;

PercentilesConfig(PercentilesMethod method) {
this.method = method;
}

public PercentilesMethod getMethod() {
return method;
}

static class TDigestConfig extends PercentilesConfig {
private final double compression;

TDigestConfig(double compression) {
super(PercentilesMethod.TDIGEST);
this.compression = compression;
}

public double getCompression() {
return compression;
}
}

static class HdrHistoConfig extends PercentilesConfig {
private final int numberOfSignificantValueDigits;

HdrHistoConfig(int numberOfSignificantValueDigits) {
super(PercentilesMethod.HDR);
this.numberOfSignificantValueDigits = numberOfSignificantValueDigits;
}

public int getNumberOfSignificantValueDigits() {
return numberOfSignificantValueDigits;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.metrics;

import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder.PercentilesConfig;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregatorSupplier;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* This factory is used to generate both TDigest and HDRHisto aggregators, depending
* on the selected method
*/
class PercentilesAggregatorFactory extends ValuesSourceAggregatorFactory {

private final double[] percents;
private final PercentilesConfig percentilesConfig;
private final boolean keyed;

static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
valuesSourceRegistry.register(PercentilesAggregationBuilder.NAME,
List.of(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.HISTOGRAM),
new PercentilesAggregatorSupplier() {
@Override
public Aggregator build(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] percents, PercentilesConfig percentilesConfig, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

if (percentilesConfig.getMethod().equals(PercentilesMethod.TDIGEST)) {
double compression = ((PercentilesConfig.TDigestConfig)percentilesConfig).getCompression();
return new TDigestPercentilesAggregator(name, valuesSource, context, parent, percents, compression, keyed,
formatter, pipelineAggregators, metaData);
} else if (percentilesConfig.getMethod().equals(PercentilesMethod.HDR)) {
int numSigFig = ((PercentilesConfig.HdrHistoConfig)percentilesConfig).getNumberOfSignificantValueDigits();
return new HDRPercentilesAggregator(name, valuesSource, context, parent, percents, numSigFig, keyed,
formatter, pipelineAggregators, metaData);
}

// This should already have thrown but just in case
throw new IllegalStateException("Unknown percentiles method: [" + percentilesConfig.getMethod().toString() + "]");
}
}
);
}

PercentilesAggregatorFactory(String name, ValuesSourceConfig config, double[] percents,
PercentilesConfig percentilesConfig, boolean keyed, QueryShardContext queryShardContext,
AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, config, queryShardContext, parent, subFactoriesBuilder, metaData);
this.percents = percents;
this.percentilesConfig = percentilesConfig;
this.keyed = keyed;
}

@Override
protected Aggregator createUnmapped(SearchContext searchContext,
Aggregator parent,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
if (percentilesConfig.getMethod().equals(PercentilesMethod.TDIGEST)) {
double compression = ((PercentilesConfig.TDigestConfig)percentilesConfig).getCompression();
return new TDigestPercentilesAggregator(name, null, searchContext, parent, percents, compression, keyed, config.format(),
pipelineAggregators, metaData);
} else if (percentilesConfig.getMethod().equals(PercentilesMethod.HDR)) {
int numSigFig = ((PercentilesConfig.HdrHistoConfig)percentilesConfig).getNumberOfSignificantValueDigits();
return new HDRPercentilesAggregator(name, null, searchContext, parent, percents, numSigFig, keyed,
config.format(), pipelineAggregators, metaData);
}

// This should already have thrown but just in case
throw new IllegalStateException("Unknown percentiles method: [" + percentilesConfig.getMethod().toString() + "]");
}

@Override
protected Aggregator doCreateInternal(ValuesSource valuesSource,
SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {

AggregatorSupplier aggregatorSupplier = ValuesSourceRegistry.getInstance().getAggregator(config.valueSourceType(),
PercentilesAggregationBuilder.NAME);

if (aggregatorSupplier instanceof PercentilesAggregatorSupplier == false) {
throw new AggregationExecutionException("Registry miss-match - expected PercentilesAggregatorSupplier, found [" +
aggregatorSupplier.getClass().toString() + "]");
}
PercentilesAggregatorSupplier percentilesAggregatorSupplier = (PercentilesAggregatorSupplier) aggregatorSupplier;
return percentilesAggregatorSupplier.build(name, valuesSource, searchContext, parent, percents, percentilesConfig, keyed,
config.format(), pipelineAggregators, metaData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics;

import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder.PercentilesConfig;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregatorSupplier;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public interface PercentilesAggregatorSupplier extends AggregatorSupplier {
Aggregator build(String name,
ValuesSource valuesSource,
SearchContext context,
Aggregator parent,
double[] percents,
PercentilesConfig percentilesConfig,
boolean keyed,
DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException;
}
Loading