Skip to content

Commit 7e316ba

Browse files
committed
Aggregation refactor: make aggregationFactory implement NamedWritable
Also makes AggregatorFactories implement Writable
1 parent 8f63bb2 commit 7e316ba

File tree

68 files changed

+1781
-166
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1781
-166
lines changed

core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.elasticsearch.common.text.Text;
3737
import org.elasticsearch.index.query.QueryBuilder;
3838
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
39+
import org.elasticsearch.search.aggregations.AggregatorFactory;
40+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
3941
import org.joda.time.DateTime;
4042
import org.joda.time.DateTimeZone;
4143

@@ -605,6 +607,20 @@ <C> C readNamedWriteable(@SuppressWarnings("unused") Class<C> categoryClass) thr
605607
throw new UnsupportedOperationException("can't read named writeable from StreamInput");
606608
}
607609

610+
/**
611+
* Reads a {@link AggregatorFactory} from the current stream
612+
*/
613+
public AggregatorFactory readAggregatorFactory() throws IOException {
614+
return readNamedWriteable(AggregatorFactory.class);
615+
}
616+
617+
/**
618+
* Reads a {@link PipelineAggregatorFactory} from the current stream
619+
*/
620+
public PipelineAggregatorFactory readPipelineAggregatorFactory() throws IOException {
621+
return readNamedWriteable(PipelineAggregatorFactory.class);
622+
}
623+
608624
/**
609625
* Reads a {@link QueryBuilder} from the current stream
610626
*/

core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.elasticsearch.common.text.Text;
3535
import org.elasticsearch.index.query.QueryBuilder;
3636
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
37+
import org.elasticsearch.search.aggregations.AggregatorFactory;
38+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
3739
import org.joda.time.ReadableInstant;
3840

3941
import java.io.EOFException;
@@ -593,6 +595,20 @@ void writeNamedWriteable(NamedWriteable namedWriteable) throws IOException {
593595
namedWriteable.writeTo(this);
594596
}
595597

598+
/**
599+
* Writes a {@link AggregatorFactory} to the current stream
600+
*/
601+
public void writeAggregatorFactory(AggregatorFactory factory) throws IOException {
602+
writeNamedWriteable(factory);
603+
}
604+
605+
/**
606+
* Writes a {@link PipelineAggregatorFactory} to the current stream
607+
*/
608+
public void writePipelineAggregatorFactory(PipelineAggregatorFactory factory) throws IOException {
609+
writeNamedWriteable(factory);
610+
}
611+
596612
/**
597613
* Writes a {@link QueryBuilder} to the current stream
598614
*/

core/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public interface Parser {
6161
*/
6262
AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException;
6363

64+
/**
65+
* @return an empty {@link AggregatorFactory} instance for this parser
66+
* that can be used for deserialization
67+
*/
68+
AggregatorFactory getFactoryPrototype();
6469
}
6570

6671
/**

core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java

Lines changed: 97 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
*/
1919
package org.elasticsearch.search.aggregations;
2020

21+
import org.elasticsearch.action.support.ToXContentToBytes;
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.common.io.stream.Writeable;
25+
import org.elasticsearch.common.xcontent.XContentBuilder;
2126
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2227
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
2328
import org.elasticsearch.search.aggregations.support.AggregationContext;
@@ -26,19 +31,23 @@
2631

2732
import java.io.IOException;
2833
import java.util.ArrayList;
34+
import java.util.Arrays;
35+
import java.util.Collections;
2936
import java.util.HashMap;
3037
import java.util.HashSet;
3138
import java.util.LinkedList;
3239
import java.util.List;
3340
import java.util.Map;
41+
import java.util.Objects;
3442
import java.util.Set;
3543

3644
/**
3745
*
3846
*/
39-
public class AggregatorFactories {
47+
public class AggregatorFactories extends ToXContentToBytes implements Writeable<AggregatorFactories> {
4048

41-
public static final AggregatorFactories EMPTY = new Empty();
49+
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0],
50+
new ArrayList<PipelineAggregatorFactory>());
4251

4352
private AggregatorFactory parent;
4453
private AggregatorFactory[] factories;
@@ -48,7 +57,8 @@ public static Builder builder() {
4857
return new Builder();
4958
}
5059

51-
private AggregatorFactories(AggregatorFactory[] factories, List<PipelineAggregatorFactory> pipelineAggregators) {
60+
private AggregatorFactories(AggregatorFactory[] factories,
61+
List<PipelineAggregatorFactory> pipelineAggregators) {
5262
this.factories = factories;
5363
this.pipelineAggregatorFactories = pipelineAggregators;
5464
}
@@ -115,33 +125,12 @@ public void validate() {
115125
}
116126
}
117127

118-
private final static class Empty extends AggregatorFactories {
119-
120-
private static final AggregatorFactory[] EMPTY_FACTORIES = new AggregatorFactory[0];
121-
private static final Aggregator[] EMPTY_AGGREGATORS = new Aggregator[0];
122-
private static final List<PipelineAggregatorFactory> EMPTY_PIPELINE_AGGREGATORS = new ArrayList<>();
123-
124-
private Empty() {
125-
super(EMPTY_FACTORIES, EMPTY_PIPELINE_AGGREGATORS);
126-
}
127-
128-
@Override
129-
public Aggregator[] createSubAggregators(Aggregator parent) {
130-
return EMPTY_AGGREGATORS;
131-
}
132-
133-
@Override
134-
public Aggregator[] createTopLevelAggregators() {
135-
return EMPTY_AGGREGATORS;
136-
}
137-
138-
}
139-
140128
public static class Builder {
141129

142130
private final Set<String> names = new HashSet<>();
143131
private final List<AggregatorFactory> factories = new ArrayList<>();
144132
private final List<PipelineAggregatorFactory> pipelineAggregatorFactories = new ArrayList<>();
133+
private boolean skipResolveOrder;
145134

146135
public Builder addAggregator(AggregatorFactory factory) {
147136
if (!names.add(factory.name)) {
@@ -156,15 +145,29 @@ public Builder addPipelineAggregator(PipelineAggregatorFactory pipelineAggregato
156145
return this;
157146
}
158147

148+
/**
149+
* FOR TESTING ONLY
150+
*/
151+
Builder skipResolveOrder() {
152+
this.skipResolveOrder = true;
153+
return this;
154+
}
155+
159156
public AggregatorFactories build() {
160157
if (factories.isEmpty() && pipelineAggregatorFactories.isEmpty()) {
161158
return EMPTY;
162159
}
163-
List<PipelineAggregatorFactory> orderedpipelineAggregators = resolvePipelineAggregatorOrder(this.pipelineAggregatorFactories, this.factories);
160+
List<PipelineAggregatorFactory> orderedpipelineAggregators = null;
161+
if (skipResolveOrder) {
162+
orderedpipelineAggregators = new ArrayList<>(pipelineAggregatorFactories);
163+
} else {
164+
orderedpipelineAggregators = resolvePipelineAggregatorOrder(this.pipelineAggregatorFactories, this.factories);
165+
}
164166
return new AggregatorFactories(factories.toArray(new AggregatorFactory[factories.size()]), orderedpipelineAggregators);
165167
}
166168

167-
private List<PipelineAggregatorFactory> resolvePipelineAggregatorOrder(List<PipelineAggregatorFactory> pipelineAggregatorFactories, List<AggregatorFactory> aggFactories) {
169+
private List<PipelineAggregatorFactory> resolvePipelineAggregatorOrder(List<PipelineAggregatorFactory> pipelineAggregatorFactories,
170+
List<AggregatorFactory> aggFactories) {
168171
Map<String, PipelineAggregatorFactory> pipelineAggregatorFactoriesMap = new HashMap<>();
169172
for (PipelineAggregatorFactory factory : pipelineAggregatorFactories) {
170173
pipelineAggregatorFactoriesMap.put(factory.getName(), factory);
@@ -259,4 +262,71 @@ List<PipelineAggregatorFactory> getPipelineAggregatorFactories() {
259262
return this.pipelineAggregatorFactories;
260263
}
261264
}
265+
266+
@Override
267+
public AggregatorFactories readFrom(StreamInput in) throws IOException {
268+
int factoriesSize = in.readVInt();
269+
AggregatorFactory[] factoriesList = new AggregatorFactory[factoriesSize];
270+
for (int i = 0; i < factoriesSize; i++) {
271+
AggregatorFactory factory = in.readAggregatorFactory();
272+
factoriesList[i] = factory;
273+
}
274+
int pipelineFactoriesSize = in.readVInt();
275+
List<PipelineAggregatorFactory> pipelineAggregatorFactoriesList = new ArrayList<PipelineAggregatorFactory>(pipelineFactoriesSize);
276+
for (int i = 0; i < pipelineFactoriesSize; i++) {
277+
PipelineAggregatorFactory factory = in.readPipelineAggregatorFactory();
278+
pipelineAggregatorFactoriesList.add(factory);
279+
}
280+
AggregatorFactories aggregatorFactories = new AggregatorFactories(factoriesList,
281+
Collections.unmodifiableList(pipelineAggregatorFactoriesList));
282+
return aggregatorFactories;
283+
}
284+
285+
@Override
286+
public void writeTo(StreamOutput out) throws IOException {
287+
out.writeVInt(this.factories.length);
288+
for (AggregatorFactory factory : factories) {
289+
out.writeAggregatorFactory(factory);
290+
}
291+
out.writeVInt(this.pipelineAggregatorFactories.size());
292+
for (PipelineAggregatorFactory factory : pipelineAggregatorFactories) {
293+
out.writePipelineAggregatorFactory(factory);
294+
}
295+
}
296+
297+
@Override
298+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
299+
builder.startObject();
300+
if (factories != null) {
301+
for (AggregatorFactory subAgg : factories) {
302+
subAgg.toXContent(builder, params);
303+
}
304+
}
305+
if (pipelineAggregatorFactories != null) {
306+
for (PipelineAggregatorFactory subAgg : pipelineAggregatorFactories) {
307+
subAgg.toXContent(builder, params);
308+
}
309+
}
310+
builder.endObject();
311+
return builder;
312+
}
313+
314+
@Override
315+
public int hashCode() {
316+
return Objects.hash(Arrays.hashCode(factories), pipelineAggregatorFactories);
317+
}
318+
319+
@Override
320+
public boolean equals(Object obj) {
321+
if (obj == null)
322+
return false;
323+
if (getClass() != obj.getClass())
324+
return false;
325+
AggregatorFactories other = (AggregatorFactories) obj;
326+
if (!Objects.deepEquals(factories, other.factories))
327+
return false;
328+
if (!Objects.equals(pipelineAggregatorFactories, other.pipelineAggregatorFactories))
329+
return false;
330+
return true;
331+
}
262332
}

0 commit comments

Comments
 (0)