Skip to content

Revert "Fix pipeline agg serialization for ccs (backport of #54282) (#54468)" #54668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -148,7 +147,6 @@ public void consumeBucketsAndMaybeBreak(int size) {
protected final Map<String, Object> metadata;

private final List<PipelineAggregator> pipelineAggregators;
private List<PipelineAggregator> pipelineAggregatorsForBwcSerialization;

/**
* Constructs an aggregation result with a given name.
Expand All @@ -161,25 +159,16 @@ protected InternalAggregation(String name, List<PipelineAggregator> pipelineAggr
this.metadata = metadata;
}

/**
* Merge a {@linkplain PipelineAggregator.PipelineTree} into this
* aggregation result tree before serializing to a node older than
* 7.8.0.
*/
public final void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) {
pipelineAggregatorsForBwcSerialization = pipelineTree.aggregators();
forEachBucket(bucketAggs -> bucketAggs.mergePipelineTreeForBWCSerialization(pipelineTree));
}

/**
* Read from a stream.
*/
protected InternalAggregation(StreamInput in) throws IOException {
name = in.readString();
metadata = in.readMap();
pipelineAggregators = emptyList();
if (in.getVersion().before(Version.V_7_8_0)) {
in.readNamedWriteableList(PipelineAggregator.class);
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class);
} else {
pipelineAggregators = emptyList();
}
}

Expand All @@ -188,9 +177,7 @@ public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeGenericValue(metadata);
if (out.getVersion().before(Version.V_7_8_0)) {
assert pipelineAggregatorsForBwcSerialization != null :
"serializing to pre-7.8.0 versions should have called mergePipelineTreeForBWCSerialization";
out.writeNamedWriteableList(pipelineAggregatorsForBwcSerialization);
out.writeNamedWriteableList(pipelineAggregators);
}
doWriteTo(out);
}
Expand Down Expand Up @@ -225,11 +212,6 @@ public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations
"Aggregation [" + getName() + "] must be a bucket aggregation but was [" + getWriteableName() + "]");
}

/**
* Run a {@linkplain Consumer} over all buckets in this aggregation.
*/
public void forEachBucket(Consumer<InternalAggregations> consumer) {}

/**
* Creates the output from all pipeline aggs that this aggregation is associated with. Should only
* be called after all aggregations have been fully reduced
Expand Down Expand Up @@ -296,23 +278,10 @@ public Map<String, Object> getMetadata() {
return metadata;
}

/**
* @deprecated soon to be removed because it is not longer needed
*/
@Deprecated
public List<PipelineAggregator> pipelineAggregators() {
return pipelineAggregators;
}

/**
* The {@linkplain PipelineAggregator}s sent to older versions of Elasticsearch.
* @deprecated only use these for serializing to older Elasticsearch versions
*/
@Deprecated
public List<PipelineAggregator> pipelineAggregatorsForBwcSerialization() {
return pipelineAggregatorsForBwcSerialization;
}

@Override
public String getType() {
return getWriteableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.search.aggregations.support.AggregationPath;

import java.io.IOException;
Expand Down Expand Up @@ -101,38 +100,18 @@ public InternalAggregations(StreamInput in) throws IOException {
}

@Override
@SuppressWarnings("unchecked")
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_7_8_0)) {
out.writeNamedWriteableList((List<InternalAggregation>)aggregations);
if (out.getVersion().before(Version.V_7_8_0) && out.getVersion().onOrAfter(Version.V_6_7_0)) {
if (pipelineTreeForBwcSerialization == null) {
mergePipelineTreeForBWCSerialization(PipelineTree.EMPTY);
out.writeNamedWriteableList(getInternalAggregations());
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeNamedWriteableList(emptyList());
}
out.writeNamedWriteableList(emptyList());
} else {
PipelineAggregator.PipelineTree pipelineTree = pipelineTreeForBwcSerialization.get();
mergePipelineTreeForBWCSerialization(pipelineTree);
out.writeNamedWriteableList(getInternalAggregations());
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeNamedWriteableList(emptyList());
}
out.writeNamedWriteableList(pipelineTreeForBwcSerialization.get().aggregators());
}
} else {
out.writeNamedWriteableList(getInternalAggregations());
}
}

/**
* Merge a {@linkplain PipelineAggregator.PipelineTree} into this
* aggregation result tree before serializing to a node older than
* 7.8.0.
*/
public void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) {
getInternalAggregations().stream().forEach(agg -> {
agg.mergePipelineTreeForBWCSerialization(pipelineTree.subTree(agg.getName()));
});
}

/**
* Make a mutable copy of the aggregation results.
* <p>
Expand All @@ -156,14 +135,6 @@ public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
.collect(toList());
}

/**
* Get the transient pipeline tree used to serialize pipeline aggregators to old nodes.
*/
@Deprecated
Supplier<PipelineAggregator.PipelineTree> getPipelineTreeForBwcSerialization() {
return pipelineTreeForBwcSerialization;
}

@SuppressWarnings("unchecked")
private List<InternalAggregation> getInternalAggregations() {
return (List<InternalAggregation>) aggregations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

public abstract class InternalMultiBucketAggregation<A extends InternalMultiBucketAggregation,
Expand Down Expand Up @@ -146,7 +145,7 @@ public static int countInnerBucket(Aggregation agg) {
}

/**
* A multi-bucket agg needs to first reduce the buckets and *their* pipelines
* Amulti-bucket agg needs to first reduce the buckets and *their* pipelines
* before allowing sibling pipelines to materialize.
*/
@Override
Expand Down Expand Up @@ -174,13 +173,6 @@ public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations
return modified ? create(newBuckets) : this;
}

@Override
public void forEachBucket(Consumer<InternalAggregations> consumer) {
for (B bucket : getBuckets()) {
consumer.accept((InternalAggregations) bucket.getAggregations());
}
}

private List<B> reducePipelineBuckets(ReduceContext reduceContext, PipelineTree pipelineTree) {
List<B> reducedBuckets = new ArrayList<>();
for (B bucket : getBuckets()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -182,11 +181,6 @@ public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations
return create(rewritten);
}

@Override
public void forEachBucket(Consumer<InternalAggregations> consumer) {
consumer.accept(aggregations);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ private InternalAggregation.ReduceContextBuilder maxBucketReduceContext() {
}

public static InternalAggregations createTestInstance() throws Exception {
return createTestInstance(randomPipelineTree());
}

public static InternalAggregations createTestInstance(PipelineAggregator.PipelineTree pipelineTree) throws Exception {
List<InternalAggregation> aggsList = new ArrayList<>();
if (randomBoolean()) {
StringTermsTests stringTermsTests = new StringTermsTests();
Expand All @@ -108,7 +104,7 @@ public static InternalAggregations createTestInstance(PipelineAggregator.Pipelin
InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests();
aggsList.add(simpleValueTests.createTestInstance());
}
return new InternalAggregations(aggsList, () -> pipelineTree);
return new InternalAggregations(aggsList);
}

private static PipelineAggregator.PipelineTree randomPipelineTree() {
Expand All @@ -133,9 +129,11 @@ public void testSerialization() throws Exception {
}

public void testGetTopLevelPipelineAggregators() throws Exception {
PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree();
InternalAggregations aggs = createTestInstance(pipelineTree);
assertThat(aggs.getTopLevelPipelineAggregators(), equalTo(pipelineTree.aggregators()));
InternalAggregations orig = createTestInstance();
PipelineAggregator.PipelineTree tree = randomPipelineTree();
InternalAggregations withPipelines = new InternalAggregations(orig.copyResults(), () -> tree);
assertThat(withPipelines.aggregations, equalTo(orig.aggregations));
assertThat(withPipelines.getTopLevelPipelineAggregators(), equalTo(tree.aggregators()));
}

private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException {
Expand All @@ -148,14 +146,8 @@ private void writeToAndReadFrom(InternalAggregations aggregations, int iteration
InternalAggregations deserialized = new InternalAggregations(in);
assertEquals(aggregations.aggregations, deserialized.aggregations);
if (iteration < 2) {
/*
* Add the pipeline tree for bwc serialization just like we
* do when we merge the aggregation. Without that we can't
* properly serialize to older versions.
*/
InternalAggregations asThoughReduced = new InternalAggregations(
deserialized.copyResults(), aggregations.getPipelineTreeForBwcSerialization());
writeToAndReadFrom(asThoughReduced, iteration + 1);
//serialize this enough times to make sure that we are able to write again what we read
writeToAndReadFrom(deserialized, iteration + 1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ public void testStreamResponse() throws Exception {
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
out.setVersion(version);
if (version.before(Version.V_7_8_0)) {
sigTerms.mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree.EMPTY);
}
out.writeNamedWriteable(sigTerms);

// read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,10 @@
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative;
import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket;
Expand All @@ -142,7 +140,6 @@
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -156,7 +153,6 @@
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.elasticsearch.search.aggregations.InternalMultiBucketAggregation.countInnerBucket;
Expand Down Expand Up @@ -388,52 +384,6 @@ public final void testFromXContentWithRandomFields() throws IOException {
assertFromXContent(aggregation, parsedAggregation);
}

public void testMergePipelineTreeForBWCSerialization() {
T agg = createTestInstance();
PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree(agg);
agg.mergePipelineTreeForBWCSerialization(pipelineTree);
assertMergedPipelineTreeForBWCSerialization(agg, pipelineTree);
}

public static PipelineAggregator.PipelineTree randomPipelineTree(InternalAggregation aggregation) {
Map<String, PipelineTree> subTree = new HashMap<>();
aggregation.forEachBucket(bucketAggs -> {
for (Aggregation subAgg : bucketAggs) {
if (subTree.containsKey(subAgg.getName())) {
continue;
}
subTree.put(subAgg.getName(), randomPipelineTree((InternalAggregation) subAgg));
}
});
return new PipelineAggregator.PipelineTree(emptyMap(), randomPipelineAggregators());
}

public static List<PipelineAggregator> randomPipelineAggregators() {
List<PipelineAggregator> pipelines = new ArrayList<>();
if (randomBoolean()) {
if (randomBoolean()) {
pipelines.add(new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create());
}
if (randomBoolean()) {
pipelines.add(new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create());
}
if (randomBoolean()) {
pipelines.add(new SumBucketPipelineAggregationBuilder("name3", "bucket3").create());
}
}
return pipelines;
}

@SuppressWarnings("deprecation")
private void assertMergedPipelineTreeForBWCSerialization(InternalAggregation agg, PipelineAggregator.PipelineTree pipelineTree) {
assertThat(agg.pipelineAggregatorsForBwcSerialization(), equalTo(pipelineTree.aggregators()));
agg.forEachBucket(bucketAggs -> {
for (Aggregation subAgg : bucketAggs) {
assertMergedPipelineTreeForBWCSerialization((InternalAggregation) subAgg, pipelineTree.subTree(subAgg.getName()));
}
});
}

protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException;

@SuppressWarnings("unchecked")
Expand Down