Skip to content

Begin to drop pipeline aggs from the result tree #54311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -43,8 +42,8 @@ public class InternalMatrixStats extends InternalAggregation implements MatrixSt

/** per shard ctor */
InternalMatrixStats(String name, long count, RunningStats multiFieldStatsResults, MatrixStatsResults results,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
super(name, pipelineAggregators, metadata);
Map<String, Object> metadata) {
super(name, metadata);
assert count >= 0;
this.stats = multiFieldStatsResults;
this.results = results;
Expand Down Expand Up @@ -240,7 +239,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce

// return empty result iff all stats are null
if (aggs.isEmpty()) {
return new InternalMatrixStats(name, 0, null, new MatrixStatsResults(), pipelineAggregators(), getMetadata());
return new InternalMatrixStats(name, 0, null, new MatrixStatsResults(), getMetadata());
}

RunningStats runningStats = new RunningStats();
Expand All @@ -250,9 +249,9 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce

if (reduceContext.isFinalReduce()) {
MatrixStatsResults results = new MatrixStatsResults(runningStats);
return new InternalMatrixStats(name, results.getDocCount(), runningStats, results, pipelineAggregators(), getMetadata());
return new InternalMatrixStats(name, results.getDocCount(), runningStats, results, getMetadata());
}
return new InternalMatrixStats(name, runningStats.docCount, runningStats, null, pipelineAggregators(), getMetadata());
return new InternalMatrixStats(name, runningStats.docCount, runningStats, null, getMetadata());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ public InternalAggregation buildAggregation(long bucket) {
if (valuesSources == null || bucket >= stats.size()) {
return buildEmptyAggregation();
}
return new InternalMatrixStats(name, stats.size(), stats.get(bucket), null, pipelineAggregators(), metadata());
return new InternalMatrixStats(name, stats.size(), stats.get(bucket), null, metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalMatrixStats(name, 0, null, null, pipelineAggregators(), metadata());
return new InternalMatrixStats(name, 0, null, null, metadata());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.matrix.stats.InternalMatrixStats.Fields;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.test.InternalAggregationTestCase;

Expand Down Expand Up @@ -69,8 +68,7 @@ protected List<NamedXContentRegistry.Entry> getNamedXContents() {
}

@Override
protected InternalMatrixStats createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metadata) {
protected InternalMatrixStats createTestInstance(String name, Map<String, Object> metadata) {
double[] values = new double[fields.length];
for (int i = 0; i < fields.length; i++) {
values[i] = randomDouble();
Expand All @@ -79,7 +77,7 @@ protected InternalMatrixStats createTestInstance(String name, List<PipelineAggre
RunningStats runningStats = new RunningStats();
runningStats.add(fields, values);
MatrixStatsResults matrixStatsResults = hasMatrixStatsResults ? new MatrixStatsResults(runningStats) : null;
return new InternalMatrixStats(name, 1L, runningStats, matrixStatsResults, Collections.emptyList(), metadata);
return new InternalMatrixStats(name, 1L, runningStats, matrixStatsResults, metadata);
}

@Override
Expand Down Expand Up @@ -125,7 +123,7 @@ protected InternalMatrixStats mutateInstance(InternalMatrixStats instance) {
metadata.put(randomAlphaOfLength(15), randomInt());
break;
}
return new InternalMatrixStats(name, docCount, runningStats, matrixStatsResults, Collections.emptyList(), metadata);
return new InternalMatrixStats(name, docCount, runningStats, matrixStatsResults, metadata);
}

@Override
Expand All @@ -149,14 +147,14 @@ public void testReduceRandom() {

runningStats.add(new String[]{"a", "b"}, new double[]{valueA, valueB});
if (++valuePerShardCounter == valuesPerShard) {
shardResults.add(new InternalMatrixStats("_name", 1L, runningStats, null, Collections.emptyList(), Collections.emptyMap()));
shardResults.add(new InternalMatrixStats("_name", 1L, runningStats, null, Collections.emptyMap()));
runningStats = new RunningStats();
valuePerShardCounter = 0;
}
}

if (valuePerShardCounter != 0) {
shardResults.add(new InternalMatrixStats("_name", 1L, runningStats, null, Collections.emptyList(), Collections.emptyMap()));
shardResults.add(new InternalMatrixStats("_name", 1L, runningStats, null, Collections.emptyMap()));
}
MultiPassStats multiPassStats = new MultiPassStats("a", "b");
multiPassStats.computeStats(aValues, bValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,19 @@ public void consumeBucketsAndMaybeBreak(int size) {
*
* @param name The name of the aggregation.
*/
protected InternalAggregation(String name, Map<String, Object> metadata) {
this.name = name;
this.pipelineAggregators = emptyList();
this.metadata = metadata;
}

/**
* Constructs an aggregation result with a given name.
*
* @param name The name of the aggregation.
* @deprecated pipelines are being removed from the aggregation tree. Use the other ctor.
*/
@Deprecated
protected InternalAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
this.name = name;
this.pipelineAggregators = pipelineAggregators;
Expand All @@ -171,6 +184,7 @@ public final void mergePipelineTreeForBWCSerialization(PipelineAggregator.Pipeli
forEachBucket(bucketAggs -> bucketAggs.mergePipelineTreeForBWCSerialization(pipelineTree));
}


/**
* Read from a stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
B extends InternalMultiBucketAggregation.InternalBucket>
extends InternalAggregation implements MultiBucketsAggregation {

public InternalMultiBucketAggregation(String name, Map<String, Object> metadata) {
super(name, metadata);
}

/**
* @deprecated being removed
*/
@Deprecated
public InternalMultiBucketAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
super(name, pipelineAggregators, metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
bucketAggregations(bucketOrd), keyed);
buckets.add(bucket);
}
return new InternalFilters(name, buckets, keyed, pipelineAggregators(), metadata());
return new InternalFilters(name, buckets, keyed, metadata());
}

@Override
Expand All @@ -198,7 +198,7 @@ public InternalAggregation buildEmptyAggregation() {
buckets.add(bucket);
}

return new InternalFilters(name, buckets, keyed, pipelineAggregators(), metadata());
return new InternalFilters(name, buckets, keyed, metadata());
}

final long bucketOrd(long owningBucketOrdinal, int filterOrd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -126,9 +125,8 @@ public int hashCode() {
// bucketMap gets lazily initialized from buckets in getBucketByKey()
private transient Map<String, InternalBucket> bucketMap;

public InternalFilters(String name, List<InternalBucket> buckets, boolean keyed, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metadata) {
super(name, pipelineAggregators, metadata);
public InternalFilters(String name, List<InternalBucket> buckets, boolean keyed, Map<String, Object> metadata) {
super(name, metadata);
this.buckets = buckets;
this.keyed = keyed;
}
Expand Down Expand Up @@ -164,7 +162,7 @@ public String getWriteableName() {

@Override
public InternalFilters create(List<InternalBucket> buckets) {
return new InternalFilters(this.name, buckets, this.keyed, this.pipelineAggregators(), this.metadata);
return new InternalFilters(name, buckets, keyed, metadata);
}

@Override
Expand Down Expand Up @@ -209,8 +207,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
}

reduceContext.consumeBucketsAndMaybeBreak(bucketsList.size());
InternalFilters reduced = new InternalFilters(name, new ArrayList<>(bucketsList.size()), keyed, pipelineAggregators(),
getMetadata());
InternalFilters reduced = new InternalFilters(name, new ArrayList<>(bucketsList.size()), keyed, getMetadata());
for (List<InternalBucket> sameRangeList : bucketsList) {
reduced.buckets.add(reduceBucket(sameRangeList, reduceContext));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ public void collect(int doc, long bucket) throws IOException {
};
}

abstract T buildAggregation(String name, int requiredSize, List<InternalGeoGridBucket> buckets,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata);
abstract T buildAggregation(String name, int requiredSize, List<InternalGeoGridBucket> buckets, Map<String, Object> metadata);

/**
* This method is used to return a re-usable instance of the bucket when building
Expand Down Expand Up @@ -132,12 +131,12 @@ public InternalGeoGrid buildAggregation(long owningBucketOrdinal) throws IOExcep
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
}
return buildAggregation(name, requiredSize, Arrays.asList(list), pipelineAggregators(), metadata());
return buildAggregation(name, requiredSize, Arrays.asList(list), metadata());
}

@Override
public InternalGeoGrid buildEmptyAggregation() {
return buildAggregation(name, requiredSize, Collections.emptyList(), pipelineAggregators(), metadata());
return buildAggregation(name, requiredSize, Collections.emptyList(), metadata());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ public class GeoHashGridAggregator extends GeoGridAggregator<InternalGeoHashGrid

@Override
InternalGeoHashGrid buildAggregation(String name, int requiredSize, List<InternalGeoGridBucket> buckets,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
return new InternalGeoHashGrid(name, requiredSize, buckets, pipelineAggregators, metadata);
Map<String, Object> metadata) {
return new InternalGeoHashGrid(name, requiredSize, buckets, metadata);
}

@Override
public InternalGeoHashGrid buildEmptyAggregation() {
return new InternalGeoHashGrid(name, requiredSize, Collections.emptyList(), pipelineAggregators(), metadata());
return new InternalGeoHashGrid(name, requiredSize, Collections.emptyList(), metadata());
}

InternalGeoGridBucket newEmptyBucket() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static java.util.Collections.emptyList;

public class GeoHashGridAggregatorFactory extends ValuesSourceAggregatorFactory {

private final int precision;
Expand All @@ -66,8 +67,7 @@ protected Aggregator createUnmapped(SearchContext searchContext,
Aggregator parent,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metadata) throws IOException {
final InternalAggregation aggregation = new InternalGeoHashGrid(name, requiredSize,
Collections.emptyList(), pipelineAggregators, metadata);
final InternalAggregation aggregation = new InternalGeoHashGrid(name, requiredSize, emptyList(), metadata);
return new NonCollectingAggregator(name, searchContext, parent, pipelineAggregators, metadata) {
@Override
public InternalAggregation buildEmptyAggregation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ public class GeoTileGridAggregator extends GeoGridAggregator<InternalGeoTileGrid

@Override
InternalGeoTileGrid buildAggregation(String name, int requiredSize, List<InternalGeoGridBucket> buckets,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
return new InternalGeoTileGrid(name, requiredSize, buckets, pipelineAggregators, metadata);
Map<String, Object> metadata) {
return new InternalGeoTileGrid(name, requiredSize, buckets, metadata);
}

@Override
public InternalGeoTileGrid buildEmptyAggregation() {
return new InternalGeoTileGrid(name, requiredSize, Collections.emptyList(), pipelineAggregators(), metadata());
return new InternalGeoTileGrid(name, requiredSize, Collections.emptyList(), metadata());
}

InternalGeoGridBucket newEmptyBucket() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ protected Aggregator createUnmapped(SearchContext searchContext,
Aggregator parent,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metadata) throws IOException {
final InternalAggregation aggregation = new InternalGeoTileGrid(name, requiredSize,
Collections.emptyList(), pipelineAggregators, metadata);
final InternalAggregation aggregation = new InternalGeoTileGrid(name, requiredSize, Collections.emptyList(), metadata);
return new NonCollectingAggregator(name, searchContext, parent, pipelineAggregators, metadata) {
@Override
public InternalAggregation buildEmptyAggregation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -48,9 +47,8 @@ public abstract class InternalGeoGrid<B extends InternalGeoGridBucket>
protected final int requiredSize;
protected final List<InternalGeoGridBucket> buckets;

InternalGeoGrid(String name, int requiredSize, List<InternalGeoGridBucket> buckets, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metadata) {
super(name, pipelineAggregators, metadata);
InternalGeoGrid(String name, int requiredSize, List<InternalGeoGridBucket> buckets, Map<String, Object> metadata) {
super(name, metadata);
this.requiredSize = requiredSize;
this.buckets = buckets;
}
Expand All @@ -72,8 +70,7 @@ protected void doWriteTo(StreamOutput out) throws IOException {
out.writeList(buckets);
}

abstract InternalGeoGrid create(String name, int requiredSize, List<InternalGeoGridBucket> buckets,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata);
abstract InternalGeoGrid create(String name, int requiredSize, List<InternalGeoGridBucket> buckets, Map<String, Object> metadata);

@Override
public List<InternalGeoGridBucket> getBuckets() {
Expand Down Expand Up @@ -115,7 +112,7 @@ public InternalGeoGrid reduce(List<InternalAggregation> aggregations, ReduceCont
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
return create(getName(), requiredSize, Arrays.asList(list), pipelineAggregators(), getMetadata());
return create(getName(), requiredSize, Arrays.asList(list), getMetadata());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.List;
Expand All @@ -33,9 +32,8 @@
*/
public class InternalGeoHashGrid extends InternalGeoGrid<InternalGeoHashGridBucket> {

InternalGeoHashGrid(String name, int requiredSize, List<InternalGeoGridBucket> buckets,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
super(name, requiredSize, buckets, pipelineAggregators, metadata);
InternalGeoHashGrid(String name, int requiredSize, List<InternalGeoGridBucket> buckets, Map<String, Object> metadata) {
super(name, requiredSize, buckets, metadata);
}

public InternalGeoHashGrid(StreamInput in) throws IOException {
Expand All @@ -44,7 +42,7 @@ public InternalGeoHashGrid(StreamInput in) throws IOException {

@Override
public InternalGeoGrid create(List<InternalGeoGridBucket> buckets) {
return new InternalGeoHashGrid(name, requiredSize, buckets, pipelineAggregators(), metadata);
return new InternalGeoHashGrid(name, requiredSize, buckets, metadata);
}

@Override
Expand All @@ -53,8 +51,8 @@ public InternalGeoGridBucket createBucket(InternalAggregations aggregations, Int
}

@Override
InternalGeoGrid create(String name, int requiredSize, List buckets, List list, Map metadata) {
return new InternalGeoHashGrid(name, requiredSize, buckets, list, metadata);
InternalGeoGrid create(String name, int requiredSize, List buckets, Map metadata) {
return new InternalGeoHashGrid(name, requiredSize, buckets, metadata);
}

@Override
Expand Down
Loading