Skip to content

Deprecate serializing PipelineAggregators #54926

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,47 @@ public AggregationSpec setAggregatorRegistrar(Consumer<ValuesSourceRegistry> agg
class PipelineAggregationSpec extends SearchExtensionSpec<PipelineAggregationBuilder,
ContextParser<String, ? extends PipelineAggregationBuilder>> {
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
/**
* Read the aggregator from a stream.
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
*/
@Deprecated
private final Writeable.Reader<? extends PipelineAggregator> aggregatorReader;

/**
* Specification of a {@link PipelineAggregator}.
*
* @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it
* is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}.
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param parser reads the aggregation builder from XContent
*/
public PipelineAggregationSpec(ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
super(name, builderReader, parser);
this.aggregatorReader = null;
}

/**
* Specification of a {@link PipelineAggregator}.
*
* @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the
* {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from
* {@link NamedWriteable#getWriteableName()}.
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param parser reads the aggregation builder from XContent
*/
public PipelineAggregationSpec(String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
super(name, builderReader, parser);
this.aggregatorReader = null;
}

/**
* Specification of a {@link PipelineAggregator}.
*
Expand All @@ -380,7 +419,10 @@ class PipelineAggregationSpec extends SearchExtensionSpec<PipelineAggregationBui
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(ParseField, Writeable.Reader, ContextParser)} for
* pipelines implemented after 7.8.0
*/
@Deprecated
public PipelineAggregationSpec(ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
Expand All @@ -399,7 +441,10 @@ public PipelineAggregationSpec(ParseField name,
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines
* implemented after 7.8.0
*/
@Deprecated
public PipelineAggregationSpec(String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
Expand Down Expand Up @@ -466,8 +511,10 @@ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.R
}

/**
* The reader for the {@link PipelineAggregator}.
* Read the aggregator from a stream.
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
*/
@Deprecated
public Writeable.Reader<? extends PipelineAggregator> getAggregatorReader() {
return aggregatorReader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,10 @@ private void registerPipelineAggregation(PipelineAggregationSpec spec) {
(p, c) -> spec.getParser().parse(p, (String) c)));
namedWriteables.add(
new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader()));
namedWriteables.add(
new NamedWriteableRegistry.Entry(PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader()));
if (spec.getAggregatorReader() != null) {
namedWriteables.add(new NamedWriteableRegistry.Entry(
PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader()));
}
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> resultReader : spec.getResultReaders().entrySet()) {
namedWriteables
.add(new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

/**
* An immutable collection of {@link AggregatorFactories}.
*/
public class AggregatorFactories {
public static final Pattern VALID_AGG_NAME = Pattern.compile("[^\\[\\]>]+");

Expand Down Expand Up @@ -155,26 +158,16 @@ private static AggregatorFactories.Builder parseAggregators(XContentParser parse
return factories;
}

public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>());
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0]);

private AggregatorFactory[] factories;
private List<PipelineAggregationBuilder> pipelineAggregatorFactories;

public static Builder builder() {
return new Builder();
}

private AggregatorFactories(AggregatorFactory[] factories, List<PipelineAggregationBuilder> pipelineAggregators) {
private AggregatorFactories(AggregatorFactory[] factories) {
this.factories = factories;
this.pipelineAggregatorFactories = pipelineAggregators;
}

public List<PipelineAggregator> createPipelineAggregators() {
List<PipelineAggregator> pipelineAggregators = new ArrayList<>(this.pipelineAggregatorFactories.size());
for (PipelineAggregationBuilder factory : this.pipelineAggregatorFactories) {
pipelineAggregators.add(factory.create());
}
return pipelineAggregators;
}

/**
Expand Down Expand Up @@ -216,13 +209,16 @@ public Aggregator[] createTopLevelAggregators(SearchContext searchContext) throw
}

/**
* @return the number of sub-aggregator factories not including pipeline
* aggregator factories
* @return the number of sub-aggregator factories
*/
public int countAggregators() {
return factories.length;
}

/**
* A mutable collection of {@link AggregationBuilder}s and
* {@link PipelineAggregationBuilder}s.
*/
public static class Builder implements Writeable, ToXContentObject {
private final Set<String> names = new HashSet<>();

Expand Down Expand Up @@ -333,16 +329,13 @@ public AggregatorFactories build(QueryShardContext queryShardContext, Aggregator
if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) {
return EMPTY;
}
List<PipelineAggregationBuilder> orderedPipelineAggregators =
resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders);
AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()];

int i = 0;
for (AggregationBuilder agg : aggregationBuilders) {
aggFactories[i] = agg.build(queryShardContext, parent);
++i;
}
return new AggregatorFactories(aggFactories, orderedPipelineAggregators);
return new AggregatorFactories(aggFactories);
}

private List<PipelineAggregationBuilder> resolvePipelineAggregatorOrder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.pipeline;


import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -113,22 +114,54 @@ protected PipelineAggregator(String name, String[] bucketsPaths, Map<String, Obj

/**
* Read from a stream.
* @deprecated pipeline aggregations added after 7.8.0 shouldn't call this
*/
@Deprecated
protected PipelineAggregator(StreamInput in) throws IOException {
name = in.readString();
bucketsPaths = in.readStringArray();
metadata = in.readMap();
if (in.getVersion().before(Version.V_7_8_0)) {
name = in.readString();
bucketsPaths = in.readStringArray();
metadata = in.readMap();
} else {
throw new IllegalStateException("Cannot deserialize pipeline [" + getClass() + "] from before 7.8.0");
}
}

/**
* {@inheritDoc}
* @deprecated pipeline aggregations added after 7.8.0 shouldn't call this
*/
@Override
@Deprecated
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeStringArray(bucketsPaths);
out.writeMap(metadata);
doWriteTo(out);
if (out.getVersion().before(Version.V_7_8_0)) {
out.writeString(name);
out.writeStringArray(bucketsPaths);
out.writeMap(metadata);
doWriteTo(out);
} else {
throw new IllegalArgumentException("[" + name + "] is not supported on versions before 7.8.0");
}
}

/**
* Write the body of the aggregation to the wire.
* @deprecated pipeline aggregations added after 7.8.0 don't need to implement this
*/
@Deprecated
protected void doWriteTo(StreamOutput out) throws IOException {
}

/**
* The name of the writeable object.
* @deprecated pipeline aggregations added after 7.8.0 don't need to implement this
*/
@Override
@Deprecated
public String getWriteableName() {
throw new IllegalArgumentException("[" + name + "] is not supported on versions before 7.8.0");
}

protected abstract void doWriteTo(StreamOutput out) throws IOException;

public String name() {
return name;
Expand Down