Skip to content

Commit 4fbfde6

Browse files
committed
Aggregations Refactor: Refactor Filters Aggregation
1 parent b147bf3 commit 4fbfde6

File tree

4 files changed

+298
-29
lines changed

4 files changed

+298
-29
lines changed

core/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregator.java

Lines changed: 187 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,14 @@
2323
import org.apache.lucene.search.Query;
2424
import org.apache.lucene.search.Weight;
2525
import org.apache.lucene.util.Bits;
26+
import org.elasticsearch.common.ParseField;
27+
import org.elasticsearch.common.io.stream.StreamInput;
28+
import org.elasticsearch.common.io.stream.StreamOutput;
29+
import org.elasticsearch.common.io.stream.Writeable;
2630
import org.elasticsearch.common.lucene.Lucene;
31+
import org.elasticsearch.common.xcontent.ToXContent;
32+
import org.elasticsearch.common.xcontent.XContentBuilder;
33+
import org.elasticsearch.index.query.QueryBuilder;
2734
import org.elasticsearch.search.aggregations.Aggregator;
2835
import org.elasticsearch.search.aggregations.AggregatorFactories;
2936
import org.elasticsearch.search.aggregations.AggregatorFactory;
@@ -39,21 +46,72 @@
3946
import java.util.ArrayList;
4047
import java.util.List;
4148
import java.util.Map;
49+
import java.util.Objects;
4250

4351
/**
4452
*
4553
*/
4654
public class FiltersAggregator extends BucketsAggregator {
4755

48-
static class KeyedFilter {
56+
public static final ParseField FILTERS_FIELD = new ParseField("filters");
57+
public static final ParseField OTHER_BUCKET_FIELD = new ParseField("other_bucket");
58+
public static final ParseField OTHER_BUCKET_KEY_FIELD = new ParseField("other_bucket_key");
4959

50-
final String key;
51-
final Query filter;
60+
public static class KeyedFilter implements Writeable<KeyedFilter>, ToXContent {
5261

53-
KeyedFilter(String key, Query filter) {
62+
static final KeyedFilter PROTOTYPE = new KeyedFilter(null, null);
63+
private final String key;
64+
private final QueryBuilder<?> filter;
65+
66+
public KeyedFilter(String key, QueryBuilder<?> filter) {
5467
this.key = key;
5568
this.filter = filter;
5669
}
70+
71+
public String key() {
72+
return key;
73+
}
74+
75+
public QueryBuilder<?> filter() {
76+
return filter;
77+
}
78+
79+
@Override
80+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
81+
builder.field(key, filter);
82+
return builder;
83+
}
84+
85+
@Override
86+
public KeyedFilter readFrom(StreamInput in) throws IOException {
87+
String key = in.readString();
88+
QueryBuilder<?> filter = in.readQuery();
89+
return new KeyedFilter(key, filter);
90+
}
91+
92+
@Override
93+
public void writeTo(StreamOutput out) throws IOException {
94+
out.writeString(key);
95+
out.writeQuery(filter);
96+
}
97+
98+
@Override
99+
public int hashCode() {
100+
return Objects.hash(key, filter);
101+
}
102+
103+
@Override
104+
public boolean equals(Object obj) {
105+
if (obj == null) {
106+
return false;
107+
}
108+
if (getClass() != obj.getClass()) {
109+
return false;
110+
}
111+
KeyedFilter other = (KeyedFilter) obj;
112+
return Objects.equals(key, other.key)
113+
&& Objects.equals(filter, other.filter);
114+
}
57115
}
58116

59117
private final String[] keys;
@@ -81,7 +139,8 @@ public FiltersAggregator(String name, AggregatorFactories factories, List<KeyedF
81139
for (int i = 0; i < filters.size(); ++i) {
82140
KeyedFilter keyedFilter = filters.get(i);
83141
this.keys[i] = keyedFilter.key;
84-
this.filters[i] = aggregationContext.searchContext().searcher().createNormalizedWeight(keyedFilter.filter, false);
142+
Query filter = keyedFilter.filter.toFilter(context.searchContext().indexShard().getQueryShardContext());
143+
this.filters[i] = aggregationContext.searchContext().searcher().createNormalizedWeight(filter, false);
85144
}
86145
}
87146

@@ -146,20 +205,138 @@ final long bucketOrd(long owningBucketOrdinal, int filterOrd) {
146205
public static class Factory extends AggregatorFactory {
147206

148207
private final List<KeyedFilter> filters;
149-
private boolean keyed;
150-
private String otherBucketKey;
208+
private final boolean keyed;
209+
private boolean otherBucket = false;
210+
private String otherBucketKey = "_other_";
151211

152-
public Factory(String name, List<KeyedFilter> filters, boolean keyed, String otherBucketKey) {
212+
public Factory(String name, List<KeyedFilter> filters) {
153213
super(name, InternalFilters.TYPE);
154214
this.filters = filters;
155-
this.keyed = keyed;
215+
this.keyed = true;
216+
}
217+
218+
public Factory(String name, QueryBuilder<?>... filters) {
219+
super(name, InternalFilters.TYPE);
220+
List<KeyedFilter> keyedFilters = new ArrayList<>(filters.length);
221+
for (int i = 0; i < filters.length; i++) {
222+
keyedFilters.add(new KeyedFilter(String.valueOf(i), filters[i]));
223+
}
224+
this.filters = keyedFilters;
225+
this.keyed = false;
226+
}
227+
228+
/**
229+
* Set whether to include a bucket for documents not matching any filter
230+
*/
231+
public void otherBucket(boolean otherBucket) {
232+
this.otherBucket = otherBucket;
233+
}
234+
235+
/**
236+
* Get whether to include a bucket for documents not matching any filter
237+
*/
238+
public boolean otherBucket() {
239+
return otherBucket;
240+
}
241+
242+
/**
243+
* Set the key to use for the bucket for documents not matching any
244+
* filter.
245+
*/
246+
public void otherBucketKey(String otherBucketKey) {
156247
this.otherBucketKey = otherBucketKey;
157248
}
158249

250+
/**
251+
* Get the key to use for the bucket for documents not matching any
252+
* filter.
253+
*/
254+
public String otherBucketKey() {
255+
return otherBucketKey;
256+
}
257+
159258
@Override
160259
public Aggregator createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket,
161260
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
162-
return new FiltersAggregator(name, factories, filters, keyed, otherBucketKey, context, parent, pipelineAggregators, metaData);
261+
return new FiltersAggregator(name, factories, filters, keyed, otherBucket ? otherBucketKey : null, context, parent,
262+
pipelineAggregators, metaData);
263+
}
264+
265+
@Override
266+
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
267+
builder.startObject();
268+
if (keyed) {
269+
builder.startObject(FILTERS_FIELD.getPreferredName());
270+
for (KeyedFilter keyedFilter : filters) {
271+
builder.field(keyedFilter.key(), keyedFilter.filter());
272+
}
273+
builder.endObject();
274+
} else {
275+
builder.startArray(FILTERS_FIELD.getPreferredName());
276+
for (KeyedFilter keyedFilter : filters) {
277+
builder.value(keyedFilter.filter());
278+
}
279+
builder.endArray();
280+
}
281+
builder.field(OTHER_BUCKET_FIELD.getPreferredName(), otherBucket);
282+
builder.field(OTHER_BUCKET_KEY_FIELD.getPreferredName(), otherBucketKey);
283+
builder.endObject();
284+
return builder;
285+
}
286+
287+
@Override
288+
protected AggregatorFactory doReadFrom(String name, StreamInput in) throws IOException {
289+
Factory factory;
290+
if (in.readBoolean()) {
291+
int size = in.readVInt();
292+
List<KeyedFilter> filters = new ArrayList<>(size);
293+
for (int i = 0; i < size; i++) {
294+
filters.add(KeyedFilter.PROTOTYPE.readFrom(in));
295+
}
296+
factory = new Factory(name, filters);
297+
} else {
298+
int size = in.readVInt();
299+
QueryBuilder<?>[] filters = new QueryBuilder<?>[size];
300+
for (int i = 0; i < size; i++) {
301+
filters[i] = in.readQuery();
302+
}
303+
factory = new Factory(name, filters);
304+
}
305+
factory.otherBucket = in.readBoolean();
306+
factory.otherBucketKey = in.readString();
307+
return factory;
308+
}
309+
310+
@Override
311+
protected void doWriteTo(StreamOutput out) throws IOException {
312+
out.writeBoolean(keyed);
313+
if (keyed) {
314+
out.writeVInt(filters.size());
315+
for (KeyedFilter keyedFilter : filters) {
316+
keyedFilter.writeTo(out);
317+
}
318+
} else {
319+
out.writeVInt(filters.size());
320+
for (KeyedFilter keyedFilter : filters) {
321+
out.writeQuery(keyedFilter.filter());
322+
}
323+
}
324+
out.writeBoolean(otherBucket);
325+
out.writeString(otherBucketKey);
326+
}
327+
328+
@Override
329+
protected int doHashCode() {
330+
return Objects.hash(filters, keyed, otherBucket, otherBucketKey);
331+
}
332+
333+
@Override
334+
protected boolean doEquals(Object obj) {
335+
Factory other = (Factory) obj;
336+
return Objects.equals(filters, other.filters)
337+
&& Objects.equals(keyed, other.keyed)
338+
&& Objects.equals(otherBucket, other.otherBucket)
339+
&& Objects.equals(otherBucketKey, other.otherBucketKey);
163340
}
164341
}
165342

core/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersParser.java

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@
2020
package org.elasticsearch.search.aggregations.bucket.filters;
2121

2222
import org.elasticsearch.common.ParseField;
23-
import org.elasticsearch.common.lucene.search.Queries;
23+
import org.elasticsearch.common.inject.Inject;
2424
import org.elasticsearch.common.xcontent.XContentParser;
25-
import org.elasticsearch.index.query.ParsedQuery;
25+
import org.elasticsearch.index.query.QueryBuilder;
26+
import org.elasticsearch.index.query.QueryBuilders;
27+
import org.elasticsearch.index.query.QueryParseContext;
28+
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
2629
import org.elasticsearch.search.SearchParseException;
2730
import org.elasticsearch.search.aggregations.Aggregator;
2831
import org.elasticsearch.search.aggregations.AggregatorFactory;
2932
import org.elasticsearch.search.internal.SearchContext;
3033

3134
import java.io.IOException;
3235
import java.util.ArrayList;
36+
import java.util.Collections;
3337
import java.util.List;
3438

3539
/**
@@ -40,6 +44,12 @@ public class FiltersParser implements Aggregator.Parser {
4044
public static final ParseField FILTERS_FIELD = new ParseField("filters");
4145
public static final ParseField OTHER_BUCKET_FIELD = new ParseField("other_bucket");
4246
public static final ParseField OTHER_BUCKET_KEY_FIELD = new ParseField("other_bucket_key");
47+
private final IndicesQueriesRegistry queriesRegistry;
48+
49+
@Inject
50+
public FiltersParser(IndicesQueriesRegistry queriesRegistry) {
51+
this.queriesRegistry = queriesRegistry;
52+
}
4353

4454
@Override
4555
public String type() {
@@ -49,13 +59,13 @@ public String type() {
4959
@Override
5060
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
5161

52-
List<FiltersAggregator.KeyedFilter> filters = new ArrayList<>();
62+
List<FiltersAggregator.KeyedFilter> keyedFilters = null;
63+
List<QueryBuilder<?>> nonKeyedFilters = null;
5364

5465
XContentParser.Token token = null;
5566
String currentFieldName = null;
56-
Boolean keyed = null;
5767
String otherBucketKey = null;
58-
boolean otherBucket = false;
68+
Boolean otherBucket = false;
5969
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
6070
if (token == XContentParser.Token.FIELD_NAME) {
6171
currentFieldName = parser.currentName();
@@ -69,21 +79,24 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se
6979
} else if (token == XContentParser.Token.VALUE_STRING) {
7080
if (context.parseFieldMatcher().match(currentFieldName, OTHER_BUCKET_KEY_FIELD)) {
7181
otherBucketKey = parser.text();
72-
otherBucket = true;
7382
} else {
7483
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: ["
7584
+ currentFieldName + "].", parser.getTokenLocation());
7685
}
7786
} else if (token == XContentParser.Token.START_OBJECT) {
7887
if (context.parseFieldMatcher().match(currentFieldName, FILTERS_FIELD)) {
79-
keyed = true;
88+
keyedFilters = new ArrayList<>();
8089
String key = null;
8190
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
8291
if (token == XContentParser.Token.FIELD_NAME) {
8392
key = parser.currentName();
8493
} else {
85-
ParsedQuery filter = context.indexShard().getQueryShardContext().parseInnerFilter(parser);
86-
filters.add(new FiltersAggregator.KeyedFilter(key, filter == null ? Queries.newMatchAllQuery() : filter.query()));
94+
QueryParseContext queryParseContext = new QueryParseContext(queriesRegistry);
95+
queryParseContext.reset(parser);
96+
queryParseContext.parseFieldMatcher(context.parseFieldMatcher());
97+
QueryBuilder<?> filter = queryParseContext.parseInnerQueryBuilder();
98+
keyedFilters
99+
.add(new FiltersAggregator.KeyedFilter(key, filter == null ? QueryBuilders.matchAllQuery() : filter));
87100
}
88101
}
89102
} else {
@@ -92,13 +105,13 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se
92105
}
93106
} else if (token == XContentParser.Token.START_ARRAY) {
94107
if (context.parseFieldMatcher().match(currentFieldName, FILTERS_FIELD)) {
95-
keyed = false;
96-
int idx = 0;
108+
nonKeyedFilters = new ArrayList<>();
97109
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
98-
ParsedQuery filter = context.indexShard().getQueryShardContext().parseInnerFilter(parser);
99-
filters.add(new FiltersAggregator.KeyedFilter(String.valueOf(idx), filter == null ? Queries.newMatchAllQuery()
100-
: filter.query()));
101-
idx++;
110+
QueryParseContext queryParseContext = new QueryParseContext(queriesRegistry);
111+
queryParseContext.reset(parser);
112+
queryParseContext.parseFieldMatcher(context.parseFieldMatcher());
113+
QueryBuilder<?> filter = queryParseContext.parseInnerQueryBuilder();
114+
nonKeyedFilters.add(filter == null ? QueryBuilders.matchAllQuery() : filter);
102115
}
103116
} else {
104117
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: ["
@@ -114,13 +127,24 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se
114127
otherBucketKey = "_other_";
115128
}
116129

117-
return new FiltersAggregator.Factory(aggregationName, filters, keyed, otherBucketKey);
130+
FiltersAggregator.Factory factory;
131+
if (keyedFilters != null) {
132+
factory = new FiltersAggregator.Factory(aggregationName, keyedFilters);
133+
} else {
134+
factory = new FiltersAggregator.Factory(aggregationName, nonKeyedFilters.toArray(new QueryBuilder<?>[nonKeyedFilters.size()]));
135+
}
136+
if (otherBucket != null) {
137+
factory.otherBucket(otherBucket);
138+
}
139+
if (otherBucketKey != null) {
140+
factory.otherBucketKey(otherBucketKey);
141+
}
142+
return factory;
118143
}
119144

120-
// NORELEASE implement this method when refactoring this aggregation
121145
@Override
122146
public AggregatorFactory[] getFactoryPrototypes() {
123-
return null;
147+
return new AggregatorFactory[] { new FiltersAggregator.Factory(null, Collections.emptyList()) };
124148
}
125149

126150
}

core/src/test/java/org/elasticsearch/search/aggregations/bucket/FiltersIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ public void testOtherNamedBucket() throws Exception {
308308
SearchResponse response = client()
309309
.prepareSearch("idx")
310310
.addAggregation(
311-
filters("tags").otherBucketKey("foobar")
311+
filters("tags").otherBucket(true).otherBucketKey("foobar")
312312
.filter("tag1", termQuery("tag", "tag1"))
313313
.filter("tag2", termQuery("tag", "tag2")))
314314
.execute().actionGet();

0 commit comments

Comments
 (0)