Skip to content

Commit 97cbdb8

Browse files
committed
Aggregations Refactor: Refactor Range Aggregations
1 parent 97140f8 commit 97cbdb8

File tree

17 files changed

+1095
-407
lines changed

17 files changed

+1095
-407
lines changed

core/src/main/java/org/elasticsearch/common/network/Cidrs.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,8 @@ static String octetsToCIDR(int[] octets, int networkMask) {
113113
assert octets.length == 4;
114114
return octetsToString(octets) + "/" + networkMask;
115115
}
116+
117+
public static String createCIDR(long ipAddress, int networkMask) {
118+
return octetsToCIDR(longToOctets(ipAddress), networkMask);
119+
}
116120
}

core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
3030
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
3131
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
32+
import org.elasticsearch.search.aggregations.support.ValueType;
33+
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
3234
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
3335
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
3436

@@ -225,6 +227,14 @@ public Type type() {
225227
return TYPE;
226228
}
227229

230+
public ValuesSourceType getValueSourceType() {
231+
return ValuesSourceType.NUMERIC;
232+
}
233+
234+
public ValueType getValueType() {
235+
return ValueType.NUMERIC;
236+
}
237+
228238
public R create(String name, List<B> ranges, ValueFormatter formatter, boolean keyed, List<PipelineAggregator> pipelineAggregators,
229239
Map<String, Object> metaData) {
230240
return (R) new InternalRange<>(name, ranges, formatter, keyed, pipelineAggregators, metaData);

core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java

Lines changed: 192 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@
2020

2121
import org.apache.lucene.index.LeafReaderContext;
2222
import org.apache.lucene.util.InPlaceMergeSorter;
23+
import org.elasticsearch.common.ParseField;
24+
import org.elasticsearch.common.ParseFieldMatcher;
25+
import org.elasticsearch.common.io.stream.StreamInput;
26+
import org.elasticsearch.common.io.stream.StreamOutput;
27+
import org.elasticsearch.common.io.stream.Writeable;
28+
import org.elasticsearch.common.xcontent.ToXContent;
29+
import org.elasticsearch.common.xcontent.XContentBuilder;
30+
import org.elasticsearch.common.xcontent.XContentParser;
2331
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
2432
import org.elasticsearch.search.aggregations.Aggregator;
2533
import org.elasticsearch.search.aggregations.AggregatorFactories;
@@ -31,9 +39,11 @@
3139
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
3240
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
3341
import org.elasticsearch.search.aggregations.support.AggregationContext;
42+
import org.elasticsearch.search.aggregations.support.ValueType;
3443
import org.elasticsearch.search.aggregations.support.ValuesSource;
44+
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
3545
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
36-
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
46+
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
3747
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
3848
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
3949
import org.elasticsearch.search.aggregations.support.format.ValueParser;
@@ -43,21 +53,38 @@
4353
import java.util.ArrayList;
4454
import java.util.List;
4555
import java.util.Map;
56+
import java.util.Objects;
4657

4758
/**
4859
*
4960
*/
5061
public class RangeAggregator extends BucketsAggregator {
5162

52-
public static class Range {
63+
public static final ParseField RANGES_FIELD = new ParseField("ranges");
64+
public static final ParseField KEYED_FIELD = new ParseField("keyed");
5365

54-
public String key;
55-
public double from = Double.NEGATIVE_INFINITY;
56-
String fromAsStr;
57-
public double to = Double.POSITIVE_INFINITY;
58-
String toAsStr;
66+
public static class Range implements Writeable<Range>, ToXContent {
5967

60-
public Range(String key, double from, String fromAsStr, double to, String toAsStr) {
68+
public static final Range PROTOTYPE = new Range(null, -1, null, -1, null);
69+
public static final ParseField KEY_FIELD = new ParseField("key");
70+
public static final ParseField FROM_FIELD = new ParseField("from");
71+
public static final ParseField TO_FIELD = new ParseField("to");
72+
73+
protected String key;
74+
protected double from = Double.NEGATIVE_INFINITY;
75+
protected String fromAsStr;
76+
protected double to = Double.POSITIVE_INFINITY;
77+
protected String toAsStr;
78+
79+
public Range(String key, double from, double to) {
80+
this(key, from, null, to, null);
81+
}
82+
83+
public Range(String key, String from, String to) {
84+
this(key, Double.NEGATIVE_INFINITY, from, Double.POSITIVE_INFINITY, to);
85+
}
86+
87+
protected Range(String key, double from, String fromAsStr, double to, String toAsStr) {
6188
this.key = key;
6289
this.from = from;
6390
this.fromAsStr = fromAsStr;
@@ -83,6 +110,99 @@ public void process(ValueParser parser, SearchContext context) {
83110
to = parser.parseDouble(toAsStr, context);
84111
}
85112
}
113+
114+
@Override
115+
public Range readFrom(StreamInput in) throws IOException {
116+
String key = in.readOptionalString();
117+
String fromAsStr = in.readOptionalString();
118+
String toAsStr = in.readOptionalString();
119+
double from = in.readDouble();
120+
double to = in.readDouble();
121+
return new Range(key, from, fromAsStr, to, toAsStr);
122+
}
123+
124+
@Override
125+
public void writeTo(StreamOutput out) throws IOException {
126+
out.writeOptionalString(key);
127+
out.writeOptionalString(fromAsStr);
128+
out.writeOptionalString(toAsStr);
129+
out.writeDouble(from);
130+
out.writeDouble(to);
131+
}
132+
133+
public Range fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException {
134+
135+
XContentParser.Token token;
136+
String currentFieldName = null;
137+
double from = Double.NEGATIVE_INFINITY;
138+
String fromAsStr = null;
139+
double to = Double.POSITIVE_INFINITY;
140+
String toAsStr = null;
141+
String key = null;
142+
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
143+
if (token == XContentParser.Token.FIELD_NAME) {
144+
currentFieldName = parser.currentName();
145+
} else if (token == XContentParser.Token.VALUE_NUMBER) {
146+
if (parseFieldMatcher.match(currentFieldName, FROM_FIELD)) {
147+
from = parser.doubleValue();
148+
} else if (parseFieldMatcher.match(currentFieldName, TO_FIELD)) {
149+
to = parser.doubleValue();
150+
}
151+
} else if (token == XContentParser.Token.VALUE_STRING) {
152+
if (parseFieldMatcher.match(currentFieldName, FROM_FIELD)) {
153+
fromAsStr = parser.text();
154+
} else if (parseFieldMatcher.match(currentFieldName, TO_FIELD)) {
155+
toAsStr = parser.text();
156+
} else if (parseFieldMatcher.match(currentFieldName, KEY_FIELD)) {
157+
key = parser.text();
158+
}
159+
}
160+
}
161+
return new Range(key, from, fromAsStr, to, toAsStr);
162+
}
163+
164+
@Override
165+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
166+
builder.startObject();
167+
if (key != null) {
168+
builder.field(KEY_FIELD.getPreferredName(), key);
169+
}
170+
if (Double.isFinite(from)) {
171+
builder.field(FROM_FIELD.getPreferredName(), from);
172+
}
173+
if (Double.isFinite(to)) {
174+
builder.field(TO_FIELD.getPreferredName(), to);
175+
}
176+
if (fromAsStr != null) {
177+
builder.field(FROM_FIELD.getPreferredName(), fromAsStr);
178+
}
179+
if (toAsStr != null) {
180+
builder.field(TO_FIELD.getPreferredName(), toAsStr);
181+
}
182+
builder.endObject();
183+
return builder;
184+
}
185+
186+
@Override
187+
public int hashCode() {
188+
return Objects.hash(key, from, fromAsStr, to, toAsStr);
189+
}
190+
191+
@Override
192+
public boolean equals(Object obj) {
193+
if (obj == null) {
194+
return false;
195+
}
196+
if (getClass() != obj.getClass()) {
197+
return false;
198+
}
199+
Range other = (Range) obj;
200+
return Objects.equals(key, other.key)
201+
&& Objects.equals(from, other.from)
202+
&& Objects.equals(fromAsStr, other.fromAsStr)
203+
&& Objects.equals(to, other.to)
204+
&& Objects.equals(toAsStr, other.toAsStr);
205+
}
86206
}
87207

88208
final ValuesSource.Numeric valuesSource;
@@ -94,7 +214,7 @@ public void process(ValueParser parser, SearchContext context) {
94214
final double[] maxTo;
95215

96216
public RangeAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, ValueFormat format,
97-
InternalRange.Factory rangeFactory, List<Range> ranges, boolean keyed, AggregationContext aggregationContext,
217+
InternalRange.Factory rangeFactory, List<? extends Range> ranges, boolean keyed, AggregationContext aggregationContext,
98218
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
99219

100220
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
@@ -245,12 +365,13 @@ protected int compare(int i, int j) {
245365

246366
public static class Unmapped extends NonCollectingAggregator {
247367

248-
private final List<RangeAggregator.Range> ranges;
368+
private final List<? extends RangeAggregator.Range> ranges;
249369
private final boolean keyed;
250370
private final InternalRange.Factory factory;
251371
private final ValueFormatter formatter;
252372

253-
public Unmapped(String name, List<RangeAggregator.Range> ranges, boolean keyed, ValueFormat format, AggregationContext context,
373+
public Unmapped(String name, List<? extends RangeAggregator.Range> ranges, boolean keyed, ValueFormat format,
374+
AggregationContext context,
254375
Aggregator parent, InternalRange.Factory factory, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
255376
throws IOException {
256377

@@ -279,17 +400,27 @@ public InternalAggregation buildEmptyAggregation() {
279400
public static class Factory extends ValuesSourceAggregatorFactory<ValuesSource.Numeric> {
280401

281402
private final InternalRange.Factory rangeFactory;
282-
private final List<Range> ranges;
283-
private final boolean keyed;
403+
private final List<? extends Range> ranges;
404+
private boolean keyed = false;
405+
406+
public Factory(String name, List<? extends Range> ranges) {
407+
this(name, InternalRange.FACTORY, ranges);
408+
}
284409

285-
public Factory(String name, ValuesSourceParser.Input<ValuesSource.Numeric> valueSourceInput, InternalRange.Factory rangeFactory,
286-
List<Range> ranges, boolean keyed) {
287-
super(name, rangeFactory.type(), valueSourceInput);
410+
protected Factory(String name, InternalRange.Factory rangeFactory, List<? extends Range> ranges) {
411+
super(name, rangeFactory.type(), rangeFactory.getValueSourceType(), rangeFactory.getValueType());
288412
this.rangeFactory = rangeFactory;
289413
this.ranges = ranges;
414+
}
415+
416+
public void keyed(boolean keyed) {
290417
this.keyed = keyed;
291418
}
292419

420+
public boolean keyed() {
421+
return keyed;
422+
}
423+
293424
@Override
294425
protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
295426
Map<String, Object> metaData) throws IOException {
@@ -301,6 +432,51 @@ protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, Aggrega
301432
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
302433
return new RangeAggregator(name, factories, valuesSource, config.format(), rangeFactory, ranges, keyed, aggregationContext, parent, pipelineAggregators, metaData);
303434
}
435+
436+
@Override
437+
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
438+
builder.field(RANGES_FIELD.getPreferredName(), ranges);
439+
builder.field(KEYED_FIELD.getPreferredName(), keyed);
440+
return builder;
441+
}
442+
443+
@Override
444+
protected ValuesSourceAggregatorFactory<Numeric> innerReadFrom(String name, ValuesSourceType valuesSourceType,
445+
ValueType targetValueType, StreamInput in) throws IOException {
446+
Factory factory = createFactoryFromStream(name, in);
447+
factory.keyed = in.readBoolean();
448+
return factory;
449+
}
450+
451+
protected Factory createFactoryFromStream(String name, StreamInput in) throws IOException {
452+
int size = in.readVInt();
453+
List<Range> ranges = new ArrayList<>(size);
454+
for (int i = 0; i < size; i++) {
455+
ranges.add(Range.PROTOTYPE.readFrom(in));
456+
}
457+
return new Factory(name, ranges);
458+
}
459+
460+
@Override
461+
protected void innerWriteTo(StreamOutput out) throws IOException {
462+
out.writeVInt(ranges.size());
463+
for (Range range : ranges) {
464+
range.writeTo(out);
465+
}
466+
out.writeBoolean(keyed);
467+
}
468+
469+
@Override
470+
protected int innerHashCode() {
471+
return Objects.hash(ranges, keyed);
472+
}
473+
474+
@Override
475+
protected boolean innerEquals(Object obj) {
476+
Factory other = (Factory) obj;
477+
return Objects.equals(ranges, other.ranges)
478+
&& Objects.equals(keyed, other.keyed);
479+
}
304480
}
305481

306482
}

0 commit comments

Comments
 (0)