Skip to content

Commit f386661

Browse files
committed
Aggregations Refactor: Refactor Avg Bucket, Min Bucket, Max Bucket, Sum Bucket, Percentiles Bucket, Stats Bucket and Extended Stats Bucket Aggregations
1 parent 3bbb65e commit f386661

24 files changed

+751
-126
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics;
21+
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.common.xcontent.XContentBuilder;
25+
import org.elasticsearch.search.aggregations.AggregatorFactory;
26+
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
27+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
28+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
29+
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
30+
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
31+
32+
import java.io.IOException;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Objects;
36+
37+
public abstract class BucketMetricsFactory extends PipelineAggregatorFactory {
38+
39+
private String format = null;
40+
private GapPolicy gapPolicy = GapPolicy.SKIP;
41+
42+
public BucketMetricsFactory(String name, String type, String[] bucketsPaths) {
43+
super(name, type, bucketsPaths);
44+
}
45+
46+
/**
47+
* Sets the format to use on the output of this aggregation.
48+
*/
49+
public void format(String format) {
50+
this.format = format;
51+
}
52+
53+
/**
54+
* Gets the format to use on the output of this aggregation.
55+
*/
56+
public String format() {
57+
return format;
58+
}
59+
60+
protected ValueFormatter formatter() {
61+
if (format != null) {
62+
return ValueFormat.Patternable.Number.format(format).formatter();
63+
} else {
64+
return ValueFormatter.RAW;
65+
}
66+
}
67+
68+
/**
69+
* Sets the gap policy to use for this aggregation.
70+
*/
71+
public void gapPolicy(GapPolicy gapPolicy) {
72+
this.gapPolicy = gapPolicy;
73+
}
74+
75+
/**
76+
* Gets the gap policy to use for this aggregation.
77+
*/
78+
public GapPolicy gapPolicy() {
79+
return gapPolicy;
80+
}
81+
82+
@Override
83+
protected abstract PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException;
84+
85+
@Override
86+
public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories,
87+
List<PipelineAggregatorFactory> pipelineAggregatorFactories) {
88+
if (bucketsPaths.length != 1) {
89+
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
90+
+ " must contain a single entry for aggregation [" + name + "]");
91+
}
92+
}
93+
94+
@Override
95+
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
96+
if (format != null) {
97+
builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format);
98+
}
99+
if (gapPolicy != null) {
100+
builder.field(BucketMetricsParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
101+
}
102+
doXContentBody(builder, params);
103+
return builder;
104+
}
105+
106+
protected abstract XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException;
107+
108+
@Override
109+
protected final PipelineAggregatorFactory doReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException {
110+
BucketMetricsFactory factory = innerReadFrom(name, bucketsPaths, in);
111+
factory.format = in.readOptionalString();
112+
factory.gapPolicy = GapPolicy.readFrom(in);
113+
return factory;
114+
}
115+
116+
protected abstract BucketMetricsFactory innerReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException;
117+
118+
@Override
119+
protected final void doWriteTo(StreamOutput out) throws IOException {
120+
innerWriteTo(out);
121+
out.writeOptionalString(format);
122+
gapPolicy.writeTo(out);
123+
}
124+
125+
protected abstract void innerWriteTo(StreamOutput out) throws IOException;
126+
127+
@Override
128+
protected final int doHashCode() {
129+
return Objects.hash(format, gapPolicy, innerHashCode());
130+
}
131+
132+
protected abstract int innerHashCode();
133+
134+
@Override
135+
protected final boolean doEquals(Object obj) {
136+
BucketMetricsFactory other = (BucketMetricsFactory) obj;
137+
return Objects.equals(format, other.format)
138+
&& Objects.equals(gapPolicy, other.gapPolicy)
139+
&& innerEquals(other);
140+
}
141+
142+
protected abstract boolean innerEquals(BucketMetricsFactory other);
143+
144+
}

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,10 @@
2121

2222
import org.elasticsearch.common.ParseField;
2323
import org.elasticsearch.common.xcontent.XContentParser;
24-
import org.elasticsearch.common.xcontent.XContentParser.Token;
2524
import org.elasticsearch.search.SearchParseException;
2625
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
2726
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2827
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
29-
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
30-
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
3128
import org.elasticsearch.search.internal.SearchContext;
3229

3330
import java.io.IOException;
@@ -54,7 +51,7 @@ public final PipelineAggregatorFactory parse(String pipelineAggregatorName, XCon
5451
String currentFieldName = null;
5552
String[] bucketsPaths = null;
5653
String format = null;
57-
GapPolicy gapPolicy = GapPolicy.SKIP;
54+
GapPolicy gapPolicy = null;
5855
Map<String, Object> leftover = new HashMap<>(5);
5956

6057
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@@ -91,16 +88,15 @@ public final PipelineAggregatorFactory parse(String pipelineAggregatorName, XCon
9188
+ "] for aggregation [" + pipelineAggregatorName + "]", parser.getTokenLocation());
9289
}
9390

94-
ValueFormatter formatter = null;
95-
if (format != null) {
96-
formatter = ValueFormat.Patternable.Number.format(format).formatter();
97-
} else {
98-
formatter = ValueFormatter.RAW;
99-
}
100-
101-
PipelineAggregatorFactory factory = null;
91+
BucketMetricsFactory factory = null;
10292
try {
103-
factory = buildFactory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter, leftover);
93+
factory = buildFactory(pipelineAggregatorName, bucketsPaths, leftover);
94+
if (format != null) {
95+
factory.format(format);
96+
}
97+
if (gapPolicy != null) {
98+
factory.gapPolicy(gapPolicy);
99+
}
104100
} catch (ParseException exception) {
105101
throw new SearchParseException(context, "Could not parse settings for aggregation ["
106102
+ pipelineAggregatorName + "].", null, exception);
@@ -114,7 +110,7 @@ public final PipelineAggregatorFactory parse(String pipelineAggregatorName, XCon
114110
return factory;
115111
}
116112

117-
protected abstract PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
118-
ValueFormatter formatter, Map<String, Object> unparsedParams) throws ParseException;
113+
protected abstract BucketMetricsFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths,
114+
Map<String, Object> unparsedParams) throws ParseException;
119115

120116
}

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketParser.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919

2020
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg;
2121

22-
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
2322
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
23+
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsFactory;
2424
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser;
25-
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
2625

2726
import java.util.Map;
2827

@@ -33,14 +32,11 @@ public String type() {
3332
}
3433

3534
@Override
36-
protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
37-
ValueFormatter formatter, Map<String, Object> unparsedParams) {
38-
return new AvgBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter);
35+
protected BucketMetricsFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, Map<String, Object> unparsedParams) {
36+
return new AvgBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths);
3937
}
40-
41-
// NORELEASE implement this method when refactoring this aggregation
4238
@Override
4339
public PipelineAggregatorFactory getFactoryPrototype() {
44-
return null;
40+
return new AvgBucketPipelineAggregator.Factory(null, null);
4541
}
4642
}

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregator.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg;
2121

2222
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.common.xcontent.XContentBuilder;
2325
import org.elasticsearch.search.aggregations.AggregatorFactory;
2426
import org.elasticsearch.search.aggregations.InternalAggregation;
2527
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
@@ -28,6 +30,7 @@
2830
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2931
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
3032
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
33+
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsFactory;
3134
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
3235
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
3336

@@ -86,20 +89,15 @@ protected InternalAggregation buildAggregation(List<PipelineAggregator> pipeline
8689
return new InternalSimpleValue(name(), avgValue, formatter, pipelineAggregators, metadata);
8790
}
8891

89-
public static class Factory extends PipelineAggregatorFactory {
92+
public static class Factory extends BucketMetricsFactory {
9093

91-
private final ValueFormatter formatter;
92-
private final GapPolicy gapPolicy;
93-
94-
public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter) {
94+
public Factory(String name, String[] bucketsPaths) {
9595
super(name, TYPE.name(), bucketsPaths);
96-
this.gapPolicy = gapPolicy;
97-
this.formatter = formatter;
9896
}
9997

10098
@Override
10199
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
102-
return new AvgBucketPipelineAggregator(name, bucketsPaths, gapPolicy, formatter, metaData);
100+
return new AvgBucketPipelineAggregator(name, bucketsPaths, gapPolicy(), formatter(), metaData);
103101
}
104102

105103
@Override
@@ -110,6 +108,31 @@ public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactorie
110108
}
111109
}
112110

111+
@Override
112+
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
113+
return builder;
114+
}
115+
116+
@Override
117+
protected BucketMetricsFactory innerReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException {
118+
return new Factory(name, bucketsPaths);
119+
}
120+
121+
@Override
122+
protected void innerWriteTo(StreamOutput out) throws IOException {
123+
// Do nothing, no extra state to write to stream
124+
}
125+
126+
@Override
127+
protected int innerHashCode() {
128+
return 0;
129+
}
130+
131+
@Override
132+
protected boolean innerEquals(BucketMetricsFactory other) {
133+
return true;
134+
}
135+
113136
}
114137

115138
}

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketParser.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919

2020
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max;
2121

22-
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
2322
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
23+
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsFactory;
2424
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser;
25-
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
2625

2726
import java.util.Map;
2827

@@ -34,15 +33,13 @@ public String type() {
3433
}
3534

3635
@Override
37-
protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
38-
ValueFormatter formatter, Map<String, Object> unparsedParams) {
39-
return new MaxBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter);
36+
protected BucketMetricsFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, Map<String, Object> unparsedParams) {
37+
return new MaxBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths);
4038
}
4139

42-
// NORELEASE implement this method when refactoring this aggregation
4340
@Override
4441
public PipelineAggregatorFactory getFactoryPrototype() {
45-
return null;
42+
return new MaxBucketPipelineAggregator.Factory(null, null);
4643
}
4744

4845
}

0 commit comments

Comments
 (0)