Skip to content

Save a little space in agg tree #53730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/54161" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
refresh: true
body:
- '{"index": {"_index": "test_index"}}'
- '{"f1": "local_cluster", "filter_field": 0}'
- '{"f1": "local_cluster", "animal": "dog", "filter_field": 0}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "local_cluster", "filter_field": 1}'
- '{"f1": "local_cluster", "animal": "dog", "filter_field": 1}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "local_cluster", "filter_field": 0}'
- '{"f1": "local_cluster", "animal": "dog", "filter_field": 0}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "local_cluster", "filter_field": 1}'
- '{"f1": "local_cluster", "animal": "dog", "filter_field": 1}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "local_cluster", "filter_field": 0}'
- '{"f1": "local_cluster", "animal": "pig", "filter_field": 0}'

- do:
search:
Expand Down Expand Up @@ -115,6 +115,87 @@
- match: { aggregations.cluster.buckets.0.key: "local_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 5 }

# once more, this time with a top level pipeline agg
- do:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a similar test for the rolling-upgrade module? Theoretically it should be the same as CCS, but it might also smoke out different issues due to heterogeneous serialization inside the same cluster (instead of funneling through a gateway).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be great to have a "mixed cluster CCS" test. I talked that one through with @javanna and we don't have one now and probably don't want to build one just for this.

search:
rest_total_hits_as_int: true
index: test_index,my_remote_cluster:test_index
body:
seq_no_primary_term: true
aggs:
cluster:
terms:
field: f1.keyword
aggs:
s:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a non-top-level pipeline agg just to confirm they aren't affected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured I'd get it in my next PR about non-top-level pipeline aggs, but I'm happy to do it now!

sum:
field: filter_field
average_sum:
avg_bucket:
buckets_path: cluster.s

- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 5 }
- match: { hits.total: 11 }
- length: { aggregations.cluster.buckets: 2 }
- match: { aggregations.cluster.buckets.0.key: "remote_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
- match: { aggregations.cluster.buckets.0.s.value: 2 }
- match: { aggregations.cluster.buckets.1.key: "local_cluster" }
- match: { aggregations.cluster.buckets.1.s.value: 2 }
- match: { aggregations.average_sum.value: 2 }

# and now a non-top-level pipeline agg!
- do:
search:
rest_total_hits_as_int: true
index: test_index,my_remote_cluster:test_index
body:
seq_no_primary_term: true
aggs:
cluster:
terms:
field: f1.keyword
aggs:
animal:
terms:
field: animal.keyword
aggs:
s:
sum:
field: filter_field
average_sum:
avg_bucket:
buckets_path: animal.s

- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 5 }
- match: { hits.total: 11 }
- length: { aggregations.cluster.buckets: 2 }
- match: { aggregations.cluster.buckets.0.key: "remote_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
- match: { aggregations.cluster.buckets.0.animal.buckets.0.key: "chicken" }
- match: { aggregations.cluster.buckets.0.animal.buckets.0.doc_count: 4 }
- match: { aggregations.cluster.buckets.0.animal.buckets.0.s.value: 1 }
- match: { aggregations.cluster.buckets.0.animal.buckets.1.key: "pig" }
- match: { aggregations.cluster.buckets.0.animal.buckets.1.doc_count: 2 }
- match: { aggregations.cluster.buckets.0.animal.buckets.1.s.value: 1 }
- match: { aggregations.cluster.buckets.0.average_sum.value: 1 }
- match: { aggregations.cluster.buckets.1.key: "local_cluster" }
- match: { aggregations.cluster.buckets.1.animal.buckets.0.key: "dog" }
- match: { aggregations.cluster.buckets.1.animal.buckets.0.doc_count: 4 }
- match: { aggregations.cluster.buckets.1.animal.buckets.0.s.value: 2 }
- match: { aggregations.cluster.buckets.1.animal.buckets.1.key: "pig" }
- match: { aggregations.cluster.buckets.1.animal.buckets.1.doc_count: 1 }
- match: { aggregations.cluster.buckets.1.animal.buckets.1.s.value: 0 }
- match: { aggregations.cluster.buckets.1.average_sum.value: 1 }

---
"Add transient remote cluster based on the preset cluster":
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,17 @@
refresh: true
body:
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"f1": "remote_cluster", "animal": "pig", "filter_field": 0}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 1}'
- '{"f1": "remote_cluster", "animal": "pig", "filter_field": 1}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 0}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 1}'
- '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 1}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 0}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 0}'

- do:
search:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,8 @@ public InternalAggregation.ReduceContextBuilder aggReduceContextBuilder(SearchRe
return new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(bigArrays, scriptService);
return InternalAggregation.ReduceContext.forPartialReduction(bigArrays, scriptService,
() -> requestToPipelineTree(request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,16 @@ public void execute(SearchContext context) {
}
}
List<PipelineAggregator> pipelineAggregators = context.aggregations().factories().createPipelineAggregators();
List<SiblingPipelineAggregator> siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
if (pipelineAggregator instanceof SiblingPipelineAggregator) {
siblingPipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
} else {
if (false == pipelineAggregator instanceof SiblingPipelineAggregator) {
// TODO move this to request validation after #53669
throw new AggregationExecutionException("Invalid pipeline aggregation named [" + pipelineAggregator.name()
+ "] of type [" + pipelineAggregator.getWriteableName() + "]. Only sibling pipeline aggregations are "
+ "allowed at the top level");
}
}
context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators));
context.queryResult().aggregations(new InternalAggregations(aggregations,
context.request().source().aggregations()::buildPipelineTree));

// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -37,7 +38,9 @@
import java.util.Objects;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.Supplier;

import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -62,12 +65,19 @@ public static class ReduceContext {
private final ScriptService scriptService;
private final IntConsumer multiBucketConsumer;
private final PipelineTree pipelineTreeRoot;
/**
* Supplies the pipelines when the result of the reduce is serialized
* to node versions that need pipeline aggregators to be serialized
* to them.
*/
private final Supplier<PipelineTree> pipelineTreeForBwcSerialization;

/**
* Build a {@linkplain ReduceContext} to perform a partial reduction.
*/
public static ReduceContext forPartialReduction(BigArrays bigArrays, ScriptService scriptService) {
return new ReduceContext(bigArrays, scriptService, (s) -> {}, null);
public static ReduceContext forPartialReduction(BigArrays bigArrays, ScriptService scriptService,
Supplier<PipelineTree> pipelineTreeForBwcSerialization) {
return new ReduceContext(bigArrays, scriptService, (s) -> {}, null, pipelineTreeForBwcSerialization);
}

/**
Expand All @@ -77,15 +87,16 @@ public static ReduceContext forPartialReduction(BigArrays bigArrays, ScriptServi
public static ReduceContext forFinalReduction(BigArrays bigArrays, ScriptService scriptService,
IntConsumer multiBucketConsumer, PipelineTree pipelineTreeRoot) {
return new ReduceContext(bigArrays, scriptService, multiBucketConsumer,
requireNonNull(pipelineTreeRoot, "prefer EMPTY to null"));
requireNonNull(pipelineTreeRoot, "prefer EMPTY to null"), () -> pipelineTreeRoot);
}

private ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsumer multiBucketConsumer,
PipelineTree pipelineTreeRoot) {
PipelineTree pipelineTreeRoot, Supplier<PipelineTree> pipelineTreeForBwcSerialization) {
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.multiBucketConsumer = multiBucketConsumer;
this.pipelineTreeRoot = pipelineTreeRoot;
this.pipelineTreeForBwcSerialization = pipelineTreeForBwcSerialization;
}

/**
Expand All @@ -112,6 +123,15 @@ public PipelineTree pipelineTreeRoot() {
return pipelineTreeRoot;
}

/**
* Supplies the pipelines when the result of the reduce is serialized
* to node versions that need pipeline aggregators to be serialized
* to them.
*/
public Supplier<PipelineTree> pipelineTreeForBwcSerialization() {
return pipelineTreeForBwcSerialization;
}

/**
* Adds {@code count} buckets to the global count for the request and fails if this number is greater than
* the maximum number of buckets allowed in a response
Expand All @@ -129,9 +149,9 @@ public void consumeBucketsAndMaybeBreak(int size) {
private final List<PipelineAggregator> pipelineAggregators;

/**
* Constructs an get with a given name.
* Constructs an aggregation result with a given name.
*
* @param name The name of the get.
* @param name The name of the aggregation.
*/
protected InternalAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
this.name = name;
Expand All @@ -145,14 +165,20 @@ protected InternalAggregation(String name, List<PipelineAggregator> pipelineAggr
protected InternalAggregation(StreamInput in) throws IOException {
name = in.readString();
metaData = in.readMap();
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class);
if (in.getVersion().before(Version.V_8_0_0)) {
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class);
} else {
pipelineAggregators = emptyList();
}
}

@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeGenericValue(metaData);
out.writeNamedWriteableList(pipelineAggregators);
if (out.getVersion().before(Version.V_8_0_0)) {
out.writeNamedWriteableList(pipelineAggregators);
}
doWriteTo(out);
}

Expand Down
Loading