Skip to content

Remove serialization from pipeline aggregator #55026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 0 additions & 67 deletions server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -368,46 +368,6 @@ public AggregationSpec setAggregatorRegistrar(Consumer<ValuesSourceRegistry> agg
class PipelineAggregationSpec extends SearchExtensionSpec<PipelineAggregationBuilder,
ContextParser<String, ? extends PipelineAggregationBuilder>> {
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
/**
* Read the aggregator from a stream.
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
*/
@Deprecated
private final Writeable.Reader<? extends PipelineAggregator> aggregatorReader;

/**
* Specification of a {@link PipelineAggregator}.
*
* @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it
* is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}.
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param parser reads the aggregation builder from XContent
*/
public PipelineAggregationSpec(ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
super(name, builderReader, parser);
this.aggregatorReader = null;
}

/**
* Specification of a {@link PipelineAggregator}.
*
* @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the
* {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from
* {@link NamedWriteable#getWriteableName()}.
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param parser reads the aggregation builder from XContent
*/
public PipelineAggregationSpec(String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
super(name, builderReader, parser);
this.aggregatorReader = null;
}

/**
* Specification of a {@link PipelineAggregator}.
Expand All @@ -417,18 +377,12 @@ public PipelineAggregationSpec(String name,
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}.
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(ParseField, Writeable.Reader, ContextParser)} for
* pipelines implemented after 7.8.0
*/
@Deprecated
public PipelineAggregationSpec(ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
super(name, builderReader, parser);
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -439,18 +393,12 @@ public PipelineAggregationSpec(ParseField name,
* {@link NamedWriteable#getWriteableName()}.
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines
* implemented after 7.8.0
*/
@Deprecated
public PipelineAggregationSpec(String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
super(name, builderReader, parser);
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -461,17 +409,14 @@ public PipelineAggregationSpec(String name,
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}.
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated prefer the ctor that takes a {@link ContextParser}
*/
@Deprecated
public PipelineAggregationSpec(ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
PipelineAggregator.Parser parser) {
super(name, builderReader, (p, n) -> parser.parse(n, p));
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -482,16 +427,13 @@ public PipelineAggregationSpec(ParseField name,
* {@link NamedWriteable#getWriteableName()}.
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @deprecated prefer the ctor that takes a {@link ContextParser}
*/
@Deprecated
public PipelineAggregationSpec(String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
PipelineAggregator.Parser parser) {
super(name, builderReader, (p, n) -> parser.parse(n, p));
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -510,15 +452,6 @@ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.R
return this;
}

/**
* Read the aggregator from a stream.
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
*/
@Deprecated
public Writeable.Reader<? extends PipelineAggregator> getAggregatorReader() {
return aggregatorReader;
}

/**
* Get the readers that must be registered for this aggregation's results.
*/
Expand Down
39 changes: 3 additions & 36 deletions server/src/main/java/org/elasticsearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,52 +193,37 @@
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.BucketSelectorPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketSelectorPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.BucketSortPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketSortPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.CumulativeSumPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.CumulativeSumPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketParser;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.InternalDerivative;
import org.elasticsearch.search.aggregations.pipeline.InternalExtendedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.InternalPercentilesBucket;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.InternalStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.MinBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.MinBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.MovFnPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.MovFnPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SerialDiffPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.SerialDiffPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase;
import org.elasticsearch.search.fetch.subphase.ExplainPhase;
import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase;
import org.elasticsearch.search.fetch.subphase.FetchScorePhase;
import org.elasticsearch.search.fetch.subphase.FetchSourcePhase;
import org.elasticsearch.search.fetch.subphase.FetchVersionPhase;
import org.elasticsearch.search.fetch.subphase.MatchedQueriesPhase;
import org.elasticsearch.search.fetch.subphase.FetchScorePhase;
import org.elasticsearch.search.fetch.subphase.ScriptFieldsPhase;
import org.elasticsearch.search.fetch.subphase.SeqNoPrimaryTermPhase;
import org.elasticsearch.search.fetch.subphase.FetchVersionPhase;
import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightPhase;
import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
Expand Down Expand Up @@ -502,82 +487,68 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
registerPipelineAggregation(new PipelineAggregationSpec(
DerivativePipelineAggregationBuilder.NAME,
DerivativePipelineAggregationBuilder::new,
DerivativePipelineAggregator::new,
DerivativePipelineAggregationBuilder::parse)
.addResultReader(InternalDerivative::new));
registerPipelineAggregation(new PipelineAggregationSpec(
MaxBucketPipelineAggregationBuilder.NAME,
MaxBucketPipelineAggregationBuilder::new,
MaxBucketPipelineAggregator::new,
MaxBucketPipelineAggregationBuilder.PARSER)
// This bucket is used by many pipeline aggreations.
.addResultReader(InternalBucketMetricValue.NAME, InternalBucketMetricValue::new));
registerPipelineAggregation(new PipelineAggregationSpec(
MinBucketPipelineAggregationBuilder.NAME,
MinBucketPipelineAggregationBuilder::new,
MinBucketPipelineAggregator::new,
MinBucketPipelineAggregationBuilder.PARSER)
/* Uses InternalBucketMetricValue */);
registerPipelineAggregation(new PipelineAggregationSpec(
AvgBucketPipelineAggregationBuilder.NAME,
AvgBucketPipelineAggregationBuilder::new,
AvgBucketPipelineAggregator::new,
AvgBucketPipelineAggregationBuilder.PARSER)
// This bucket is used by many pipeline aggreations.
.addResultReader(InternalSimpleValue.NAME, InternalSimpleValue::new));
registerPipelineAggregation(new PipelineAggregationSpec(
SumBucketPipelineAggregationBuilder.NAME,
SumBucketPipelineAggregationBuilder::new,
SumBucketPipelineAggregator::new,
SumBucketPipelineAggregationBuilder.PARSER)
/* Uses InternalSimpleValue */);
registerPipelineAggregation(new PipelineAggregationSpec(
StatsBucketPipelineAggregationBuilder.NAME,
StatsBucketPipelineAggregationBuilder::new,
StatsBucketPipelineAggregator::new,
StatsBucketPipelineAggregationBuilder.PARSER)
.addResultReader(InternalStatsBucket::new));
registerPipelineAggregation(new PipelineAggregationSpec(
ExtendedStatsBucketPipelineAggregationBuilder.NAME,
ExtendedStatsBucketPipelineAggregationBuilder::new,
ExtendedStatsBucketPipelineAggregator::new,
new ExtendedStatsBucketParser())
.addResultReader(InternalExtendedStatsBucket::new));
registerPipelineAggregation(new PipelineAggregationSpec(
PercentilesBucketPipelineAggregationBuilder.NAME,
PercentilesBucketPipelineAggregationBuilder::new,
PercentilesBucketPipelineAggregator::new,
PercentilesBucketPipelineAggregationBuilder.PARSER)
.addResultReader(InternalPercentilesBucket::new));
registerPipelineAggregation(new PipelineAggregationSpec(
CumulativeSumPipelineAggregationBuilder.NAME,
CumulativeSumPipelineAggregationBuilder::new,
CumulativeSumPipelineAggregator::new,
CumulativeSumPipelineAggregationBuilder::parse));
registerPipelineAggregation(new PipelineAggregationSpec(
BucketScriptPipelineAggregationBuilder.NAME,
BucketScriptPipelineAggregationBuilder::new,
BucketScriptPipelineAggregator::new,
BucketScriptPipelineAggregationBuilder.PARSER));
registerPipelineAggregation(new PipelineAggregationSpec(
BucketSelectorPipelineAggregationBuilder.NAME,
BucketSelectorPipelineAggregationBuilder::new,
BucketSelectorPipelineAggregator::new,
BucketSelectorPipelineAggregationBuilder::parse));
registerPipelineAggregation(new PipelineAggregationSpec(
BucketSortPipelineAggregationBuilder.NAME,
BucketSortPipelineAggregationBuilder::new,
BucketSortPipelineAggregator::new,
BucketSortPipelineAggregationBuilder::parse));
registerPipelineAggregation(new PipelineAggregationSpec(
SerialDiffPipelineAggregationBuilder.NAME,
SerialDiffPipelineAggregationBuilder::new,
SerialDiffPipelineAggregator::new,
SerialDiffPipelineAggregationBuilder::parse));
registerPipelineAggregation(new PipelineAggregationSpec(
MovFnPipelineAggregationBuilder.NAME,
MovFnPipelineAggregationBuilder::new,
MovFnPipelineAggregator::new,
MovFnPipelineAggregationBuilder.PARSER));

registerFromPlugin(plugins, SearchPlugin::getPipelineAggregations, this::registerPipelineAggregation);
Expand All @@ -588,10 +559,6 @@ private void registerPipelineAggregation(PipelineAggregationSpec spec) {
(p, c) -> spec.getParser().parse(p, (String) c)));
namedWriteables.add(
new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader()));
if (spec.getAggregatorReader() != null) {
namedWriteables.add(new NamedWriteableRegistry.Entry(
PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader()));
}
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> resultReader : spec.getResultReaders().entrySet()) {
namedWriteables
.add(new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ public void execute(SearchContext context) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}
context.queryResult().aggregations(new InternalAggregations(aggregations,
context.request().source().aggregations()::buildPipelineTree));
context.queryResult().aggregations(new InternalAggregations(aggregations));

// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
Expand Down
Loading