Skip to content

Commit 123ca13

Browse files
committed
Aggregations Refactor: Refactor Serial Differencing Aggregation
1 parent 50462bb commit 123ca13

File tree

5 files changed

+159
-22
lines changed

5 files changed

+159
-22
lines changed

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffParser.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
2626
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2727
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
28-
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
29-
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
3028
import org.elasticsearch.search.internal.SearchContext;
3129

3230
import java.io.IOException;
@@ -50,8 +48,8 @@ public PipelineAggregatorFactory parse(String reducerName, XContentParser parser
5048
String currentFieldName = null;
5149
String[] bucketsPaths = null;
5250
String format = null;
53-
GapPolicy gapPolicy = GapPolicy.SKIP;
54-
int lag = 1;
51+
GapPolicy gapPolicy = null;
52+
Integer lag = null;
5553

5654
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
5755
if (token == XContentParser.Token.FIELD_NAME) {
@@ -102,20 +100,22 @@ public PipelineAggregatorFactory parse(String reducerName, XContentParser parser
102100
+ "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation());
103101
}
104102

105-
ValueFormatter formatter;
103+
SerialDiffPipelineAggregator.Factory factory = new SerialDiffPipelineAggregator.Factory(reducerName, bucketsPaths);
104+
if (lag != null) {
105+
factory.lag(lag);
106+
}
106107
if (format != null) {
107-
formatter = ValueFormat.Patternable.Number.format(format).formatter();
108-
} else {
109-
formatter = ValueFormatter.RAW;
108+
factory.format(format);
110109
}
111-
112-
return new SerialDiffPipelineAggregator.Factory(reducerName, bucketsPaths, formatter, gapPolicy, lag);
110+
if (gapPolicy != null) {
111+
factory.gapPolicy(gapPolicy);
112+
}
113+
return factory;
113114
}
114115

115-
// NORELEASE implement this method when refactoring this aggregation
116116
@Override
117117
public PipelineAggregatorFactory getFactoryPrototype() {
118-
return null;
118+
return new SerialDiffPipelineAggregator.Factory(null, null);
119119
}
120120

121121
}

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java

Lines changed: 96 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,29 @@
2323
import org.elasticsearch.common.collect.EvictingQueue;
2424
import org.elasticsearch.common.io.stream.StreamInput;
2525
import org.elasticsearch.common.io.stream.StreamOutput;
26+
import org.elasticsearch.common.xcontent.XContentBuilder;
2627
import org.elasticsearch.search.aggregations.InternalAggregation;
2728
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
2829
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
2930
import org.elasticsearch.search.aggregations.InternalAggregations;
3031
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
32+
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
3133
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
3234
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
3335
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
3436
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
37+
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
3538
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
3639
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
3740

3841
import java.io.IOException;
3942
import java.util.ArrayList;
4043
import java.util.List;
4144
import java.util.Map;
45+
import java.util.Objects;
4246
import java.util.stream.Collectors;
4347
import java.util.stream.StreamSupport;
4448

45-
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
4649
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
4750

4851
public class SerialDiffPipelineAggregator extends PipelineAggregator {
@@ -144,20 +147,105 @@ public void doWriteTo(StreamOutput out) throws IOException {
144147

145148
public static class Factory extends PipelineAggregatorFactory {
146149

147-
private final ValueFormatter formatter;
148-
private GapPolicy gapPolicy;
149-
private int lag;
150+
private String format;
151+
private GapPolicy gapPolicy = GapPolicy.SKIP;
152+
private int lag = 1;
150153

151-
public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, int lag) {
154+
public Factory(String name, String[] bucketsPaths) {
152155
super(name, TYPE.name(), bucketsPaths);
153-
this.formatter = formatter;
154-
this.gapPolicy = gapPolicy;
156+
}
157+
158+
/**
159+
* Sets the lag to use when calculating the serial difference.
160+
*/
161+
public void lag(int lag) {
155162
this.lag = lag;
156163
}
157164

165+
/**
166+
* Gets the lag to use when calculating the serial difference.
167+
*/
168+
public int lag() {
169+
return lag;
170+
}
171+
172+
/**
173+
* Sets the format to use on the output of this aggregation.
174+
*/
175+
public void format(String format) {
176+
this.format = format;
177+
}
178+
179+
/**
180+
* Gets the format to use on the output of this aggregation.
181+
*/
182+
public String format() {
183+
return format;
184+
}
185+
186+
/**
187+
* Sets the GapPolicy to use on the output of this aggregation.
188+
*/
189+
public void gapPolicy(GapPolicy gapPolicy) {
190+
this.gapPolicy = gapPolicy;
191+
}
192+
193+
/**
194+
* Gets the GapPolicy to use on the output of this aggregation.
195+
*/
196+
public GapPolicy gapPolicy() {
197+
return gapPolicy;
198+
}
199+
200+
protected ValueFormatter formatter() {
201+
if (format != null) {
202+
return ValueFormat.Patternable.Number.format(format).formatter();
203+
} else {
204+
return ValueFormatter.RAW;
205+
}
206+
}
207+
158208
@Override
159209
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
160-
return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, lag, metaData);
210+
return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter(), gapPolicy, lag, metaData);
211+
}
212+
213+
@Override
214+
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
215+
if (format != null) {
216+
builder.field(SerialDiffParser.FORMAT.getPreferredName(), format);
217+
}
218+
builder.field(SerialDiffParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
219+
builder.field(SerialDiffParser.LAG.getPreferredName(), lag);
220+
return builder;
221+
}
222+
223+
@Override
224+
protected PipelineAggregatorFactory doReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException {
225+
Factory factory = new Factory(name, bucketsPaths);
226+
factory.format = in.readOptionalString();
227+
factory.gapPolicy = GapPolicy.readFrom(in);
228+
factory.lag = in.readVInt();
229+
return factory;
230+
}
231+
232+
@Override
233+
protected void doWriteTo(StreamOutput out) throws IOException {
234+
out.writeOptionalString(format);
235+
gapPolicy.writeTo(out);
236+
out.writeVInt(lag);
237+
}
238+
239+
@Override
240+
protected int doHashCode() {
241+
return Objects.hash(format, gapPolicy, lag);
242+
}
243+
@Override
244+
protected boolean doEquals(Object obj) {
245+
Factory other = (Factory) obj;
246+
return Objects.equals(format, other.format)
247+
&& Objects.equals(gapPolicy, other.gapPolicy)
248+
&& Objects.equals(lag, other.lag);
161249
}
162250

163251
}

core/src/test/java/org/elasticsearch/search/aggregations/BaseAggregationTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
3333
import org.elasticsearch.common.io.stream.StreamInput;
3434
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.common.settings.SettingsFilter;
3536
import org.elasticsearch.common.settings.SettingsModule;
3637
import org.elasticsearch.common.xcontent.XContentFactory;
3738
import org.elasticsearch.common.xcontent.XContentParser;
@@ -100,7 +101,7 @@ public static void init() throws IOException {
100101
index = new Index("test");
101102
injector = new ModulesBuilder().add(
102103
new EnvironmentModule(new Environment(settings)),
103-
new SettingsModule(settings),
104+
new SettingsModule(settings, new SettingsFilter(settings)),
104105
new ThreadPoolModule(new ThreadPool(settings)),
105106
new ScriptModule(settings),
106107
new IndicesModule() {

core/src/test/java/org/elasticsearch/search/aggregations/BasePipelineAggregationTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
3333
import org.elasticsearch.common.io.stream.StreamInput;
3434
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.common.settings.SettingsFilter;
3536
import org.elasticsearch.common.settings.SettingsModule;
3637
import org.elasticsearch.common.xcontent.XContentFactory;
3738
import org.elasticsearch.common.xcontent.XContentParser;
@@ -101,7 +102,7 @@ public static void init() throws IOException {
101102
index = new Index("test");
102103
injector = new ModulesBuilder().add(
103104
new EnvironmentModule(new Environment(settings)),
104-
new SettingsModule(settings),
105+
new SettingsModule(settings, new SettingsFilter(settings)),
105106
new ThreadPoolModule(new ThreadPool(settings)),
106107
new ScriptModule(settings),
107108
new IndicesModule() {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations.pipeline;
21+
22+
import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase;
23+
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
24+
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
25+
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator.Factory;
26+
27+
public class SerialDifferenceTests extends BasePipelineAggregationTestCase<SerialDiffPipelineAggregator.Factory> {
28+
29+
@Override
30+
protected Factory createTestAggregatorFactory() {
31+
String name = randomAsciiOfLengthBetween(3, 20);
32+
String[] bucketsPaths = new String[1];
33+
bucketsPaths[0] = randomAsciiOfLengthBetween(3, 20);
34+
Factory factory = new Factory(name, bucketsPaths);
35+
if (randomBoolean()) {
36+
factory.format(randomAsciiOfLengthBetween(1, 10));
37+
}
38+
if (randomBoolean()) {
39+
factory.gapPolicy(randomFrom(GapPolicy.values()));
40+
}
41+
if (randomBoolean()) {
42+
factory.lag(randomIntBetween(1, 1000));
43+
}
44+
return factory;
45+
}
46+
47+
}

0 commit comments

Comments
 (0)